当前位置:网站首页>Introduction to thread pool
Introduction to thread pool
2022-07-04 00:26:00 【Hey, poof】
Thread pool is a lower level component , Usually we are in the process of development , It may be more about the interfaces that call these components . For the server , The underlying component code is very general . Thread pool is mainly used to write logs 、 The result of the calculation is 、 Adding, deleting, modifying and checking .
Thread pool is mainly composed of task queue 、 The execution queue and pool management components constitute , Here is the code .
#define LL_ADD(item, list) do \
{
\
item->prev = NULL; \
item->next = list; \
list = item; \
} while(0)
#define LL_REMOVE(item, list) do \
{
\
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
typedef struct NWORKER
{
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue;
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
typedef struct NJOB
{
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
typedef struct NWORKQUEUE
{
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx;
pthread_cond_t jobs_cond;
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr)
{
nWorker *worker = (nWorker*)ptr;
while (1)
{
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL)
{
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate)
{
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL)
{
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers)
{
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++)
{
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL)
{
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret)
{
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue)
{
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job)
{
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
There are several points in the code that need to be explained , First, macro functions are used for the operation of linked lists , Because this can avoid the problem of defining multiple functions due to different data types , The second is the initialization method of mutexes and conditional variables , The static initialization method used here , It avoids the problem that variables cannot be statically initialized and assigned after being defined .
A little more verbose here , The processing flow of the server , One is testing IO Is the event ready , The second is right IO Read and write , Third, analyze and operate the data . These three steps correspond to epoll、recv()/send()、parse. therefore , There are three ways for servers , One is the single thread approach , One thread handles the above three steps , Second, put IO Both read / write and parsing are processed in the process pool , Third, only put the parsing process in the process pool , Reading and writing IO Or is it handled by a single thread . Of the above three approaches , The second method is that the server responds the fastest , But there's a problem , That is, multiple threads will share one fd. Imagine such a situation , There are two tasks in the thread pool , All operations are the same fd, These two tasks are assigned to two threads , If these two threads send data at the same time , There will be dirty data , Or is it , A thread is sending and receiving data , Another thread called close(), Cause crash . The second method applies to fd Scenarios with long operation time , And the third method is applicable to the situation where the business processing time is long .
边栏推荐
- STM32 GPIO CSDN creative punch in
- Solution to the impact of Remote Code Execution Vulnerability of log4j2 component on December 9, 2021
- Ningde times and BYD have refuted rumors one after another. Why does someone always want to harm domestic brands?
- 网上的低佣金链接安全吗?招商证券怎么开户?
- For loop
- What are the application fields of digital twins in industry?
- No qualifying bean of type ‘com. netflix. discovery. AbstractDiscoveryClientOptionalArgs<?>‘ available
- Deep learning ----- using NN, CNN, RNN neural network to realize MNIST data set processing
- Global and Chinese market of melting furnaces 2022-2028: Research Report on technology, participants, trends, market size and share
- P3371 [template] single source shortest path (weakened version)
猜你喜欢

Alibaba cloud container service differentiation SLO hybrid technology practice

Pytorch learning notes 5: model creation

Joint examination of six provinces 2017

Zipper table in data warehouse (compressed storage)

Reading notes on how programs run

The interviewer's biggest lie to deceive you, bypassing three years of less struggle

I wrote a chat software with timeout connect function

MySQL is installed as a Windows Service
![[source code] VB6 chat robot](/img/89/46b67f627c8257eaddc70a247c9ba5.jpg)
[source code] VB6 chat robot

Qtcharts notes (V) scatter diagram qscatterseries
随机推荐
How to be a professional software testing engineer? Listen to the byte five year old test
ITK learning notes (VII) the position of ITK rotation direction remains unchanged
What is the Valentine's Day gift given by the operator to the product?
SQL data update
Solution to the impact of Remote Code Execution Vulnerability of log4j2 component on December 9, 2021
Development and application of fcitx functional plug-ins
Research Report on the scale prediction of China's municipal engineering industry and the prospect of the 14th five year plan 2022-2028
A Kuan food rushed to the Shenzhen Stock Exchange: with annual sales of 1.1 billion, Hillhouse and Maotai CCB are shareholders
Yyds dry goods inventory three JS source code interpretation - getobjectbyproperty method
Gossip about redis source code 78
P1629 postman delivering letter
No qualifying bean of type ‘com. netflix. discovery. AbstractDiscoveryClientOptionalArgs<?>‘ available
SPI based on firmware library
Gossip about redis source code 82
Joint examination of six provinces 2017
CSP window
Sorry, Tencent I also refused
Optimization of for loop
[2021]NeRF in the Wild: Neural Radiance Fields for Unconstrained Photo Collections
ESP Arduino playing with peripherals (V) basic concept of interrupt and timer interrupt