当前位置:网站首页>Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程

Flask1.1.4 Werkzeug1.0.1 源码分析:启动流程

2022-07-07 00:28:00 某工程师$

基于QuickStart中的一个demo来分析

from flask import Flask

app = Flask(__name__)


@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"


if __name__ == '__main__':
    app.run()

@app.route(“/”) 是一个接收参数的装饰器工厂函数,返回一个闭包了特定参数的装饰器

# flask/app.py L1288
def route(self, rule, **options):
	# 典型的注册装饰器
    def decorator(f):
        endpoint = options.pop("endpoint", None)
        # 此方法将url信息和view_func 注册到flask实例中
        self.add_url_rule(rule, endpoint, f, **options)
        return f

    return decorator

关键在 self.add_url_rule(rule, endpoint, f, **options) 方法 下面代码做了一些精简

# flask/app.py L1178
@setupmethod
def add_url_rule(
    self,
    rule,
    endpoint=None,
    view_func=None,
    provide_automatic_options=None,
    **options
):
	# 此处endpoint是一个重要的概念 后续在路由部分会详细讲解 
	# 在flask中是 url_map和view_functions 关联的重要纽带
	# 每次请求来时 去url_map中搜索得到 endpoint,args 然后走 view_functions[endpoint](args) 拿到结果
	# 若不传 默认为view_func.__name__
    if endpoint is None:
        endpoint = _endpoint_from_view_func(view_func)
    options["endpoint"] = endpoint
    methods = options.pop("methods", None)

	# ...

	# 此处的 url_rule_class 是一个Flask类属性 值为 werkzeug.routing.Rule
	# 所以此处即为构建 werkzeug.routing.Rule 对象
    rule = self.url_rule_class(rule, methods=methods, **options)
    rule.provide_automatic_options = provide_automatic_options
	
	# 构建好的 rule对象保存到 url_map实例属性中 
	# url_map是werkzeug.routing.Map 对象
	# flask路由部分其实借助了werkzeug的能力
    self.url_map.add(rule)
    if view_func is not None:
        # ...
        # endpoint为key将对应的view_func保存在 view_functions属性中
        # view_functions 是个 dict
        self.view_functions[endpoint] = view_func

很简单,add_url_rule 主要作用就是 将rule和view_func 信息维护到 flask实例的 url_map和view_functions属性中

ok,继续研究 app.run() 启动流程就在其中(app.run()会启动一个简易的web服务器用于开发和测试,生产环境会用其他高性能的web server,不过借助这个研究下启动流程还是可以的)

# flask/app.py L889
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
  	# ...
  	
    from werkzeug.serving import run_simple

    try:
    	# 借助了 werkzeug.serving.run_simple
    	# 注意第三个参数 将flask实例传给了 run_simple方法
    	# 后续 web server会调用flask实例进行逻辑处理
        run_simple(host, port, self, **options)
    finally:
        # reset the first request information if the development server
        # reset normally. This makes it possible to restart the server
        # without reloader and that stuff from an interactive shell.
        self._got_first_request = False

接着看 run_simple代码,这边切换为研究 werkzeug 源码

# werkzeug/serving.py L876
def run_simple(
    hostname,
    port,
    application,
    use_reloader=False,
    use_debugger=False,
    use_evalex=True,
    extra_files=None,
    reloader_interval=1,
    reloader_type="auto",
    threaded=False,
    processes=1,
    request_handler=None,
    static_files=None,
    passthrough_errors=False,
    ssl_context=None,
):
    
 	#...
 	# application 参数对应的是 flask实例

    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        # 关键于此 创建一个server实例
        srv = make_server(
            hostname,
            port,
            application,
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
        if fd is None:
            log_startup(srv.socket)
        # 调用web server实例的serve_forever方法
        srv.serve_forever()

    if use_reloader:
        #...
    else:
        inner()

继续往下,先看 make_server方法

# werkzeug/serving.py L830
def make_server(
    host=None,
    port=None,
    app=None,
    threaded=False,
    processes=1,
    request_handler=None,
    passthrough_errors=False,
    ssl_context=None,
    fd=None,
):
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and multi process server.")
    elif threaded:
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
    elif processes > 1:
        return ForkingWSGIServer(
            host,
            port,
            app,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )

看出来就是根据具体参数 构建一个server实例返回 挑一个看下 看 BaseWSGIServer

# werkzeug/serving.py L708
class BaseWSGIServer(HTTPServer, object):
    def __init__(
        self,
        host,
        port,
        app,
        handler=None,
        passthrough_errors=False,
        ssl_context=None,
        fd=None,
    ):
        if handler is None:
        	# 此处注意下 后续的请求具体处理会走 WSGIRequestHandler
            handler = WSGIRequestHandler

      	# 这边进行了 端口绑定和监听
      	# 并且传入了类 WSGIRequestHandler
        HTTPServer.__init__(self, server_address, handler)
		# app即flaks实例
        self.app = app
       
    # 此方法在make_server 返回实例后立马调用
    def serve_forever(self):
        self.shutdown_signal = False
        try:
        	# 此处的 HTTPServer 位于python内置库http中 http.server.HTTPServer
        	# 将当前server实例作为参数传入
            HTTPServer.serve_forever(self)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()

在往下看 HTTPServer.serve_forever() 可以发现 serve_forever 其实是其父类的父类 内置库 socketserver中 BaseServer 类实现的方法

# socketserver.py L153
class BaseServer:
    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor. May be extended, do not override."""
        self.server_address = server_address
        # 这个属性就是 WSGIRequestHandler
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
				# 就是个无限循环 接收一个请求处理一个请求
                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                    	# 接收到具体请求后 进行处理
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
            
	def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
            	# 接收到具体请求后 进行处理
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
            
    def process_request(self, request, client_address):
    	# 接收到具体请求后 进行处理
        self.finish_request(request, client_address)
        self.shutdown_request(request)
        
    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        # 每次都实例化了 WSGIRequestHandler 进行请求处理
        # 注意哦 第三个参数把当前的server实例传入
        self.RequestHandlerClass(request, client_address, self)

好的 最终最终 请求的处理是在 WSGIRequestHandler 实例化过程中处理的
那我们来看看 WSGIRequestHandler的代码吧
WSGIRequestHandler 的初始化其实走父类的父类 BaseRequestHandler 的__init__方法

# socketserver.py 696
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
        	# 具体的处理方法 其实是调用的子类的实现
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

# werkzeug/serving.py L163
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
	# 处理请求的核心方法
    def run_wsgi(self):
		# 此处经典的 将http内容转化为符合wsgi格式的内容 因为后面是个wsgi的app啊
        self.environ = environ = self.make_environ()
        headers_set = []
        headers_sent = []
		# 眼熟吧 调用wsgi app的参数之一
        def start_response(status, response_headers, exc_info=None):
            if exc_info:
                try:
                    if headers_sent:
                        reraise(*exc_info)
                finally:
                    exc_info = None
            elif headers_set:
                raise AssertionError("Headers already set")
            headers_set[:] = [status, response_headers]
            return write

        def execute(app):
        	# 经典的wsgi app调用 返回值可迭代
        	# 此处的app是flask实例 实例可调用自然是因为Flask实现了__call__方法
            application_iter = app(environ, start_response)
            
        try:
        	# 执行此方法 参数为flask实例
            execute(self.server.app)
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e, environ)
        except Exception:
            #...

	# 上一个类调用的handle就是这个
    def handle(self):
        """Handles a request ignoring dropped connections."""
        try:
        	# 调用父类的handle
            BaseHTTPRequestHandler.handle(self)
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e)
        except Exception as e:
            if self.server.ssl_context is None or not is_ssl_error(e):
                raise
        if self.server.shutdown_signal:
            self.initiate_shutdown()
    
    # 没错 下面的类又调用了这个方法
    def handle_one_request(self):
        """Handle a single HTTP request."""
        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
        elif self.parse_request():
        	# 最终最终 走了这个方法
            return self.run_wsgi()

# http/server.py 147
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
	# 上一个类调用的就是这个
    def handle(self):
        """Handle multiple requests if necessary."""
        self.close_connection = True
		# 此处又调用回了 WSGIRequestHandler 的 handle_one_request
        self.handle_one_request()
        while not self.close_connection:
            self.handle_one_request()

上面的调用路线看起来稍微有点绕,是因为涉及到了继承方法的覆盖,有些调用走的父类的,有些走的子类的方法。

可以看到:其实就是监听端口,并开启一个无限的循环,每次接收到一个请求之后,就实例化WSGIRequestHandler进行处理,而WSGIRequestHandler主要做了HTTP格式数据到WSGI数据的转换,然后用WSGI的方式调用了flask实例进行实际的逻辑处理并返回数据。

下面来看看Flask实例支持调用的代码吧。

# flask/app.py L103
class Flask(_PackageBoundObject):

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)
        
    def wsgi_app(self, environ, start_response):
    	# 重要 创建当前请求的上下文对象 flask.ctx.RequestContext对象
    	# 此处已经对请求信息进行了处理(获得了 endpoint和view_func_args)
        ctx = self.request_context(environ)
        error = None
        try:
            try:
            	# 将当前请求的上下文入栈(LocalStack)
            	# 此处是Flask上下文的实现细节 通过共享的方式来传递上下文 给后面的view_func
                ctx.push()
                # 分发执行
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            # 请求处理完成后 清理上下文 出栈
            ctx.auto_pop(error)
            
    def full_dispatch_request(self):
    	# hook
        self.try_trigger_before_first_request_functions()
        try:
        	# hook
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
            	# 主要执行逻辑的方法
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)

    def dispatch_request(self):
    	# 直接从上下文栈中获取当前的请求上下文
        req = _request_ctx_stack.top.request
        if req.routing_exception is not None:
            self.raise_routing_exception(req)
        rule = req.url_rule
        # ...
        # 根据endpoint获取对应的view_func 执行并返回
        return self.view_functions[rule.endpoint](**req.view_args)

okk,到此处Flask启动流程基本就差不多了。
总结一下步骤:

  1. 创建一个Flask实例 app
  2. 将url和view_func 通过 app.add_url_rule() 维护到app的url_map和view_functions 属性中。url_map包含了路由逻辑,view_functions存储了对应的逻辑函数,二者通过endpoint相关联。
  3. 步骤1和2 完成后,其实一个遵循WSGI协议的 web application已经准备好了,接下来我们要做的就是将其挂到一个同样支持WSGI协议的web server下面。web server接受HTTP协议的请求,并将其转化为WSGI格式的内容,然后调用 app(environ, start_response) 执行具体逻辑处理,并返回WSGI格式的结果。之后再把WSGI格式的结果转换为HTTP格式返回给客户端就可以啦。
  4. werkzeug 中的 BaseWSGIServer 继承了Python内置库的 http.server.HTTPServer,从中获得了HTTPServer监听端口并获取请求的能力,并整合了 app和 WSGIRequestHandler。每当一个请求就绪时,就交给一个WSGIRequestHandler实例处理。WSGIRequestHandler做了HTTP格式数据到WSGI格式的转换,并执行app(environ, start_response) ,返回响应。
  5. app(environ, start_response) 这步就又回到flask的请求处理逻辑,根据environ的信息配合事先已经绑定好的 url_map,得到具体的路由信息和参数(endpoint,view_func_args),然后从 view_functions 字典中取出对应的view_function运行并返回结果。
原网站

版权声明
本文为[某工程师$]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_37882382/article/details/125622294