当前位置:网站首页>Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
2022-07-07 00:28:00 【某工程师$】
基于QuickStart中的一个demo来分析
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == '__main__':
app.run()
@app.route(“/”) 是一个接收参数的装饰器工厂函数,返回一个闭包了特定参数的装饰器
# flask/app.py L1288
def route(self, rule, **options):
# 典型的注册装饰器
def decorator(f):
endpoint = options.pop("endpoint", None)
# 此方法将url信息和view_func 注册到flask实例中
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
关键在 self.add_url_rule(rule, endpoint, f, **options) 方法 下面代码做了一些精简
# flask/app.py L1178
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
# 此处endpoint是一个重要的概念 后续在路由部分会详细讲解
# 在flask中是 url_map和view_functions 关联的重要纽带
# 每次请求来时 去url_map中搜索得到 endpoint,args 然后走 view_functions[endpoint](args) 拿到结果
# 若不传 默认为view_func.__name__
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
options["endpoint"] = endpoint
methods = options.pop("methods", None)
# ...
# 此处的 url_rule_class 是一个Flask类属性 值为 werkzeug.routing.Rule
# 所以此处即为构建 werkzeug.routing.Rule 对象
rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options
# 构建好的 rule对象保存到 url_map实例属性中
# url_map是werkzeug.routing.Map 对象
# flask路由部分其实借助了werkzeug的能力
self.url_map.add(rule)
if view_func is not None:
# ...
# endpoint为key将对应的view_func保存在 view_functions属性中
# view_functions 是个 dict
self.view_functions[endpoint] = view_func
很简单,add_url_rule 主要作用就是 将rule和view_func 信息维护到 flask实例的 url_map和view_functions属性中
ok,继续研究 app.run() 启动流程就在其中(app.run()会启动一个简易的web服务器用于开发和测试,生产环境会用其他高性能的web server,不过借助这个研究下启动流程还是可以的)
# flask/app.py L889
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
# ...
from werkzeug.serving import run_simple
try:
# 借助了 werkzeug.serving.run_simple
# 注意第三个参数 将flask实例传给了 run_simple方法
# 后续 web server会调用flask实例进行逻辑处理
run_simple(host, port, self, **options)
finally:
# reset the first request information if the development server
# reset normally. This makes it possible to restart the server
# without reloader and that stuff from an interactive shell.
self._got_first_request = False
接着看 run_simple代码,这边切换为研究 werkzeug 源码
# werkzeug/serving.py L876
def run_simple(
hostname,
port,
application,
use_reloader=False,
use_debugger=False,
use_evalex=True,
extra_files=None,
reloader_interval=1,
reloader_type="auto",
threaded=False,
processes=1,
request_handler=None,
static_files=None,
passthrough_errors=False,
ssl_context=None,
):
#...
# application 参数对应的是 flask实例
def inner():
try:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
except (LookupError, ValueError):
fd = None
# 关键于此 创建一个server实例
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if fd is None:
log_startup(srv.socket)
# 调用web server实例的serve_forever方法
srv.serve_forever()
if use_reloader:
#...
else:
inner()
继续往下,先看 make_server方法
# werkzeug/serving.py L830
def make_server(
host=None,
port=None,
app=None,
threaded=False,
processes=1,
request_handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and multi process server.")
elif threaded:
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
elif processes > 1:
return ForkingWSGIServer(
host,
port,
app,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
else:
return BaseWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
看出来就是根据具体参数 构建一个server实例返回 挑一个看下 看 BaseWSGIServer
# werkzeug/serving.py L708
class BaseWSGIServer(HTTPServer, object):
def __init__(
self,
host,
port,
app,
handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if handler is None:
# 此处注意下 后续的请求具体处理会走 WSGIRequestHandler
handler = WSGIRequestHandler
# 这边进行了 端口绑定和监听
# 并且传入了类 WSGIRequestHandler
HTTPServer.__init__(self, server_address, handler)
# app即flaks实例
self.app = app
# 此方法在make_server 返回实例后立马调用
def serve_forever(self):
self.shutdown_signal = False
try:
# 此处的 HTTPServer 位于python内置库http中 http.server.HTTPServer
# 将当前server实例作为参数传入
HTTPServer.serve_forever(self)
except KeyboardInterrupt:
pass
finally:
self.server_close()
在往下看 HTTPServer.serve_forever() 可以发现 serve_forever 其实是其父类的父类 内置库 socketserver中 BaseServer 类实现的方法
# socketserver.py L153
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
# 这个属性就是 WSGIRequestHandler
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def serve_forever(self, poll_interval=0.5):
self.__is_shut_down.clear()
try:
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
# 就是个无限循环 接收一个请求处理一个请求
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
# 接收到具体请求后 进行处理
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
# 接收到具体请求后 进行处理
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
def process_request(self, request, client_address):
# 接收到具体请求后 进行处理
self.finish_request(request, client_address)
self.shutdown_request(request)
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
# 每次都实例化了 WSGIRequestHandler 进行请求处理
# 注意哦 第三个参数把当前的server实例传入
self.RequestHandlerClass(request, client_address, self)
好的 最终最终 请求的处理是在 WSGIRequestHandler 实例化过程中处理的
那我们来看看 WSGIRequestHandler的代码吧
WSGIRequestHandler 的初始化其实走父类的父类 BaseRequestHandler 的__init__方法
# socketserver.py 696
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
# 具体的处理方法 其实是调用的子类的实现
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
# werkzeug/serving.py L163
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
# 处理请求的核心方法
def run_wsgi(self):
# 此处经典的 将http内容转化为符合wsgi格式的内容 因为后面是个wsgi的app啊
self.environ = environ = self.make_environ()
headers_set = []
headers_sent = []
# 眼熟吧 调用wsgi app的参数之一
def start_response(status, response_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
reraise(*exc_info)
finally:
exc_info = None
elif headers_set:
raise AssertionError("Headers already set")
headers_set[:] = [status, response_headers]
return write
def execute(app):
# 经典的wsgi app调用 返回值可迭代
# 此处的app是flask实例 实例可调用自然是因为Flask实现了__call__方法
application_iter = app(environ, start_response)
try:
# 执行此方法 参数为flask实例
execute(self.server.app)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e, environ)
except Exception:
#...
# 上一个类调用的handle就是这个
def handle(self):
"""Handles a request ignoring dropped connections."""
try:
# 调用父类的handle
BaseHTTPRequestHandler.handle(self)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e)
except Exception as e:
if self.server.ssl_context is None or not is_ssl_error(e):
raise
if self.server.shutdown_signal:
self.initiate_shutdown()
# 没错 下面的类又调用了这个方法
def handle_one_request(self):
"""Handle a single HTTP request."""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
elif self.parse_request():
# 最终最终 走了这个方法
return self.run_wsgi()
# http/server.py 147
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
# 上一个类调用的就是这个
def handle(self):
"""Handle multiple requests if necessary."""
self.close_connection = True
# 此处又调用回了 WSGIRequestHandler 的 handle_one_request
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
上面的调用路线看起来稍微有点绕,是因为涉及到了继承方法的覆盖,有些调用走的父类的,有些走的子类的方法。
可以看到:其实就是监听端口,并开启一个无限的循环,每次接收到一个请求之后,就实例化WSGIRequestHandler进行处理,而WSGIRequestHandler主要做了HTTP格式数据到WSGI数据的转换,然后用WSGI的方式调用了flask实例进行实际的逻辑处理并返回数据。
下面来看看Flask实例支持调用的代码吧。
# flask/app.py L103
class Flask(_PackageBoundObject):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
# 重要 创建当前请求的上下文对象 flask.ctx.RequestContext对象
# 此处已经对请求信息进行了处理(获得了 endpoint和view_func_args)
ctx = self.request_context(environ)
error = None
try:
try:
# 将当前请求的上下文入栈(LocalStack)
# 此处是Flask上下文的实现细节 通过共享的方式来传递上下文 给后面的view_func
ctx.push()
# 分发执行
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 请求处理完成后 清理上下文 出栈
ctx.auto_pop(error)
def full_dispatch_request(self):
# hook
self.try_trigger_before_first_request_functions()
try:
# hook
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
# 主要执行逻辑的方法
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
return self.finalize_request(rv)
def dispatch_request(self):
# 直接从上下文栈中获取当前的请求上下文
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule
# ...
# 根据endpoint获取对应的view_func 执行并返回
return self.view_functions[rule.endpoint](**req.view_args)
okk,到此处Flask启动流程基本就差不多了。
总结一下步骤:
- 创建一个Flask实例 app
- 将url和view_func 通过 app.add_url_rule() 维护到app的url_map和view_functions 属性中。url_map包含了路由逻辑,view_functions存储了对应的逻辑函数,二者通过endpoint相关联。
- 步骤1和2 完成后,其实一个遵循WSGI协议的 web application已经准备好了,接下来我们要做的就是将其挂到一个同样支持WSGI协议的web server下面。web server接受HTTP协议的请求,并将其转化为WSGI格式的内容,然后调用 app(environ, start_response) 执行具体逻辑处理,并返回WSGI格式的结果。之后再把WSGI格式的结果转换为HTTP格式返回给客户端就可以啦。
- werkzeug 中的 BaseWSGIServer 继承了Python内置库的 http.server.HTTPServer,从中获得了HTTPServer监听端口并获取请求的能力,并整合了 app和 WSGIRequestHandler。每当一个请求就绪时,就交给一个WSGIRequestHandler实例处理。WSGIRequestHandler做了HTTP格式数据到WSGI格式的转换,并执行app(environ, start_response) ,返回响应。
- app(environ, start_response) 这步就又回到flask的请求处理逻辑,根据environ的信息配合事先已经绑定好的 url_map,得到具体的路由信息和参数(endpoint,view_func_args),然后从 view_functions 字典中取出对应的view_function运行并返回结果。
边栏推荐
- Educational Codeforces Round 22 B. The Golden Age
- Preliminary practice of niuke.com (9)
- Message queue: how to deal with message backlog?
- 谈fpga和asic的区别
- Differences and introduction of cluster, distributed and microservice
- Mybaits multi table query (joint query, nested query)
- Hcip seventh operation
- AI face editor makes Lena smile
- Go language context explanation
- 三级菜单数据实现,实现嵌套三级菜单数据
猜你喜欢
集群、分布式、微服務的區別和介紹
消息队列:消息积压如何处理?
5. 数据访问 - EntityFramework集成
Message queuing: how to ensure that messages are not lost
Forkjoin is the most comprehensive and detailed explanation (from principle design to use diagram)
pytorch_ 01 automatic derivation mechanism
Reading the paper [sensor enlarged egocentric video captioning with dynamic modal attention]
Distributed global ID generation scheme
English grammar_ Noun possessive
C nullable type
随机推荐
Mysql-centos7 install MySQL through yum
Go 語言的 Context 詳解
《2022中国低/无代码市场研究及选型评估报告》发布
JVM the truth you need to know
Codeforces Round #416 (Div. 2) D. Vladik and Favorite Game
分布式事务介绍
Bat instruction processing details
纪念下,我从CSDN搬家到博客园啦!
Tablayout modification of customized tab title does not take effect
OpenSergo 即将发布 v1alpha1,丰富全链路异构架构的服务治理能力
Determine whether the file is a DICOM file
What EDA companies are there in China?
得物客服一站式工作台卡顿优化之路
pytorch_ 01 automatic derivation mechanism
2pc of distributed transaction solution
集群、分布式、微服務的區別和介紹
Common skills and understanding of SQL optimization
AI face editor makes Lena smile
SQLSTATE[HY000][1130] Host ‘host. docker. internal‘ is not allowed to connect to this MySQL server
【Shell】清理nohup.out文件