当前位置:网站首页>Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
2022-07-07 00:28:00 【某工程师$】
基于QuickStart中的一个demo来分析
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == '__main__':
app.run()
@app.route(“/”) 是一个接收参数的装饰器工厂函数,返回一个闭包了特定参数的装饰器
# flask/app.py L1288
def route(self, rule, **options):
# 典型的注册装饰器
def decorator(f):
endpoint = options.pop("endpoint", None)
# 此方法将url信息和view_func 注册到flask实例中
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
关键在 self.add_url_rule(rule, endpoint, f, **options) 方法 下面代码做了一些精简
# flask/app.py L1178
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
# 此处endpoint是一个重要的概念 后续在路由部分会详细讲解
# 在flask中是 url_map和view_functions 关联的重要纽带
# 每次请求来时 去url_map中搜索得到 endpoint,args 然后走 view_functions[endpoint](args) 拿到结果
# 若不传 默认为view_func.__name__
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
options["endpoint"] = endpoint
methods = options.pop("methods", None)
# ...
# 此处的 url_rule_class 是一个Flask类属性 值为 werkzeug.routing.Rule
# 所以此处即为构建 werkzeug.routing.Rule 对象
rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options
# 构建好的 rule对象保存到 url_map实例属性中
# url_map是werkzeug.routing.Map 对象
# flask路由部分其实借助了werkzeug的能力
self.url_map.add(rule)
if view_func is not None:
# ...
# endpoint为key将对应的view_func保存在 view_functions属性中
# view_functions 是个 dict
self.view_functions[endpoint] = view_func
很简单,add_url_rule 主要作用就是 将rule和view_func 信息维护到 flask实例的 url_map和view_functions属性中
ok,继续研究 app.run() 启动流程就在其中(app.run()会启动一个简易的web服务器用于开发和测试,生产环境会用其他高性能的web server,不过借助这个研究下启动流程还是可以的)
# flask/app.py L889
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
# ...
from werkzeug.serving import run_simple
try:
# 借助了 werkzeug.serving.run_simple
# 注意第三个参数 将flask实例传给了 run_simple方法
# 后续 web server会调用flask实例进行逻辑处理
run_simple(host, port, self, **options)
finally:
# reset the first request information if the development server
# reset normally. This makes it possible to restart the server
# without reloader and that stuff from an interactive shell.
self._got_first_request = False
接着看 run_simple代码,这边切换为研究 werkzeug 源码
# werkzeug/serving.py L876
def run_simple(
hostname,
port,
application,
use_reloader=False,
use_debugger=False,
use_evalex=True,
extra_files=None,
reloader_interval=1,
reloader_type="auto",
threaded=False,
processes=1,
request_handler=None,
static_files=None,
passthrough_errors=False,
ssl_context=None,
):
#...
# application 参数对应的是 flask实例
def inner():
try:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
except (LookupError, ValueError):
fd = None
# 关键于此 创建一个server实例
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if fd is None:
log_startup(srv.socket)
# 调用web server实例的serve_forever方法
srv.serve_forever()
if use_reloader:
#...
else:
inner()
继续往下,先看 make_server方法
# werkzeug/serving.py L830
def make_server(
host=None,
port=None,
app=None,
threaded=False,
processes=1,
request_handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and multi process server.")
elif threaded:
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
elif processes > 1:
return ForkingWSGIServer(
host,
port,
app,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
else:
return BaseWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
看出来就是根据具体参数 构建一个server实例返回 挑一个看下 看 BaseWSGIServer
# werkzeug/serving.py L708
class BaseWSGIServer(HTTPServer, object):
def __init__(
self,
host,
port,
app,
handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if handler is None:
# 此处注意下 后续的请求具体处理会走 WSGIRequestHandler
handler = WSGIRequestHandler
# 这边进行了 端口绑定和监听
# 并且传入了类 WSGIRequestHandler
HTTPServer.__init__(self, server_address, handler)
# app即flaks实例
self.app = app
# 此方法在make_server 返回实例后立马调用
def serve_forever(self):
self.shutdown_signal = False
try:
# 此处的 HTTPServer 位于python内置库http中 http.server.HTTPServer
# 将当前server实例作为参数传入
HTTPServer.serve_forever(self)
except KeyboardInterrupt:
pass
finally:
self.server_close()
在往下看 HTTPServer.serve_forever() 可以发现 serve_forever 其实是其父类的父类 内置库 socketserver中 BaseServer 类实现的方法
# socketserver.py L153
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
# 这个属性就是 WSGIRequestHandler
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def serve_forever(self, poll_interval=0.5):
self.__is_shut_down.clear()
try:
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
# 就是个无限循环 接收一个请求处理一个请求
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
# 接收到具体请求后 进行处理
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
# 接收到具体请求后 进行处理
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
def process_request(self, request, client_address):
# 接收到具体请求后 进行处理
self.finish_request(request, client_address)
self.shutdown_request(request)
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
# 每次都实例化了 WSGIRequestHandler 进行请求处理
# 注意哦 第三个参数把当前的server实例传入
self.RequestHandlerClass(request, client_address, self)
好的 最终最终 请求的处理是在 WSGIRequestHandler 实例化过程中处理的
那我们来看看 WSGIRequestHandler的代码吧
WSGIRequestHandler 的初始化其实走父类的父类 BaseRequestHandler 的__init__方法
# socketserver.py 696
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
# 具体的处理方法 其实是调用的子类的实现
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
# werkzeug/serving.py L163
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
# 处理请求的核心方法
def run_wsgi(self):
# 此处经典的 将http内容转化为符合wsgi格式的内容 因为后面是个wsgi的app啊
self.environ = environ = self.make_environ()
headers_set = []
headers_sent = []
# 眼熟吧 调用wsgi app的参数之一
def start_response(status, response_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
reraise(*exc_info)
finally:
exc_info = None
elif headers_set:
raise AssertionError("Headers already set")
headers_set[:] = [status, response_headers]
return write
def execute(app):
# 经典的wsgi app调用 返回值可迭代
# 此处的app是flask实例 实例可调用自然是因为Flask实现了__call__方法
application_iter = app(environ, start_response)
try:
# 执行此方法 参数为flask实例
execute(self.server.app)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e, environ)
except Exception:
#...
# 上一个类调用的handle就是这个
def handle(self):
"""Handles a request ignoring dropped connections."""
try:
# 调用父类的handle
BaseHTTPRequestHandler.handle(self)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e)
except Exception as e:
if self.server.ssl_context is None or not is_ssl_error(e):
raise
if self.server.shutdown_signal:
self.initiate_shutdown()
# 没错 下面的类又调用了这个方法
def handle_one_request(self):
"""Handle a single HTTP request."""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
elif self.parse_request():
# 最终最终 走了这个方法
return self.run_wsgi()
# http/server.py 147
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
# 上一个类调用的就是这个
def handle(self):
"""Handle multiple requests if necessary."""
self.close_connection = True
# 此处又调用回了 WSGIRequestHandler 的 handle_one_request
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
上面的调用路线看起来稍微有点绕,是因为涉及到了继承方法的覆盖,有些调用走的父类的,有些走的子类的方法。
可以看到:其实就是监听端口,并开启一个无限的循环,每次接收到一个请求之后,就实例化WSGIRequestHandler进行处理,而WSGIRequestHandler主要做了HTTP格式数据到WSGI数据的转换,然后用WSGI的方式调用了flask实例进行实际的逻辑处理并返回数据。
下面来看看Flask实例支持调用的代码吧。
# flask/app.py L103
class Flask(_PackageBoundObject):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
# 重要 创建当前请求的上下文对象 flask.ctx.RequestContext对象
# 此处已经对请求信息进行了处理(获得了 endpoint和view_func_args)
ctx = self.request_context(environ)
error = None
try:
try:
# 将当前请求的上下文入栈(LocalStack)
# 此处是Flask上下文的实现细节 通过共享的方式来传递上下文 给后面的view_func
ctx.push()
# 分发执行
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 请求处理完成后 清理上下文 出栈
ctx.auto_pop(error)
def full_dispatch_request(self):
# hook
self.try_trigger_before_first_request_functions()
try:
# hook
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
# 主要执行逻辑的方法
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
return self.finalize_request(rv)
def dispatch_request(self):
# 直接从上下文栈中获取当前的请求上下文
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule
# ...
# 根据endpoint获取对应的view_func 执行并返回
return self.view_functions[rule.endpoint](**req.view_args)
okk,到此处Flask启动流程基本就差不多了。
总结一下步骤:
- 创建一个Flask实例 app
- 将url和view_func 通过 app.add_url_rule() 维护到app的url_map和view_functions 属性中。url_map包含了路由逻辑,view_functions存储了对应的逻辑函数,二者通过endpoint相关联。
- 步骤1和2 完成后,其实一个遵循WSGI协议的 web application已经准备好了,接下来我们要做的就是将其挂到一个同样支持WSGI协议的web server下面。web server接受HTTP协议的请求,并将其转化为WSGI格式的内容,然后调用 app(environ, start_response) 执行具体逻辑处理,并返回WSGI格式的结果。之后再把WSGI格式的结果转换为HTTP格式返回给客户端就可以啦。
- werkzeug 中的 BaseWSGIServer 继承了Python内置库的 http.server.HTTPServer,从中获得了HTTPServer监听端口并获取请求的能力,并整合了 app和 WSGIRequestHandler。每当一个请求就绪时,就交给一个WSGIRequestHandler实例处理。WSGIRequestHandler做了HTTP格式数据到WSGI格式的转换,并执行app(environ, start_response) ,返回响应。
- app(environ, start_response) 这步就又回到flask的请求处理逻辑,根据environ的信息配合事先已经绑定好的 url_map,得到具体的路由信息和参数(endpoint,view_func_args),然后从 view_functions 字典中取出对应的view_function运行并返回结果。
边栏推荐
- 集群、分布式、微服务的区别和介绍
- 【日常训练--腾讯精选50】235. 二叉搜索树的最近公共祖先
- 消息队列:如何确保消息不会丢失
- 微信小程序蓝牙连接硬件设备并进行通讯,小程序蓝牙因距离异常断开自动重连,js实现crc校验位
- 原生小程序 之 input切換 text與password類型
- 分布式事务介绍
- 5阶多项式轨迹
- [reading of the paper] a multi branch hybrid transformer network for channel terminal cell segmentation
- Simple case of SSM framework
- Industrial Finance 3.0: financial technology of "dredging blood vessels"
猜你喜欢
Leetcode: maximum number of "balloons"
Web authentication API compatible version information
《ClickHouse原理解析与应用实践》读书笔记(6)
1. AVL tree: left-right rotation -bite
Paper reading [open book video captioning with retrieve copy generate network]
Getting started with DES encryption
EMMC打印cqhci: timeout for tag 10提示分析与解决
Web Authentication API兼容版本信息
Five core elements of architecture design
4. Object mapping Mapster
随机推荐
淘宝商品详情页API接口、淘宝商品列表API接口,淘宝商品销量API接口,淘宝APP详情API接口,淘宝详情API接口
Codeforces Round #416 (Div. 2) D. Vladik and Favorite Game
WEB架构设计过程
Tablayout modification of customized tab title does not take effect
分布式事务解决方案之TCC
得物客服一站式工作台卡顿优化之路
2pc of distributed transaction solution
Leetcode 1189 maximum number of "balloons" [map] the leetcode road of heroding
TCC of distributed transaction solutions
Message queue: how to deal with message backlog?
async / await
linear regression
纪念下,我从CSDN搬家到博客园啦!
Introduction to distributed transactions
Common skills and understanding of SQL optimization
【Shell】清理nohup.out文件
Web architecture design process
What EDA companies are there in China?
集群、分布式、微服務的區別和介紹
AI人脸编辑让Lena微笑