当前位置:网站首页>Source code analysis - lightweight asynchronous crawler framework Ruia
Source code analysis - lightweight asynchronous crawler framework Ruia
2022-07-02 22:32:00 【Lazy programming - two liang】
stay Source analysis - Official account acquisition reader Liuli The article mentions ruia, This article simply records ruia.
Why do you want to see ? Mainly reading Liuli In the process of , I took a look at it conveniently ruia The warehouse of , Found a small amount of code , Its publicity also emphasizes that all functions except the core functions of the crawler are realized through plug-ins , I was curious about the implementation of its plug-in system , It's like Flask What about dynamic introduction ? Or other ways I don't know ?
Old rules , Take care of your interests before you look , Otherwise, plunge into the details , Finally, I can't take anything with me , I mainly focus on the following 2 A little interested :
1.ruia What is the design architecture of ?
2.ruia How to use plug-in system ?
Preview tips ,ruia There is no plug-in system , Instead, the so-called plug-ins are implemented in the form of middleware , It is different from the plug-ins I understand .
ruia Design architecture
If you are familiar with Scrapy, that ruia You will be very familiar with the usage and architecture of , Because I led you to analyze it in my advanced reptile class Scrapy Code for the framework , So I am familiar with Scrapy, If you're not familiar with it , You may be confused about this article .
ruia Compared with Scrapy It's a lot lighter , It has no scheduler related logic , But directly through Spider Complete the complete crawling logic , Let's start with one Demo For example , have a look ruia Basic use of , Part of the code is as follows :
class DoubanSpider(Spider):
# Reptile name
name = "DoubanSpider"
# entrance url
start_urls = ["https://movie.douban.com/top250"]
# Crawler related configuration
request_config = {"RETRIES": 3, "DELAY": 0, "TIMEOUT": 20}
concurrency = 10
# aiohttp config
aiohttp_kwargs = {}
# Asynchronous methods
async def parse(self, response):
# Asynchronous blocking wait
html = await response.text()
etree = response.html_etree(html=html)
pages = ["?start=0&filter="] + [
# adopt css Selectors , Get what needs further crawling url
i.get("href") for i in etree.cssselect(".paginator>a")
for index, page in enumerate(pages):
url = self.start_urls[0] + page
# Build a new request
# The callback method is parse_item
yield self.request(
url=url, metadata={"index": index}, callback=self.parse_item
async def parse_item(self, response):
async for item in DoubanItem.get_items(html=await response.text()):
yield item
async def process_item(self, item: DoubanItem):
if __name__ == "__main__":
# Start the crawler
From the above code , adopt Spider Of start Method to start the crawler , The method code is as follows :
def start(
middleware: typing.Union[typing.Iterable, Middleware] = None,
# Get the event loop
loop = loop or asyncio.new_event_loop()
# Instantiate the current class , That is spider_ins Variables are instances of the current class - Scrapy That's how it's done
spider_ins = cls(middleware=middleware, loop=loop, **spider_kwargs)
# Start the event cycle , Execute the... Of the crawler instance _start Method
spider_ins._start(after_start=after_start, before_stop=before_stop)
# Close the asynchronous generator object in the event loop (asynchronous generator)
if close_event_loop:
# Close event loop
return spider_ins
From the above code, we can see , The complete process of crawler is :
1. Create an event loop
2. Instantiate crawler
3. Put the crawler crawling logic into the event loop and execute asynchronously
4. Close the asynchronous generator object in the event loop , A simple understanding is to close the task in the event loop
5. Close the event loop itself
run_until_complete The method is asyncio Compare the bottom methods , Used to associate a function or task object (task) Add to the event loop and start , Its basic usage is as follows :
In [1]: import asyncio
In [2]: # Define the coprogram function
In [3]: async def fun(a):
...: print(a)
In [4]: # Call the coroutine function , Generate a coroutine object , At this time, the coprocessor function is not executed
In [5]: coroutine = fun('hello world')
In [6]: # Create an event loop
In [7]: loop = asyncio.get_event_loop()
In [8]: # Add a coroutine function to the event loop , And start the
In [9]: loop.run_until_complete(coroutine)
hello world
About Python Event loop more information , You can read the documentation :https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html
from start The method is known , The main logic of the crawler is actually _start In the method , The method code is relatively long , Let's split it up and see , First, add signal The logic of , The relevant code is as follows :
# Add signal
for signal in (SIGINT, SIGTERM):
signal, lambda: asyncio.ensure_future(self.stop(signal))
except NotImplementedError:
f"{self.name} tried to use loop.add_signal_handler "
"but it is not implemented on this platform."
In the above code , take SIGINT and SIGTERM These two signals are added to the current event loop , Except for these two ,SIGKILL It is also a common signal , Be careful , this 3 Signals are only implemented in classes Unix platform , namely Linux、MacOS There is ,Windows No, ,Windows It has its own set of similar implementations .
SIGINT、SIGTERM and SIGKILL What's the function ?
When we press ctrl+c To stop the process ,Linux The system actually sends to the process SIGINT The signal , Process receive SIGINT After the signal , Will stop the current process , It will also stop the child process , Be careful ,SIGINT Signals can only end the foreground process .
stay Linux in , adopt kill Commands can kill a process , If kill Command without any parameters , The bottom layer of the command ,Linux The system will send SIGTERM The signal , Be careful , The current process will receive this signal , But subprocesses don't , That is, if the process is kill 了 , Then the parent process of the current process, for example init, namely PID by 1 The process of , If it is a process created by a user process ,SIGTERM It can't be closed .
In case of forced shutdown , We will use kill -9 Come on kill process , here Linux Will send... To the process SIGKILL The signal , This signal cannot be captured , Process received SIGKILL After the signal , The current process and its related subprocesses will be kill fall .
In order to verify whether what I said above is reasonable , You can build a simple piece of code to judge , The code is as follows :
import sys
import asyncio
from signal import SIGINT, SIGTERM
import traceback
# Different Python Different versions
if sys.version_info >= (3, 9):
async_all_tasks = asyncio.all_tasks
async_current_task = asyncio.current_task
async_all_tasks = asyncio.Task.all_tasks
async_current_task = asyncio.tasks.Task.current_task
async def fun(a):
# Add signal
for signal in (SIGINT, SIGTERM):
signal, lambda: asyncio.ensure_future(stop(signal))
except Exception as e:
await asyncio.sleep(600)
async def stop(_signal):
print("Stopping async")
# Cancel all tasks in the event loop
await cancel_all_tasks()
# Shut down the event cycle
async def cancel_all_tasks():
tasks = []
for task in async_all_tasks():
if task is not async_current_task():
await asyncio.gather(*tasks, return_exceptions=True)
coroutine = fun('hello world')
loop = asyncio.get_event_loop()
In the above code , Use ruia Similar logic makes a simple script , stay Linux Run in , And then through ctrl+c Turn it off , The effect is as follows , You can find Stopping async Printed out .
go back to _start Method , After adding semaphores , The code is as follows :
# hook - Logic to be executed before crawling
await self._run_spider_hook(after_start)
# Crawler main logic
await self.start_master()
# hook - The logic to be executed before the end of crawling
await self._run_spider_hook(before_stop)
await self.request_session.close()
First see start_master Method , The code is as follows :
async def start_master(self):
Actually start crawling
# get Request Class instance object
async for request_ins in self.process_start_urls():
# Completed the request , To obtain the response and callback Method processing results
workers = [
# asyncio.ensure_future Methods will start_worker The co process method is constructed as task
for i in range(self.worker_numbers)
for worker in workers:
self.logger.info(f"Worker started: {id(worker)}")
# All elements blocked into the queue are received and processed
await self.request_queue.join()
if not self.is_async_start:
await self.stop(SIGINT)
if self.cancel_tasks:
await self.cancel_all_tasks()
start_master Methods have two pieces of logic , One is to build asynchronous task logic , One is the concurrent execution of built asynchronous tasks . Take a look step by step , First look at process_start_urls Method , The code is as follows :
async def process_start_urls(self):
for url in self.start_urls:
# get Request Class instance
yield self.request(url=url, callback=self.parse, metadata=self.metadata)
process_start_urls The function of the method is start_urls Fill in the list url To build a Request Class object , At this time, the request for web page data is not completed , What needs to be noted here is callback Parameters , Use it directly parse Method as a callback Value .
The specific logic of the request is handle_request In the method , The method code is as follows ( Only the key code display is extracted ):
async def handle_request(
self, request: Request
) -> typing.Tuple[AsyncGeneratorType, Response]:
callback_result, response = None, None
await self._run_request_middleware(request)
# Complete the request
callback_result, response = await request.fetch_callback(self.sem)
await self._run_response_middleware(request, response)
await self._process_response(request=request, response=response)
# response result
return callback_result, response
handle_request In the method , Call with middleware effect logic , Before request , adopt _run_request_middleware Method calls the relevant middleware logic to process the request before Request Class instance , Similar after request , adopt _run_response_middleware Method treatment , Last call _process_response Method to deal with the final Reponse Class instance .
notice fetch_callback Method , This method completes the request and obtains the corresponding results .
async def fetch_callback(
self, sem: Semaphore
) -> Tuple[AsyncGeneratorType, Response]:
async with sem:
# Request logic
response = await self.fetch()
except Exception as e:
response = None
self.logger.error(f"<Error: {self.url} {e}>")
if self.callback is not None:
if iscoroutinefunction(self.callback):
# call callback The result of this method
callback_result = await self.callback(response)
callback_result = self.callback(response)
callback_result = None
return callback_result, response
fetch_callback Method is actually a layer of encapsulation , In this method, through fetch Method to complete the request for the web page , adopt callback Method complete right response To deal with .
notice fetch Method , It calls for _make_request Method to implement the request , Then encapsulate the result of the request into Response Class instance , The relevant code is as follows ( Show only the relevant code ):
async def fetch(self, delay=True) -> Response:
async with async_timeout.timeout(timeout):
# Send a request
resp = await self._make_request()
resp_encoding = resp.get_encoding()
resp_encoding = self.encoding
# Use the results of the request to build Response object
response = Response(
pivotal _make_request The method is to use aiohttp The related party implements the asynchronous request , The code is as follows :
async def _make_request(self):
self.logger.info(f"<{self.method}: {self.url}>")
if self.method == "GET":
request_func = self.current_request_session.get(
self.url, headers=self.headers, ssl=self.ssl, **self.aiohttp_kwargs
request_func = self.current_request_session.post(
self.url, headers=self.headers, ssl=self.ssl, **self.aiohttp_kwargs
resp = await request_func
return resp
fetch After reading the method , Look at callback Related logic , stay spider.py Of process_start_urls In the method ,callback The input parameter of the parameter is parse Method , So here we call parse Method to process the data returned by the web page , and parse Methods are usually implemented in the outermost subclass ( We realize the parsing logic of web pages by ourselves ).
go back to start_master Method , The logic of the first step of this method is over , The effect of this part of logic is to callback_reuslt and response Added to the request_queue In line , But we need to pay attention , Because here is asynchronous operation , therefore requests_queue Is an asynchronous object instance , instead of callback_reuslt And response.
Then see the logic of the second part , Because the first part of logic is asynchronous operation , The second part is logic , It mainly implements the first part of asynchronous logic concurrently , Then get callback_reuslt And response, Its main logic implementation is start_worker In the method .
start_worker The method code is as follows :
async def start_worker(self):
Start spider worker
while True:
# get Request Class instance
request_item = await self.request_queue.get()
# Add to task list
if self.request_queue.empty():
# Run concurrently self.worker_tasks Waiting objects in the sequence
results = await asyncio.gather(
*self.worker_tasks, return_exceptions=True
for task_result in results:
if not isinstance(task_result, RuntimeError) and task_result:
callback_results, response = task_result
if isinstance(callback_results, AsyncGeneratorType):
await self._process_async_callback(
callback_results, response
self.worker_tasks = []
There is nothing to say about this part of logic , Mainly through asyncio.gather Concurrent execution worker_tasks The task , Then get callback_results and response.
stay start_master In the method , adopt join Method wait request_queue The tasks in the queue are all completed .
thus ,ruia The main logic in is finished , Not too complicated , It's learning Python asyncio Good reference code .
ruia Plug in system
ruia In fact, the so-called plug-in system is not designed , Instead, we use the concept of middleware as the so-called plug-in , And scrapy similar ,ruia The middleware will call before and after the request , It is also mentioned in the code above :_run_request_middleware Methods and _run_response_middleware Method .
For further understanding ruia Plug in for , Pull here ruia-ua Project code , The project is ruia It is used to replace the request header implemented by the author User-Agent Plug in for , Use ruia-ua as well as ruia To crawl HackerNews Website data , The code is as follows :
from ruia import AttrField, TextField, Item, Spider
from ruia_ua import middleware
class HackerNewsItem(Item):
target_item = TextField(css_select='tr.athing')
title = TextField(css_select='a.storylink')
url = AttrField(css_select='a.storylink', attr='href')
class HackerNewsSpider(Spider):
start_urls = ['https://news.ycombinator.com/news?p=1', 'https://news.ycombinator.com/news?p=2']
async def parse(self, response):
# Do something...
if __name__ == '__main__':
# Use ruia-ua plug-in unit
From the above code, we can see , The so-called use of ruia-ua plug-in unit , It's really just using ruia-ua Middleware provided .
Browsing ruia-ua Code , The key logic is as follows :
middleware = Middleware()
async def add_random_ua(spider_ins, request):
ua = await get_random_user_agent()
if request.headers:
request.headers.update({'User-Agent': ua})
request.headers = {
'User-Agent': ua
The above code is simple , Just use middleware.request The decorator handled add_random_ua Method , and add_random_ua Method implements substitution User-Agent The logic of .
middleware.request The decorator code is in ruia/middleware.py in , The relevant code is as follows :
def request(self, *args, **kwargs):
Define a Decorate to be called before a request.
eg: @middleware.request
middleware = args[0]
def register_middleware(*args, **kwargs):
return middleware
return register_middleware()
middleware.request The logic of the method is also very simple , That is to say request_middleware Add the corresponding method object to the list , And then in _run_request_middleware Method is called .
_run_request_middleware The code of the method is as follows :
async def _run_request_middleware(self, request: Request):
if self.middleware.request_middleware:
# Call middleware sequentially
for middleware in self.middleware.request_middleware:
if callable(middleware):
aws_middleware_func = middleware(self, request)
if isawaitable(aws_middleware_func):
await aws_middleware_func
f"<Middleware {middleware.__name__}: must be a coroutine function"
except Exception as e:
self.logger.exception(f"<Middleware {middleware.__name__}: {e}")
This is ruia The so-called plug-in , That is, use the decorator , Add the method you need to use to a list In the object , stay ruia In the process of requesting the website , Both before and after the request list Opportunities for method calls in objects , So as to realize the so-called plug-in .
in general ,ruia It's good Python Asyncio Learning materials , On the inside asyncio The usage of can be copied directly , As for its plug-in system , In fact, it's another way of saying middleware , There is still a gap from the real plug-in .
This article is here , I'm two liang , See you next article .
- 开发者分享 | HLS, 巧用AXI_master总线接口指令的定制并提升数据带宽-面积换速度...
- Pointer array parameter passing, pointer parameter passing
- Landingsite eband B1 smoke test case
- LandingSite eBand B1冒烟测试用例
- Daily book -- analyze the pain points of software automation from simple to deep
- Secondary development of ANSYS APDL: post processing uses command flow to analyze the result file
- 【AUTOSAR-DCM】-4.3-UDS $22和$2E服务如何读取和写入NVM数据
- U++ 学习笔记 ----松弛
- Sql service intercepts string
- Five message formats of OSPF
Daily book -- analyze the pain points of software automation from simple to deep
Official announcement! The golden decade of new programmers and developers was officially released
#include errors detected. Please update your includePath.
Ransack combined condition search implementation
Daily book - low code you must understand in the era of digital transformation
The book "new programmer 002" is officially on the market! From "new database era" to "software defined car"
Etcd raft protocol
Interpretation of CVPR paper | generation of high fidelity fashion models with weak supervision
[shutter] shutter custom fonts (download TTF fonts | pubspec.yaml configure font resources | synchronize resources | globally apply fonts | locally apply fonts)
Attack and defense world PWN question: Echo
Market Research - current market situation and future development trend of high tibial osteotomy plate
Pointer - function pointer
[shutter] shutter application life cycle (foreground state resumed | background state paused | inactive | component separation state detached)
Daily book CSO advanced road first exposed
《乔布斯传》英文原著重点词汇笔记(十)【 chapter eight】
[shutter] shutter custom fonts (download TTF fonts | pubspec.yaml configure font resources | synchronize resources | globally apply fonts | locally apply fonts)
: last child does not take effect
[C question set] of V
Pointer and string
A week's life
Market Research - current market situation and future development trend of handheld wound imaging equipment
:last-child 不生效解决
Market Research - current market situation and future development trend of third-party data platform
Riding the wind of "cloud native" and stepping on the wave of "digitalization", new programmer 003 starts pre-sale
20220702 how do programmers build knowledge systems?