当前位置:网站首页>Flask1.1.4 Werkzeug1.0.1 源碼分析:啟動流程

Flask1.1.4 Werkzeug1.0.1 源碼分析:啟動流程

2022-07-07 05:51:00 某工程師$

基於QuickStart中的一個demo來分析

from flask import Flask

app = Flask(__name__)


@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"


if __name__ == '__main__':
    app.run()

@app.route(“/”) 是一個接收參數的裝飾器工廠函數,返回一個閉包了特定參數的裝飾器

# flask/app.py L1288
def route(self, rule, **options):
	# 典型的注册裝飾器
    def decorator(f):
        endpoint = options.pop("endpoint", None)
        # 此方法將url信息和view_func 注册到flask實例中
        self.add_url_rule(rule, endpoint, f, **options)
        return f

    return decorator

關鍵在 self.add_url_rule(rule, endpoint, f, **options) 方法 下面代碼做了一些精簡

# flask/app.py L1178
@setupmethod
def add_url_rule(
    self,
    rule,
    endpoint=None,
    view_func=None,
    provide_automatic_options=None,
    **options
):
	# 此處endpoint是一個重要的概念 後續在路由部分會詳細講解 
	# 在flask中是 url_map和view_functions 關聯的重要紐帶
	# 每次請求來時 去url_map中搜索得到 endpoint,args 然後走 view_functions[endpoint](args) 拿到結果
	# 若不傳 默認為view_func.__name__
    if endpoint is None:
        endpoint = _endpoint_from_view_func(view_func)
    options["endpoint"] = endpoint
    methods = options.pop("methods", None)

	# ...

	# 此處的 url_rule_class 是一個Flask類屬性 值為 werkzeug.routing.Rule
	# 所以此處即為構建 werkzeug.routing.Rule 對象
    rule = self.url_rule_class(rule, methods=methods, **options)
    rule.provide_automatic_options = provide_automatic_options
	
	# 構建好的 rule對象保存到 url_map實例屬性中 
	# url_map是werkzeug.routing.Map 對象
	# flask路由部分其實借助了werkzeug的能力
    self.url_map.add(rule)
    if view_func is not None:
        # ...
        # endpoint為key將對應的view_func保存在 view_functions屬性中
        # view_functions 是個 dict
        self.view_functions[endpoint] = view_func

很簡單,add_url_rule 主要作用就是 將rule和view_func 信息維護到 flask實例的 url_map和view_functions屬性中

ok,繼續研究 app.run() 啟動流程就在其中(app.run()會啟動一個簡易的web服務器用於開發和測試,生產環境會用其他高性能的web server,不過借助這個研究下啟動流程還是可以的)

# flask/app.py L889
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
  	# ...
  	
    from werkzeug.serving import run_simple

    try:
    	# 借助了 werkzeug.serving.run_simple
    	# 注意第三個參數 將flask實例傳給了 run_simple方法
    	# 後續 web server會調用flask實例進行邏輯處理
        run_simple(host, port, self, **options)
    finally:
        # reset the first request information if the development server
        # reset normally. This makes it possible to restart the server
        # without reloader and that stuff from an interactive shell.
        self._got_first_request = False

接著看 run_simple代碼,這邊切換為研究 werkzeug 源碼

# werkzeug/serving.py L876
def run_simple(
    hostname,
    port,
    application,
    use_reloader=False,
    use_debugger=False,
    use_evalex=True,
    extra_files=None,
    reloader_interval=1,
    reloader_type="auto",
    threaded=False,
    processes=1,
    request_handler=None,
    static_files=None,
    passthrough_errors=False,
    ssl_context=None,
):
    
 	#...
 	# application 參數對應的是 flask實例

    def inner():
        try:
            fd = int(os.environ["WERKZEUG_SERVER_FD"])
        except (LookupError, ValueError):
            fd = None
        # 關鍵於此 創建一個server實例
        srv = make_server(
            hostname,
            port,
            application,
            threaded,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
        if fd is None:
            log_startup(srv.socket)
        # 調用web server實例的serve_forever方法
        srv.serve_forever()

    if use_reloader:
        #...
    else:
        inner()

繼續往下,先看 make_server方法

# werkzeug/serving.py L830
def make_server(
    host=None,
    port=None,
    app=None,
    threaded=False,
    processes=1,
    request_handler=None,
    passthrough_errors=False,
    ssl_context=None,
    fd=None,
):
    if threaded and processes > 1:
        raise ValueError("cannot have a multithreaded and multi process server.")
    elif threaded:
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )
    elif processes > 1:
        return ForkingWSGIServer(
            host,
            port,
            app,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )
    else:
        return BaseWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )

看出來就是根據具體參數 構建一個server實例返回 挑一個看下 看 BaseWSGIServer

# werkzeug/serving.py L708
class BaseWSGIServer(HTTPServer, object):
    def __init__(
        self,
        host,
        port,
        app,
        handler=None,
        passthrough_errors=False,
        ssl_context=None,
        fd=None,
    ):
        if handler is None:
        	# 此處注意下 後續的請求具體處理會走 WSGIRequestHandler
            handler = WSGIRequestHandler

      	# 這邊進行了 端口綁定和監聽
      	# 並且傳入了類 WSGIRequestHandler
        HTTPServer.__init__(self, server_address, handler)
		# app即flaks實例
        self.app = app
       
    # 此方法在make_server 返回實例後立馬調用
    def serve_forever(self):
        self.shutdown_signal = False
        try:
        	# 此處的 HTTPServer 比特於python內置庫http中 http.server.HTTPServer
        	# 將當前server實例作為參數傳入
            HTTPServer.serve_forever(self)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()

在往下看 HTTPServer.serve_forever() 可以發現 serve_forever 其實是其父類的父類 內置庫 socketserver中 BaseServer 類實現的方法

# socketserver.py L153
class BaseServer:
    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor. May be extended, do not override."""
        self.server_address = server_address
        # 這個屬性就是 WSGIRequestHandler
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()
        try:
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
				# 就是個無限循環 接收一個請求處理一個請求
                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                    	# 接收到具體請求後 進行處理
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
            
	def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
            	# 接收到具體請求後 進行處理
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
            
    def process_request(self, request, client_address):
    	# 接收到具體請求後 進行處理
        self.finish_request(request, client_address)
        self.shutdown_request(request)
        
    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        # 每次都實例化了 WSGIRequestHandler 進行請求處理
        # 注意哦 第三個參數把當前的server實例傳入
        self.RequestHandlerClass(request, client_address, self)

好的 最終最終 請求的處理是在 WSGIRequestHandler 實例化過程中處理的
那我們來看看 WSGIRequestHandler的代碼吧
WSGIRequestHandler 的初始化其實走父類的父類 BaseRequestHandler 的__init__方法

# socketserver.py 696
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
        	# 具體的處理方法 其實是調用的子類的實現
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

# werkzeug/serving.py L163
class WSGIRequestHandler(BaseHTTPRequestHandler, object):
	# 處理請求的核心方法
    def run_wsgi(self):
		# 此處經典的 將http內容轉化為符合wsgi格式的內容 因為後面是個wsgi的app啊
        self.environ = environ = self.make_environ()
        headers_set = []
        headers_sent = []
		# 眼熟吧 調用wsgi app的參數之一
        def start_response(status, response_headers, exc_info=None):
            if exc_info:
                try:
                    if headers_sent:
                        reraise(*exc_info)
                finally:
                    exc_info = None
            elif headers_set:
                raise AssertionError("Headers already set")
            headers_set[:] = [status, response_headers]
            return write

        def execute(app):
        	# 經典的wsgi app調用 返回值可迭代
        	# 此處的app是flask實例 實例可調用自然是因為Flask實現了__call__方法
            application_iter = app(environ, start_response)
            
        try:
        	# 執行此方法 參數為flask實例
            execute(self.server.app)
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e, environ)
        except Exception:
            #...

	# 上一個類調用的handle就是這個
    def handle(self):
        """Handles a request ignoring dropped connections."""
        try:
        	# 調用父類的handle
            BaseHTTPRequestHandler.handle(self)
        except (_ConnectionError, socket.timeout) as e:
            self.connection_dropped(e)
        except Exception as e:
            if self.server.ssl_context is None or not is_ssl_error(e):
                raise
        if self.server.shutdown_signal:
            self.initiate_shutdown()
    
    # 沒錯 下面的類又調用了這個方法
    def handle_one_request(self):
        """Handle a single HTTP request."""
        self.raw_requestline = self.rfile.readline()
        if not self.raw_requestline:
            self.close_connection = 1
        elif self.parse_request():
        	# 最終最終 走了這個方法
            return self.run_wsgi()

# http/server.py 147
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
	# 上一個類調用的就是這個
    def handle(self):
        """Handle multiple requests if necessary."""
        self.close_connection = True
		# 此處又調用回了 WSGIRequestHandler 的 handle_one_request
        self.handle_one_request()
        while not self.close_connection:
            self.handle_one_request()

上面的調用路線看起來稍微有點繞,是因為涉及到了繼承方法的覆蓋,有些調用走的父類的,有些走的子類的方法。

可以看到:其實就是監聽端口,並開啟一個無限的循環,每次接收到一個請求之後,就實例化WSGIRequestHandler進行處理,而WSGIRequestHandler主要做了HTTP格式數據到WSGI數據的轉換,然後用WSGI的方式調用了flask實例進行實際的邏輯處理並返回數據。

下面來看看Flask實例支持調用的代碼吧。

# flask/app.py L103
class Flask(_PackageBoundObject):

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)
        
    def wsgi_app(self, environ, start_response):
    	# 重要 創建當前請求的上下文對象 flask.ctx.RequestContext對象
    	# 此處已經對請求信息進行了處理(獲得了 endpoint和view_func_args)
        ctx = self.request_context(environ)
        error = None
        try:
            try:
            	# 將當前請求的上下文入棧(LocalStack)
            	# 此處是Flask上下文的實現細節 通過共享的方式來傳遞上下文 給後面的view_func
                ctx.push()
                # 分發執行
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            # 請求處理完成後 清理上下文 出棧
            ctx.auto_pop(error)
            
    def full_dispatch_request(self):
    	# hook
        self.try_trigger_before_first_request_functions()
        try:
        	# hook
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
            	# 主要執行邏輯的方法
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)

    def dispatch_request(self):
    	# 直接從上下文棧中獲取當前的請求上下文
        req = _request_ctx_stack.top.request
        if req.routing_exception is not None:
            self.raise_routing_exception(req)
        rule = req.url_rule
        # ...
        # 根據endpoint獲取對應的view_func 執行並返回
        return self.view_functions[rule.endpoint](**req.view_args)

okk,到此處Flask啟動流程基本就差不多了。
總結一下步驟:

  1. 創建一個Flask實例 app
  2. 將url和view_func 通過 app.add_url_rule() 維護到app的url_map和view_functions 屬性中。url_map包含了路由邏輯,view_functions存儲了對應的邏輯函數,二者通過endpoint相關聯。
  3. 步驟1和2 完成後,其實一個遵循WSGI協議的 web application已經准備好了,接下來我們要做的就是將其掛到一個同樣支持WSGI協議的web server下面。web server接受HTTP協議的請求,並將其轉化為WSGI格式的內容,然後調用 app(environ, start_response) 執行具體邏輯處理,並返回WSGI格式的結果。之後再把WSGI格式的結果轉換為HTTP格式返回給客戶端就可以啦。
  4. werkzeug 中的 BaseWSGIServer 繼承了Python內置庫的 http.server.HTTPServer,從中獲得了HTTPServer監聽端口並獲取請求的能力,並整合了 app和 WSGIRequestHandler。每當一個請求就緒時,就交給一個WSGIRequestHandler實例處理。WSGIRequestHandler做了HTTP格式數據到WSGI格式的轉換,並執行app(environ, start_response) ,返回響應。
  5. app(environ, start_response) 這步就又回到flask的請求處理邏輯,根據environ的信息配合事先已經綁定好的 url_map,得到具體的路由信息和參數(endpoint,view_func_args),然後從 view_functions 字典中取出對應的view_function運行並返回結果。
原网站

版权声明
本文为[某工程師$]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207070027415912.html