当前位置:网站首页>C language principle explanation and code implementation of scalable / reduced thread pool
C language principle explanation and code implementation of scalable / reduced thread pool
2022-06-11 02:23:00 【Prison code department】
Extendable / Reduced thread pool C Language principle explanation and code implementation
One 、 Principle analysis of thread pool model
The client is equivalent to the producer , The server is equivalent to the consumer .
The thread pool is compared to the traditional multi-threaded model , The overhead of creating and destroying threads each time can be saved .
So abandon the mechanism of creating a thread for each request , It adopts the method of one-time batch production of threads .
Threads “ pool ”, This pool is a virtual concept , It refers to the accessible address where the generated thread is stored .
When there are no tasks , Threads in the thread pool are blocked (pthread_cond_wait() ) stay The task queue is not NULL On the condition variable of ;
When a client sends a task / request ,server You need to wake up the threads in the pool :
Two functions :
pthread_cond_signal(): Default wakeup ( Blocking on the corresponding condition variable ) One thread .pthread_cond_broadcast(): Will wake up all ( Blocking on the corresponding condition variable ) The thread of , That is, to produce The shock effect
After the awakened thread takes out the task from the task queue and completes processing , Return to the thread pool and wait for the task , Waiting to be awakened .
It can be used Unbuntu See an example of a thread pool :
Open Firefox , And in shell Input in
ps aux | grep firefox:
Take out the process ID, And then in shell Input in
ps -L [PID], You can see that although we only have one browser , But the background has started many threads at once , The thread pool is used here :NLWP Identify the number of threads , You can see that the number of threads is 38.
1. 1 multiple IO The comparison between transfer and thread pool
multiple IO Forwarding deals with how the client establishes a connection with the server and accepts requests ;
The thread pool processes the requests received by the server , How to deal with the problem .
Therefore, the thread pool can be connected to multiple threads IO Switching is used together to improve efficiency .
1.2 Parameters required for thread pool thread creation and maintenance
In the first place , Need to have Preliminary number of threads , That's the initial value , Such as :thread_init_num = 38;
But obviously 38 Threads are not enough , Because there are peak visits ; When the visit peak comes , It is bound to increase the number of threads , But you can't add threads indefinitely , because :① The number of threads that a process can open is limited ② Although threads and processes share the address space , But the stack should be independent ;
Therefore, a value is required to limit The maximum number of threads opened , Such as :thread_max_num = 500;
“win The system default stack space is 1M(1MiB) size , and Linux By default, the common stack space is 8M or 10M.
Besides , In order to judge the best time for the expansion and slimming of the thread pool , Two more values are required :
- Record Current number of busy threads Value :
thread_busy_num; - Record The current total of / Number of threads alive Value :
thread_live_num;
Determine whether to expand the capacity according to the ratio of these two values or Slimming ;
One is required for capacity expansion Expansion step :thread_step, That is, how many threads are resized at a time .
Since there is an access peak, there must be an access trough , If the server is still maintaining the peak number of threads , Then it will bring unnecessary resource overhead ( Especially memory overhead ), So you need to lose weight , The slimming step can be the same as the expansion step , It can also be set separately .
About thread pool expansion or The act of slimming down , A key question is who will calculate thread_busy_num and thread_live_num The ratio of the ,
Give Way server It is unreasonable for the main process to do this ,server The main task of the main process is to listen for connections , The purpose of using thread pool is to prevent server Main process distraction , The work of processing data ” outsource “ To thread pool , So it should be There is an additional manager thread , To maintain the thread pool .
Two 、 Thread pool description structure
typedef struct {
void *(*function)(void *); /* A function pointer , Callback function */
void *arg; /* The parameters of the above function */
} threadpool_task_t; /* Each sub thread task structure */
/* Describe thread pool related information */
struct threadpool_t {
pthread_mutex_t lock; /* It is used to lock the structure */
pthread_mutex_t thread_counter; /* A lock that records the number of busy threads -- busy_thr_num */
pthread_cond_t queue_not_full; /* When the task queue is full , Add task thread blocking , Wait for this conditional variable */
pthread_cond_t queue_not_empty; /* When the task queue is not empty , Notify the thread waiting for the task */
pthread_t *threads; /* For each thread in the thread pool tid, Array structure */
pthread_t adjust_tid; /* Memory management thread tid */
threadpool_task_t *task_queue; /* Task queue ( Array first address ) */
int min_thr_num; /* Minimum number of threads in thread pool */
int max_thr_num; /* Maximum number of threads in the thread pool */
int live_thr_num; /* The current number of surviving threads */
int busy_thr_num; /* Number of busy threads */
int wait_exit_thr_num; /* Number of threads to destroy */
int queue_front; /* task_queue Team leader subscript */
int queue_rear; /* task_queue End of team subscript */
int queue_size; /* task_queue The actual number of tasks in the team */
int queue_max_size; /* task_queue The maximum number of tasks that a queue can hold */
int shutdown; /* Sign a , Thread pool usage state ,true or false */
};
The actually used thread pool can increase or decrease the related attributes of the structure according to the business requirements .
3、 ... and 、 Thread pool main function architecture
int main(void)
{
/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
threadpool_t *thp = threadpool_create(3,100,100); /* Creating a thread pool , The smallest in the pool 3 Threads , Maximum 100, The queue is the largest 100*/
/* The thread pool structure is shared by all threads */
printf("pool inited");
//int *num = (int *)malloc(sizeof(int)*20);
int num[20], i;
for (i = 0; i < 20; i++) {
num[i] = i;
printf("add task %d\n",i);
/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */
threadpool_add(thp, process, (void*)&num[i]); /* Add tasks to the thread pool */
}
sleep(10); /* Wait for the sub thread to complete the task , Simulate processing of other tasks cpu Time */
threadpool_destroy(thp); /* Thread pool destruction */
return 0;
}
main() Function architecture :
- Creating a thread pool
- Simulate adding tasks to the thread pool … Handle tasks with callbacks
- Destroy thread pool
Four 、 Thread pool creation
//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL; /* Thread pool Structure */
/* Apply for thread pool structure space */
do {
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
printf("malloc threadpool fail");
break; /* Jump out of do while*/
}
pool->min_thr_num = min_thr_num; /* Minimum number of threads */
pool->max_thr_num = max_thr_num; /* Maximum number of threads */
pool->busy_thr_num = 0; /* Current number of busy threads */
pool->live_thr_num = min_thr_num; /* Number of threads alive initial value = Minimum number of threads */
pool->wait_exit_thr_num = 0; /* Number of threads waiting to be destroyed */
pool->queue_size = 0; /* Yes 0 A product , The actual number of tasks in the queue */
pool->queue_max_size = queue_max_size; /* Maximum number of task queues */
pool->queue_front = 0; /* Task queue header pointer */
pool->queue_rear = 0; /* Task multi queue tail pointer */
pool->shutdown = false; /* Do not close the thread pool */
/* According to the maximum number of threads , Make room for the worker thread array , And clear it */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL) {
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* Make room for the task queue */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL) {
printf("malloc task_queue fail");
break;
}
/* Initialize mutex 、 Condition variables, ( The default attribute of the lock is passed NULL) */
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond fail");
break;
}
/* start-up min_thr_num individual work thread */
for (i = 0; i < min_thr_num; i++) {
/* threadpool_thread Is the callback function address of the thread */
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool Points to the current thread pool , As an argument to the thread callback function void* args*/
/* That is, when each thread starts, the thread pool attribute will be passed to the thread callback function as a parameter , Be careful ! It's the address ! */
printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
}
/* establish 1 A manager thread ,adjust_thread Is the callback function of the manager thread . */
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* Create the manager thread */
return pool;
} while (0);
threadpool_free(pool); /* When the previous code call fails , Release poll Storage space */
return NULL;
}
pthreadpool_create() Function architecture :
- Create thread pool structure pointer
- Initialize the thread pool structure (N Member variables )
- establish N Task threads
- establish 1 A manager thread
- When the failure , Destroy all the space opened up
5、 ... and 、 Add tasks to the thread pool
First, I will briefly introduce how to use conditional variables and mutexes together :
pthread_cond_XXX() Functions and mutexes mutex With :
pthread_cond_wait()
Used to block the current thread , Wait for another thread to usepthread_cond_signal()orpthread_cond_broadcastTo wake it uppthread_cond_wait()Must be withpthread_mutexMatching use of .pthread_cond_wait()As soon as the function enterswaitThe status will automaticallyrelease mutex( Unlock ). When other threads pass throughpthread_cond_signal()orpthread_cond_broadcast, Wake up the thread , sendpthread_cond_wait()adopt ( return ) when , The thread automatically gets themutex( locked ).pthread_cond_signalThe function sends a signal to another thread that is blocking and waiting , Get it out of the jam , Carry on . If no thread is in a blocking wait state ,pthread_cond_signalAnd will return successfully .
Usepthread_cond_signalUsually not “ Panic group phenomenon ” produce , He signals only one thread at most . If there are multiple threads blocking waiting for this condition variable , Then it determines which thread receives the signal to continue execution according to the priority of each waiting thread . If each thread has the same priority , Which thread gets the signal is determined according to the waiting time . But anyway, onepthread_cond_signalCall to send a message at most once .- however
pthread_cond_signalOn multiprocessors, it is possible to wake up multiple threads at the same time , When you can only have one thread handle a task , Other awakened threads need to continuewait, And specification requirementspthread_cond_signalWake up at least onepthread_cond_waitOn the thread , In fact, some implementations wake up multiple threads on a single processor for simplicity . in addition , Some applications , Such as thread pool ,pthread_cond_broadcastWake up all threads , But we usually only need some threads to perform tasks , So other threads need to continue wait. So it is highly recommended topthread_cond_wait()UsewhileLoop to make conditional judgment ( The following task thread callback function has the following expression ).
Add tasks to the thread pool
/* Threads in the thread pool , Simulation processing business */
void *process(void *arg)
{
printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);
sleep(1); // simulation Work
printf("task %d is end\n",(int)arg);
return NULL;
}
/* To the thread pool Add a task */
// call : threadpool_add(thp, process, (void*)&num[i]); /* Add tasks to the thread pool process: A lowercase letter ----> Capitalization */
/* Write the task to the task queue in this function */
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
/* Lock the thread pool structure itself first */
pthread_mutex_lock(&(pool->lock));
/* == It's true , The queue is full , transfer wait Blocking */
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if (pool->shutdown) {
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/* Empty The worker thread Called callback function Parameters of arg -- At the end of the task queue */
if (pool->task_queue[pool->queue_rear].arg != NULL) {
pool->task_queue[pool->queue_rear].arg = NULL;
}
/* Add tasks to the task queue */
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* The pointer at the end of the team moves , Simulate the ring */
pool->queue_size++;
/* After adding tasks , The queue is not empty , Wake up the thread pool The thread waiting to process the task */
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
Specific businesses can be found in void *process(void *arg) Function .
6、 ... and 、 Child thread callback function
6.1 Manage thread callback functions
/* Manage threads */
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown) {
sleep(DEFAULT_TIME); /* The operation of capacity expansion and reduction is generally not so urgent , There is no need to keep the administrative thread involved cpu The fight for , Wake up regularly to manage the thread pool */
pthread_mutex_lock(&(pool->lock)); /* Lock the thread pool */
int queue_size = pool->queue_size; /* Number of tasks */
int live_thr_num = pool->live_thr_num; /* Number of surviving threads */
pthread_mutex_unlock(&(pool->lock)); /* Unlock the route pool */
pthread_mutex_lock(&(pool->thread_counter)); /* Number of busy threads locked */
int busy_thr_num = pool->busy_thr_num; /* Number of busy threads */
pthread_mutex_unlock(&(pool->thread_counter)); /* Unlock busy threads */
/* 1. Create a new thread Algorithm : The number of tasks is greater than the minimum number of thread pools , And the number of surviving threads is less than the maximum number of threads Such as :30>=10 && 40<100*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
pthread_mutex_lock(&(pool->lock));
int add = 0;
/* One time increase DEFAULT_THREAD Threads */
for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++) {
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
/* 2. Destroy extra idle threads Algorithm : Busy threads X2 Less than Number of threads alive And Number of threads alive Greater than The minimum number of threads */
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
/* Destroy once DEFAULT_THREAD Threads , Random 10 One is enough */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* Number of threads to destroy Set to 10 */
pthread_mutex_unlock(&(pool->lock));
for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
/* Notify idle threads , They'll terminate themselves */
/* The so-called notification of idle threads , Is to wake up blocked in queue_not_empty Thread on condition variable */
/* pthread_cond_signal() The function of is to wake up one process at a time */
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
adjust_thread() Function architecture :
Receive callback parameters void *arg --> pool Structure
loop DEFAULT_TIME Wake up once
The operation of capacity expansion and reduction is generally not so urgent , There is no need to keep the administrative thread involved cpu The fight for , Wake up regularly to manage the thread pool .
Use with mutex , obtain 【 Number of tasks 】、【 Number of surviving threads 】 and 【 Number of busy threads 】
According to the established algorithm , Use the above three variables , Calculate and judge whether to expand or reduce capacity ( That is, whether it should be created or Destroy the thread now called the step size specified in this ).
6.2 Task thread callback function
/* Each worker thread in the thread pool */
void *threadpool_thread(void *threadpool)/* The parameter is a pointer to the thread pool structure */
{
threadpool_t *pool = (threadpool_t *)threadpool; /* Restore the outlet pool structure pool */
threadpool_task_t task; /* Define a task structure */
while (true) {
/* Lock must be taken to wait on conditional variable */
/* Just created a thread , Waiting for a task in the queue , Otherwise, the block will wait until there are tasks in the queue, and then wake up to receive tasks */
/* Access a common thread pool structure in a thread pool Put on the mutex before */
pthread_mutex_lock(&(pool->lock));
/*queue_size == 0 Explain that there is no mission , transfer wait Blocking on conditional variables , If there is a mission , Skip this while*/
while ((pool->queue_size == 0) && (!pool->shutdown)) {
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
/* Block wait condition variable queue_not_empty And untie lock The mutex , Until the condition variable is satisfied, then lock Lock and execute */
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
/* Clear a specified number of idle threads , If the number of threads to end is greater than 0, End thread */
/* wait_exit_thr_num Indicates the thread to destroy */
if (pool->wait_exit_thr_num > 0) {
pool->wait_exit_thr_num--;
/* If the number of threads in the thread pool is greater than the minimum, the current thread can be terminated */
/* Pay attention to if It was in the last if Inside */
/* live_thr_num Indicates the number of threads currently alive */
if (pool->live_thr_num > pool->min_thr_num) {
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
/* Exit after unlocking */
/* Use pthread_mutex Keep the particle size as small as possible ; * Particular attention while, If unlocked, write in while The end of , The lock is written on while Top of , * Then it is almost impossible for other threads to grab the lock */
pthread_mutex_unlock(&(pool->lock));
/* be in threadpool_thread In the callback function , If here exit Then the code will not execute */
pthread_exit(NULL);
}
}
}
/* If you specify true, To close every thread in the thread pool , Self exit processing --- Destroy thread pool */
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_detach(pthread_self()); /* Separate yourself , Resources are automatically recycled , Mainly recycling resources */
pthread_exit(NULL); /* detach After that, the thread did not die , It also needs to be pthread_exit() Make the thread end itself */
}
/* Get the task from the task queue , It's an out of line operation */
task.function = pool->task_queue[pool->queue_front].function; // Get the callback function
task.arg = pool->task_queue[pool->queue_front].arg; // Get back the arguments of the function
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* Out of the team , Simulate a circular queue */
pool->queue_size--;
/* New tasks can be added to the notification */
/* pthread_cond_broadcast(): Wake up the All threads */
pthread_cond_broadcast(&(pool->queue_not_full));
/* After the task is taken out, the temporary variable task In the following , Immediately Mutex of thread pool lock Release */
pthread_mutex_unlock(&(pool->lock));
/* thread_counter It is dedicated to managing the number of busy threads (busy_thr_num) The mutex of */
pthread_mutex_lock(&(pool->thread_counter)); /* Busy state thread number variable trivial */
pool->busy_thr_num++; /* Number of busy threads +1*/
pthread_mutex_unlock(&(pool->thread_counter));
/* Perform tasks */
printf("thread 0x%x start working\n", (unsigned int)pthread_self());
/*threadpool_thread It also belongs to a callback function , This wave belongs to a callback function calling another callback function */
(*(task.function))(task.arg); /* Execute callback function task */
//task.function(task.arg); /* Execute callback function task */
/* End of task processing */
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
/* Get rid of a task , Number of busy States, number of threads -1*/
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; /* Get rid of a task , Number of busy States, number of threads -1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
threadpool_thread() Function architecture :
Accept thread callback parameters void *arg --> pool Structure
Lock :lock The entire thread pool structure
Judge the conditional variable queue_not_empty :
3.1 If the task queue is empty :
Call pthread_cond_wait Block the thread itself wait
Blocking determines whether the thread of the condition variable needs to exit after it is awakened ( Cooperate with the capacity reduction operation of the management thread , However, the number of surviving threads should not be less than the minimum number of threads )
Determine whether to destroy the thread pool , if , The thread exits by itself
Get the task from the task queue , It's an out of line operation
call pthread_cond_broadcast() The notification is blocked at pool->queue_not_full New tasks can be added to threads on
After the task is taken out, the temporary variable task In the following , Immediately set the mutex of the thread pool lock Release
Cooperate with mutex pool->thread_counter Make the number of busy threads in the thread pool variable +1
Perform tasks
/* threadpool_thread It also belongs to a callback function , This wave belongs to a callback function calling another callback function */ (*(task.function))(task.arg); /* Execute callback function task */Get rid of a task , Cooperate with mutex pool->thread_counter Make the number of busy state threads -1
7、 ... and 、 Source code
7.1 threadpool.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include "threadpool.h"
#define DEFAULT_TIME 10 /*10s Test once */
#define MIN_WAIT_TASK_NUM 10 /* If queue_size > MIN_WAIT_TASK_NUM Add a new thread to the thread pool */
#define DEFAULT_THREAD_VARY 10 /* The number of threads created and destroyed each time */
#define true 1
#define false 0
typedef struct {
void *(*function)(void *); /* A function pointer , Callback function */
void *arg; /* The parameters of the above function */
} threadpool_task_t; /* Each sub thread task structure */
/* Describe thread pool related information */
struct threadpool_t {
pthread_mutex_t lock; /* It is used to lock the structure */
pthread_mutex_t thread_counter; /* Record the number of busy threads de Trivial -- busy_thr_num */
pthread_cond_t queue_not_full; /* When the task queue is full , Add task thread blocking , Wait for this conditional variable */
pthread_cond_t queue_not_empty; /* When the task queue is not empty , Notify the thread waiting for the task */
pthread_t *threads; /* For each thread in the thread pool tid. Array */
pthread_t adjust_tid; /* Memory management thread tid */
threadpool_task_t *task_queue; /* Task queue ( Array first address ) */
int min_thr_num; /* Minimum number of threads in thread pool */
int max_thr_num; /* Maximum number of threads in the thread pool */
int live_thr_num; /* The current number of surviving threads */
int busy_thr_num; /* Number of busy threads */
int wait_exit_thr_num; /* Number of threads to destroy */
int queue_front; /* task_queue Team leader subscript */
int queue_rear; /* task_queue End of team subscript */
int queue_size; /* task_queue The actual number of tasks in the team */
int queue_max_size; /* task_queue The maximum number of tasks that a queue can hold */
int shutdown; /* Sign a , Thread pool usage state ,true or false */
};
void *threadpool_thread(void *threadpool);
void *adjust_thread(void *threadpool);
int is_thread_alive(pthread_t tid);
int threadpool_free(threadpool_t *pool);
//threadpool_create(3,100,100);
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL; /* Thread pool Structure */
do {
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
printf("malloc threadpool fail");
break; /* Jump out of do while*/
}
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num; /* Number of threads alive initial value = Minimum number of threads */
pool->wait_exit_thr_num = 0;
pool->queue_size = 0; /* Yes 0 A product */
pool->queue_max_size = queue_max_size; /* Maximum number of task queues */
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false; /* Do not close the thread pool */
/* According to the maximum number of threads , Make room for the worker thread array , And clear it */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL) {
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* to Task queue Open up space */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL) {
printf("malloc task_queue fail");
break;
}
/* Initialize mutex 、 Condition variables, */
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond fail");
break;
}
/* start-up min_thr_num individual work thread */
for (i = 0; i < min_thr_num; i++) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); /*pool Points to the current thread pool , As an argument to the thread callback function void* args*/
printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
}
/* establish 1 A manager thread . */
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); /* Create the manager thread */
return pool;
} while (0);
threadpool_free(pool); /* When the previous code call fails , Release poll Storage space */
return NULL;
}
/* To the thread pool Add a task */
//threadpool_add(thp, process, (void*)&num[i]); /* Add tasks to the thread pool process: A lowercase letter ----> Capitalization */
/* Write the task to the task queue in this function */
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
{
/* Lock the thread pool structure itself first */
pthread_mutex_lock(&(pool->lock));
/* == It's true , The queue is full , transfer wait Blocking */
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if (pool->shutdown) {
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/* Empty The worker thread Called callback function Parameters of arg -- At the end of the task queue */
if (pool->task_queue[pool->queue_rear].arg != NULL) {
pool->task_queue[pool->queue_rear].arg = NULL;
}
/* Add tasks to the task queue */
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* The pointer at the end of the team moves , Simulate the ring */
pool->queue_size++;
/* After adding tasks , The queue is not empty , Wake up the thread pool The thread waiting to process the task */
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
/* Each worker thread in the thread pool */
void *threadpool_thread(void *threadpool)/* The parameter is a pointer to the thread pool structure */
{
threadpool_t *pool = (threadpool_t *)threadpool; /* Restore the outlet pool structure pool */
threadpool_task_t task;
while (true) {
/* Lock must be taken to wait on conditional variable */
/* Just created a thread , Waiting for a task in the queue , Otherwise, the block will wait until there are tasks in the queue, and then wake up to receive tasks */
/* Access a common thread pool structure in a thread pool Put on the mutex before */
pthread_mutex_lock(&(pool->lock));
/*queue_size == 0 Explain that there is no mission , transfer wait Blocking on conditional variables , If there is a mission , Skip this while*/
while ((pool->queue_size == 0) && (!pool->shutdown)) {
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
/* Block wait condition variable queue_not_empty And untie lock The mutex , Until the condition variable is satisfied, then lock Lock and execute */
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
/* Clear a specified number of idle threads , If the number of threads to end is greater than 0, End thread */
/* wait_exit_thr_num Indicates the thread to destroy */
if (pool->wait_exit_thr_num > 0) {
pool->wait_exit_thr_num--;
/* If the number of threads in the thread pool is greater than the minimum, the current thread can be terminated */
/* Pay attention to if It was in the last if Inside */
/* live_thr_num Indicates the number of threads currently alive */
if (pool->live_thr_num > pool->min_thr_num) {
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
/* Exit after unlocking */
/* Use pthread_mutex Keep the particle size as small as possible ; * Particular attention while, If unlocked, write in while The end of , The lock is written on while Top of , * Then it is almost impossible for other threads to grab the lock */
pthread_mutex_unlock(&(pool->lock));
/* be in threadpool_thread In the callback function , If here exit Then the code will not execute */
pthread_exit(NULL);
}
}
}
/* If you specify true, To close every thread in the thread pool , Self exit processing --- Destroy thread pool */
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_detach(pthread_self()); /* Separate yourself , Resources are automatically recycled , Mainly recycling resources */
pthread_exit(NULL); /* detach After that, the thread did not die , It also needs to be pthread_exit() Make the thread end itself */
}
/* Get the task from the task queue , It's an out of line operation */
task.function = pool->task_queue[pool->queue_front].function; // Get the callback function
task.arg = pool->task_queue[pool->queue_front].arg; // Get back the arguments of the function
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* Out of the team , Simulate a circular queue */
pool->queue_size--;
/* New tasks can be added to the notification */
/* pthread_cond_broadcast(): Wake up the All threads */
pthread_cond_broadcast(&(pool->queue_not_full));
/* After the task is taken out, the temporary variable task In the following , Immediately Mutex of thread pool lock Release */
pthread_mutex_unlock(&(pool->lock));
/* thread_counter It is dedicated to managing the number of busy threads (busy_thr_num) The mutex of */
pthread_mutex_lock(&(pool->thread_counter)); /* Busy state thread number variable trivial */
pool->busy_thr_num++; /* Number of busy threads +1*/
pthread_mutex_unlock(&(pool->thread_counter));
/* Perform tasks */
printf("thread 0x%x start working\n", (unsigned int)pthread_self());
/*threadpool_thread It also belongs to a callback function , This wave belongs to a callback function calling another callback function */
(*(task.function))(task.arg); /* Execute callback function task */
//task.function(task.arg); /* Execute callback function task */
/* End of task processing */
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
/* Get rid of a task , Number of busy States, number of threads -1*/
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; /* Get rid of a task , Number of busy States, number of threads -1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
/* Manage threads */
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown) {
sleep(DEFAULT_TIME); /* timing Management of thread pool */
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; /* Focus on Number of tasks */
int live_thr_num = pool->live_thr_num; /* Survive Number of threads */
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; /* Number of busy threads */
pthread_mutex_unlock(&(pool->thread_counter));
/* 1. Create a new thread Algorithm : The number of tasks is greater than the minimum number of thread pools , And the number of surviving threads is less than the maximum number of threads Such as :30>=10 && 40<100*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
pthread_mutex_lock(&(pool->lock));
int add = 0;
/* One time increase DEFAULT_THREAD Threads */
for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++) {
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
/* 2. Destroy extra idle threads Algorithm : Busy threads X2 Less than Number of threads alive And Number of threads alive Greater than The minimum number of threads */
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) {
/* Destroy once DEFAULT_THREAD Threads , Random 10 One is enough */
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* Number of threads to destroy Set to 10 */
pthread_mutex_unlock(&(pool->lock));
for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
/* Notify idle threads , They'll terminate themselves */
/* The so-called notification of idle threads , Is to wake up blocked in queue_not_empty Thread on condition variable */
/* pthread_cond_signal() The function of is to wake up one process at a time */
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
int threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL) {
return -1;
}
pool->shutdown = true;
/* First destroy the management thread */
pthread_join(pool->adjust_tid, NULL);
for (i = 0; i < pool->live_thr_num; i++) {
/* Notify all idle threads */
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for (i = 0; i < pool->live_thr_num; i++) {
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}
int threadpool_free(threadpool_t *pool)
{
if (pool == NULL) {
return -1;
}
if (pool->task_queue) {
free(pool->task_queue);
}
if (pool->threads) {
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
int threadpool_all_threadnum(threadpool_t *pool)
{
int all_threadnum = -1; // Bus number
pthread_mutex_lock(&(pool->lock));
all_threadnum = pool->live_thr_num; // Number of surviving threads
pthread_mutex_unlock(&(pool->lock));
return all_threadnum;
}
int threadpool_busy_threadnum(threadpool_t *pool)
{
int busy_threadnum = -1; // Number of busy threads
pthread_mutex_lock(&(pool->thread_counter));
busy_threadnum = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->thread_counter));
return busy_threadnum;
}
int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); // Hair 0 Sign no. , Test whether the thread is alive
if (kill_rc == ESRCH) {
return false;
}
return true;
}
/* test */
#if 1
/* Threads in the thread pool , Simulation processing business */
void *process(void *arg)
{
printf("thread 0x%x working on task %d\n ",(unsigned int)pthread_self(),(int)arg);
sleep(1); // simulation Work
printf("task %d is end\n",(int)arg);
return NULL;
}
int main(void)
{
/*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
threadpool_t *thp = threadpool_create(3,100,100); /* Creating a thread pool , The smallest in the pool 3 Threads , Maximum 100, The queue is the largest 100*/
printf("pool inited");
//int *num = (int *)malloc(sizeof(int)*20);
int num[20], i;
for (i = 0; i < 20; i++) {
num[i] = i;
printf("add task %d\n",i);
/*int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) */
threadpool_add(thp, process, (void*)&num[i]); /* Add tasks to the thread pool */
}
sleep(10); /* Wait for the sub thread to complete the task */
threadpool_destroy(thp);
return 0;
}
#endif
7.2 threadpool.h
#ifndef __THREADPOOL_H_
#define __THREADPOOL_H_
typedef struct threadpool_t threadpool_t;
/** * @function threadpool_create * @descCreates a threadpool_t object. * @param thr_num thread num * @param max_thr_num max thread size * @param queue_max_size size of the queue. * @return a newly created thread pool or NULL */
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
/** * @function threadpool_add * @desc add a new task in the queue of a thread pool * @param pool Thread pool to which add the task. * @param function Pointer to the function that will perform the task. * @param argument Argument to be passed to the function. * @return 0 if all goes well,else -1 */
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
/** * @function threadpool_destroy * @desc Stops and destroys a thread pool. * @param pool Thread pool to destroy. * @return 0 if destory success else -1 */
int threadpool_destroy(threadpool_t *pool);
/** * @desc get the thread num * @pool pool threadpool * @return # of the thread */
int threadpool_all_threadnum(threadpool_t *pool);
/** * desc get the busy thread num * @param pool threadpool * return # of the busy thread */
int threadpool_busy_threadnum(threadpool_t *pool);
#endif
边栏推荐
- Day code 300 lines learning notes day 22
- 1031. 两个非重叠子数组的最大和
- Complete tutorial on obtaining voltage from QGC ground station (APM voltage cannot be obtained from QGC)
- 10 years of domestic milk powder counter attack: post-90s nannies and dads help new domestic products counter attack foreign brands
- Knowledge competition of safety production month -- how much do you know about new safety law
- QT database learning notes (I) basic concepts of database
- [untitled]
- 腾讯面试官曰Mysql架构的内部模块索引原理及性能优化思路谁会?
- What should be paid attention to in PMP registration? Special reminder
- C DataGrid binding data
猜你喜欢

腾讯测试开发岗面试上机编程题

环糊精金属有机骨架(β-CD-MOF)装载二巯丁二酸/大黄素/槲皮素/三氯蔗糖/二氟尼柳/奥美拉唑(OME)

Secret

2022 safety officer-b certificate examination question bank and answers

Colab reported an error: importerror: cannot import name '_ check_ savefig_ extra_ args‘ from ‘matplotlib. backend_ bases‘

QT database learning notes (I) basic concepts of database
![[3.delphi common components] 4 Select class component](/img/36/e78ee0c082bc36be6dbc49d0e12521.jpg)
[3.delphi common components] 4 Select class component

腾讯面试官曰Mysql架构的内部模块索引原理及性能优化思路谁会?

Polynomial multiplication

2022 safety officer-a certificate special operation certificate examination question bank and simulation examination
随机推荐
Number of different paths
Md61 plan independent demand import Bapi [by daily dimension / dynamic template / dynamic field]
Me11/me12 purchase information record and condition record creation and update bapi:me_ INFORECORD_ MAINTAIN_ MULTI
当逻辑删除遇上唯一索引,遇到的问题和解决方案?
Find - (block find)
Software testing interview reply: the technical side is not difficult for me, but the HR side is a hang up
CRS-5017
Binary tree sequence traversal
The annual salary of testers in large factories ranges from 300000 to 8K a month. Roast complained that the salary was too low, but he was ridiculed by netizens?
Byte beating client R & D Intern Tiktok side
CRS-4544 & ORA-09925
Bingbing learning notes: find the greatest common divisor and the least common multiple. Complex version reverse string
Union find
cannot import name ‘dtensor‘ from ‘tensorflow.compat.v2.experimental‘
软件测试面试复盘:技术面没有难倒我,hr面却是一把挂
Koa2 learning notes
20n10-asemi medium and small power MOS transistor 20n10
Introduction and practice of QT tcp/udp network protocol (I) TCP communication
Analysis of the difficulties in the architecture design of massive chat messages in the live broadcast room
CRS-4544 & ORA-09925

