当前位置:网站首页>Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程
2022-07-07 00:28:00 【某工程师$】
基于QuickStart中的一个demo来分析
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == '__main__':
app.run()
@app.route(“/”) 是一个接收参数的装饰器工厂函数,返回一个闭包了特定参数的装饰器
# flask/app.py L1288
def route(self, rule, **options):
# 典型的注册装饰器
def decorator(f):
endpoint = options.pop("endpoint", None)
# 此方法将url信息和view_func 注册到flask实例中
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
关键在 self.add_url_rule(rule, endpoint, f, **options) 方法 下面代码做了一些精简
# flask/app.py L1178
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
# 此处endpoint是一个重要的概念 后续在路由部分会详细讲解
# 在flask中是 url_map和view_functions 关联的重要纽带
# 每次请求来时 去url_map中搜索得到 endpoint,args 然后走 view_functions[endpoint](args) 拿到结果
# 若不传 默认为view_func.__name__
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
options["endpoint"] = endpoint
methods = options.pop("methods", None)
# ...
# 此处的 url_rule_class 是一个Flask类属性 值为 werkzeug.routing.Rule
# 所以此处即为构建 werkzeug.routing.Rule 对象
rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options
# 构建好的 rule对象保存到 url_map实例属性中
# url_map是werkzeug.routing.Map 对象
# flask路由部分其实借助了werkzeug的能力
self.url_map.add(rule)
if view_func is not None:
# ...
# endpoint为key将对应的view_func保存在 view_functions属性中
# view_functions 是个 dict
self.view_functions[endpoint] = view_func
很简单,add_url_rule 主要作用就是 将rule和view_func 信息维护到 flask实例的 url_map和view_functions属性中
ok,继续研究 app.run() 启动流程就在其中(app.run()会启动一个简易的web服务器用于开发和测试,生产环境会用其他高性能的web server,不过借助这个研究下启动流程还是可以的)
# flask/app.py L889
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
# ...
from werkzeug.serving import run_simple
try:
# 借助了 werkzeug.serving.run_simple
# 注意第三个参数 将flask实例传给了 run_simple方法
# 后续 web server会调用flask实例进行逻辑处理
run_simple(host, port, self, **options)
finally:
# reset the first request information if the development server
# reset normally. This makes it possible to restart the server
# without reloader and that stuff from an interactive shell.
self._got_first_request = False
接着看 run_simple代码,这边切换为研究 werkzeug 源码
# werkzeug/serving.py L876
def run_simple(
hostname,
port,
application,
use_reloader=False,
use_debugger=False,
use_evalex=True,
extra_files=None,
reloader_interval=1,
reloader_type="auto",
threaded=False,
processes=1,
request_handler=None,
static_files=None,
passthrough_errors=False,
ssl_context=None,
):
#...
# application 参数对应的是 flask实例
def inner():
try:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
except (LookupError, ValueError):
fd = None
# 关键于此 创建一个server实例
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if fd is None:
log_startup(srv.socket)
# 调用web server实例的serve_forever方法
srv.serve_forever()
if use_reloader:
#...
else:
inner()
继续往下,先看 make_server方法
# werkzeug/serving.py L830
def make_server(
host=None,
port=None,
app=None,
threaded=False,
processes=1,
request_handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if threaded and processes > 1:
raise ValueError("cannot have a multithreaded and multi process server.")
elif threaded:
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
elif processes > 1:
return ForkingWSGIServer(
host,
port,
app,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
else:
return BaseWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
看出来就是根据具体参数 构建一个server实例返回 挑一个看下 看 BaseWSGIServer
# werkzeug/serving.py L708
class BaseWSGIServer(HTTPServer, object):
def __init__(
self,
host,
port,
app,
handler=None,
passthrough_errors=False,
ssl_context=None,
fd=None,
):
if handler is None:
# 此处注意下 后续的请求具体处理会走 WSGIRequestHandler
handler = WSGIRequestHandler
# 这边进行了 端口绑定和监听
# 并且传入了类 WSGIRequestHandler
HTTPServer.__init__(self, server_address, handler)
# app即flaks实例
self.app = app
# 此方法在make_server 返回实例后立马调用
def serve_forever(self):
self.shutdown_signal = False
try:
# 此处的 HTTPServer 位于python内置库http中 http.server.HTTPServer
# 将当前server实例作为参数传入
HTTPServer.serve_forever(self)
except KeyboardInterrupt:
pass
finally:
self.server_close()
在往下看 HTTPServer.serve_forever() 可以发现 serve_forever 其实是其父类的父类 内置库 socketserver中 BaseServer 类实现的方法
# socketserver.py L153
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
# 这个属性就是 WSGIRequestHandler
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def serve_forever(self, poll_interval=0.5):
self.__is_shut_down.clear()
try:
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
# 就是个无限循环 接收一个请求处理一个请求
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
# 接收到具体请求后 进行处理
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
# 接收到具体请求后 进行处理
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
def process_request(self, request, client_address):
# 接收到具体请求后 进行处理
self.finish_request(request, client_address)
self.shutdown_request(request)
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
# 每次都实例化了 WSGIRequestHandler 进行请求处理
# 注意哦 第三个参数把当前的server实例传入
self.RequestHandlerClass(request, client_address, self)
好的 最终最终 请求的处理是在 WSGIRequestHandler 实例化过程中处理的
那我们来看看 WSGIRequestHandler的代码吧
WSGIRequestHandler 的初始化其实走父类的父类 BaseRequestHandler 的__init__方法
# socketserver.py 696
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
# 具体的处理方法 其实是调用的子类的实现
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
# werkzeug/serving.py L163
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
# 处理请求的核心方法
def run_wsgi(self):
# 此处经典的 将http内容转化为符合wsgi格式的内容 因为后面是个wsgi的app啊
self.environ = environ = self.make_environ()
headers_set = []
headers_sent = []
# 眼熟吧 调用wsgi app的参数之一
def start_response(status, response_headers, exc_info=None):
if exc_info:
try:
if headers_sent:
reraise(*exc_info)
finally:
exc_info = None
elif headers_set:
raise AssertionError("Headers already set")
headers_set[:] = [status, response_headers]
return write
def execute(app):
# 经典的wsgi app调用 返回值可迭代
# 此处的app是flask实例 实例可调用自然是因为Flask实现了__call__方法
application_iter = app(environ, start_response)
try:
# 执行此方法 参数为flask实例
execute(self.server.app)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e, environ)
except Exception:
#...
# 上一个类调用的handle就是这个
def handle(self):
"""Handles a request ignoring dropped connections."""
try:
# 调用父类的handle
BaseHTTPRequestHandler.handle(self)
except (_ConnectionError, socket.timeout) as e:
self.connection_dropped(e)
except Exception as e:
if self.server.ssl_context is None or not is_ssl_error(e):
raise
if self.server.shutdown_signal:
self.initiate_shutdown()
# 没错 下面的类又调用了这个方法
def handle_one_request(self):
"""Handle a single HTTP request."""
self.raw_requestline = self.rfile.readline()
if not self.raw_requestline:
self.close_connection = 1
elif self.parse_request():
# 最终最终 走了这个方法
return self.run_wsgi()
# http/server.py 147
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
# 上一个类调用的就是这个
def handle(self):
"""Handle multiple requests if necessary."""
self.close_connection = True
# 此处又调用回了 WSGIRequestHandler 的 handle_one_request
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
上面的调用路线看起来稍微有点绕,是因为涉及到了继承方法的覆盖,有些调用走的父类的,有些走的子类的方法。
可以看到:其实就是监听端口,并开启一个无限的循环,每次接收到一个请求之后,就实例化WSGIRequestHandler进行处理,而WSGIRequestHandler主要做了HTTP格式数据到WSGI数据的转换,然后用WSGI的方式调用了flask实例进行实际的逻辑处理并返回数据。
下面来看看Flask实例支持调用的代码吧。
# flask/app.py L103
class Flask(_PackageBoundObject):
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ, start_response):
# 重要 创建当前请求的上下文对象 flask.ctx.RequestContext对象
# 此处已经对请求信息进行了处理(获得了 endpoint和view_func_args)
ctx = self.request_context(environ)
error = None
try:
try:
# 将当前请求的上下文入栈(LocalStack)
# 此处是Flask上下文的实现细节 通过共享的方式来传递上下文 给后面的view_func
ctx.push()
# 分发执行
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
# 请求处理完成后 清理上下文 出栈
ctx.auto_pop(error)
def full_dispatch_request(self):
# hook
self.try_trigger_before_first_request_functions()
try:
# hook
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
# 主要执行逻辑的方法
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
return self.finalize_request(rv)
def dispatch_request(self):
# 直接从上下文栈中获取当前的请求上下文
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule
# ...
# 根据endpoint获取对应的view_func 执行并返回
return self.view_functions[rule.endpoint](**req.view_args)
okk,到此处Flask启动流程基本就差不多了。
总结一下步骤:
- 创建一个Flask实例 app
- 将url和view_func 通过 app.add_url_rule() 维护到app的url_map和view_functions 属性中。url_map包含了路由逻辑,view_functions存储了对应的逻辑函数,二者通过endpoint相关联。
- 步骤1和2 完成后,其实一个遵循WSGI协议的 web application已经准备好了,接下来我们要做的就是将其挂到一个同样支持WSGI协议的web server下面。web server接受HTTP协议的请求,并将其转化为WSGI格式的内容,然后调用 app(environ, start_response) 执行具体逻辑处理,并返回WSGI格式的结果。之后再把WSGI格式的结果转换为HTTP格式返回给客户端就可以啦。
- werkzeug 中的 BaseWSGIServer 继承了Python内置库的 http.server.HTTPServer,从中获得了HTTPServer监听端口并获取请求的能力,并整合了 app和 WSGIRequestHandler。每当一个请求就绪时,就交给一个WSGIRequestHandler实例处理。WSGIRequestHandler做了HTTP格式数据到WSGI格式的转换,并执行app(environ, start_response) ,返回响应。
- app(environ, start_response) 这步就又回到flask的请求处理逻辑,根据environ的信息配合事先已经绑定好的 url_map,得到具体的路由信息和参数(endpoint,view_func_args),然后从 view_functions 字典中取出对应的view_function运行并返回结果。
边栏推荐
- Flinksql 读写pgsql
- win配置pm2开机自启node项目
- R language [logic control] [mathematical operation]
- [daily training -- Tencent selected 50] 292 Nim games
- 消息队列:重复消息如何处理?
- 淘宝商品详情页API接口、淘宝商品列表API接口,淘宝商品销量API接口,淘宝APP详情API接口,淘宝详情API接口
- OpenSergo 即将发布 v1alpha1,丰富全链路异构架构的服务治理能力
- 判断文件是否为DICOM文件
- Determine whether the file is a DICOM file
- nodejs获取客户端ip
猜你喜欢
Get the way to optimize the one-stop worktable of customer service
三级菜单数据实现,实现嵌套三级菜单数据
2pc of distributed transaction solution
Harmonyos practice - Introduction to development, analysis of atomized services
Web architecture design process
R language [logic control] [mathematical operation]
Lombok plug-in
目标检测中的损失函数与正负样本分配:RetinaNet与Focal loss
Paper reading [semantic tag enlarged xlnv model for video captioning]
TCC of distributed transaction solutions
随机推荐
Things about data storage 2
980. 不同路径 III DFS
随机生成session_id
TCC of distributed transaction solutions
zabbix_get测试数据库失败
Taobao store release API interface (New), Taobao oauth2.0 store commodity API interface, Taobao commodity release API interface, Taobao commodity launch API interface, a complete set of launch store i
Message queue: how to handle repeated messages?
C#可空类型
JVM the truth you need to know
What are the common message queues?
Codeforces Round #416 (Div. 2) D. Vladik and Favorite Game
【日常训练--腾讯精选50】235. 二叉搜索树的最近公共祖先
Leetcode 1189 maximum number of "balloons" [map] the leetcode road of heroding
linear regression
Message queuing: how to ensure that messages are not lost
Input of native applet switches between text and password types
Educational Codeforces Round 22 B. The Golden Age
SQL query: subtract the previous row from the next row and make corresponding calculations
Five core elements of architecture design
[reading of the paper] a multi branch hybrid transformer network for channel terminal cell segmentation