当前位置:网站首页>Implementation of epoll+threadpool high concurrency network IO model

Implementation of epoll+threadpool high concurrency network IO model

2022-06-22 02:50:00 SS_ zico

Choice of network model

When multiple tasks arrive, they need to respond in time , And send the task to a specific processing thread , Complete the corresponding task . If you only use the traditional server model : Synchronous blocking 、 Single thread listen Polling and listening , The efficiency and concurrency often fail to meet the requirements . And we use epoll Would be a perfect solution to this problem
About epoll Please read :Linux Under the I/O Reuse and epoll Detailed explanation
Common network IO Model : Five kinds of networks IO Model

ThreadPool

When implementing high concurrency servers , Of course, you need to implement multi-threaded concurrent work to handle the connection of multiple clients . Then the methods are as follows

programme :

  1. Whenever a client sends a request , The server creates a thread to process the connection request of the client .

  2. Create several threads in advance , And let the thread block first . Wake up a thread whenever a request arrives , We're going to process it , Continue blocking after processing .

contrast :

  1. It's dynamic creation , When a task arrives , You need to create a thread , Destroy the thread after the task is processed .
  2. Instead, create it in advance , The thread does not need to be destroyed after processing the task , You can continue to block and wait for the next task .
    Obviously, the plan 1 The cost will be much greater than the scheme 2. From this point of view, we need to choose a scheme 2.

however 1 There are also advantages . That is, it can dynamically create , Make sure that each task is within the kernel capacity , It can be handled in time . So how can we combine the advantages of the two ?
That is in the scheme 2 Create another thread based on , Keep polling the number of listening tasks , To dynamically increase or decrease the number of tasks . This is called thread pool .

Realization

Producer consumer model

For detailed introduction and implementation, please refer to : Producer and consumer issues C Language implementation
First , We use the producer consumer model to implement the server model . Who will act as a model for consumers and producers , The following describes the general functions of producers and consumers in this module .

Producer function :

  1. Listen for connection requests from clients .
  2. Store client connection requests in a container .
  3. When the task is not empty, a non empty signal is sent to notify the consumer to receive .

Consumer function :

  1. Monitor the non null signal sent by the producer .
  2. When a signal is received , Take the task from the container and process it .
  3. Send a container not full signal to the producer .( Because if the task is too fast, the container may be full , The producer cannot continue to accept client requests )

therefore , Available from above
The producer is epoll Model , Consumers are thread pools .

Thread pool

1. Thread pool ( A structure ).
Since it's a thread pool , First, an identifier is required to indicate whether the thread pool has been started or closed . Second, we need a record of each thread tid Array of , And store the management thread id Of mtid Variable , To facilitate the management of threads for common i Thread recycling . The following structure members are required :

int thread_shutdown;//1 To close the thread pool ,0 Open thread pool for 
pthread_t *tids;//tid Array 
pthread_t mtid;// Manager thread id
int thread_max;// Maximum threads 
int thread_min;// Minimum number of threads ( Destruction requires )
int thread_alive;// Number of threads alive 
int thread_busy;// Number of threads processing tasks 

The producer needs to store the task in a container , If we consider the container separately , Then it will be very troublesome ( How to lock it , Function parameter passing problem ). So we might as well throw it directly into the structure ( I use a circular queue here , Of course, other containers can also ). Since we are multithreading to access this critical resource , There is no shortage of a lock . So you also need the following structure members :

task_t *task_queue;
int queue_cur;// Number of tasks currently stored 
int queue_max;// The maximum number of containers that a queue can hold 
int queue_front;// Queue header pointer ( Similar to top of stack )
int queue_rear;// Pointer to the end of the queue 

pthread_mutex_t plock;// lock 

We mentioned above that producers need to tell consumers about their future tasks , And consumers need to block waiting for this signal . So how do we do it ?, At this time, we can use conditional variables to meet .
Because on the one hand, conditional variables can make consumers block . Second, it allows producers to pass pthread_cond_signal() Function to notify the consumer to stop pending
( Watch out for the crowd The avoidance of this problem will be explained in detail later ).
So we need two conditional variables to implement :

  1. The producer tells the consumer that the container has a task to deal with .
  2. The consumer tells the producer that the container is not full enough to continue the task .
pthread_cond_t pnot_full;// The consumer tells the producer that the container is not full enough to continue the task 
pthread_cond_t pnot_empty;// The producer told the consumer that they could deal with it 

Finally, because the manager thread needs ordinary threads to destroy ( Only a few threads are working , And most of them are suspended ), Then we only need to set one int The variable indicates the number of threads that need to be destroyed

int thread_exitcode;

Mission

The task here is the above task_t, It's also a structure .
Because we need to popularize this model , Then the task can be changed . So we can encapsulate two members in this structure , A function pointer to represent the corresponding task . One more void* Pointer to store the parameters required by the function .

typedef struct{
    
void * (*job)(void*);
void *arg;
} task_t;

producer epoll

epoll There is only one main epfd File descriptor to indicate the root of the red black tree . We declare one in the main function ( If c++ If implemented, it can be encapsulated as a member variable of a class ).

typedef struct {
    
	void * (*job)(void *);
	void *arg;
}task_t;

typedef struct {
    
	int thread_shutdown;
	int thread_max;
	int thread_min;
	int thread_alive;
	int thread_busy;
	int thread_exitcode;

	task_t *task_queue;
	int queue_max;
	int queue_cur;
	int queue_front;
	int queue_rear;

	pthread_t *tids;
	pthread_t mtid;

	pthread_mutex_t plock;
	pthread_cond_t pnot_full;
	pthread_cond_t pnot_empty;
}pool_t;

Basic implementation logic

main function :
1.0. Yes pool The network end of (thread_pool_netinit)
1.0.1. thread_pool_netinit: Here is the basic socket communication server End , I'm going to initialize it serverfd Return to the main function
1.1. Yes task To initialize , about task Here is a simple conversion of lowercase letters to uppercase letters (thread_user_job), Parameter is one serverfd.
1.1.1. thread_user_job: because epoll The connection request of the client has been monitored , Then just right serverfd Just listen , The rest is to deal with the main logic ( The blogger's logic is to convert lowercase to uppercase )
1.2 . Second, yes pool To initialize (thread_pool_init)
1.2.1. thread_pool_init: This is mainly to allocate space for the thread pool ( such as :pool Structure ,tids Array ,task_queue Loop the queue malloc Allocate space ) Initialize each member (thread_shutdown This member must be initialized to 0, State the open state ), Finally, set up a specified number of consumer threads according to the parameters (thread_customer_job), And a manager thread (thread_manger_job). Finally, the address of the structure is returned to main function .
1.2.1.1. thread_customer_job: First, judge whether the route pool is open , Then get the lock first ( Because conditional variables themselves are critical resources ) then pthread_cond_wait(pnot_empty,&plock). Block waiting pnot_empty The arrival of conditional variables . Be careful : The above surprising crowd problem will be described here . because pthread_cond_waitAPI The manual describes , There will be several ( Pay attention to this number ) Block the thread waiting for the condition variable to wake up . So whenever a conditional variable arrives , In principle, only one thread is required , Here, several threads will be awakened , So it will lead to the group shock problem , Then we can add one here while The loop judges the data here thread_cur( Whether the current number of tasks is empty ), Only the first one who gets the semaphore will jump out while Loop then to the data thread_cur– Avoid other threads jumping out of the loop ,( But bloggers think , Jumping out while Loop and execute cur– Other threads may also jump out of the loop , Leave a hole here first ). After processing, the logic is to update the number of threads and the data corresponding to the circular queue . Finally, it will continue to block and wait for the condition variable .
1.2.1.2. :thread_manger_job: First judge whether the route pool is open . Preempt locks and update and retain data . Then judge the logical structure ( Blogger here is through the number of surviving threads thread_alive, Current number of tasks thread_cur And busy threads ,thread_busy, Perform a simple mathematical proportional operation to determine whether to destroy the thread ).
1.3. : Yes epoll To initialize (thread_epoll_init)
1.3.1. :thread_epoll_init Mainly epoll_create And will be serverfd adopt epoll_ctl Put it on the red black tree to monitor . The final will be epfd return ( It is worth noting that , Need to put epoll Change to ET Pattern Because I will solve the problem after adding tasks , There is no need to block and wait for resolution )
1.4. :epoll Start listening (thread_epll_start)
1.4.1. :thread_epoll_start: First, judge whether the route pool is open , then epoll_wait Blocking monitor serverfd, If the return value >0 Then there is a connection request (thread_epoll_addtask).
1.4.1.1. :thread_epoll_addtask. First, judge whether the route pool is closed , Then judge whether the container is full , If it is not full, add the task to the queue . If it's full pthread_cond_wait Block waiting pnot_full.
1.5.: Destroy the global lock

原网站

版权声明
本文为[SS_ zico]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/172/202206211652524059.html