当前位置:网站首页>Flask 常用组件

Flask 常用组件

2022-07-04 07:35:00 A-L-Kun

Flask常用组件

一、 flask_session

session 是基于cookie实现, 保存在服务端的键值对(形式为 {随机字符串:'uuid'}), 同时在浏览器中的cookie中也对应一相同的随机字符串,用来再次请求的 时候验证,这个组件的作用是将session数据存储到数据库中

1、 常用配置

# 对 session 进行保护
SECRET_KEY = os.urandom(32)  # 创建密钥
SESSION_USE_SIGNER = True  # 是否签名保护

PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)  # 设置时间延迟
SESSION_REFRESH_EACH_REQUEST = True  # 每刷新一次就更新一次session

SESSION_TYPE = "redis"  # 设置redis存储session数据
SESSION_REDIS = Redis("127.0.0.1")  # 连接数据库
SESSION_PREFIX = "MyWebPrefix:"  # 设置session前缀,默认为session:

注意:session中存储的是字典,当修改字典内部(第二级)元素时,会造成数据不更新

解决方法:

我们要先阅读一下部分源码

class SecureCookieSession(CallbackDict, SessionMixin):
    #: When data is changed, this is set to ``True``. Only the session
    modified = False  # 只有当session里面的数据改变的时候,这个才会被设置成True

    #: When data is read or written, this is set to ``True``. Used by
    accessed = False

    def __init__(self, initial: t.Any = None) -> None:
        def on_update(self) -> None:
            self.modified = True  # 如果数据更新,就会被改为True,后面就会对session设置到cookies中
            """ if not (session.modified # 如果session修改,就会重新设置到cookies中 or ( session.permanent # 是否设置到cookie里面 and app.config["SESSION_REFRESH_EACH_REQUEST"] # 如果为True,则每次请求刷新一次 )): return ... response.set_cookie(...) """
            self.accessed = True  # 如果session已读,则将session写入response中
            """ if session.accessed: response.vary.add("Cookie") """
		# 将on_update()传递给CallbackDict
        super().__init__(initial, on_update)

解决方法:

  1. 方法一

    session.modified = True  # 手动使得session标记为session的内容被修改
    
  2. 方法二

    app.config["SESSION_REFRESH_EACH_REQUEST"] = True  # 每刷新一次就更新
    # 同时,在登录成功之后,设置
    session.permanent = True  # 默认为 False,但是如果使用redis的话,就不需要设置,其默认为True
    

2、 使用方法

2.1 session_interface

通过session_interface来设置

from flask_session import RedisSessionInterface
from flask import Flask
from os import urandom
from redis import Redis

app = Flask(__name__)
app.serect_key = urandom(32)  # 设置32位随机密钥
app.config["SESSION_USE_SIGNER"] = True  
# 通过redis保存session
app.session_interface = RedisSessionInterface(
    redis=Redis("127.0.0.1"),  # 连接Redis数据库
    key_prefix="flask_login",  # 设置随机字符串的前缀,默认为session:
)

默认session设置为

from flask.sessions import SecureCookieSessionInterface
app.session_interface = SecureCookieSessionInterface()

修改后,就可以直接使用了,其使用方法和原先的一样

2.2 config

通过配置文件来设置

from flask_session import Session
from flask import Flask
from os import urandom
from redis import Redis

settings = {
    
    "SESSION_TYPE": "redis",  # 使用redis数据库连接
    "SESSION_REDIS": Redis("127.0.0.1"),  # 连接redis数据库
    "SECRET_KEY": os.urandom(32),  # 创建密钥
    "SESSION_USE_SIGNER": True,  # 是否签名保护
    "SESSION_REFRESH_EACH_REQUEST"True# 每刷新一次就更新数据
}
app.config.from_mapping(settings)  # 设置配置
Session(app)  # 内部封装了设置app.session_interface的方法

二、 DBUtils

1、 引言

当我们要对数据库进行操作时,可以这么干:

import pymysql
from functools import wraps
SQL_CONFIG = {
    
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
}

def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
        obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
        conn.commit()  # 提交事务
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj  # 返回数据

    return inner


class SQLTool:
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

if __name__ == '__main__':
    obj = SQLTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
    print(obj)

问题来了,如果有很多连接的话,开启数据库再关闭是不是很麻烦呢?

  • 我们可以使用数据库连接池:DBUtils

2、 DBUtils

使用数据库连接池

此连接池有两种模式:

  1. 为每一个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新发到连接池,供自己线程再次使用。当线程终止,连接自动关闭
  2. 创建一批连接到连接池,供所有线程共享使用(主要)

2.1 模式一

POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()

2.2 模式二

import time
import pymysql
import threading
from dbutils.pooled_db import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()

func()

3、 代码封装

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# 使用单例模式创建一个数据库连接池
from dbutils.pooled_db import PooledDB
import pymysql
SQL_CONFIG = {
    
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
    "charset": "utf8",
}
POOL_CONFIG = {
    
    # 对数据库连接池的配置
    "maxconnections": 6,
    "mincached": 2,
    "maxcached": 5,
    "maxshared": 3,
    "blocking": True,
    "maxusage": None,
    "setsession": [],
    "ping": 0,
}

POOL = PooledDB(
    creator=pymysql,
    **SQL_CONFIG,
    **POOL_CONFIG
)

def select_sql(type_):
    def sql_outer(fun):
        """使用装饰器,可以对数据库便捷操作"""

        @wraps(fun)
        def inner(sql, *args):
            if type_ == "sql":
                """如果通过数据库来获取值的话"""
                conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
                obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
                conn.commit()  # 提交事务
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj  # 返回数据
            elif type_ == "pool":
                # 通过数据库连接池取值
                conn = POOL.connection()
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
                obj = fun(sql, cursor, *args)
                conn.commit()
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj
            raise ValueError("type_ value error, value = pool or sql")
        return inner

    return sql_outer


class SQLTool:
    """使用pymysql单线程连接"""

    @staticmethod
    @select_sql("sql")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @select_sql("sql")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj


class DBUtilsTool:
    """使用数据库连接池"""

    @staticmethod
    @select_sql("pool")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @select_sql("pool")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

obj = DBUtilsTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)

4、 结合flask使用

4.1 方式一

无法创建的原因是flask的上下文管理机制,当flask还没run()的时候,无法访问current_app里面的配置信息

将配置信息放到settings配置文件中,再使用init_app方法——模拟Session(app),把数据库连接池放到配置文件config里面

在存放数据库连接池的文件sql.py中:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
from dbutils.pooled_db import PooledDB


def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池,模拟Session(app)"""
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)

    def init_app(self, app):
        POOL = PooledDB(
            **app.config["POOL_CONFIG"]
        )
        app.config["SQL_POOL"] = POOL

    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

settings.py文件中:

import pymysql

class BaseConfig:
    POOL_CONFIG = {
    
        # 对数据库连接池的配置
        "creator": pymysql,
        "maxconnections": 6,
        "mincached": 2,
        "maxcached": 5,
        "maxshared": 3,
        "blocking": True,
        "maxusage": None,
        "setsession": [],
        "ping": 4,
        # 对数据库的配置
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "passwd": "qwe123",
        "db": "flask1",
        "charset": "utf8",
    }

manage.py主运行程序中

from sql import DBUtilsTool
from flask import Flask

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    # 模拟session_redis 将app传入,初始化数据库连接池
    DBUtilsTool(app)  # 在程序刚启动的时候,flask还没开始运行,就把配置文件放到其中,flask开启的时候,就已经将数据库连接池存储到了config中
    return app


if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")

4.2 方式二

直接将数据库连接池添加到配置文件中

sql.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app


def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

settings.py

from dbutils.pooled_db import PooledDB

class BaseConfig:
    SQL_POOL = PooledDB(
                # 对数据库连接池的配置
                "creator": pymysql,
                "maxconnections": 6,
                "mincached": 2,
                "maxcached": 5,
                "maxshared": 3,
                "blocking": True,
                "maxusage": None,
                "setsession": [],
                "ping": 4,
                # 对数据库的配置
                "host": "127.0.0.1",
                "port": 3306,
                "user": "root",
                "passwd": "qwe123",
                "db": "flask1",
                "charset": "utf8",
            )

manage.py

from sql import DBUtilsTool
from flask import Flask

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    return app


if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")

不建议使用方法二,其没有做到配置与程序分离的效果

4.3 方法三

使用pool.py在里面创建POOL

from flask import current_app


POOL = PooledDB(
                **current_app.config["POOL_CONFIG"]
            )

sql.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# from .pool import POOL # 注意不能在这里导入,因为app创建时,注册蓝图的时候,app并没有运行,故current_app里面没有配置信息

def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        from .pool import POOL  # 但路由中调用的时候导入,app肯定运行起来了,故current_app里面肯定有配置信息
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

要注意current_app的创建时期,其他的使用方法都一样

只要写原生SQL,就要使用数据库连接池

三、 wtforms

作用:专门用于对python web框架做表单验证

1、 支持的字段

字段描述
StringField文本字段
TextAreaField多行文本字段
PasswordField密码文本字段
HiddenField隐藏文本字段
DateField文本字段,值为 datetime.date 格式
DateTimeField文本字段,值为 datetime.datetime 格式
IntegerField文本字段,值为整数
DecimalField文本字段,值为 decimal.Decimal
FloatField文本字段,值为浮点数
BooleanField复选框,值为 True 和 False
RadioField一组单选框
SelectField下拉列表
SelectMultipleField下拉列表,可选择多个值
FileField文件上传字段
SubmitField表单提交按钮
FormField把表单作为字段嵌入另一个表单
FieldList一组指定类型的字段

2、 字段参数

参数描述
label字段别名,在页面中可以通过字段.label展示
validator验证规则列表
filters过氯器列表,用于对提交数据进行过滤
description描述信息,通常用于生成帮助信息
id表示在form类定义时候字段的位置,通常你不需要定义它,默认会按照定义的先后顺序排序
default默认值
widgethtml插件,通过该插件可以覆盖默认的插件,更多通过用户自定义
render_kw自定义html属性
choices复选类型的选项

3、 验证函数

验证函数说明
Email验证是电子邮件地址
EqualTo比较两个字段的值; 常用于要求输入两次密钥进行确认的情况
IPAddress验证IPv4网络地址
Length验证输入字符串的长度
NumberRange验证输入的值在数字范围内
Optional无输入值时跳过其它验证函数
DataRequired确保字段中有数据
Regexp使用正则表达式验证输入值
URL验证url
AnyOf确保输入值在可选值列表中
NoneOf确保输入值不在可选列表中

可以根据验证函数的源码,来自定义验证函数

4、 代码实例

创建一个上传文件功能的app,里面包含了对其工作流程的关键步骤分析,希望对理解有用

代码实例:

app.py

from flask import Flask
from wtforms import FileField, Form, SubmitField, widgets, validators
from flask import (request, render_template, current_app)
from werkzeug.utils import secure_filename
import os


class UploadForm(Form):
    """创建上传文件的字段"""
    # UploadForm.checkbox = UnboundField(BooleanField, *args, **kwargs, creation_counter=1)
    file1 = FileField(
        render_kw={
    "class": "file"},
        widget=widgets.FileInput(),
        validators=[
            validators.DataRequired(message="请选择要上传的文件哦!")
        ]
    )
    # UploadForm.sbtn = UnboundField(SubmitField, *args, **kwargs, creation_counter=2)
    sbtn = SubmitField(
        render_kw={
    
            "id": "upload_submit",
            "value": "上传文件",
        },
        widget=widgets.SubmitInput(),
    )


config = {
    
    "UPLOAD_FOLDER": "upload/",  # 设置存储文件的文件夹
}
app = Flask(__name__)
app.config.from_mapping(config)


@app.route("/upload", methods=['GET', 'POST'])
def upload():
    # 用于专门处理提交的数据的路由
    if request.method == "POST":
        form = UploadForm(formdata=request.files)  # 上传文件
        if form.validate():  # 对数据进行验证
            f = form.file1.data
            f.save(os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(f.filename)))  # 将文件名称安全处理
            return render_template("index.html", form=form, msg=f"上传文件成功!上传的文件为{
      f.filename}")
        else:
            return render_template("index.html", form=form, msg="请选择文件哦!")
    form = UploadForm()
    """ # 对该类创建时的源码分析 metaclass=FormMeta # 其为元类 # 故,创建对象时 1、 FormMeta.__call__ # 在 __call__ 方法里面进行的步骤 UploadForm._unbound_fields = None UploadForm._wtforms_meta = None _unbound_fields = [ # 其根据counter来排序 ("checkbox": UnboundField(BooleanField, *args, **kwargs, creation_counter=1)) ("sbtn": UnboundField(SubmitField, *args, **kwargs, creation_counter=1)) ] _wtforms_meta = type("Meta", tuple([DefaultMeta]), {}) # DefaultMeta = Form.Meta = class Meta(DefaultMeta): pass 2、 UploadForm.__new__ # 在 __new__ 方法里面进行的步骤,然后发现没有 __new__ 方法,除非自定义 pass 3、 UploadForm.__init__ # 执行 __init__ 方法 UploadForm()._fields = { "file1": FileField(...), "sbtn": SubmitField(...) } UploadForm().name=FileField(...) UploadForm().sbtn=SubmitField(...) """
    print(form.file1)
    """ # 访问类的属性时的源码分析 # 故,form.name 执行的是字段类中的 __str__ 方法 Field.__str__ -> return self() # 执行字段的 __call__ 方法 Field.__call__ -> return self.meta.render_field(self, kwargs) DefaultMeta.render_field(self, kwargs) -> return field.widget(field, **render_kw) # 执行 widget 的 __call__ 方法 Input.__call__ -> return Markup("<input %s>" % self.html_params(name=field.name, **kwargs)) # 进行渲染 """
    for i in form:
        print(i)  # 发现i是可以遍历的,
        """ # 其内部有一个iter方法 BaseField.__iter__ -> iter(self._fields.values()) print(i) # 执行字段内部的 __str__ 方法 """
    return render_template("index.html", form=form)


if __name__ == '__main__':
    app.run()

还有验证流程,可以自己尝试一下

templates/index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>文件上传</title>
</head>
<body>
    <div class="uploadFile">
        <h1>请选择要上传的文件</h1>
        <form method="POST" enctype="multipart/form-data">
            <p>{
    {
     form.file1 }}</p>
            <p style="color: red">{
    {
     form.file1.errors[0] }}</p>
            <p>{
    {
     form.sbtn }}</p>
        </form>
        <p style="color: green">{
    {
     msg }}</p>
    </div>
</body>
</html>

四、 flask_mail

1、 简介

在web程序中,经常会需要发送电子邮件。比如,在用户注册账户时发送确认邮件;定期向用户发送热门内容或是促销信息等等。在Web程序中发送电子邮件并不复杂,借助扩展Flask-Mail或是第三方邮件服务,只需要几行代码就可以发送邮件

配置信息:

参数描述
MAIL_SERVER邮件服务器的名称/IP地址
MAIL_PORT所用服务器的端口号
MAIL_USE_TLS启用/禁用传输安全层加密
MAIL_USE_SSL启用/禁用安全套接字层加密
MAIL_DEBUG调试支持,默认是Flask应用程序的调试状态
MAIL_USERNAME发件人的用户名
MAIL_PASSWORD发件人的密码
MAIL_DEFAULT_SENDER设置默认发件人
MAIL_MAX_EMAILS设置要发送的最大邮件
MAIL_SUPPRESS_SEND如果app.testing设置为true,则发送被抑制
MAIL_ASCII_ATTACHMENTS如果设置为true,则将附加的文件名转换为ASCII

2、 Mail

它管理电子邮件消息的要求。 类构造函数采用以下形式

方法描述
send()发送Message类对象的内容
connect()与邮件主机打开连接
send_message()发送消息对象

3、 Massage

3.1 实例化对象

Message(
    subject='',  # 设置标题
    recipients=[],  # 收件人
    body=None,  # 发内容
    html=None,
    sender=None,  # 发件人
    cc=None,
    bcc=None,
    attachments=None,  # 附件
    reply_to=None,  
    date=None,
    charset=None,
    extra_headers=None,
    mail_options=None,
    rcpt_options=None
)

3.2 类方法

attach() - 向消息添加附件。 该方法采用以下参数:

  • filename - 要附加的文件的名称
  • content_type - 文件的MIME类型
  • data - 原始文件数据
  • disposition - 内容处置,如果有的话

add_recipient() - 向消息添加另一个收件人

4、 使用方法

  1. 初始化邮箱

    from flask_mail import Mail, Message
    mail = Mail(app)
    
  2. 配置邮箱信息

    class BaseConfig2():
        MAIL_SERVER = "smtp.qq.com"  # 设置SMTP服务器
        MAIL_PORT = 465  # 设置端口
        MAIL_USE_TLS = False  # 是否使用TLSSL加密
        MAIL_USER_SSL = True  # 是否使用SSL加密
        MAIL_USERNAME = "[email protected]"  # 邮箱
        MAIL_PASSWORD = "xtisaddfdfntdcjf"  # 密码
        MAIL_DEFAULT_SENDER = "A.L.Kun<[email protected]>"  # 发件人
    
  3. 创建信息

    msg = Message(subject="This is title", recipients="[email protected]")
    msg.body = "This is body"
    
  4. 发送信息

    mail.send(msg)
    

总代码:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
__author__ = "A.L.Kun"
__file__ = "app_.py"
__time__ = "2022/6/24 19:56"
__email__ = "[email protected]"
from settings import BaseConfig2
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config.from_object(BaseConfig2)
mail = Mail(app)


@app.route('/')
def hello():
    msg = Message(subject="This is title", recipients="[email protected]")
    msg.body = "This is body"
    message.html = render_template('content.html')  # 可以发送一个html页面
    mail.send(msg)
    return "信息发送完成!"

if __name__ == '__main__':
    app.run(debug=True)

五、 flask_script

1、 简介

安装:pip install flask-script

通过使用Flask-Script扩展,我们可以在Flask服务器启动的时候,通过命令行的方式传入参数。而不仅仅通过app.run()方法中传参,比如我们可以通过python hello.py runserver –host ip地址,告诉服务器在哪个网络接口监听来自客户端的连接

2、 启动服务

from flask import Flask

app = Flask(__name__)

"""使用flask-script启动项目"""
from flask_script import Manager
manager = Manager(app)

@app.route('/')
def index():
    return 'hello world'

if  __name__ == "__main__"
    manager.run()

在命令行输入python manage.py runserver,即可启动服务

3、 传入参数

from flaskScript import create_app
from flask_script import Manager

app = create_app()  # 返回一个app对象
manager = Manager(app)


@manager.command
def custom(arg):
    """ 这个方法可以接收从命令行输入的参数,位置参数 如: 运行:python manage.py custom 123 其会在控制台打印 123 """
    print(arg)


@manager.option("-n", "--name", dest='name')
@manager.option("-u", "--url", dest="url")
def cmd_(name, url):
    """ 自定义命令,可选参数 :param name:从命令行传入的姓名 :param url: 从命令行传入的url :return: 如: 输入:python manage.py cmd_ -n "lihua" -u "127.0.0.1" 则会输出:lihua 127.0.0.1 """
    print(name, url)


if __name__ == '__main__':
    manager.run()

作用:

  • python manage.py runserver ...
  • python manage.py 自定义命令

六、 flask_sqlalchemy

1、 简介

sqlalchemy这里面有sqlalchemy的一些基本操作

flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作

安装:

pip install flask-sqlalchemy

配置参数

配置选项说明
SQLALCHEMY_DATABASE_URI连接数据库。示例:mysql://username:[email protected]/post/db?charset=utf-8
SQLALCHEMY_BINDS一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO调试设置为true
SQLALCHEMY_POOL_SIZE数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT连接超时时间
SQLALCHEMY_POOL_RECYCLE自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。

2、 使用步骤

  1. 配置数据库信息,在settings.py文件中添加

    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:[email protected]:3306/flask1?charset=utf8"
    
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ECHO = True
    
  2. 创建一个app.py文件,在里面添加

    from flask_sqlalchemy import SQLAlchemy
    
    db = SQLAlchemy()  # 实例化对象
    app = Flask(__name__)
    app.config.from_file("settings.py")  # 导入配置文件
    db.init_app(app)  # 初始化sqlalchemy,使用app里面的配置文件
    
  3. 编写数据库结构,创建一个models.py,与app.py同级

    from app import db
    
    class Users(db.Model):
        __table__ = "user"
        id = db.Column(db.INTEGER, primary_key=True, autoincrement=True)
        name = db.Column(db.String(32))
    
  4. 在路由中使用

    import models
    
    @app.route("/login", methods=["GET", "POST"])
    def login():
        data = db.session.query(models.Users).all()  # 查找数据库里面的所有信息
        print(data)
        db.session.remove()  # 移除session
        return "Login"
    

扩展,离线脚本的使用

from app import app
with app.app_context():
    pass  # 里面可以运行flask运行时进行的操作

七、 flask_migrate

作用:做数据库迁移

其依赖于:flask-script/flask-sqlalchemy

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_script import Manager

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///app.db"  # 连接数据库

db = SQLAlchemy(app)  
manager = Manager(app)
Migrate(app, db)  # 实例化组件

""" flask db init # 初始化表 flask db migrate # 将表在数据库中创建出来 flask db upgrade # 更新表的结构 """


# Model
class User(db.Model):
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128))


if __name__ == '__main__':
    manager.run()

八、 自定义组件

auto.py中添加:

from flask import request, session, redirect


class Auth:
    def __init__(self, app=None):
        self.app = app
        if app is not None:
            self.init_app(app)

    def init_app(self, app):
        app.auto = self  # 将这个类的信息全部写入app里面
        self.app = app
        self.app.before_request(self.check_login)

    def check_login(self):
        print("检测用户是否登录")
        usr = session.get("usr")
        print(usr)

    def login(self, data):
        """创建session"""
        session["usr"] = data

    def login_out(self):
        """用户登出"""
        del session["usr"]

使用这个组件时:

from auto import Auto
from flask import Flask


app = Flask(__name__)

at = Auto()
at.init_app(app)


################################################
from flask import current_app
cerrent_app.login_out()  # 调用组件登出的功能 

最后,总目录结构为:

https://images.cnblogs.com/cnblogs_com/blogs/722174/galleries/2074790/o_220701084945_sort.png

点击我,查看源代码

九、 其它

1、 多app应用

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple

app01 = Flask("app01")
app02 = Flask("app02")
# //login -> 访问 app01下面的login
dm = DispatcherMiddleware(app01, {
    
    "/app02": app02  # /appo2/index 访问app02下面的index
})


@app01.route("/login")
def login():
    return "app01.login"


@app02.route("/index")
def index():
    return "app02.index"


if __name__ == '__main__':
    # 请求一旦进来会执行 run_simple 方法的第三个参数加括号:dm(),对象加括号会调用对象的__call__方法
    run_simple("127.0.0.1", 5000, dm)

2、 信号

from flask import Flask, signals

app = Flask(__name__)


def func(*args, **kwargs):
    print("请求开始,触发的app为", *args, **kwargs)


signals.request_started.connect(func)  # 连接信号
""" 含有的全部信号 template_rendered before_render_template request_started request_finished request_tearing_down got_request_exception appcontext_tearing_down appcontext_pushed appcontext_popped message_flashed 这里可以在源码中查看这些信号的触发条件,源码内部使用send方法触发信号,或者百度也OK """

@app.route("/")
def index():
    return "首页面"


if __name__ == '__main__':
    app.run(debug=True)

注意,信号和装饰器的区别是,信号无法控制程序的进行,其只是提供一个提示功能,在原来的基础增加额外的操作和值;而装饰器可以控制请求是否可以继续往后执行

原网站

版权声明
本文为[A-L-Kun]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_62789540/article/details/125563241

随机推荐