Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。
“微”(micro) 并不表示你需要把整个 Web 应用塞进单个 Python 文件(虽然确实可以 ),也不意味着 Flask 在功能上有所欠缺。微框架中的“微”意味着 Flask 旨在保持核心简单而易于扩展。Flask 不会替你做出太多决策——比如使用何种数据库。而那些 Flask 所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask 可以与您珠联璧合。
默认情况下,Flask 不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用
wsgiref 最简单的Web应用就是先把HTML用文件保存好,用一个现成的HTTP服务器软件,接收用户请求,从文件中读取HTML,返回。
正确的做法是底层代码由专门的服务器软件实现,我们用Python专注于生成HTML文档。因为我们不希望接触到TCP连接、HTTP原始请求和响应格式,所以,需要一个统一的接口协议来实现这样的服务器软件,让我们专心用Python编写Web业务。这个接口就是WSGI:Web Server Gateway Interface。而wsgiref模块就是python基于wsgi协议开发的服务模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Copyfrom wsgiref.simple_server import make_server def mya (environ, start_response ): print (environ) start_response('200 OK' , [('Content-Type' , 'text/html' )]) if environ.get('PATH_INFO' ) == '/index' : with open ('index.html' ,'rb' ) as f: data=f.read() elif environ.get('PATH_INFO' ) == '/login' : with open ('login.html' , 'rb' ) as f: data = f.read() else : data=b'<h1>Hello, web!</h1>' return [data] if __name__ == '__main__' : myserver = make_server('' , 8011 , mya) print ('监听8010' ) myserver.serve_forever() wsgiref简单应用
1.安装 pip3 install flask
2.werkzeug简介 Werkzeug是一个WSGI工具包,他可以作为一个Web框架的底层库。这里稍微说一下, werkzeug 不是一个web服务器,也不是一个web框架,而是一个工具包,官方的介绍说是一个 WSGI 工具包,它可以作为一个 Web 框架的底层库,因为它封装好了很多 Web 框架的东西,例如 Request,Response 等等
1 2 3 4 5 6 7 8 9 Copyfrom werkzeug.wrappers import Request, Response @Request.application def hello (request ): return Response('Hello World!' ) if __name__ == '__main__' : from werkzeug.serving import run_simple run_simple('localhost' , 4000 , hello)
3.flask快速使用 1 2 3 4 5 6 7 8 9 10 Copyfrom flask import Flask app = Flask(__name__) @app.route('/' ) def hello_world (): return 'Hello World!' if __name__ == '__main__' : app.run()
案例:登录,显示用户信息 main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 Copyfrom flask import Flask,render_template,request,redirect,session,url_for app = Flask(__name__) app.debug = True app.secret_key = 'sdfsdfsdfsdf' USERS = { 1 :{'name' :'张三' ,'age' :18 ,'gender' :'男' ,'text' :"道路千万条" }, 2 :{'name' :'李四' ,'age' :28 ,'gender' :'男' ,'text' :"安全第一条" }, 3 :{'name' :'王五' ,'age' :18 ,'gender' :'女' ,'text' :"行车不规范" }, } @app.route('/detail/<int:nid>' ,methods=['GET' ] ) def detail (nid ): user = session.get('user_info' ) if not user: return redirect('/login' ) info = USERS.get(nid) return render_template('detail.html' ,info=info) @app.route('/index' ,methods=['GET' ] ) def index (): user = session.get('user_info' ) if not user: url = url_for('l1' ) return redirect(url) return render_template('index.html' ,user_dict=USERS) @app.route('/login' ,methods=['GET' ,'POST' ],endpoint='l1' ) def login (): if request.method == "GET" : return render_template('login.html' ) else : user = request.form.get('user' ) pwd = request.form.get('pwd' ) if user == 'cxw' and pwd == '123' : session['user_info' ] = user return redirect('http://www.baidu.com' ) return render_template('login.html' ,error='用户名或密码错误' ) if __name__ == '__main__' : app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 详细信息 {{info.name}}</h1 > <div > {{info.text}} </div > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 用户列表</h1 > <table > {% for k,v in user_dict.items() %} <tr > <td > {{k}}</td > <td > {{v.name}}</td > <td > {{v['name']}}</td > <td > {{v.get('name')}}</td > <td > <a href ="/detail/{{k}}" > 查看详细</a > </td > </tr > {% endfor %} </table > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 用户登录</h1 > <form method ="post" > <input type ="text" name ="user" > <input type ="text" name ="pwd" > <input type ="submit" value ="登录" > {{error}} </form > </body > </html >
作业:登录认证装饰器 -多个装饰器执行顺序
4.配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为: Copy { 'DEBUG' : get_debug_flag(default=False ), 是否开启Debug 模式 'TESTING' : False , 是否开启测试模式 'PROPAGATE_EXCEPTIONS' : None , 'PRESERVE_CONTEXT_ON_EXCEPTION' : None , 'SECRET_KEY' : None , 'PERMANENT_SESSION_LIFETIME' : timedelta(days=31 ), 'USE_X_SENDFILE' : False , 'LOGGER_NAME' : None , 'LOGGER_HANDLER_POLICY' : 'always' , 'SERVER_NAME' : None , 'APPLICATION_ROOT' : None , 'SESSION_COOKIE_NAME' : 'session' , 'SESSION_COOKIE_DOMAIN' : None , 'SESSION_COOKIE_PATH' : None , 'SESSION_COOKIE_HTTPONLY' : True , 'SESSION_COOKIE_SECURE' : False , 'SESSION_REFRESH_EACH_REQUEST' : True , 'MAX_CONTENT_LENGTH' : None , 'SEND_FILE_MAX_AGE_DEFAULT' : timedelta(hours=12 ), 'TRAP_BAD_REQUEST_ERRORS' : False , 'TRAP_HTTP_EXCEPTIONS' : False , 'EXPLAIN_TEMPLATE_LOADING' : False , 'PREFERRED_URL_SCHEME' : 'http' , 'JSON_AS_ASCII' : True , 'JSON_SORT_KEYS' : True , 'JSONIFY_PRETTYPRINT_REGULAR' : True , 'JSONIFY_MIMETYPE' : 'application/json' , 'TEMPLATES_AUTO_RELOAD' : None , }
方式一 1 2 Copy app.config['DEBUG' ] = True PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...)
方式二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 Copy app.config.from_pyfile("python文件名称" ) 如: settings.py DEBUG = True app.config.from_pyfile("settings.py" ) app.config.from_envvar("环境变量名称" ) 环境变量的值为python文件名称名称,内部调用from_pyfile方法 app.config.from_json("json文件名称" ) JSON文件名称,必须是json格式,因为内部会执行json.loads app.config.from_mapping({'DEBUG' : True }) 字典格式 app.config.from_object("python类或类的路径" ) app.config.from_object('pro_flask.settings.TestingConfig' ) settings.py class Config (object ): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig (Config ): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig (Config ): DEBUG = True class TestingConfig (Config ): TESTING = True PS: 从sys.path中已经存在路径开始写 PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True ,则就是instance_path目录(Flask对象init方法的参数)
5.路由系统 典型写法 1 Copy@app.route('/detail/<int:nid>' ,methods=['GET' ],endpoint='detail' )
默认转换器 1 2 3 4 5 6 7 8 9 CopyDEFAULT_CONVERTERS = { 'default' : UnicodeConverter, 'string' : UnicodeConverter, 'any' : AnyConverter, 'path' : PathConverter, 'int' : IntegerConverter, 'float' : FloatConverter, 'uuid' : UUIDConverter, }
路由系统本质 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Copy""" 1. decorator = app.route('/',methods=['GET','POST'],endpoint='n1') def route(self, rule, **options): # app对象 # rule= / # options = {methods=['GET','POST'],endpoint='n1'} def decorator(f): endpoint = options.pop('endpoint', None) self.add_url_rule(rule, endpoint, f, **options) return f return decorator 2. @decorator decorator(index) """ def login (): return '登录' app.add_url_rule('/login' , 'n2' , login, methods=['GET' ,"POST" ])
CBV(源码分析) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Copydef auth(func): def inner (*args, **kwargs ): print ('before' ) result = func(*args, **kwargs) print ('after' ) return result return inner class IndexView (views.View): methods = ['GET' ] decorators = [auth, ] def dispatch_request (self ): print ('Index' ) return 'Index!' app.add_url_rule('/index' , view_func=IndexView.as_view(name='index' )) class IndexView (views.MethodView): methods = ['GET' ] decorators = [auth, ] def get (self ): return 'Index.GET' def post (self ): return 'Index.POST' app.add_url_rule('/index' , view_func=IndexView.as_view(name='index' ))
app.add_url_rule参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 Copy@app.route和app.add_url_rule参数: rule, URL规则 view_func, 视图函数名称 defaults = None , 默认值, 当URL中无参数,函数需要参数时,使用defaults = {'k' : 'v' } 为函数提供参数 endpoint = None , 名称,用于反向生成URL,即: url_for('名称' ) methods = None , 允许的请求方式,如:["GET" , "POST" ] strict_slashes = None ''' @app.route('/index', strict_slashes=False) #访问http://www.xx.com/index/ 或http://www.xx.com/index均可 @app.route('/index', strict_slashes=True) #仅访问http://www.xx.com/index ''' redirect_to = None , ''' @app.route('/index/<int:nid>', redirect_to='/home/<nid>') ''' subdomain = None , ''' #C:\Windows\System32\drivers\etc\hosts www.liuqingzheng.com admin.liuqingzheng.com buy.liuqingzheng.com from flask import Flask, views, url_for app = Flask(import_name=__name__) app.config['SERVER_NAME'] = 'liuqingzheng.com:5000' @app.route("/", subdomain="admin") def static_index(): """Flask supports static subdomains This is available at static.your-domain.tld""" return "static.your-domain.tld" #可以传入任意的字符串,如传入的字符串为aa,显示为 aa.liuqingzheng.com @app.route("/dynamic", subdomain="<username>") def username_index(username): """Dynamic subdomains are also supported Try going to user1.your-domain.tld/dynamic""" return username + ".your-domain.tld" if __name__ == '__main__': app.run() 访问: http://www.liuqingzheng.com:5000/dynamic http://admin.liuqingzheng.com:5000/dynamic http://buy.liuqingzheng.com:5000/dynamic '''
支持正则 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 Copy from flask import Flask, views, url_forfrom werkzeug.routing import BaseConverterapp = Flask(import_name=__name__) class RegexConverter (BaseConverter ): """ 自定义URL匹配正则表达式 """ def __init__ (self, map , regex ): super (RegexConverter, self).__init__(map ) self.regex = regex def to_python (self, value ): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 """ return int (value) def to_url (self, value ): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 """ val = super (RegexConverter, self).to_url(value) return val app.url_map.converters['regex' ] = RegexConverter @app.route('/index/<regex("\d+"):nid>' ) def index (nid ): print (url_for('index' , nid='888' )) return 'Index' if __name__ == '__main__' : app.run()
6.模版 6.1渲染变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 用户列表</h1 > <table > {% for k,v in user_dict.items() %} <tr > <td > {{k}}</td > <td > {{v.name}}</td > <td > {{v['name']}}</td > <td > {{v.get('name')}}</td > <td > <a href ="/detail/{{k}}" > 查看详细</a > </td > </tr > {% endfor %} </table > </body > </html >
6.2变量的循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 用户列表</h1 > <table > {% for k,v in user_dict.items() %} <tr > <td > {{k}}</td > <td > {{v.name}}</td > <td > {{v['name']}}</td > <td > {{v.get('name')}}</td > <td > <a href ="/detail/{{k}}" > 查看详细</a > </td > </tr > {% endfor %} </table > </body > </html >
6.3逻辑判断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 用户列表</h1 > <table > {% if name %} <h1 > Hello {{ name }}!</h1 > {% else %} <h1 > Hello World!</h1 > {% endif %} </table > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 Copyfrom flask import Flask,render_template,Markup,jsonify,make_response app = Flask(__name__) def func1 (arg ): return Markup("<input type='text' value='%s' />" %(arg,)) @app.route('/' ) def index (): return render_template('index.html' ,ff = func1) if __name__ == '__main__' : app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > {{ff('六五')}} {{ff('六五')|safe}} </body > </html >
1.Markup等价django的mark_safe ,
7.请求响应 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 Copy from flask import Flask from flask import request from flask import render_template from flask import redirect from flask import make_response app = Flask(__name__) @app.route('/login.html' , methods=['GET' , "POST" ] ) def login (): return "内容" if __name__ == '__main__' : app.run()
8.session 1 2 3 Copycookie: 存放在客户端的键值对session:存放在客户端的键值对 token: 存放在客户端,通过算法来校验
1 Copyapp.secret_key="asdas"
除请求对象之外,还有一个 session 对象。它允许你在不同请求间存储特定用户的信息。它是在 Cookies 的基础上实现的,并且对 Cookies 进行密钥签名要使用会话,你需要设置一个密钥。 (app.session_interface对象)
1 2 3 4 5 6 7 8 Copy设置:session['username' ] = 'xxx' 删除:session.pop('username' , None )
app.session_interface中save_session的参数(设置cookie的参数) 1 2 3 4 5 6 7 8 Copykey, 键 value='' , 值 max_age=None , 超时时间 cookie需要延续的时间(以秒为单位)如果参数是\ None `` ,这个cookie会延续到浏览器关闭为止 expires=None , 超时时间(IE requires expires, so set it if hasn't been already.) path=' /', Cookie生效的路径,/ 表示根路径,特殊的:根路径的cookie可以被任何url的页面访问,浏览器只会把cookie回传给带有该路径的页面,这样可以避免将cookie传给站点中的其他的应用。 domain=None, Cookie生效的域名 你可用这个参数来构造一个跨站cookie。如, domain=".example.com"所构造的cookie对下面这些站点都是可读的:www.example.com 、 www2.example.com 和an.other.sub.domain.example.com 。如果该参数设置为 None ,cookie只能由设置它的站点读取 secure=False, 浏览器将通过HTTPS来回传cookie httponly=False 只能http协议传输,无法被JavaScript获取(不是绝对,底层抓包可以获取到也可以被覆盖)
session源码的执行流程 1 2 3 4 Copy -save_seesion -响应的时候,把session 中的值加密序列化放大到了cookie中,返回到浏览器中 -open_session -请求来了,从cookie中取出值,反解,生成session 对象,以后再视图函数中直接用sessoin就可以了。
9.闪现(message) 1 2 3 4 Copy-设置:flash('aaa' ) -取值:get_flashed_message() - -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Copyfrom flask import Flask,flash,get_flashed_messages,request,redirect app = Flask(__name__) app.secret_key = 'asdfasdf' @app.route('/index' ) def index (): val = request.args.get('v' ) if val == 'oldboy' : return 'Hello World!' flash('超时错误' ,category="x1" ) return "ssdsdsdfsd" @app.route('/error' ) def error (): """ 展示错误信息 :return: 如果get_flashed_messages(with_category=True) """ data = get_flashed_messages(category_filter=['x1' ]) if data: msg = data[0 ] else : msg = "..." return "错误信息:%s" %(msg,) if __name__ == '__main__' : app.run()
10.请求扩展 1 before_request 类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情
1 2 3 4 5 6 7 8 9 Copy @app.before_request def process_request (*args,**kwargs ): if request.path == '/login' : return None user = session.get('user_info' ) if user: return None return redirect('/login' )
2 after_request 类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常
1 2 3 4 Copy@app.after_request def process_response1 (response ): print ('process_response1 走了' ) return response
3 before_first_request 第一次请求时,跟浏览器无关
1 2 3 Copy@app.before_first_request def first (): pass
4 teardown_request 每一个请求之后绑定一个函数,即使遇到了异常
1 2 3 Copy@app.teardown_request def ter (e ): pass
5 errorhandler 路径不存在时404,服务器内部错误500
1 2 3 Copy@app.errorhandler(404 ) def error_404 (arg ): return "404错误了"
6 template_global 标签
1 2 3 4 Copy@app.template_global() def sb (a1, a2 ): return a1 + a2
7 template_filter 过滤器
1 2 3 4 Copy@app.template_filter() def db (a1, a2, a3 ): return a1 + a2 + a3
1 重点掌握before_request和after_request,
2 注意有多个的情况,执行顺序
3 before_request请求拦截后(也就是有return值),response所有都执行
11 中间件(了解) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Copyfrom flask import Flask app = Flask(__name__) @app.route('/' ) def index (): return 'Hello World!' class Md (object ): def __init__ (self,old_wsgi_app ): self.old_wsgi_app = old_wsgi_app def __call__ (self, environ, start_response ): print ('开始之前' ) ret = self.old_wsgi_app(environ, start_response) print ('结束之后' ) return ret if __name__ == '__main__' : app.wsgi_app = Md(app.wsgi_app) app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Copyctx = self.request_context(environ) error = None try : try : ctx.push() response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except : error = sys.exc_info()[1 ] raise return response(environ, start_response) finally : if self.should_ignore_error(error): error = None ctx.auto_pop(error)
12.蓝图 对程序进行目录结构划分
不使用蓝图,自己分文件 目录结构:
1 2 3 4 5 6 Copy-templates -views -__init__.py -user.py -order.py -app.py
1 2 3 Copyfrom views import app if __name__ == '__main__' : app.run()
1 2 3 4 5 6 Copyfrom flask import Flask,request app = Flask(__name__) from . import accountfrom . import orderfrom . import user
1 2 3 4 Copyfrom . import app @app.route('/user' ) def user (): return 'user'
1 2 3 4 Copyfrom . import app @app.route('/order' ) def order (): return 'order'
使用蓝图之中小型系统 详见代码:pro_flask_简单应用程序目录示例.zip
1 2 3 4 5 6 7 8 9 10 Copy-flask_pro -flask_test -__init__.py -static -templates -views -order.py -user.py -manage.py
_init .py
1 2 3 4 5 6 Copyfrom flask import Flask app=Flask(__name__) from flask_test.views import userfrom flask_test.views import orderapp.register_blueprint(user.us) app.register_blueprint(order.ord )
1 2 3 Copyfrom flask_test import app if __name__ == '__main__' : app.run(port=8008 )
1 2 3 4 5 6 Copyfrom flask import Blueprint us=Blueprint('user' ,__name__) @us.route('/login' ) def login (): return 'login'
1 2 3 4 5 6 Copyfrom flask import Blueprint ord =Blueprint('order' ,__name__)@ord.route('/test' ) def test (): return 'order test'
使用蓝图之大型系统 详见代码:pro_flask_大型应用目录示例.zip
1 xxx = Blueprint(‘account’, name ,url_prefix=’/xxx’) :蓝图URL前缀,表示url的前缀,在该蓝图下所有url都加前缀
2 xxx = Blueprint(‘account’, name,url_prefix=’/xxx’,template_folder=’tpls’):给当前蓝图单独使用templates,向上查找,当前找不到,会找总templates
3 蓝图的befort_request,对当前蓝图有效
4 大型项目,可以模拟出类似于django中app的概念
13.请求上下文源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Copy第一阶段:将ctx(request,session)放到Local对象上 第二阶段:视图函数导入:request/session request.method -LocalProxy对象.method,执行getattr 方法,getattr (self._get_current_object(), name) -self._get_current_object()返回return self.__local(),self.__local(),在LocakProxy实例化的时候,object .__setattr__(self, '_LocalProxy__local' , local),此处local就是:partial(_lookup_req_object, 'request' ) -def _lookup_req_object (name ): top = _request_ctx_stack.top if top is None : raise RuntimeError(_request_ctx_err_msg) return getattr (top, name) 第三阶段:请求处理完毕 - 获取session并保存到cookie - 将ctx删除
14.g对象 专门用来存储用户信息的g对象,g的全称的为global
g对象和session的区别 1 Copysession对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
15.flask-session 作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安装:pip3 install flask-session
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Copyfrom flask import Flask,session from flask_session import RedisSessionInterfaceimport redisapp = Flask(__name__) conn=redis.Redis(host='' ,port=6379 ) app.session_interface=RedisSessionInterface(conn,key_prefix='lqz' ) @app.route('/' ) def hello_world (): session['name' ]='lqz' return 'Hello World!' if __name__ == '__main__' : app.run()
1 2 3 4 5 Copyfrom redis import Redis from flask.ext.session import Sessionapp.config['SESSION_TYPE' ] = 'redis' app.config['SESSION_REDIS' ] = Redis(host='' ,port='6379' ) Session(app)
1 2 3 4 Copyresponse.set_cookie('k' ,'v' ,exipre=None ) app.session_interface=RedisSessionInterface(conn,key_prefix='lqz' ,permanent=False )
1 2 Copy 'PERMANENT_SESSION_LIFETIME' : timedelta(days=31 ),
16.数据库连接池 pymsql链接数据库 1 2 3 4 5 6 7 8 9 10 11 12 Copyimport pymysql conn = pymysql.connect(host='' , port=3306 , user='root' , passwd='123456' , db='s8day127db' ) cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s" ,{'user' :'lqz' ,'pwd' :'123' }) obj = cursor.fetchone() conn.commit() cursor.close() conn.close() print (obj)
数据库连接池版 setting.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 Copyfrom datetime import timedelta from redis import Redisimport pymysqlfrom DBUtils.PooledDB import PooledDB, SharedDBConnectionclass Config (object ): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20 ) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis" PYMYSQL_POOL = PooledDB( creator=pymysql, maxconnections=6 , mincached=2 , maxcached=5 , maxshared=3 , blocking=True , maxusage=None , setsession=[], ping=0 , host='' , port=3306 , user='root' , password='123456' , database='s8day127db' , charset='utf8' ) class ProductionConfig (Config ): SESSION_REDIS = Redis(host='' , port='6379' ) class DevelopmentConfig (Config ): SESSION_REDIS = Redis(host='' , port='6379' ) class TestingConfig (Config ): pass
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Copyimport pymysql from settings import Configclass SQLHelper (object ): @staticmethod def open (cursor ): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close (conn,cursor ): conn.commit() cursor.close() conn.close() @classmethod def fetch_one (cls,sql,args,cursor =pymysql.cursors.DictCursor ): conn,cursor = cls.open (cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all (cls,sql, args,cursor =pymysql.cursors.DictCursor ): conn, cursor = cls.open (cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj
1 Copyobj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s" , form.data)
安装:pip3 install wtforms
使用1: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 Copyfrom flask import Flask, render_template, request, redirect from wtforms import Formfrom wtforms.fields import simplefrom wtforms import validatorsfrom wtforms import widgetsapp = Flask(__name__, template_folder='templates' ) app.debug = True class LoginForm (Form ): name = simple.StringField( label='用户名' , validators=[ validators.DataRequired(message='用户名不能为空.' ), validators.Length(min =6 , max =18 , message='用户名长度必须大于%(min)d且小于%(max)d' ) ], widget=widgets.TextInput(), render_kw={'class' : 'form-control' } ) pwd = simple.PasswordField( label='密码' , validators=[ validators.DataRequired(message='密码不能为空.' ), validators.Length(min =8 , message='用户名长度必须大于%(min)d' ), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}" , message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符' ) ], widget=widgets.PasswordInput(), render_kw={'class' : 'form-control' } ) @app.route('/login' , methods=['GET' , 'POST' ] ) def login (): if request.method == 'GET' : form = LoginForm() return render_template('login.html' , form=form) else : form = LoginForm(formdata=request.form) if form.validate(): print ('用户提交数据通过格式验证,提交的值为:' , form.data) else : print (form.errors) return render_template('login.html' , form=form) if __name__ == '__main__' : app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Copy<!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > 登录</h1 > <form method ="post" > <p > {{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p > <p > {{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p > <input type ="submit" value ="提交" > </form > </body > </html >
使用2: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 Copyfrom flask import Flask, render_template, request, redirect from wtforms import Formfrom wtforms.fields import corefrom wtforms.fields import html5from wtforms.fields import simplefrom wtforms import validatorsfrom wtforms import widgetsapp = Flask(__name__, template_folder='templates' ) app.debug = True class RegisterForm (Form ): name = simple.StringField( label='用户名' , validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class' : 'form-control' }, default='alex' ) pwd = simple.PasswordField( label='密码' , validators=[ validators.DataRequired(message='密码不能为空.' ) ], widget=widgets.PasswordInput(), render_kw={'class' : 'form-control' } ) pwd_confirm = simple.PasswordField( label='重复密码' , validators=[ validators.DataRequired(message='重复密码不能为空.' ), validators.EqualTo('pwd' , message="两次密码输入不一致" ) ], widget=widgets.PasswordInput(), render_kw={'class' : 'form-control' } ) email = html5.EmailField( label='邮箱' , validators=[ validators.DataRequired(message='邮箱不能为空.' ), validators.Email(message='邮箱格式错误' ) ], widget=widgets.TextInput(input_type='email' ), render_kw={'class' : 'form-control' } ) gender = core.RadioField( label='性别' , choices=( (1 , '男' ), (2 , '女' ), ), coerce=int ) city = core.SelectField( label='城市' , choices=( ('bj' , '北京' ), ('sh' , '上海' ), ) ) hobby = core.SelectMultipleField( label='爱好' , choices=( (1 , '篮球' ), (2 , '足球' ), ), coerce=int ) favor = core.SelectMultipleField( label='喜好' , choices=( (1 , '篮球' ), (2 , '足球' ), ), widget=widgets.ListWidget(prefix_label=False ), option_widget=widgets.CheckboxInput(), coerce=int , default=[1 , 2 ] ) def __init__ (self, *args, **kwargs ): super (RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1 , '篮球' ), (2 , '足球' ), (3 , '羽毛球' )) def validate_pwd_confirm (self, field ): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ if field.data != self.data['pwd' ]: raise validators.StopValidation("密码不一致" ) @app.route('/register' , methods=['GET' , 'POST' ] ) def register (): if request.method == 'GET' : form = RegisterForm(data={'gender' : 2 ,'hobby' :[1 ,]}) return render_template('register.html' , form=form) else : form = RegisterForm(formdata=request.form) if form.validate(): print ('用户提交数据通过格式验证,提交的值为:' , form.data) else : print (form.errors) return render_template('register.html' , form=form) if __name__ == '__main__' : app.run() Copy<!DOCTYPE html> <html lang="en" > <head> <meta charset="UTF-8" > <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px" > {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0 ] }}</p> {% endfor %} <input type ="submit" value="提交" > </form> </body> </html>
18.信号 Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
安装:pip3 install blinker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Copyrequest_started = _signals.signal('request-started' ) request_finished = _signals.signal('request-finished' ) before_render_template = _signals.signal('before-render-template' ) template_rendered = _signals.signal('template-rendered' ) got_request_exception = _signals.signal('got-request-exception' ) request_tearing_down = _signals.signal('request-tearing-down' ) appcontext_tearing_down = _signals.signal('appcontext-tearing-down' ) appcontext_pushed = _signals.signal('appcontext-pushed' ) appcontext_popped = _signals.signal('appcontext-popped' ) message_flashed = _signals.signal('message-flashed' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 Copyfrom flask import Flask,signals,render_template app = Flask(__name__) def func (*args,**kwargs ): print ('触发型号' ,args,kwargs) signals.request_started.connect(func) @app.before_first_request def before_first1 (*args,**kwargs ): pass @app.before_first_request def before_first2 (*args,**kwargs ): pass @app.before_request def before_first3 (*args,**kwargs ): pass @app.route('/' ,methods=['GET' ,"POST" ] ) def index (): print ('视图' ) return render_template('index.html' ) if __name__ == '__main__' : app.wsgi_app app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Copya. before_first_request b. 触发 request_started 信号 c. before_request d. 模板渲染 渲染前的信号 before_render_template.send(app, template=template, context=context) rv = template.render(context) 渲染后的信号 template_rendered.send(app, template=template, context=context) e. after_request f. session.save_session() g. 触发 request_finished信号 如果上述过程出错: 触发错误处理信号 got_request_exception.send(self, exception=e) h. 触发信号 request_tearing_down
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Copyfrom flask import Flask, current_app, flash, render_template from flask.signals import _signalsapp = Flask(import_name=__name__) xxxxx = _signals.signal('xxxxx' ) def func (sender, *args, **kwargs ): print (sender) xxxxx.connect(func) @app.route("/x" ) def index (): xxxxx.send('123123' , k1='v1' ) return 'Index' if __name__ == '__main__' : app.run()
19.多app应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Copyfrom werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simplefrom flask import Flask, current_appapp1 = Flask('app01' ) app2 = Flask('app02' ) @app1.route('/index' ) def index (): return "app01" @app2.route('/index2' ) def index2 (): return "app2" dm = DispatcherMiddleware(app1, { '/sec' : app2, }) if __name__ == "__main__" : run_simple('localhost' , 5000 , dm)
20.flask-script 用于实现类似于django中 python3 manage.py runserver …类似的命令
安装:pip3 install flask-script
使用 1 2 3 4 5 6 7 8 Copyfrom flask_script import Manager app = Flask(__name__) manager=Manager(app) ... if __name__ == '__main__' : manager.run()
自定制命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Copy@manager.command def custom (arg ): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print (arg) @manager.option('-n' , '--name' , dest='name' ) def cmd (name, url ): """ 自定义命令(-n也可以写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print (name, url)
21flask-admin 安装 1 Copypip3 install flask_admin
简单使用 1 2 3 4 5 6 7 8 9 10 11 12 Copyfrom flask import Flask from flask_admin import Adminapp = Flask(__name__) admin = Admin(app) if __name__=="mian" : app.run()
将表模型注册到admin中 1 2 3 4 5 6 7 8 9 10 11 12 13 14 Copy SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 from flask_admin.contrib.sqla import ModelViewfrom api.models import Stock,Product,Images,Category,Wxuser,Banneradmin.add_view(ModelView(Stock, db.session)) admin.add_view(ModelView(Product, db.session)) admin.add_view(ModelView(Category, db.session))
如果有个字段是图片指端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Copy from flask_admin.contrib.fileadmin import FileAdmin,formfile_path = op.join(op.dirname(__file__), 'static' ) admin = Admin(app) admin.add_view(FileAdmin(file_path, '/static/' , name='文件' )) class ImagesView (ModelView ): form_extra_fields = { 'image_url' : form.ImageUploadField('Image' , base_path=file_path, relative_path='uploadFile/' ) } admin.add_view(ImagesView(Images, db.session))