Flask基础全套

Flask基础全套,第1张

概述Flask简介 Flask是主流PythonWeb三大框架之一,其特点是短小精悍以及功能强大从而获得众多Pythoner的追捧,相比于Django它更加简单更易上手,Flask拥有非常强大的三方库,提 Flask简介

   Flask是主流PythonWeb三大框架之一,其特点是短小精悍以及功能强大从而获得众多Pythoner的追捧,相比于Django它更加简单更易上手,Flask拥有非常强大的三方库,提供各式各样的模块对其本身进行扩充:

   Flask扩展模块

   下面是FlaskDjango本身的一些区别:

 FlaskDjango
网关接口(Wsgi)werkzeugwsgiref
模板语言(Template)Jinja2DjangoTemplate
ORMsqlAlchemyDjangoORM

   下载Flask

pip install flask
werkzeug模块

   Flask本质就是对werkzeug模块进行一些更高层次的封装,就如同Django是对wsgiref模块做了一些更高层次封装一样。所以先来看一下werkzeug模块如何使用:

from werkzeug.wrappers import Request,Responsefrom werkzeug.serving import run_simple @Request.applicationdef index(request):    return Response("Hello werkzeug")if __name__ == '__main__':    run_simple("localhost",5000,index)
简单入门基本使用

   使用Flask的小案例:

from flask import Flask# 1.创建Flask对象实例,填入构造参数app = Flask(__name__)# 2.视图函数中书写route以及vIEw@app.route("/")def index():    return "Hello Flask!"# 3.启动监听,等待链接请求  默认端口号:5000,可在run()时添加形参if __name__ == '__main__':    app.run()    # app.run(Thread=True)  # 开启多线程  

  

构造参数

   对于创建Flask实例对象,传入的构造参数有以下选项:

形参描述默认值
import_name为Flask对象取名,一般为__name__即可
static_url_path模板中访问的静态文件存放目录,默认情况下与static_folder同名None
static_folder静态文件存放的目录名称,默认当前项目中的static目录static
static_host远程静态文件所用的Host地址None
host_matching如果不是特别需要的话,慎用,否则所有的route都需要host=""的参数False
subdomain_matchingSERVER_name子域名,暂时未GET到其作用False
template_foldertemplate模板目录,默认当前项目中的templates目录templates
instance_path指向另一个Flask实例的路径None
instance_relative_config是否加载另一个实例的配置False
root_path主模块所在的目录的绝对路径,默认项目目录None
Flask配置项

   如同Django中的settings.py一样,在Flask中也拥有它自己的一些配置项。通过以下方式可对配置项进行修改。

deBUG模式

   一般来说对于Flask的开发模式都是用app.deBUG=True来完成的:

app = Flask(__name__)app.deBUG = True

   当然你也可以依照下面的方式进行修改。

config修改

   对Flask实例直接进行config的字典 *** 作修改配置项:

app = Flask(__name__)app.config["DEBUG"] = True
from_pyfile

   以py文件形式进行配置:

app = Flask(__name__)app.config.from_pyfile("flask_settings.py")# flask_settings.pyDEBUG = True
from_object

   以class与类属性的方式书写配置项:

app = Flask(__name__)app.config.from_object("flask_settings.DevelopmentConfig")# flask_settings.pyclass BaseConfig(object):    """    抽象类,只用于继承    """    DEBUG = False    TESTING = False    # 其他配置项class ProductionConfig(BaseConfig):    """    上线时的配置项    """    DATABASE_URI = 'MysqL://user@localhost/foo'class DevelopmentConfig(BaseConfig):    """    开发时的配置项    """    DEBUG = True
其他配置

   通过环境变量配置:

app.config.from_envvar("环境变量名称")# 环境变量的值为python文件名称名称,内部调用from_pyfile方法

   通过JsON格式文件配置:

app.config.from_Json("Json文件名称")# JsON文件名称,必须是Json格式,因为内部会执行Json.loads

   通过字典格式配置:

app.config.from_mapPing({'DEBUG':True})
配置项大全

   以下是Flask的配置项大全:

	'DEBUG': False,# 是否开启DeBUG模式    'TESTING': False,# 是否开启测试模式    'PROPAGATE_EXCEPTIONS': None,# 异常传播(是否在控制台打印LOG) 当DeBUG或者testing开启后,自动为True    'PRESERVE_CONTEXT_ON_EXCEPTION': None,# 一两句话说不清楚,一般不用它    'SECRET_KEY': None,# 之前遇到过,在启用Session的时候,一定要有它    'PERMANENT_SESSION_liFETIME': 31,# days,Session的生命周期(天)默认31天    'USE_X_SENDfile': False,# 是否弃用 x_sendfile    'LOGGER_name': None,# 日志记录器的名称    'LOGGER_HANDLER_POliCY': 'always','SERVER_name': None,# 服务访问域名    'APPliCATION_ROOT': None,# 项目的完整路径    'SESSION_cookie_name': 'session',# 在cookies中存放session加密字符串的名字    'SESSION_cookie_DOMAIN': None,# 在哪个域名下会产生session记录在cookies中    'SESSION_cookie_PATH': None,# cookies的路径    'SESSION_cookie_httpONLY': True,# 控制 cookie 是否应被设置 httponly 的标志,    'SESSION_cookie_SECURE': False,# 控制 cookie 是否应被设置安全标志    'SESSION_REFRESH_EACH_REQUEST': True,# 这个标志控制永久会话如何刷新    'MAX_CONTENT_LENGTH': None,# 如果设置为字节数, Flask 会拒绝内容长度大于此值的请求进入,并返回一个 413 状态码    'SEND_file_MAX_AGE_DEFAulT': 12,# hours 默认缓存控制的最大期限    'TRAP_BAD_REQUEST_ERRORS': False,# 如果这个值被设置为 True ,Flask不会执行 http 异常的错误处理,而是像对待其它异常一样,    # 通过异常栈让它冒泡地抛出。这对于需要找出 http 异常源头的可怕调试情形是有用的。    'TRAP_http_EXCEPTIONS': False,# Werkzeug 处理请求中的特定数据的内部数据结构会抛出同样也是“错误的请求”异常的特殊的 key errors 。    # 同样地,为了保持一致,许多 *** 作可以显式地抛出 BadRequest 异常。    # 因为在调试中,你希望准确地找出异常的原因,这个设置用于在这些情形下调试。    # 如果这个值被设置为 True ,你只会得到常规的回溯。    'EXPLAIN_TEMPLATE_LOADING': False,'PREFERRED_URL_SCHEME': 'http',# 生成URL的时候如果没有可用的 URL 模式话将使用这个值    'JsON_AS_ASCII': True,# 默认情况下 Flask 使用 ascii 编码来序列化对象。如果这个值被设置为 False ,    # Flask不会将其编码为 ASCII,并且按原样输出,返回它的 unicode 字符串。    # 比如 Jsonfiy 会自动地采用 utf-8 来编码它然后才进行传输。    'JsON_SORT_KEYS': True,#默认情况下 Flask 按照 JsON 对象的键的顺序来序来序列化它。    # 这样做是为了确保键的顺序不会受到字典的哈希种子的影响,从而返回的值每次都是一致的,不会造成无用的额外 http 缓存。    # 你可以通过修改这个配置的值来覆盖默认的 *** 作。但这是不被推荐的做法因为这个默认的行为可能会给你在性能的代价上带来改善。    'JsONIFY_PRETTYPRINT_REGulAR': True,'JsONIFY_MIMETYPE': 'application/Json','TEMPLATES_auto_RELOAD': None,
路由路由参数

   所有路由中的参数如下:

@app.route("/index",methods=["POST","GET"],endpoint="别名",defaults={"默认参数": 1},strict_slashes=True,redirect_to="/",subdomain=None)

   详细描述:

参数描述
methods访问方式,默认只支持GET
endpoint别名、默认为函数名,不可重复。默认为函数名
defaults当视图函数拥有一个形参时,可将它作为默认参数传递进去
strict_slashes是否严格要求路径访问,如定义的时候是/index,访问是/index/,默认是严格访问
redirect_to301永久重定向,如函数help的redirect_to是/doc,则访问help将跳转到doc函数
subdomain通过指定的域名进行访问,在浏览器中输入域名即可,本地需配置hosts文件
转换器

   Flask中拥有Django3中的转换器来捕捉用户请求的地址栏参数:

转换器含义
default接收字符串,默认的转换器
string接收字符串,和默认的一样
any可以指定多个路径
int接收整数
float接收浮点数和整数
uuID唯一标识码
path和字符串一样,但是它可以配置/,字符串不可以

   如下所示:

http://localhost:5000/article/2020-01-29@app.route("/article/<int:year>-<int:month>-<int:day>","GET"])def article(year,month,day):  	# 相当于有命分组,必须使用同样的变量名接收    # 并且还会自动转换类型,int捕捉到的就都是int类型    return f"{year}-{month}-{day}"
正则匹配

   由于参数捕捉只支持转换器,所以我们可以自定义一个转换器让其能够支持正则匹配:

from flask import Flask,url_forfrom werkzeug.routing import BaseConverterapp = Flask(import_name=__name__)class RegexConverter(BaseConverter):    """    自定义URL匹配正则表达式    """    def __init__(self,map,regex):        super(RegexConverter,self).__init__(map)        self.regex = regex    def to_python(self,value):        """        路由匹配时,匹配成功后传递给视图函数中参数的值        :param value:         :return:         """        return int(value)    def to_url(self,value):        """        使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数        :param value:         :return:         """        val = super(RegexConverter,self).to_url(value)        return val# 添加到flask中app.url_map.converters['regex'] = RegexConverter@app.route('/index/<regex("\d+"):nID>')def index(nID):    print(url_for('index',nID='888'))    return 'Index'if __name__ == '__main__':    app.run()
反向解析

   使用url_for()可在视图中反向解析出url

# url_for(endpoint,**values)print(url_for("article",**{"year": 2010,"month": 11,"day": 11}))print(url_for("article",year=2010,month=11,day=11))

   如果在模板中,也可以使用url_for()进行反向解析:

<a href='{{ url_for("article",day=11)) }}'>点我</a>
app.add_url_rule

   可以发现,Flask的路由与Django的有非常大的区别,但是通过app.add_url_rule也可以做到和Django相似。

   但是这样的做法很少,函数签名如下:

    def add_url_rule(        self,rule,# 规则        endpoint=None,# 别名        vIEw_func=None,# 视图函数        provIDe_automatic_options=None,# 控制是否自动添加options        **options    ):

   实际应用如下:

from flask import Flaskapp = Flask(__name__)def index():    return "index"def home(name):    return "Welcome Home,%s" % namerouters = [    ("/index",None,index),("/home/<string:name>",home),]for rule in routers:    app.add_url_rule(*rule)if __name__ == '__main__':    app.run()
视图请求相关

   Flaskrequest对象不是通过参数传递,而是通过导入:

from flask import request

   下面是一些常用的属性与方法:

属性/方法描述
request.headers查看所有的请求头
request.method存放请求方式
request.form存放form表单中的序列化数据,一般来说就是POST请求的数据
request.args存放url里面的序列化数据,一般来说就是GET请求的数据
request.data查看传过来所有解析不了的内容
request.Json查看前端传过来的Json格式数据,内部会自动反序列化
request.values.to_dict()存放url和from中的所有数据
request.cookies前端传过来的cookies
request.path路由地址,如:/index
request.full_path带参数的请求路由地址,如:/index?name=yunya
request.url全部地址,如:http://127.0.0.1:5000/index?name=yunya
request.host主机位,如:127.0.0.1:5000
request.host_url将主机位转换成url,如:http://127.0.0.1:5000/
request.url_root域名
file = request.files前端传过来的文件
file.filename返回文件名称
file.save()保存文件

   *** 作演示:

from flask import Flaskfrom flask import requestapp = Flask(__name__)@app.route('/index',"GET"])def index():    print(request.method)    if request.method == "GET":        return "GET"    elif request.method == "POST":        return "POST"    else:        return "ERROR"if __name__ == '__main__':    app.run()
返回响应

   返回响应一般有五种:

返回响应描述
return 'string'返回字符串
return render_template()返回模板文件
return redirect()302,重定向,可填入别名或者路由匹配地址
return Jsonify()返回Json格式数据
return Response对象直接返回一个对象,常用于取消XSS攻击预防、设置返回头等

   注意,在Flask中,都会返回csrftoken,它存放在浏览器的cookie中。当Flask模板渲染的页面发送请求时会自动携带csrftoken,这与Django并不相同

   此外,如果返回的对象不是字符串、不是元组也不是Response对象,它会将值传递给Flask.force_type类方法,将它转换成为一个响应对象

   如下所示:

from flask import Flaskapp = Flask(__name__)@app.route('/templateTest')def templatetest():    # 返回模板    from flask import render_template    return render_template("result.HTML")@app.route('/redirectTest')def redirecttest():    # 302重定向    from flask import redirect    return redirect("templateTest")@app.route('/JsonTest')def Jsontest():    # 返回Json数据    from flask import Jsonify    message = {"book": "flask","price": 199,"publish": "BeiJing"}    return Jsonify(message)@app.route('/makeResponseTest')def makeResponsetest():    # 返回Response对象    from flask import make_response    # 取消XSS攻击预防    from flask import Markup    element = Markup("<a href='https://www.Google.com'>点我一下</a>")    response = make_response(element)    #  *** 作cookie    response.set_cookie("key","oldValue")    response.delete_cookie("key")    response.set_cookie("key","newValue")    #  *** 作返回头    response.headers["jwt"] = "ajfkdasi#@#kjdfsas9f(**jfd"    return responseif __name__ == '__main__':    app.run()
session

   在Flask中,session也是通过导入来 *** 纵的,而不是通过request对象。

   需要注意的是在Flask`session的保存时长为31天,并且默认是保存在内存中,并未做任何持久化处理。

   如果想做持久化处理,则可以通过其他的一些第三方模块。

*** 作描述
session.get("key",None)获取session
session["key"]=value设置session
session.pop("key",None)删除session

   如下案例所示:

from flask import Flaskfrom flask import requestfrom flask import sessionfrom flask import Markupfrom flask import render_templatefrom flask import redirect# 第一步,导入sessionapp = Flask(__name__)# 第二步,加盐,也可以在配置文件中加盐app.secret_key = "salt"@app.route('/home')def home():    username = session.get("username")    print(username)    if username:        return "欢迎回家%s"%username    return redirect("login")@app.route('/login',methods=["GET","POST"])def login():    if request.method == "GET":        return render_template("login.HTML")    if request.method == "POST":        username = request.form.get("username")        if username:            session["username"] = username            return "您已登录" + Markup("<a href='/home'>返回home</a>")        return redirect("login")if __name__ == '__main__':    app.run()
<form method="POST">    <p><input type="text" name="username" placeholder="username"></p>    <p><input type="text" name="password" placeholder="password"></p>    <button type="submit">登录</button></form>
flash

   消息闪现flash是基于session来做的,它只会允许值被取出一次,内部通过pop()实现。

   使用方式如下:

flash("data",category="sort")  # 存入数据以及分类get_flashed_messages(with_categorIEs=False,category_filter=()) # 取出flash中的数据# with_categorIEs为True时返回一个tuple# category_filter指定数据类别,如不指定则代表取出所有

   如下所示:

from flask import Flaskfrom flask import flashfrom flask import get_flashed_messagesapp = Flask(__name__)app.secret_key = "salt"@app.route('/set_flash')def set_flash():    flash(message="dataA",category="sortA")    flash(message="dataB",category="sortB")    return "OK!!"@app.route('/get_flash/<string:choice>')def get_flash(choice):    if choice == "all":        all_data = get_flashed_messages()  # 取所有闪现消息        return str(all_data)  # ['dataA','dataB']    elif choice == "sortA":        sortA_data = get_flashed_messages(category_filter=("sortA",)) # 取类别A的所有闪现消息        return str(sortA_data)  # ['dataA']    elif choice == "sortB":        sortB_data = get_flashed_messages(category_filter=("sortB",))  # 取类别B的所有闪现消息        return str(sortB_data)  # ['dataB']    else:        return "ERROR"if __name__ == '__main__':    app.run()
FBV

   如果不是做前后端分离,那么Flask应用最多的还是FBV

from flask import Flaskapp = Flask(__name__)@app.route('/index')def index():    return "index"if __name__ == '__main__':    app.run()
CBV

   使用CBV必须导入vIEws.MethodVIEw且继承它,初此之外必须使用app.add_url_rule添加路由与视图的关系映射:

from flask import Flaskfrom flask.vIEws import MethodVIEwapp = Flask(__name__)class Home(MethodVIEw):    methods = ["GET","POST"]  # 该类中允许的请求方式    decorators = []  # 装饰器添加在这里    def dispatch_request(self,*args,**kwargs):        print("首先执行该方法")        return super(Home,self).dispatch_request(*args,**kwargs)    def get(self):        return "Home,Get"    def post(self):        return "Home,Post"app.add_url_rule("/home",vIEw_func=Home.as_vIEw(name="home"))if __name__ == '__main__':    app.run()
RESTAPI

   如果项目是前后端分离的,则需要借助第三方模块flask-restful,详情查阅官网:

   点我跳转

文件上传案例

   保存上传文件的案例:

from flask import Flask,request app = Flask(__name__) '''因为是文件,所以只能是POST方式'''@app.route("/upload",methods=["POST"])def upload():    """接受前端传送来的文件"""    file_obj = request.files.get("pic")    if file_obj is None:        # 表示没有发送文件        return "未上传文件"     '''        将文件保存到本地(即当前目录)        直接使用上传的文件对象保存    '''    file_obj.save('pic.jpg')  # 和前端上传的文件类型要相同    return "上传成功"     # 将文件保存到本地(即当前目录) 普通的保存方法    # with open("./pic.jpg",'wb') as f:    #     data = file_obj.read()    #     f.write(data)    #     return "上传成功" if __name__ == '__main__':    app.run(deBUG=True)

   其他的一些补充知识:

file_obj.stream  # 文件流,即文件的二进制对象from werkzeug.datastructures import fileStorage  # 查看详情,文件对象的具体方法
模板jinja2简介

   jinja2Flask中默认的模板语言,相比于DjangoTemplate它更加的符合Python语法。

   如在模板传参中,如果视图中传入是一个dict,那么在DTL中只能通过.的方式进行深度获取,而在jinja2中则可以通过[]的方式进行获取。

   此外,在DTL中如果视图传入一个function则会自动加括号进行调用,而在jinja2中就不会进行自动调用而是要自己手动加括号进行调用。

   总而言之,jinja2相比于DTL来说更加的人性化。

模板传参

   模板传参可以通过k=v的方式传递,也可以通过**dict的方式进行解包传递:

@app.route('/index')def index():    context = {        "name": "云崖","age": 18,"hobby": ["篮球","足球"]    }    return render_template("index.HTML",**context)    # return render_template("index.HTML",name="云崖",age=18)

   渲染,通过{{}}进行:

<body>    <p>{{name}}</p>    <p>{{age}}</p>    <p>{{hobby.0}}-{{hobby[1]}}</p></body>
内置过滤器

   常用的内置过滤器如下:

过滤器描述
escape转义字符
safe关闭XSS预防,关闭转义
stripTags删除字符串中所有的HTML标签,如果有多个空格连续,将替换为一个空格
first返回容器中的第一个元素
last返回容器中的最后一个元素
length返回容器总长度
abs绝对值
int转换为int类型
float转换为float类型
join字符串拼接
lower转换为小写
upper转换为大写
cAPItialize把值的首字母转换成大写,其他子母转换为小写
Title把值中每个单词的首字母都转换成大写
trim把值的首尾空格去掉
round默认对数字进行四舍五入,也可以用参数进行控制
replace替换
format格式化字符串
truncate截取length长度的字符串
default相当于or,如果渲染变量没有值就用default中的值

   使用内置过滤器:

<p>{{gender | default("性别不详")}}</p>
分支循环

   iffor都用{% %}进行包裹,与DTL中使用相似。

   在for中拥有以下变量,用来获取当前的遍历状态:

for循环的遍历描述
loop.index当前遍历次数,从1开始计算
loop.index0当前遍历次数,从0开始计算
loop.first第一次遍历
loop.last最后一次遍历
loop.length遍历对象的长度
loop.revindex到循环结束的次数,从1开始
loop.revindex0到循环结束的次数,从0开始

   下面是一则示例:

<body>    {% for item in range(10) %}        {% if loop.first %}            <p>第一次遍历开始--->{{loop.index}}</p>        {% elif loop.last %}            <p>最后一次遍历开始-->{{loop.index}}</p>            <p>遍历了一共{{loop.length}}次</p>        {% else %}            <p>{{loop.index}}</p>        {% endif %}    {% endfor %}</body>

   结果如下:

第一次遍历开始--->123456789最后一次遍历开始-->10遍历了一共10次
宏的使用

   在模板中的宏类似于Python中的函数,可对其进行传值:

<body>    <!--定义宏,后面是默认的参数-->    {% macro input(name,value="",type="text") %}        <input name="{{ name }}" value="{{ value }}" type="{{ type }}">    {% endmacro %}        <!--使用宏-->    <form action="">        <p>username:{{ input("username") }}</p>        <p>password:{{ input("pwd",type="password")}}</p>        <p>{{ input(value="login",type="submit") }}</p>    </form></body>

   可以在一个模板中专门定义宏,其他模板中再进行导入:

# 导入方式一# with context可以把后端传到当前模板的变量传到定义的宏里面{% import "macros.HTML" as macro with context %}      <form>        <p>用户名:{{ macro.input('username') }}</p>        <p>密码:{{ macro.input('password',type="password" )}}</p>        <p> {{ macro.input(value="提交",type="submit" )}}</p>    </form># 导入方式二{% from "macros.HTML" import input as input_fIEld %}     <form>        <p>用户名:{{ input_fIEld('username') }}</p>        <p>密码:{{ input_fIEld('password',type="password" )}}</p>        <p> {{ input_fIEld(value="提交",type="submit" )}}</p>    </form>
定义变量

   在模板中可通过{% set %}{% with %}定义变量。

   {% set %}是全局变量,可在当前模板任意位置使用

   {% with %}是局部变量,只能在{% with %}语句块中使用

<body>    {% set name="云崖" %}    <p>名字是:{{name}}</p>    {% with age=18 %}        <p>年龄是:{{age}}</p>    {% enDWith %}</body>
模板继承

   使用{% extends %}引入一个定义号的模板。

   使用{% blocak %}{% endblock %}定义块

   使用{{ super() }}引入原本的模板块内容

   定义模板如下:

<!DOCTYPE HTML><HTML lang="en"><head>    <Meta charset="UTF-8">    <Meta name="vIEwport"          content="wIDth=device-wIDth,user-scalable=no,initial-scale=1.0,maximum-scale=1.0,minimum-scale=1.0">    <Meta http-equiv="X-UA-Compatible" content="IE=edge">    <Title>jinja2学习</Title>    {% block style %}    {% endblock %}</head><body><header>    {% block header %}    <p>头部信息</p>    {% endblock %}</header><main>    {% block main %}    <p>主体信息</p>    {% endblock %}</main><footer>    <p>页面尾部</p></footer>{% block script %}{% endblock %}</body></HTML>

   导入模板并使用:

{% extends "base.HTML" %}{% block style %}<style>    h1{        color:red;    }</style>{% endblock %}{% block header %}<!--调用父模板内容-->    {{ super() }}{% endblock %}{% block main %}<h1>HELLO,欢迎来到Jinja2学习</h1>{% endblock %}{% block script %}<script>    "use strict;"    console.log("HELLO,WORLD")</script>{% endblock %}
中间件

   在Flask中的中间件使用非常少。由于Flask是基于werkzeug模块来完成的,所以按理说我们只需要在werkzeug的启动流程中添加代码即可。

   下面是中间件的使用方式,如果想了解它的原理在后面的源码分析中会有涉及。

   在Flask请求来临时会执行wsgi_app这样的一个方法,所以就在这个方法上入手:

from flask import Flaskfrom flask import render_templateapp = Flask(__name__)# 中间件class MIDdleware(object):    def __init__(self,old_wsgi_app):        self.old_wsgi_app = old_wsgi_app  # 原本要执行的wsgi_app方法    def __call__(self,environ,start_response):        print("书写代码...中间件。请求来时")        result = self.old_wsgi_app(environ,start_response)        print("书写代码...中间件。请求走时")        return result@app.route('/index')def index():    return "Hello,world"if __name__ == '__main__':    app.wsgi_app = MIDdleware(app.wsgi_app)  # 传入要原本执行的wsgi_app    app.run()
装饰器如何添加装饰器

   由于Flask的每个视图函数头顶上都有一个装饰器,且具有endpoint不可重复的限制。

   所以我们为单独的某一个视图函数添加装饰器时一定要将其添加在下方(执行顺序自下而上),此外还要使用functools.wraps修改装饰器inner()让每个装饰器的inner.__name__都不相同,来突破endpoint不可重复的限制。

   如下所示,为单独的某一个接口书写频率限制的装饰器:

from flask import Flaskfrom functools import wrapsapp = Flask(__name__)def flow(func):    @wraps(func)  # 如果不加这个,路由的别名一致都是inner就会抛出异常。func.__name__    def inner(*args,**kwargs):        # 书写逻辑,用random代替。如果是0就代表不让通过        import random        access = random.randint(0,1)        if access:            result = func(*args,**kwargs)            return result        else:            return "频率太快了"    return inner@app.route('/backend')@flowdef backend():    return "backend"@app.route('/index')@flowdef index():    return "index"if __name__ == '__main__':    app.run()
befor_request

   每次请求来的时候都会走它,由于Flask的中间件比较弱鸡,所以这种方式更常用。

   类似于Django中间件中的process_request,如果有多个顺序是从上往下,可以用它做session认证。

   如果返回的不是None,就拦截请求

@app.before_requestdef before(*args,**kwargs):    if request.path=='/login':        return None      else:        name=session.get('user')        if not name:            return redirect('/login')        else:            return None
after_request

   请求走了就会触发,类似于Djangoprocess_response,如果有多个,顺序是从下往上执行:

   必须传入一个参数,就是视图的return值

@app.after_requestdef after(response):    print('我走了')    return response
before_first_request

   目启动起来第一次会走,以后都不会走了,也可以配多个(项目启动初始化的一些 *** 作)

   如果返回的不是None,就拦截请求

@app.before_first_requestdef first():    print('我的第一次')
teardown_request

   每次视图函数执行完了都会走它。

   可以用来记录出错日志:

@app.teardown_requestdef ter(e):    print(e)    print('我是teardown_request ')
errorhandler

   绑定错误的状态码,只要码匹配就走它。

   常用于重写404页面等:

@app.errorhandler(404)def error_404(arg):    return render_template('error.HTML',message='404错误')
template_global

   定义全局的标签,如下所示:

@app.template_global()def add(a1,a2):    return a1 + a2    # 在模板中:{{ add(3,4) }}
template_filter

   定义全局过滤器,如下所示:

@app.template_filter()def db(a1,a2,a3):  # 第一个值永远都是|左边的值    return a1 + a2 + a3    # 在模板中{{ 1|db(2,3)}}
多request顺序

   如果存在多个berfor_request与多个after_request那么执行顺序是怎样的?

from flask import Flaskfrom functools import wrapsapp = Flask(__name__)@app.before_requestdef before_fist():    print("第一个before_request")@app.before_requestdef before_last():    print("第二个before_request")@app.after_requestdef before_fist(response):    print("第一个after_request")    return response@app.after_requestdef before_last(response):    print("第二个after_request")    return response@app.route('/index')def index():    return "index"if __name__ == '__main__':    app.run()    """第一个before_request第二个before_request第二个after_request第一个after_request"""

   如果第一个before_request就返回了非None进行拦截,执行顺序则和Django的不一样,Django会返回同级的process_response,而Flask还必须要走所有的after_request的:

@app.before_requestdef before_fist():    print("第一个before_request")    return "拦截了"第一个before_request第二个after_request第一个after_request
蓝图

   蓝图Blueprint的作用就是为了将功能和主服务分开。

   说的直白点就是构建项目目录,划分内置的装饰器作用域等,类似于Djangoapp的概念。

  

小型项目

   下面有一个基本的项目目录,如下所示:

- mysite	- mysite  # 包,项目根目录		- vIEws  # 文件夹,视图相关			- index.py			- backend.py		- templates			- index  # 文件夹,index相关的模板			- backend # 文件夹,backend相关的资源		- static			- index			- backend		- __init__.py	- manage.py  # 启动文件	- settings.py # 配置文件

   这样的目录结构看起来就比较清晰,那么如何对它进行管理呢?就可以使用蓝图:

# backend.pyfrom flask import Blueprintfrom flask import render_templatebck = Blueprint("bck",__name__)  # 创建蓝图对象 bck@bck.route("/login")  # 路由使用蓝图对象bck为前缀,而不是appdef login():    return render_template("backend/backend_login.HTML")
# index.pyfrom flask import Blueprintfrom flask import render_templateIDx = Blueprint("IDx",__name__)@IDx.route("/login")def login():    return render_template("index/index_login.HTML")
# __init__.pyfrom flask import Flaskfrom .vIEws.backend import bckfrom .vIEws.index import IDxdef create_app():    app = Flask(import_name=__name__)      app.register_blueprint(bck)  # 注册蓝图对象 bck    app.register_blueprint(IDx)  # 注册蓝图对象 IDx    return app
# manage.pyfrom mysite import create_appif __name__ == '__main__':    app = create_app()    app.run()
url前缀

   启动服务后发现两个功能区的login都是相同的url,导致后注册的蓝图对象永远无法访问登录页面。

   在__init__.py中注册蓝图对象的代码中添加前缀:

from flask import Flaskfrom .vIEws.backend import bckfrom .vIEws.index import IDxdef create_app():    app = Flask(import_name=__name__)    app.register_blueprint(bck,url_prefix="/backend/")    app.register_blueprint(IDx,url_prefix="/index/")    return app

   访问时:

http://127.0.0.1:5000/backend/loginhttp://127.0.0.1:5000/index/login
蓝图资源

   每个蓝图应用的资源都不相同,如下:

templates/index  # 这是index访问的模板路径templates/backend  # 这是backend访问的模板路径static/indexstatic/backend

   如何指定他们的资源呢?其实在创建蓝图对象的时候就可以指定:

from flask import Blueprintfrom flask import render_template# 使用相对路径bck = Blueprint("bck",__name__,template_folder="../templates/backend",static_folder="../static/backend",)@bck.route("/login")def login():    return render_template("backend_login.HTML")  # 注意不同蓝图对象之间的模板应该尽量不重名,重名可能导致一些错误

   如果是静态资源的访问,并不会加上前缀/backend

<img src="/static/backend/logo@2x.png" alt="">
蓝图装饰器

   蓝图装饰器分为全局装饰器和局部装饰器两种:

   全局装饰器全局有效:

def create_app():    app = Flask(import_name=__name__)    app.register_blueprint(bck,url_prefix="/backend/",)    app.register_blueprint(IDx,url_prefix="/index/")        @app.before_request    def func():        print("全局有效")            return app

   局部装饰器只在当前蓝图对象bck有效:

bck = Blueprint("bck",)@bck.before_requestdef func():    print("局部有效")

  

大型项目

   构建大型项目,就完全可以将它做的和Django相似,让每个蓝图对象都拥有自己的templatesstatic

- mysite	- mysite		- index # 包,单独的一个蓝图对象            - static  # 文件夹            - templates # 文件夹            - vIEws.py            - __init__.py # 创建蓝图对象,指定template与static	        - backend            - static            - templates            - vIEws.py            - __init__.py	- manage.py  # 启动文件	- settings.py # 配置文件
多app应用

   一个Flask程序允许多个实例,如下所示:

from werkzeug.wsgi import dispatcherMIDdlewarefrom werkzeug.serving import run_simplefrom flask import Flaskapp01 = Flask('app01')app02 = Flask('app02')@app01.route('/index')def index():    return "app01"@app02.route('/index')def index2():    return "app02"app = dispatcherMIDdleware(app01,{    '/app01': app01,'/app02': app02,})#默认使用app01的路由,也就是访问 http://127.0.0.1:5000/index 返回app01#当以app01开头时候使用app01的路由,也就是http://127.0.0.1:5000/app01/index 返回app01#当以app02开头时候使用app02的路由,也就是http://127.0.0.1:5000/app02/index 返回app02if __name__ == "__main__":    run_simple('127.0.0.1',app)
解决跨域

   解决跨域请求,可以用第三方插件,也可以自定义响应头:

@app.after_request  # 解决CORS跨域请求def cors(response):    response.headers['Access-Control-Allow-Origin'] = "*"    if request.method == "OPTIONS":        response.headers["Access-Control-Allow-headers"] = "Origin,Content-Type,cookie,Accept,Token,authorization"    return response
上下文机制全局变量

   在Flask项目启动时,会自动初始化一些全局变量。其中有几个变量尤为重要,可通过以下命令查看:

from flask import globals

   就是下面的6个变量,将贯穿整个http请求流程。

_request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack()current_app = LocalProxy(_find_app)request = LocalProxy(partial(_lookup_req_object,"request"))  # from flask import request 拿的就是它session = LocalProxy(partial(_lookup_req_object,"session"))  # from flask import session 拿的就是它g = LocalProxy(partial(_lookup_app_object,"g"))
偏函数

   上面的6个变量中有两个变量执行了类的实例化,并且有传入了一个偏函数。

   偏函数的作用在于不用传递一个参数,设置好后自动传递:

from functools import partialdef add(x,y):    return x + yadd = partial(add,1)  # 自动传递第一个参数为1,返回一个新的函数result = add(2)print(result)  # 3
列表实现栈

   栈是一种后进先出的数据结构,使用列表可以实现一个栈:

class Stack(object):    def __init__(self):        self.__stack = []    def push(self,value):        self.__stack.append(value)    @property    def top(self):        try:            return self.__stack[-1]        except IndexError as e:            return Nonestack = Stack()stack.push(1)print(stack.top)

   在Flask源码中多次有构建这个栈的地方(目前来看至少两处)。

Local

   Local对象在全局变量中会实例化两次,作用是实例化出一个字典,用于存放线程中的东西:

_request_ctx_stack = LocalStack()_app_ctx_stack = LocalStack()

   我们来看看它的源码:

try:  # 导入协程获取pID,或者是线程模块获取pID的函数    from greenlet import getcurrent as get_IDentexcept importError:    try:        from thread import get_IDent    except importError:        from _thread import get_IDentclass Local(object):	# 只能 . 这里面的    __slots__ = ("__storage__","__IDent_func__")    def __init__(self):        object.__setattr__(self,"__storage__",{})        object.__setattr__(self,"__IDent_func__",get_IDent)	# 返回可迭代对象,这里可以看出__storage__是一个字典    def __iter__(self):        return iter(self.__storage__.items())            def __call__(self,proxy):        return LocalProxy(self,proxy)    def __release_local__(self):        self.__storage__.pop(self.__IDent_func__(),None)	# 通过pID返回字典中的一个name对应的value    def __getattr__(self,name):        try:            return self.__storage__[self.__IDent_func__()][name]        except KeyError:            raise AttributeError(name)	# 构建出一个字典,{pID:{name:value}}    def __setattr__(self,name,value):        IDent = self.__IDent_func__()        storage = self.__storage__        try:            storage[IDent][name] = value        except KeyError:            storage[IDent] = {name: value}    def __delattr__(self,name):        try:            del self.__storage__[self.__IDent_func__()][name]        except KeyError:            raise AttributeError(name)
LocalStack

   用于 *** 纵Local中构建的字典:

class LocalStack(object):   	# 实例化    def __init__(self):        self._local = Local()    def __release_local__(self):        self._local.__release_local__()    @property    def __IDent_func__(self):        return self._local.__IDent_func__    @__IDent_func__.setter    def __IDent_func__(self,value):        object.__setattr__(self._local,value)    def __call__(self):        def _lookup():            rv = self.top            if rv is None:                raise RuntimeError("object unbound")            return rv        return LocalProxy(_lookup)	# 向Local字典中添加一个名为stack的列表	# {pID:{"stack":[]}}    def push(self,obj):            rv = getattr(self._local,"stack",None)        if rv is None:            self._local.stack = rv = []        rv.append(obj)        return rv        	# 消息闪现的实现原理,获取或者移除    def pop(self):        stack = getattr(self._local,None)        if stack is None:            return None        elif len(stack) == 1:            release_local(self._local)            return stack[-1]        else:            return stack.pop()	# 只获取    @property    def top(self):        try:            return self._local.stack[-1]        except (AttributeError,IndexError):            return None
LocalProxy

   访问Local时用LocalProxy,实际上是一个代理对象:

current_app = LocalProxy(_find_app)request = LocalProxy(partial(_lookup_req_object,"request"))session = LocalProxy(partial(_lookup_req_object,"session"))g = LocalProxy(partial(_lookup_app_object,"g"))

   这四句话代表四个意思,使用最多的范围如下:

from flask import requestfrom flask import sessionfrom flask import gfrom flask import current_app

   源码如下:

@implements_boolclass LocalProxy(object):    __slots__ = ("__local","__dict__","__name__","__wrapped__")    def __init__(self,local,name=None):        object.__setattr__(self,"_LocalProxy__local",local)        object.__setattr__(self,name)        if callable(local) and not hasattr(local,"__release_local__"):            object.__setattr__(self,"__wrapped__",local)    def _get_current_object(self):        if not hasattr(self.__local,"__release_local__"):            return self.__local()        try:            return getattr(self.__local,self.__name__)        except AttributeError:            raise RuntimeError("no object bound to %s" % self.__name__)    @property    def __dict__(self):        try:            return self._get_current_object().__dict__        except RuntimeError:            raise AttributeError("__dict__")    def __repr__(self):        try:            obj = self._get_current_object()        except RuntimeError:            return "<%s unbound>" % self.__class__.__name__        return repr(obj)    def __bool__(self):        try:            return bool(self._get_current_object())        except RuntimeError:            return False    def __unicode__(self):        try:            return unicode(self._get_current_object())  # noqa        except RuntimeError:            return repr(self)    def __dir__(self):        try:            return dir(self._get_current_object())        except RuntimeError:            return []    def __getattr__(self,name):        if name == "__members__":            return dir(self._get_current_object())        return getattr(self._get_current_object(),name)    def __setitem__(self,key,value):        self._get_current_object()[key] = value    def __delitem__(self,key):        del self._get_current_object()[key]    if PY2:        __getslice__ = lambda x,i,j: x._get_current_object()[i:j]        def __setslice__(self,j,seq):            self._get_current_object()[i:j] = seq        def __delslice__(self,j):            del self._get_current_object()[i:j]    __setattr__ = lambda x,n,v: setattr(x._get_current_object(),v)    __delattr__ = lambda x,n: delattr(x._get_current_object(),n)    __str__ = lambda x: str(x._get_current_object())    __lt__ = lambda x,o: x._get_current_object() < o    __le__ = lambda x,o: x._get_current_object() <= o    __eq__ = lambda x,o: x._get_current_object() == o    __ne__ = lambda x,o: x._get_current_object() != o    __gt__ = lambda x,o: x._get_current_object() > o    __ge__ = lambda x,o: x._get_current_object() >= o    __cmp__ = lambda x,o: cmp(x._get_current_object(),o)  # noqa    __hash__ = lambda x: hash(x._get_current_object())    __call__ = lambda x,*a,**kw: x._get_current_object()(*a,**kw)    __len__ = lambda x: len(x._get_current_object())    __getitem__ = lambda x,i: x._get_current_object()[i]    __iter__ = lambda x: iter(x._get_current_object())    __contains__ = lambda x,i: i in x._get_current_object()    __add__ = lambda x,o: x._get_current_object() + o    __sub__ = lambda x,o: x._get_current_object() - o    __mul__ = lambda x,o: x._get_current_object() * o    __floordiv__ = lambda x,o: x._get_current_object() // o    __mod__ = lambda x,o: x._get_current_object() % o    __divmod__ = lambda x,o: x._get_current_object().__divmod__(o)    __pow__ = lambda x,o: x._get_current_object() ** o    __lshift__ = lambda x,o: x._get_current_object() << o    __rshift__ = lambda x,o: x._get_current_object() >> o    __and__ = lambda x,o: x._get_current_object() & o    __xor__ = lambda x,o: x._get_current_object() ^ o    __or__ = lambda x,o: x._get_current_object() | o    __div__ = lambda x,o: x._get_current_object().__div__(o)    __truediv__ = lambda x,o: x._get_current_object().__truediv__(o)    __neg__ = lambda x: -(x._get_current_object())    __pos__ = lambda x: +(x._get_current_object())    __abs__ = lambda x: abs(x._get_current_object())    __invert__ = lambda x: ~(x._get_current_object())    __complex__ = lambda x: complex(x._get_current_object())    __int__ = lambda x: int(x._get_current_object())    __long__ = lambda x: long(x._get_current_object())  # noqa    __float__ = lambda x: float(x._get_current_object())    __oct__ = lambda x: oct(x._get_current_object())    __hex__ = lambda x: hex(x._get_current_object())    __index__ = lambda x: x._get_current_object().__index__()    __coerce__ = lambda x,o: x._get_current_object().__coerce__(x,o)    __enter__ = lambda x: x._get_current_object().__enter__()    __exit__ = lambda x,**kw: x._get_current_object().__exit__(*a,**kw)    __radd__ = lambda x,o: o + x._get_current_object()    __rsub__ = lambda x,o: o - x._get_current_object()    __rmul__ = lambda x,o: o * x._get_current_object()    __rdiv__ = lambda x,o: o / x._get_current_object()    if PY2:        __rtruediv__ = lambda x,o: x._get_current_object().__rtruediv__(o)    else:        __rtruediv__ = __rdiv__    __rfloordiv__ = lambda x,o: o // x._get_current_object()    __rmod__ = lambda x,o: o % x._get_current_object()    __rdivmod__ = lambda x,o: x._get_current_object().__rdivmod__(o)    __copy__ = lambda x: copy.copy(x._get_current_object())    __deepcopy__ = lambda x,memo: copy.deepcopy(x._get_current_object(),memo)
基本概念

   在Flask中,每一次http请求的到来都会执行一些 *** 作。

   举个例子,Django里面request是通过形参的方式传递进视图函数,这个很好实现,那么Flask中的request则是通过导入的方式作用于视图函数,这意味着每次request中的数据都要进行更新。

   它是如何做到的呢?这个就是Flask的精髓,上下文管理。

   上面说过,Local对象会实例化两次:

_app_ctx_stack = LocalStack()_request_ctx_stack = LocalStack()

   它的实现原理是这样的,每一次http请求来临都会创建一个线程,Local对象就会依照这个线程的pID来构建出一个字典,这里用掉的对象是_request_ctx_stack,它内部有一个叫做__storage__的变量,最终会搞成下面的数据格式:

{	pID001:{stack:[<app_ctx = RequestContext request,session]},# 存储request对象以及session	pID002:{stack:[<app_ctx = RequestContext request,}

   而除开_request_ctx_stack外还会有一个叫做_app_ctx_stack的东西,它会存放当前Flask实例app以及一个g对象:

{	pID001:{stack:[<app_ctx = flask.ctx.AppContext app,g>]},pID002:{stack:[<app_ctx = flask.ctx.AppContext app,}

   每一次请求来的时候都会创建这样的两个字典,请求走的时候进行销毁。

   在每次导入request/session时都会从上面的这个__storage__字典中,拿出当前线程对应的pID中的request/session,以达到更行的目的。

功能
Local构建大字典
Localstack构建stack这个列表实现的栈
LocalProxy控制获取stack列表中栈的数据,如导入时引入request,怎么样将stack中的request拿出来
Flask流程之__call__

   Flask基本请求流程是建立在werkzeug之上:

from werkzeug.wrappers import Request,index)

   可以看到,werkzeug在开启服务后,会执行一个叫run_simple的函数,并且会调用被装饰器包装过后的index函数。

   也就意味着,在run_simple传参时,第三个参数形参名application会加括号进行调用。

   如果你传入一个类,它将执行__init__方法,如果你传入一个实例对象,它将执行其类的__call__方法。

   如下所示:

from werkzeug.serving import run_simpleclass Test:    def __init__(self,**kwargs):        print("run init")if __name__ == '__main__':    run_simple("localhost",Test)    # run init

   示例二:

from werkzeug.serving import run_simpleclass Test:    def __init__(self,**kwargs):        super(Test,self).__init__(*args,**kwargs)    def __call__(self,**kwargs):        print("run call")test = test()if __name__ == '__main__':    run_simple("localhost",test)# run call

   OK,现在牢记一点,如果传入的是函数,执行其类的__call__

   接下来我们看Flask程序:

from flask import Flaskapp = Flask(__name__)@app.route('/index')def index():    return "index"if __name__ == '__main__':    app.run()

   当请求来时,会执行run方法,我们朝里看源码,直接拉到run方法的下面,在run方法中调用了run_simple方法,其中的第三个参数就是当前的Flask实例对象app

try:    run_simple(host,port,self,**options)finally:    self._got_first_request = False

   所以到了这里,实例调用父类的__call__Flask类本身),继续看app.__call__,实例本身没有找其类的。

#  app.__call__ 鼠标左键点进去def __call__(self,start_response):	return self.wsgi_app(environ,start_response)

   可以看见,在这里它调用的是app.wsgi_app方法,这也就能解释Flask中间件为什么重写下面这段代码。

app.wsgi_app = MIDdleware(app.wsgi_app)  # 传入要原本执行的wsgi_app

  

Flask流程之wsgi_app

   原生的Flask.wsgi_app中的代码是整个Flask框架中的核心,如下所示:

    def wsgi_app(self,start_response):    	# 参数 self就是Flask实例对象app,environ是werkzeug的原生http请求对象        ctx = self.request_context(environ)  # 对environ这个原生的请求对象进行封装,封装成一个request对象,可以拥有request.method/.args/.form等方法                error = None        try:            try:            	# 上下文管理,将ctx放入Local大字典中,并且会更新session,从http请求中拿出session,此外还会将当前实例app,也就是self放入Local的另一个大字典中,当然还有g对象                ctx.push()                                # 执行视图函数                response = self.full_dispatch_request()                             	# 捕获异常            except Exception as e:                error = e                response = self.handle_exception(e)            except:                 error = sys.exc_info()[1]                raise                                          # 返回请求            return response(environ,start_response)         finally:            if self.should_ignore_error(error):                  error = None            # 清除上下文管理内容            ctx.auto_pop(error)
Flask流程之Resquest

   看下面这一行代码,它其实是实例化一个对象,用于封装environ这个原始的http请求:

    def wsgi_app(self,start_response):     	# self:app,也就是Flask实例        ctx = self.request_context(environ)

   点开它,发现会返回一个实例对象:

    def request_context(self,environ):    	# self:app,也就是Flask实例        return RequestContext(self,environ)  # 注意这里传参,__init__的第二个参数是app实例

   去找它的__init__方法:

class RequestContext(object):    def __init__(self,app,request=None,session=None):    	# self:ReuqetContext对象 app是Flask实例        self.app = app        if request is None:            request = app.request_class(environ)  # 执行这里、封装request        self.request = request        self.url_adapter = None        try:            self.url_adapter = app.create_url_adapter(self.request)  # 创建url与视图函数对应关系,这里不看        except httpException as e:            self.request.routing_exception = e        self.flashes = None        self.session = session  # 注意 session 是一个None

   先看上面的一句,封装request对象,由于selfapp,所以找Flask类中的request_class,它是一个类属性:

request_class = Request

   加括号进行调用,并且传递了environ,所以要进行实例化,找它的__init__方法,发现是一个多继承类:

class Request(RequestBase,JsONMixin):	# 该类本身未实现__init__,去找它的父类,从左到右找

   找它的父类,RequestBase,也是一个多继承类:

class Request(    BaseRequest,AcceptMixin,ETagRequestMixin,UserAgentMixin,AuthorizationMixin,CORSRequestMixin,CommonRequestDescriptorsMixin,):	# 该类本身未实现__init__,去找它的父类,从上到下

   再继续向上找,找BaseRequest类:

class BaseRequest(object):    def __init__(self,populate_request=True,shallow=False):    # self:Request,因为是Request对象要实例化    self.environ = environ    if populate_request and not shallow:    	self.environ["werkzeug.request"] = self    self.shallow = shallow        # 该类还实现了args\form\files等方法    # 大体意思就是说调用这些方法的时候会将environ中的数据解析出来

   然后将结果返回给ctx,这个ctx就是RequestContext的实例对象,里面有个Request实例对象request,还有个session,不过是None

<ctx=RequestContext request,session=None>
Flask流程之ctx.push

   接着往下看代码,记住现在的线索:

def wsgi_app(self,start_response):	ctx = self.request_context(environ)	# <ctx=RequestContext request,session=None>	    error = None    try:    	try:        	ctx.push()  # 接下来着重看这里            response = self.full_dispatch_request()		except Exception as e:

   执行ctx.push,这个方法可以说非常的绕。

	 def push(self):		# 参数:self是ctx,也就是 <ctx=RequestContext request,session=None>				# _request_ctx_stack = LocalStack() 全局变量,已经做好了        top = _request_ctx_stack.top  # None        if top is not None and top.preserved:  # False 不走这里            top.pop(top._preserved_exc)		# _app_ctx_stack = LocalStack() 全局变量,已经做好了        app_ctx = _app_ctx_stack.top  # None        if app_ctx is None or app_ctx.app != self.app:  # 会走这里,因为第一个条件成立            app_ctx = self.app.app_context()  # 走这里,实际上就是实例化            app_ctx.push()             self._implicit_app_ctx_stack.append(app_ctx)        else:            self._implicit_app_ctx_stack.append(None)        if hasattr(sys,"exc_clear"):            sys.exc_clear()        _request_ctx_stack.push(self)               if self.session is None:            session_interface = self.app.session_interface            self.session = session_interface.open_session(self.app,self.request)            if self.session is None:                self.session = session_interface.make_null_session(self.app)        if self.url_adapter is not None:            self.match_request()

   现在来看 app_ctx到底是个神马玩意儿:

    def app_context(self):    	# self:Flask实例化对象,app        return AppContext(self)

   继续走:

class AppContext(object):    def __init__(self,app):    	# self:AppContext的实例对象,app就是Flask的实例对象        self.app = app  # AppContext实例对象的app就是Flask的实例对象        self.url_adapter = app.create_url_adapter(None)        self.g = app.app_ctx_globals_class()  # 一个空的g对象        self._refcnt = 0

   现在回来,第二个重要点来了:

 		app_ctx = _app_ctx_stack.top  # None        if app_ctx is None or app_ctx.app != self.app:  # 会走这里,因为第一个条件成立            app_ctx = self.app.app_context()  # <app_ctx = flask.ctx.AppContext app,g> app是当前Flask实例对象,g是一个空的玩意儿,详细代码不用看了            app_ctx.push()   # 又执行push            self._implicit_app_ctx_stack.append(app_ctx)        else:            self._implicit_app_ctx_stack.append(None)
Flask流程之app_ctx.push

   这个pushAppContext中的push

    def push(self):    	# self: <app_ctx = flask.ctx.AppContext app,g>        self._refcnt += 1        if hasattr(sys,"exc_clear"):            sys.exc_clear()        _app_ctx_stack.push(self) # 存入        appcontext_pushed.send(self.app) # 这里不看了

   其实就是往Local的大字典中进行存放,不过是通过LocalStack这个类的push方法:

    def push(self,obj):        rv = getattr(self._local,None)  # 触发Local.__getattr__返回一个None        if rv is None: # 走这里,触发Local.__setattr__            self._local.stack = rv = []        rv.append(obj)  # 直接存,触发Local.__setattr__        return rv

   数据结构:

{	pID001:{stack:[<app_ctx = flask.ctx.AppContext app,}

   然后返回app_ctx.push:

		app_ctx = _app_ctx_stack.top  # None        if app_ctx is None or app_ctx.app != self.app:  # 会走这里,因为第一个条件成立            app_ctx = self.app.app_context()  # <app_ctx = flask.ctx.AppContext app,g> app是当前Flask实例对象,g是一个空的玩意儿,详细代码不用看了            app_ctx.push()   # 存入成功            self._implicit_app_ctx_stack.append(app_ctx)  # 走这里了 ctx类RequestContext中有一个列表,把他存进来        else:            self._implicit_app_ctx_stack.append(None) 
Flask流程之_request_ctx_stack.push

   继续向下看ctx.push中的代码,这里主要是封装请求上下文:

	 def push(self):		# self:ctx对象 <ctx=RequestContext request,session=None>				# _request_ctx_stack = LocalStack() 全局变量,已经做好了        top = _request_ctx_stack.top  # None        if top is not None and top.preserved:  # False 不走这里            top.pop(top._preserved_exc)		# _app_ctx_stack = LocalStack() 全局变量,已经做好了        app_ctx = _app_ctx_stack.top  # None        if app_ctx is None or app_ctx.app != self.app:  # 会走这里,因为第一个条件成立            app_ctx = self.app.app_context()  # 走这里,实际上就是实例化            app_ctx.push()  # 存入Local中            self._implicit_app_ctx_stack.append(app_ctx)  # 存入ctx的列表中        else:            self._implicit_app_ctx_stack.append(None)        if hasattr(sys,"exc_clear"):  # 不管            sys.exc_clear()        _request_ctx_stack.push(self)  # 重点是这里,和上面相同,又创建了一个字典。放进去,需要注意的是这里是_request_ctx_stack.push,是另一个不同的Local实例化对象                """        {			pID001:{stack:[<app_ctx = RequestContext request,session=None]},}        """               if self.session is None:  # 设置session            session_interface = self.app.session_interface            self.session = session_interface.open_session(self.app,self.request)  # 打开session,从request中读取出cookie然后进行load反序列化            if self.session is None:                self.session = session_interface.make_null_session(self.app)        """        现在session就不是None了        {			pID001:{stack:[<app_ctx = RequestContext request,}        """        if self.url_adapter is not None:            self.match_request()
Flask流程之session

   保存session,从cookie中获取数据,反序列化后保存到session中。

class SecurecookieSessionInterface(SessionInterface):     serializer = session_Json_serializer    session_class = SecurecookieSession    def get_signing_serializer(self,app):        if not app.secret_key:            return None        signer_kwargs = dict(            key_derivation=self.key_derivation,digest_method=self.digest_method        )        return URLSafeTimedSerializer(            app.secret_key,salt=self.salt,serializer=self.serializer,signer_kwargs=signer_kwargs,)    def open_session(self,request):        s = self.get_signing_serializer(app)        if s is None:            return None        val = request.cookies.get(app.session_cookie_name)        if not val:            return self.session_class()        max_age = total_seconds(app.permanent_session_lifetime)        try:            data = s.loads(val,max_age=max_age)            return self.session_class(data)        except BadSignature:            return self.session_class()    def save_session(self,session,response):        domain = self.get_cookie_domain(app)        path = self.get_cookie_path(app)        # If the session is modifIEd to be empty,remove the cookie.        # If the session is empty,return without setting the cookie.        if not session:            if session.modifIEd:                response.delete_cookie(                    app.session_cookie_name,domain=domain,path=path                )            return        # Add a "vary: cookie" header if the session was accessed at all.        if session.accessed:            response.vary.add("cookie")        if not self.should_set_cookie(app,session):            return        httponly = self.get_cookie_httponly(app)        secure = self.get_cookie_secure(app)        samesite = self.get_cookie_samesite(app)        expires = self.get_expiration_time(app,session)        val = self.get_signing_serializer(app).dumps(dict(session))        response.set_cookie(            app.session_cookie_name,val,expires=expires,httponly=httponly,path=path,secure=secure,samesite=samesite,)
导入源码

   如果使用from flask import request它会通过LocalProxy去拿到Local中的ctx对象然后进行解析,拿到request对象。

request = LocalProxy(partial(_lookup_req_object,"request"))  # 实例化

   在这里可以看见实例化了一个LocalProxy对象:

@implements_boolclass LocalProxy(object):    def __init__(self,name=None):    	# local:偏函数    	# name:None        object.__setattr__(self,local)  # self.__local = local  双下开头会改变名字        object.__setattr__(self,name)  # None        if callable(local) and not hasattr(local,"__release_local__"):  # 运行,如果偏函数可执行,并且偏函数没有属性__release_local__时执行             object.__setattr__(self,local)  # 当前实例增加属性,指向偏函数

   当要使用request.method时,触发LocalProxy__getattr__方法:

    def __getattr__(self,name):    	# name:method        if name == "__members__":            return dir(self._get_current_object())        return getattr(self._get_current_object(),name)  # 执行这里,name = method

   在_get_current_object中执行self.__local()

   def _get_current_object(self):        if not hasattr(self.__local,"__release_local__"):            return self.__local()   #  self.__local就是偏函数,偏函数自动传参。request        try:            return getattr(self.__local,self.__name__)         except AttributeError:            raise RuntimeError("no object bound to %s" % self.__name__)

   偏函数,第二个参数是request,也就是说_lookup_req_object的参数默认就是request

def _lookup_req_object(name):	# name:request字符串    top = _request_ctx_stack.top  # 返回RequestContext对象,里面有request和session    if top is None:        raise RuntimeError(_request_ctx_err_msg)    return getattr(top,name)  # 返回的就是request对象,从RequestContext对象中拿到request对象

   拿到request对象后继续看__getattr__方法,获取method

    def __getattr__(self,name)  # 执行这里,name = method
Flask流程之pop

   wsgi_app返回和清栈还没有看:

	def wsgi_app(self,start_response):		ctx = self.request_context(environ)  # 封装request、session到RequestContext对象        error = None        try:            try:                ctx.push()  # 封装app上下文和请求上下文,填充session内容                response = self.full_dispatch_request()  # 执行视图函数            except Exception as e:                error = e                response = self.handle_exception(e)            except:  # noqa: B001                error = sys.exc_info()[1]                raise            return response(environ,start_response)  # 封装返回对象        finally:            if self.should_ignore_error(error):  # 清除上下文,两个字典中的内容                error = None            ctx.auto_pop(error)

   最主要就是看清栈:

    def auto_pop(self,exc):        if self.request.environ.get("flask._preserve_context") or (            exc is not None and self.app.preserve_context_on_exception        ):            self.preserved = True            self._preserved_exc = exc        else:            self.pop(exc)  # 看这里就行了

   接着看pop方法:

    def pop(self,exc=_sentinel):        app_ctx = self._implicit_app_ctx_stack.pop() # 清除Flask实例中存放的app和g对象, [<app_ctx = flask.ctx.AppContext app,g>]        try:            clear_request = False            if not self._implicit_app_ctx_stack:             	# 这里都会执行,但是不看                self.preserved = False                self._preserved_exc = None                if exc is _sentinel:                    exc = sys.exc_info()[1]                self.app.do_teardown_request(exc)                if hasattr(sys,"exc_clear"):                    sys.exc_clear()                request_close = getattr(self.request,"close",None)                if request_close is not None:                    request_close()                clear_request = True        finally:        	# 清除请求上下文中的数据            rv = _request_ctx_stack.pop() # # LocalProxy.pop()          	# 修改request中的werkzeug.request为None,本身是Request对象本身            if clear_request:                rv.request.environ["werkzeug.request"] = None        	# 清除应用上下文中的数据            if app_ctx is not None:                app_ctx.pop(exc)  # LocalProxy.pop()            assert rv is self,"Popped wrong request context. (%r instead of %r)" % (                rv,)

   LocalProxy.pop中的代码:

   def pop(self):        stack = getattr(self._local,None)        if stack is None:            return None        elif len(stack) == 1:            release_local(self._local)  # Local.__storage__.pop(self.__IDent_func__(),None)            return stack[-1]  # 如果只剩下一个,就返回        else:            return stack.pop() # 清理干净 {pID:{"statck":[]}}
Flask流程之before_request

   before_request实现挺简单的,用一个列表,将所有被装饰函数放进来。再执行视图函数之前把列表中所有被berore_request装饰的函数先执行一遍:

    @setupmethod    def before_request(self,f):		self:app,Flask实例		f:被装饰函数		        self.before_request_funcs.setdefault(None,[]).append(f)  # 从与i个字典中获取None,如果没获取到就是一个空列表,后面使用了append代表None对应的k就是一个列表。        # 这句话的意思就是说,从一个字典中{None:[func,func,func]}出一个列表,获取不到就创建一个空列表{None:[]},并且把被装饰的函数f添加进去        return f

   在wsgi_app中查看源码:

# 查看执行视图函数这一句response = self.full_dispatch_request()

   点进去看,执行视图函数前发生了什么:

    def full_dispatch_request(self):        self.try_trigger_before_first_request_functions()  # 先执行这berfor_first_request装饰的函数        try:            request_started.send(self)            rv = self.preprocess_request()  # 在执行berfor_first_request装饰的函数            if rv is None:                rv = self.dispatch_request()  # 开始执行视图函数,rv=返回值        except Exception as e:            rv = self.handle_user_exception(e)        return self.finalize_request(rv)

   关键代码:

   def preprocess_request(self):		# self:app,Flask实例对象        bp = _request_ctx_stack.top.request.blueprint  # 获取蓝图        funcs = self.url_value_preprocessors.get(None,())        if bp is not None and bp in self.url_value_preprocessors:             funcs = chain(funcs,self.url_value_preprocessors[bp])        for func in funcs:            func(request.endpoint,request.vIEw_args)        funcs = self.before_request_funcs.get(None,())   # 获取列表,[func1,func2],key是None        if bp is not None and bp in self.before_request_funcs:   # 如果蓝图存在,将蓝图的全局before_request也添加到列表中            funcs = chain(funcs,self.before_request_funcs[bp])          for func in funcs:   # 运河,执行            rv = func()            if rv is not None:  # 返回值如果不是None就拦截                return rv

   而使用after_request装上后的函数也会被放到一个列表中,其他的具体实现都差不多:

  @setupmethod    def after_request(self,f):		self:app,Flask实例		f:被装饰函数        self.after_request_funcs.setdefault(None,[]).append(f)        return f
源码流程图

  

  

g的作用

   一次请求流程中的一些共同数据,可以用g进行存储:

from flask import Flask,gapp = Flask(__name__)@app.before_requestdef func():    g.message = "仅当次请求有效"@app.route('/index')def index():    print(g.message)    return "index"if __name__ == '__main__':    app.run()
current_app的作用

   可以导入当前的配置:

from flask import Flask,current_appapp = Flask(__name__)@app.before_requestdef func():    if current_app.deBUG == False:        current_app.deBUG = True        print("修改成功")@app.route('/index')def index():    return "index"if __name__ == '__main__':    app.run()
WTforms

   wtforms类似于Django中的forms组件,用于数据验证和生成HTML

   官方文档:https://wtforms.readthedocs.io/en/stable/index.html#

基本使用

   首先进行安装:

pip3 install wtforms

   一个简单的注册示例:

from flask import Flaskfrom flask import requestfrom flask import render_templatefrom flask import Markupfrom wtforms import Form  # 必须继承from wtforms import valIDators  # 自定义认证器from wtforms import Widgets  # HTML生成插件from wtforms import fIElds # 字段导入class LoginForm(Form):    name = fIElds.StringFIEld(        label="用户名",Widget=Widgets.Textinput(),render_kw={"class": "form-control"},valIDators=[            valIDators.Datarequired(message="用户名不能为空"),valIDators.Length(max=8,min=3,message="用户名长度必须大于%(max)d且小于%(min)d")        ]    )    pwd = fIElds.PasswordFIEld(        label="密码",Widget=Widgets.Passwordinput(),render_kw={"class": "form-control",},valIDators=[            valIDators.Datarequired(message="密码不能为空"),valIDators.Length(max=18,min=4,message="密码长度必须大于%(max)d且小于%(min)d"),valIDators.Regexp(regex="\d+",message="密码必须是数字"),]    )app = Flask(__name__,template_folder="templates")@app.route('/login',"POST"])def login():    if request.method == "GET":        form = LoginForm()        return render_template("login.HTML",**{"form":form})    form = LoginForm(formdata=request.form)    if form.valIDate():        print("用户提交的数据用过格式验证,值为:%s" % form.data)        return "登录成功"    else:        print(form.errors,"错误信息")    return render_template("login.HTML",**{"form":form})if __name__ == '__main__':    app.run()

   前端渲染:

<form method="POST" novalIDate>    {% for item in form %}      <p> {{item.label}}:{{item}}</p>      <p >{{item.errors[0]}}</p>    {% endfor %}    <p>        <button type="submit">提交</button>    </p></form>

  

Form实例化

   以下是Form类的实例化参数:

参数描述
formdata需要被验证的form表单数据
obj如果formdata为空或未提供,则检查此对象的属性是否与表单字段名称匹配,这些属性将用于字段值
prefix字段前缀匹配,当传入该参数时,所有验证字段必须以这个开头(无太大意义)
data当formdata参数和obj参数都有时候,可以使用该参数传入字典格式的待验证数据或者生成HTML的默认值,列如:{'usernam':'admin’}
Meta用于覆盖当前已经定义的form类的Meta配置,参数格式为字典

   使用data来构建默认值,常用于文章编辑等,需要填入原本数据库中查询出的文章。

   下面用默认用户做演示:

def login():    if request.method == "GET":        form = LoginForm(data={"name":"默认用户"})        return render_template("login.HTML",**{"form":form})

  

字段介绍

   字段的继承,fIElds是常用类,它继承了其他的一些类:

from wtforms.fIElds.core import *from wtforms.fIElds.simple import *from wtforms.fIElds.core import Label,FIEld,SelectFIEldBase,Flagsfrom wtforms.utils import unset_value as _unset_value

   一般都字段都会默认生成一种HTML标签,但是也可以通过Widget进行更改,下面是常用的一些字段:

字段类型描述
StringFIEld文本字段, 相当于type类型为text的input标签
TextAreaFIEld多行文本字段
PasswordFIEld密码文本字段
HIDdenFIEld隐藏文本字段
DateFIEld文本字段, 值为datetime.date格式
DateTimeFIEld文本字段, 值为datetime.datetime格式
IntegerFIEld文本字段, 值为整数
DecimalFIEld文本字段, 值为decimal.Decimal
floatFIEld文本字段, 值为浮点数
BooleanFIEld复选框, 值为True 和 False
RadioFIEld一组单选框
SelectFIEld下拉列表
SelectMultipleFIEld下拉列表, 可选择多个值
fileFIEld文件上传字段
submitFIEld表单提交按钮
Formfiled把表单作为字段嵌入另一个表单
FIEldList子组指定类型的字段

   每个字段可以为其配置一些额外的属性,如下所示:

字段属性描述
label字段别名
valIDators验证规则列表
filters过滤器列表
description字段详细描述
default默认值
Widget自定义插件,替换默认生成的HTML标签
render_kw为生成的HTML标签配置属性
choices复选框的类型

   如下所示:

class Formlearn(Form):    fIEld = fIElds.StringFIEld(        label="测试字段",# 使用插件,代替默认生成标签        valIDators=[            valIDators.Datarequired(message="必填"),valIDators.Length(max=12,message="长度验证"),],description="这是一个测试字段",default="默认值",render_kw={"style":"wIDth:60px"},# 设置键值对    )
内置验证

   使用valIDators为字段进行验证时,可指定如下的内置验证规则:

验证函数说明
Email验证电子邮件地址
EqualTo比较两个字段的值,常用于要求输入两次密码进行确认的情况
IPAddress验证IPv4网络地址
Length验证输入字符串的长度
NumberRange验证输入的值在数字范围内
Optional无输入值时跳过其他验证函数
Datarequired确保字段中有数据
Regexp使用正则表达式验证输入值
URL验证URL
AnyOf确保输入值在可选值列表中
NoneOf确保输入值不在可选列表中

   EqualTo是常用的验证方式,验证两次密码是否输入一致:

class Formlearn(Form):    password = fIElds.StringFIEld(        label="用户密码",valIDators=[            valIDators.Datarequired(message="必填"),valIDators.Length(min=8,max=16,message="必须小于8位大于16位")        ],)    re_password = fIElds.StringFIEld(        label="密码验证",valIDators=[            valIDators.EqualTo(fIEldname="password",message="两次密码输入不一致",)        ],render_kw={"placeholder": "重新输入密码"},)
Meta配置

   Meta主要用于自定义wtforms的功能,用的比较少,大多都是配置选项,以下是配置参数:

from wtforms import Form  # 必须继承from wtforms import fIElds  # 字段导入from wtforms.csrf.core import CSRF  # 自带的CSRF验证和生成from hashlib import md5  # 加密class MyCSRF(CSRF):    def setup_form(self,form):        self.csrf_context = form.Meta.csrf_context()        self.csrf_secret = form.Meta.csrf_secret        return super(MyCSRF,self).setup_form(form)    def generate_csrf_token(self,csrf_token):        gID = self.csrf_secret + self.csrf_context        token = md5(gID.encode('utf-8')).hexdigest()        return token    def valIDate_csrf_token(self,form,fIEld):        if fIEld.data != fIEld.current_token:            raise ValueError('InvalID CSRF')class Formlearn(Form):    username = fIElds.StringFIEld(label="用户名")    password = fIElds.PasswordFIEld(label="密码")    class Meta:        # CSRF相关        csrf = False  # 是否自动生成CSRF标签        csrf_fIEld_name = "csrf_token"  # 生成的CSRF标签名字        csrf_secret = "2d728321*fd&"  # 自动生成标签的值,加密用csrf_context        csrf_context = lambda x:request.url        csrf_class = MyCSRF  # 生成和比较的CSRF标签        # 其他配置        locales = ('zh','en')  # 是否支持本地化 locales = False        cache_translations = True  # 是否对本地化进行缓存        translations_cache = {}  # 保存本地化缓存信息的字段
钩子函数

   一般都用局部钩子:

class Formlearn(Form):    username = fIElds.StringFIEld(label="用户名")    password = fIElds.PasswordFIEld(label="密码")    def valIDate_username(self,obj):        """        :param obj:  字段对象,属性data就是用户输入的内容        :return:  如果不进行返回,则默认返回obj对象        """        if len(obj.data) < 6:            # raise valIDators.ValIDationError("用户名太短了") # 继续后续验证            raise valIDators.StopValIDation("用户名太短了")  # 不再继续后续验证        return obj    def valIDate_password(self,obj):        print("局部钩子")        return obj
自定义验证

   自定义验证规则:

from wtforms import valIDatorsclass ValIDatorsRule(object):    """自定义验证规则"""    def __call__(self,fIEld):        """        :param form:        :param fIEld: 使用fIEld.data,取出用户输入的信息        :return: 当return是None,则验证通过        """        import re        error_char = re.search(r"\W",fIEld.data).group(0)  # 取出第一个匹配结果        if error_char:        	raise valIDators.StopValIDation("提交数据含有特殊字符,如:%s"%error_char")  # 不再继续后续验证            # raise valIDators.ValIDationError("提交数据含有特殊字符,如:%s"%error_char) # 继续后续验证

   使用:

class LoginForm(Form):    name = simple.StringFIEld(        label="用户名",message="用户名长度必须大于%(max)d且小于%(min)d"),ValIDatorsRule(),# 使用自定义验证        ]    )
插件大全

   以下代码包含所有可能用到的插件:

from flask import Flask,render_template,redirect,requestfrom wtforms import Formfrom wtforms.fIElds import corefrom wtforms.fIElds import HTML5from wtforms.fIElds import simplefrom wtforms import valIDatorsfrom wtforms import Widgetsapp = Flask(__name__,template_folder="templates")app.deBUG = True=======================simple===========================class RegisterForm(Form):    name = simple.StringFIEld(        label="用户名",valIDators=[            valIDators.Datarequired()        ],render_kw={"class":"form-control"},default="wd"    )    pwd = simple.PasswordFIEld(        label="密码",valIDators=[            valIDators.Datarequired(message="密码不能为空")        ]    )    pwd_confim = simple.PasswordFIEld(        label="重复密码",valIDators=[            valIDators.Datarequired(message='重复密码不能为空.'),valIDators.EqualTo('pwd',message="两次密码不一致")        ],render_kw={'class': 'form-control'}    )  ========================HTML5============================    email = HTML5.EmailFIEld(  #注意这里用的是HTML5.EmailFIEld        label='邮箱',valIDators=[            valIDators.Datarequired(message='邮箱不能为空.'),valIDators.Email(message='邮箱格式错误')        ],Widget=Widgets.Textinput(input_type='email'),render_kw={'class': 'form-control'}    )  ===================以下是用core来调用的=======================    gender = core.RadioFIEld(        label="性别",choices=(            (1,"男"),(1,"女"),),coerce=int  # 传入时自动转换位int类型,否则是str类型    )    city = core.SelectFIEld(        label="城市",choices=(            ("bj","北京"),("sh","上海"),)    )    hobby = core.SelectMultipleFIEld(        label='爱好','篮球'),(2,'足球'),coerce=int    )    favor = core.SelectMultipleFIEld(        label="喜好",Widget = Widgets.ListWidget(prefix_label=False),option_Widget = Widgets.CheckBoxinput(),coerce = int,default = [1,2]    )            def __init__(self,**kwargs):  #这里的self是一个RegisterForm对象        '''        	解决数据库不及时更新的问题        	重写__init__方法        '''        super(RegisterForm,**kwargs)  #继承父类的init方法        self.favor.choices =((1,(3,'羽毛球'))  #把RegisterForm这个类里面的favor重新赋值,实现动态改变复选框中的选项    def valIDate_pwd_confim(self,fIEld,):        '''        自定义pwd_config字段规则,例:与pwd字段是否一致        :param fIEld:        :return:        '''        # 最开始初始化时,self.data中已经有所有的值        if fIEld.data != self.data['pwd']:            # raise valIDators.ValIDationError("密码不一致") # 继续后续验证            raise valIDators.StopValIDation("密码不一致")  # 不再继续后续验证          @app.route('/register',"POST"])def register():    if request.method=="GET":        form = RegisterForm(data={'gender': 1})  #默认是1,return render_template("register.HTML",form=form)    else:        form = RegisterForm(formdata=request.form)        if form.valIDate():  #判断是否验证成功            print('用户提交数据通过格式验证,提交的值为:',form.data)  #所有的正确信息        else:            print(form.errors)  #所有的错误信息        return render_template('register.HTML',form=form)if __name__ == '__main__':    app.run()
源码解析

   点我跳转

Flask-session

   、通过第三方插件Flask-session能够将session存放至其他地方,而不是只能存放在内存中:

pip install Flask-session

   使用的时候:

from flask import Flaskfrom flask import sessionfrom flask_session import Sessionimport redisapp = Flask(__name__)app.config["SESSION_TYPE"] = "redis"app.config["SESSION_REdis"] = redis.Redis(host="127.0.0.1",port=6379,password="")  # 有密码就填上app.config["SESSION_KEY_PREFIX"] = "session"  # 前缀Session(app)  # 修改flask的默认session接口@app.route('/set')def set():    session["key"] = "value"    return "ok"@app.route('/get')def get():    res = session.get("key")    return resif __name__ == '__main__':    app.run()

   原理也很简单,替换掉了默认的session接口,与Djangoredis缓存差不多。

Flask-sqlALchemy

   Flask-sqlALchemysqlALchemyFlask之间的粘合剂。让FlasksqlALchemy之间的关系更为紧密:

pip install flask-sqlalchemy

   使用Flask-sqlALchemy也非常简单,首先是创建项目:

- mysite # 项目根目录	- mysite  # 包		- static		- templates		- vIEws			index.py		- __init__.py		- models.py	- manage.py	- settings.py

   代码如下,做主蓝图,使用flasksqlAlchemy模块:

# __init__.pyfrom flask import Flaskfrom flask_session import Sessionfrom flask_sqlalchemy import sqlAlchemy# 第一步,实例化db = sqlAlchemy()  # db包含了所有需要的东西,如commit,remove,Base类等from .models import *  # 导入模型类from .vIEws.index import index  # 防止循环导入def create_app():    app = Flask(import_name=__name__,template_folder='../templates',static_folder='../static',static_url_path='/static')    # 加载配置文件    app.config.from_pyfile("../settings.py")    # 将db注册到app中,必须在注册蓝图之前    db.init_app(app)    # 配置Session    Session(app)    # 注册蓝图    app.register_blueprint(index,url_prefix="/index/")  # 注册蓝图对象 index    return app

   settings.py中的配置项:

import redis# sqlalchemy相关配置sqlALCHEMY_DATABASE_URI = "MysqL+pyMysqL://root@127.0.0.1:3306/db2?charset=utf8"sqlALCHEMY_POol_SIZE = 5    # 链接池的连接数量sqlALCHEMY_POol_TIMEOUT = 10    # 链接池连接超时时间sqlALCHEMY_POol_RECYCLE  = 60*60*4  # 关闭连接的时间:默认MysqL是2小时sqlALCHEMY_MAX_OVERFLOW = 3   # 控制在连接池达到最大值后可以创建的连接数。当这些额外的连接回收到连接池后将会被断开和抛弃sqlALCHEMY_TRACK_MODIFICATIONS = False  # 追踪对象的修改并且发送信号# session相关配置SESSION_TYPE = 'redis'  # session类型为redisSESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密SESSION_REdis = redis.Redis(host="127.0.0.1",password="")

   然后是书写模型类:

# models.pyfrom . import dbclass UserProfile(db.Model):  # 必须继承Base    __tablename__ = 'userprofile'    ID = db.Column(db.Integer,primary_key=True)    username = db.Column(db.String(64),unique=True,nullable=False)    password = db.Column(db.String(64),nullable=False)    email = db.Column(db.String(128),nullable=False)    def __repr__(self):        return '<user %s>' % self.username

   视图:

# index.pyfrom flask import Blueprintfrom .. import dbfrom .. import modelsindex = Blueprint("index",__name__)@index.route("/model")def model():    import uuID    db.session.add(  # 使用session添加数据        models.UserProfile(            username="user%s" % (uuID.uuID4()),password="password%s" % str(uuID.uuID4()),email="%s@gamil.com" % str(uuID.uuID4())        )    )    db.session.commit()  # 提交    result = db.session.query(models.UserProfile).all()  # 查询数据    db.session.remove()  # 关闭    print(result)    return "ok"

   启动文件:

# manage.pyfrom mysite import create_appfrom mysite import dbif __name__ == '__main__':    app = create_app()    with app.app_context():  # 执行脚本创建数据库表        # db.drop_all()        db.create_all()    app.run()
Flask-Script

   该插件的作用通过脚本的形式启动Flask项目,同时还具有自定义脚本命令的功能。

   下载安装:

pip install flask-script

   在启动文件中进行使用:

from flask_script import Managerfrom mysite import create_appfrom mysite import dbif __name__ == '__main__':    app = create_app()    manager = Manager(app)  # 注册    with app.app_context():  # 执行脚本创建数据库表        # db.drop_all()        db.create_all()    manager.run()  # 使用manager.run启动flask项目

   启动命令:

python manage.py runserver -h 127.0.0.1 -p 5000

   你可以自定义一些启动脚本,如下所示:

@manager.commanddef cmd(args):	print(args)        # 命令:python manage.py cmd 12345# 结果:12345

   也可以使用关键字传参的方式,进行脚本的启动:

 @manager.option('-n','--name',dest='name')    @manager.option('-u','--url',dest='url')    def cmd(name,url):        print(name,url)        # 命令:python manage.py cmd -n test -u www.xxx.com# 结果:test www.xxx.com

   自定义脚本可以配置是否创建数据库,配合Flask-sqlAlchemy,如下所示:

from flask_script import Managerfrom mysite import create_appfrom mysite import dbif __name__ == '__main__':    app = create_app()    manager = Manager(app)  # 注册    @manager.command    def create_tables():        with app.app_context():  # 执行脚本创建数据库表            # db.drop_all()            db.create_all()    manager.run()  # 使用manager.run启动flask项目    # 命令:python manage.py create_tables
Flask-Migrate

   该插件的作用类似于Django中对model的命令行 *** 作,由于原生Flask-sqlALchemy不支持表结构的修改,所以用该插件的命令行来弥补。

   值得一提的是,该插件依赖于Flask-Script

pip install flask-migrate

   在启动文件中进行导入:

from flask_script import Managerfrom flask_migrate import Migrate,MigrateCommandfrom mysite import create_appif __name__ == '__main__':    app = create_app()    manager = Manager(app)  # 注册 flask-scripts组件        Migrate(app)  # 注册 flask-migrate组件    manager.add_command("db",MigrateCommand)        manager.run()  # 使用manager.run启动flask项目

   命令行:

命令描述
python 启动文件.py db init初始化model
python 启动文件.py db migrate类型于makemigrations,生成模型类
python 启动文件.py db upgrade类似于migrate,将模型类映射到物理表中

   在第一次使用时,三条命令都敲一遍。

   如果修改了表结构,只用敲第二条,第三条命令即可,弥补Flask-sqlALchemy不能修改表结构的缺点。

最后记录

   一个完整基础Flask项目基础架构:

- mysite # 项目根目录	- mysite  # 包		- static		- templates		- vIEws			index.py		- __init__.py		- models.py	- manage.py	- settings.py
# __init__.pyfrom flask import Flaskfrom flask_session import Sessionfrom flask_sqlalchemy import sqlAlchemy# 第一步,实例化db = sqlAlchemy()from .models import *from .vIEws.index import index  # 防止循环导入def create_app():    app = Flask(import_name=__name__,url_prefix="/index/")  # 注册蓝图对象 index    return app
# models.pyfrom . import dbclass UserProfile(db.Model):    __tablename__ = 'userprofile'    ID = db.Column(db.Integer,nullable=False)    # email = db.Column(db.String(128),nullable=False)    def __repr__(self):        return '<user %s>' % self.username
# manage.py from flask_script import Managerfrom flask_migrate import Migrate,MigrateCommandfrom mysite import create_appfrom mysite import dbif __name__ == '__main__':    app = create_app()    manager = Manager(app)  # 注册 flask-scripts组件    Migrate(app,db)  # 注册 flask-migrate组件    manager.add_command("db",MigrateCommand)    manager.run()  # 使用manager.run启动flask项目
# settings.pyimport redis# sqlalchemy相关配置sqlALCHEMY_DATABASE_URI = "MysqL+pyMysqL://root@127.0.0.1:3306/db2?charset=utf8"sqlALCHEMY_POol_SIZE = 5    # 链接池的连接数量sqlALCHEMY_POol_TIMEOUT = 10    # 链接池连接超时时间sqlALCHEMY_POol_RECYCLE  = 60*60*4  # 关闭连接的时间:默认MysqL是2小时sqlALCHEMY_MAX_OVERFLOW = 3   # 控制在连接池达到最大值后可以创建的连接数。当这些额外的连接回收到连接池后将会被断开和抛弃sqlALCHEMY_TRACK_MODIFICATIONS = False  # 追踪对象的修改并且发送信号# session相关配置SESSION_TYPE = 'redis'  # session类型为redisSESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密SESSION_REdis = redis.Redis(host="127.0.0.1",password="")
# index.pyfrom flask import Blueprintfrom .. import dbfrom .. import modelsindex = Blueprint("index",__name__)@index.route("/model")def model():    # 插入数据    import uuID    db.session.add(        models.UserProfile(            username="user%s" % (uuID.uuID4()),email="%s@gamil.com" % str(uuID.uuID4())        )    )    db.session.commit()  # 提交    result = db.session.query(models.UserProfile).all()    db.session.remove()  # 关闭    print(result)    return "ok"
总结

以上是内存溢出为你收集整理的Flask基础全套全部内容,希望文章能够帮你解决Flask基础全套所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1210138.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存