当前位置:网站首页>Introduction to thread pool
Introduction to thread pool
2022-07-04 00:26:00 【Hey, poof】
Thread pool is a lower level component , Usually we are in the process of development , It may be more about the interfaces that call these components . For the server , The underlying component code is very general . Thread pool is mainly used to write logs 、 The result of the calculation is 、 Adding, deleting, modifying and checking .
Thread pool is mainly composed of task queue 、 The execution queue and pool management components constitute , Here is the code .
#define LL_ADD(item, list) do \
{
\
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item, list) do \
{
\
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
typedef struct NWORKER
{
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
typedef struct NJOB
{
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
typedef struct NWORKQUEUE
{
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr)
{
nWorker *worker = (nWorker*)ptr;
while (1)
{
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL)
{
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate)
{
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL)
{
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers)
{
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++)
{
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL)
{
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret)
{
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue)
{
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job)
{
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
There are several points in the code that need to be explained , First, macro functions are used for the operation of linked lists , Because this can avoid the problem of defining multiple functions due to different data types , The second is the initialization method of mutexes and conditional variables , The static initialization method used here , It avoids the problem that variables cannot be statically initialized and assigned after being defined .
A little more verbose here , The processing flow of the server , One is testing IO Is the event ready , The second is right IO Read and write , Third, analyze and operate the data . These three steps correspond to epoll、recv()/send()、parse. therefore , There are three ways for servers , One is the single thread approach , One thread handles the above three steps , Second, put IO Both read / write and parsing are processed in the process pool , Third, only put the parsing process in the process pool , Reading and writing IO Or is it handled by a single thread . Of the above three approaches , The second method is that the server responds the fastest , But there's a problem , That is, multiple threads will share one fd. Imagine such a situation , There are two tasks in the thread pool , All operations are the same fd, These two tasks are assigned to two threads , If these two threads send data at the same time , There will be dirty data , Or is it , A thread is sending and receiving data , Another thread called close(), Cause crash . The second method applies to fd Scenarios with long operation time , And the third method is applicable to the situation where the business processing time is long .
边栏推荐
- Pair
- Global and Chinese market of glossometer 2022-2028: Research Report on technology, participants, trends, market size and share
- Development and application of fcitx functional plug-ins
- [NLP] text classification still stays at Bert? Duality is too strong than learning framework
- Stock price forecast
- P1656 bombing Railway
- ISBN number
- 【leetcode】374. Guess the size of the number
- [MySQL] classification of multi table queries
- A method to solve Bert long text matching
猜你喜欢
![[C language] break and continue in switch statement](/img/ae/5967fefcf3262c9d3096e5c7d644fd.jpg)
[C language] break and continue in switch statement

Test the influence of influent swacth on the electromagnetic coil of quartz meter

Bodong medical sprint Hong Kong stocks: a 9-month loss of 200million Hillhouse and Philips are shareholders

Iclr2022: how does AI recognize "things I haven't seen"?

Unity elementary case notes of angry birds Siki college 1-6

Ningde times and BYD have refuted rumors one after another. Why does someone always want to harm domestic brands?

Eight year test old bird, some suggestions for 1-3 year programmers

Investment demand and income forecast report of China's building ceramics industry, 2022-2028

Correlation analysis summary

Alibaba cloud container service differentiation SLO hybrid technology practice
随机推荐
Pytorch learning notes 5: model creation
Similarities and differences of text similarity between Jaccard and cosine
Is the securities account opened by Caicai for individuals safe? Is there a routine
Pair
Arc 135 supplementary report
Gossip about redis source code 78
Distributed transaction -- middleware of TCC -- selection / comparison
The upload experience version of uniapp wechat applet enters the blank page for the first time, and the page data can be seen only after it is refreshed again
P1656 bombing Railway
Zipper table in data warehouse (compressed storage)
[PHP basics] cookie basics, application case code and attack and defense
What is the potential of pocket network, which is favored by well-known investors?
What is the difference between NFT, SFT and dnft? How to build NFT platform applications?
1214 print diamond
Collation of the most complete Chinese naturallanguageprocessing data sets, platforms and tools
Alibaba cloud container service differentiation SLO hybrid technology practice
NLP Chinese corpus project: large scale Chinese natural language processing corpus
Selenium library 4.5.0 keyword explanation (II)
Social network analysis -social network analysis
Solution to the impact of Remote Code Execution Vulnerability of log4j2 component on December 9, 2021