当前位置:网站首页>Introduction to thread pool
Introduction to thread pool
2022-07-04 00:26:00 【Hey, poof】
Thread pool is a lower level component , Usually we are in the process of development , It may be more about the interfaces that call these components . For the server , The underlying component code is very general . Thread pool is mainly used to write logs 、 The result of the calculation is 、 Adding, deleting, modifying and checking .
Thread pool is mainly composed of task queue 、 The execution queue and pool management components constitute , Here is the code .
#define LL_ADD(item, list) do \
{
\
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item, list) do \
{
\
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
typedef struct NWORKER
{
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
typedef struct NJOB
{
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
typedef struct NWORKQUEUE
{
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr)
{
nWorker *worker = (nWorker*)ptr;
while (1)
{
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL)
{
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate)
{
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL)
{
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers)
{
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++)
{
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL)
{
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret)
{
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue)
{
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job)
{
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
There are several points in the code that need to be explained , First, macro functions are used for the operation of linked lists , Because this can avoid the problem of defining multiple functions due to different data types , The second is the initialization method of mutexes and conditional variables , The static initialization method used here , It avoids the problem that variables cannot be statically initialized and assigned after being defined .
A little more verbose here , The processing flow of the server , One is testing IO Is the event ready , The second is right IO Read and write , Third, analyze and operate the data . These three steps correspond to epoll、recv()/send()、parse. therefore , There are three ways for servers , One is the single thread approach , One thread handles the above three steps , Second, put IO Both read / write and parsing are processed in the process pool , Third, only put the parsing process in the process pool , Reading and writing IO Or is it handled by a single thread . Of the above three approaches , The second method is that the server responds the fastest , But there's a problem , That is, multiple threads will share one fd. Imagine such a situation , There are two tasks in the thread pool , All operations are the same fd, These two tasks are assigned to two threads , If these two threads send data at the same time , There will be dirty data , Or is it , A thread is sending and receiving data , Another thread called close(), Cause crash . The second method applies to fd Scenarios with long operation time , And the third method is applicable to the situation where the business processing time is long .
边栏推荐
- It is the most difficult to teach AI to play iron fist frame by frame. Now arcade game lovers have something
- Gossip about redis source code 77
- Social network analysis -social network analysis
- Data storage - interview questions
- Analysis on the scale of China's smart health industry and prediction report on the investment trend of the 14th five year plan 2022-2028 Edition
- A Kuan food rushed to the Shenzhen Stock Exchange: with annual sales of 1.1 billion, Hillhouse and Maotai CCB are shareholders
- URL (data:image/png; Base64, ivborw0k... Use case
- Tencent interview: can you pour water?
- How will the complete NFT platform work in 2022? How about its core functions and online time?
- [cloud native topic -48]:kubesphere cloud Governance - operation - overview of multi tenant concept
猜你喜欢
Kubedl hostnetwork: accelerating the efficiency of distributed training communication
P3371 [template] single source shortest path (weakened version)
[PHP basics] session basic knowledge, application case code and attack and defense
Report on prospects and future investment recommendations of China's assisted reproductive industry, 2022-2028 Edition
Bodong medical sprint Hong Kong stocks: a 9-month loss of 200million Hillhouse and Philips are shareholders
Shell script three swordsman sed
Kubedl hostnetwork: accelerating the efficiency of distributed training communication
(Video + graphics and text) introduction to machine learning series - Chapter 4 naive Bayes
Is user authentication really simple
[about text classification trick] things you don't know
随机推荐
Regular expressions and text processors for shell programming
[C language] break and continue in switch statement
Gossip about redis source code 83
It is the most difficult to teach AI to play iron fist frame by frame. Now arcade game lovers have something
Several ways to set up a blog locally [attach relevant software download links]
[GNN] hard core! This paper combs the classical graph network model
Docking Alipay process [pay in person, QR code Payment]
What is the Valentine's Day gift given by the operator to the product?
[leetcode] interview question 17.08 Circus tower
What are the application fields of digital twins in industry?
Collation of the most complete Chinese naturallanguageprocessing data sets, platforms and tools
[MySQL] classification of multi table queries
[Mongodb] 2. Use mongodb --------- use compass
Anomalies seen during the interview
Recommendation of knowledge base management system
Deep learning ----- using NN, CNN, RNN neural network to realize MNIST data set processing
Make small tip
Idea set class header comments
On the day when 28K joined Huawei testing post, I cried: everything I have done in these five months is worth it
STM32 GPIO CSDN creative punch in