当前位置:网站首页>Pure C handwriting thread pool
Pure C handwriting thread pool
2022-07-27 14:00:00 【cheems~】
pure c Handwriting thread pool
Preface
This article introduces the function of thread pool 、 Thread pool application scenarios 、 How thread pools work 、 The code implements thread pool and connection with nginx Comparative analysis of thread pools .
The knowledge points of this column are through Zero sound education Online learning , Make a summary and write an article , Yes c/c++linux Readers interested in the course , You can click on the link C/C++ Background advanced server course introduction Check the service of the course in detail .
Role of thread pool
Why the cable pool , What problem has it solved
- Reduce the creation and destruction of threads ( Thread perspective )
Asynchronous decouplingThe role of ( Design angle )
Usage scenario of asynchronous processing of thread pool
Take the journal for example , Writing log loginfo(“xxx”), Drop the log , It's different , They should be asynchronous . Then asynchronous decoupling is to treat the log as a task task, Throw this task to the thread pool , The thread pool is responsible for log disk dropping . For applications , It can improve the efficiency of falling plate .
With nginx For example , Tens of thousands of requests per second , fast . If you add a log , that qps Suddenly fell down , Because every request requires a drop , Then the performance of the whole server will decline . We can introduce a thread pool , Throw the task of logging to the thread pool , For the main loop , Just throw the task , This can greatly improve the efficiency of the main thread . This is the role of asynchronous decoupling of thread pools
It's not just log dropping , There are many places where thread pools can be used , More time-consuming operations, such as database operations ,io Processing, etc , Can use thread pool .
It is necessary for the thread pool to associate threads with cpu Be intimate and have sex ? Focus on cpu Processing power , It can be bonded ; If the focus is on asynchronous decoupling , So what we pay more attention to here is the task , There is no need to combine threads with cpu Binding .
How thread pools work
What should the thread pool provide api

When we use thread pools , It is used as a component . So when using components , Our first thought is what the thread pool should provide api.
- Initialization of thread pool ( establish ) init/create
- Throw tasks into the pool push_task
- Destruction of thread pool deinit/destroy
These three api It's the core api, Other extensible api Are dispensable , And these three api There must be .
Three components of thread pool

Imagine going to the bank business hall . teller : Provide services to customers ; Customer : It's for business , For tellers , These people are tasks . Then these two images are constructed pthread and task.
Then this bulletin board (xxx What counter do you want to come to ), Whose attribute is it ? The function of the bulletin board is to manage the orderly work of customers and tellers , It does not belong to the teller , Nor does it belong to customers , It is a management tool .
- teller ---->pthread
- Customer ---->task
- Notice board –> Manage the orderly work of tellers and customers ( There will be no case that a task is processed by multiple threads at the same time )
So this naturally formed 3 A component , So what attributes should they have :
- For tellers : Job number id, Stop working identifier flag
- For customers : If you need to bring a bank card to withdraw money , If you need to bring vouchers to apply for a loan , So we need a task func(), And the parameters of the corresponding task arg
- For signs : If there are no customers , Then the teller needs to wait for the arrival of customers in his work , So the first one needs to wait cond, Since we need to manage orderly work , There must be mutex To ensure critical resources
Next, the teller is called the execution queue , Customers are called task queues , Billboards are called pool management components .
Wrong understanding : To use threads, take a thread from the thread pool and use it , Run out and return to the thread pool . This understanding is the concept of connection pool . And the thread pool is that multiple threads go to the task queue to get tasks , Competition task .
So the core of the thread is the following pseudo code :
while(1){
get_task();
task->func();
}
Code implementation
Task queue of thread pool 、 Perform the queue 、 Pool management component The definition of And Add delete

// Perform the queue
typedef struct NWORKER {
pthread_t id;
int termination;
struct NTHREADPOLL *thread_poll;
struct NWORKER *prev;
struct NWORKER *next;
} worker_t;
// Task queue
typedef struct NTASK {
void (*task_func)(void *arg);
void *user_data;
struct NTASK *prev;
struct NTASK *next;
} task_t;
// Pool management component
typedef struct NTHREADPOLL {
worker_t *workers;
task_t *tasks;
pthread_cond_t cond;
pthread_mutex_t mutex;
} thread_poll_t;
// The first interpolation
#define LL_ADD(item, list)do{
\ item->prev=NULL; \ item->next=list; \ if(list!=NULL){
\ list->prev=item; \ } \ list=item \ }while(0)
#define LL_REMOVE(item, list)do{
\ if(item->prev!=NULL){
\ item->prev->next=item->next; \ } \ if(item->next!=NULL){
\ item->next->prev=item->prev; \ } \ if(list==item){
\ list=item->next; \ } \ item->prev=item->next=NULL; \ }while(0)
Three api
Creating is actually creating thread_poll_t Structure , Then create threads and... According to the given macro worker.
push Just give task Add a task to the queue , And then use signal notice cond.
Destroy all threads termination Set up 1, And then broadcast cond that will do .
//return access create thread num;
int thread_poll_create(thread_poll_t *thread_poll, int thread_num) {
if (thread_num < 1)thread_num = 1;
memset(thread_poll, 0, sizeof(thread_poll_t));
//init cond
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t));
//init mutex
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t));
// one thread one worker
int idx = 0;
for (idx = 0; idx < thread_num; idx++) {
worker_t *worker = malloc(sizeof(worker_t));
if (worker == NULL) {
perror("worker malloc err\n");
return idx;
}
memset(worker, 0, sizeof(worker_t));
worker->thread_poll = thread_poll;
int ret = pthread_create(&worker->id, NULL, thread_callback, worker);
if (ret) {
perror("pthread_create err\n");
free(worker);
return idx;
}
LL_ADD(worker, thread_poll->workers);
}
return idx;
}
int thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) {
pthread_mutex_lock(&thread_poll->mutex);
LL_ADD(task, thread_poll->tasks);
pthread_cond_signal(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
int thread_destroy(thread_poll_t *thread_poll) {
worker_t *worker = NULL;
for (worker = thread_poll->workers; worker != NULL; worker = worker->next) {
worker->termination = 1;
}
pthread_mutex_lock(&thread_poll->mutex);
pthread_cond_broadcast(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
Thread callback function
All a thread has to do is fetch a task , Perform tasks . Get the task from the task queue .
task_t *get_task(worker_t *worker) {
while (1) {
pthread_mutex_lock(&worker->thread_poll->mutex);
while (worker->thread_poll->workers == NULL) {
if (worker->termination)break;
pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex);
}
if (worker->termination) {
pthread_mutex_unlock(&worker->thread_poll->mutex);
return NULL;
}
task_t *task = worker->thread_poll->tasks;
if (task) {
LL_REMOVE(task, worker->thread_poll->tasks);
}
pthread_mutex_unlock(&worker->thread_poll->mutex);
if (task != NULL) {
return task;
}
}
};
void *thread_callback(void *arg) {
worker_t *worker = (worker_t *) arg;
while (1) {
task_t *task = get_task(worker);
if (task == NULL) {
free(worker);
pthread_exit("thread termination\n");
}
task->task_func(task);
}
}
Test code
Here we create 1000 individual task, opened 10 individual thread. remember task as well as task Parameters of , from task Of func To destroy .
//
// Created by 68725 on 2022/7/25.
//
#include <pthread.h>
#include <memory.h>
#include <malloc.h>
#include <stdio.h>
#include <unistd.h>
// The first interpolation
#define LL_ADD(item, list)do{
\ item->prev=NULL; \ item->next=list; \ if(list!=NULL){
\ list->prev=item; \ } \ list=item; \ }while(0)
#define LL_REMOVE(item, list)do{
\ if(item->prev!=NULL){
\ item->prev->next=item->next; \ } \ if(item->next!=NULL){
\ item->next->prev=item->prev; \ } \ if(list==item){
\ list=item->next; \ } \ item->prev=item->next=NULL; \ }while(0)
// Perform the queue
typedef struct NWORKER {
pthread_t id;
int termination;
struct NTHREADPOLL *thread_poll;
struct NWORKER *prev;
struct NWORKER *next;
} worker_t;
// Task queue
typedef struct NTASK {
void (*task_func)(void *arg);
void *user_data;
struct NTASK *prev;
struct NTASK *next;
} task_t;
// Pool management component
typedef struct NTHREADPOLL {
worker_t *workers;
task_t *tasks;
pthread_cond_t cond;
pthread_mutex_t mutex;
} thread_poll_t;
task_t *get_task(worker_t *worker) {
while (1) {
pthread_mutex_lock(&worker->thread_poll->mutex);
while (worker->thread_poll->workers == NULL) {
if (worker->termination)break;
pthread_cond_wait(&worker->thread_poll->cond, &worker->thread_poll->mutex);
}
if (worker->termination) {
pthread_mutex_unlock(&worker->thread_poll->mutex);
return NULL;
}
task_t *task = worker->thread_poll->tasks;
if (task) {
LL_REMOVE(task, worker->thread_poll->tasks);
}
pthread_mutex_unlock(&worker->thread_poll->mutex);
if (task != NULL) {
return task;
}
}
};
void *thread_callback(void *arg) {
worker_t *worker = (worker_t *) arg;
while (1) {
task_t *task = get_task(worker);
if (task == NULL) {
free(worker);
pthread_exit("thread termination\n");
}
task->task_func(task);
}
}
//return access create thread num;
int thread_poll_create(thread_poll_t *thread_poll, int thread_num) {
if (thread_num < 1)thread_num = 1;
memset(thread_poll, 0, sizeof(thread_poll_t));
//init cond
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&thread_poll->cond, &blank_cond, sizeof(pthread_cond_t));
//init mutex
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&thread_poll->mutex, &blank_mutex, sizeof(pthread_mutex_t));
// one thread one worker
int idx = 0;
for (idx = 0; idx < thread_num; idx++) {
worker_t *worker = malloc(sizeof(worker_t));
if (worker == NULL) {
perror("worker malloc err\n");
return idx;
}
memset(worker, 0, sizeof(worker_t));
worker->thread_poll = thread_poll;
int ret = pthread_create(&worker->id, NULL, thread_callback, worker);
if (ret) {
perror("pthread_create err\n");
free(worker);
return idx;
}
LL_ADD(worker, thread_poll->workers);
}
return idx;
}
int thread_poll_push_task(thread_poll_t *thread_poll, task_t *task) {
pthread_mutex_lock(&thread_poll->mutex);
LL_ADD(task, thread_poll->tasks);
pthread_cond_signal(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
int thread_destroy(thread_poll_t *thread_poll) {
worker_t *worker = NULL;
for (worker = thread_poll->workers; worker != NULL; worker = worker->next) {
worker->termination = 1;
}
pthread_mutex_lock(&thread_poll->mutex);
pthread_cond_broadcast(&thread_poll->cond);
pthread_mutex_unlock(&thread_poll->mutex);
}
void counter(task_t *task) {
int idx = *(int *) task->user_data;
printf("idx:%d pthread_id:%llu\n", idx, pthread_self());
free(task->user_data);
free(task);
}
#define THREAD_COUNT 10
#define TASK_COUNT 1000
int main() {
thread_poll_t thread_poll = {
0};
int ret = thread_poll_create(&thread_poll, THREAD_COUNT);
if (ret != THREAD_COUNT) {
thread_destroy(&thread_poll);
}
int i = 0;
for (i = 0; i < TASK_COUNT; i++) {
//create task
task_t *task = (task_t *) malloc(sizeof(task_t));
if (task == NULL) {
perror("task malloc err\n");
exit(1);
}
task->task_func = counter;
task->user_data = malloc(sizeof(int));
*(int *) task->user_data = i;
//push task
thread_poll_push_task(&thread_poll, task);
}
getchar();
thread_destroy(&thread_poll);
}
nginx Thread pool realizes comparison and division
Thread pool initialization comparison
cond initialization ,mutex initialization , Create thread 
Thread callback function comparison
Take task , Perform tasks

push Task comparison
nginx Is to insert the task to the end , What we do is insert it into the head 
The choice of the number of threads
How many threads are initialized ? If it is computationally intensive, you don't need too many threads , If it is task intensive, there can be several more . Here are the empirical values , Not necessarily according to this .
Computationally intensive : Strong computation , The calculation time is long , The number of threads is the same as cpu The number of cores is proportional , Such as 1:1.
Task intensive : Processing tasks ,io operation . You can drive more , Such as cpu Of the core number 2 times .
Dynamic expansion and contraction of thread pool
With more and more tasks , What if there are not enough threads ? We can open a monitoring thread , set up n=running Threads / Total thread . When n> When the water level rises , Monitor threads to create several threads ; When n< At the lower water level , Monitoring thread destroys several threads . You can set 30% and 70%.
边栏推荐
- 现在还来得及参加9月份的PMP考试吗?
- [C Advanced] pointer array vs array pointer
- Wechat campus laundry applet graduation design finished product of applet completion work (3) background function
- The most complete collection of strategies! Super AI painting tool midjourney open beta! Come and build your fantasy metauniverse
- 小程序毕设作品之微信校园洗衣小程序毕业设计成品(7)中期检查报告
- Hcip - OSPF comprehensive experiment
- Wechat campus laundry applet graduation design finished product (4) opening report
- WPF visifire.charts4.6.1 tutorial with source code
- Conditions and procedures of futures account opening
- 【2022-07-25】
猜你喜欢

The universe has no end. Can utonmos shine the meta universe into reality?

不需要标注数据的语义分割!ETH&鲁汶大学提出MaskDistill,用Transformer来进行无监督语义分割,SOTA!...

【笔记】逻辑斯蒂回归

致尚科技IPO过会:年营收6亿 应收账款账面价值2.7亿

NoSQL —— NoSQL 三大理论基石 —— CAP —— BASE—— 最终一致性

There is no need for semantic segmentation of annotation data! Eth & Leuven University proposed maskdistill, using transformer for unsupervised semantic segmentation, SOTA

West test Shenzhen Stock Exchange listing: annual revenue of 240million, fund-raising of 900million, market value of 4.7 billion

Summary of scaling and coding methods in Feature Engineering
![[training day4] card game [greed]](/img/02/88af03ca5e137eba6cdd778f827f2b.png)
[training day4] card game [greed]

纯c手写线程池
随机推荐
Use recyclerview to realize the left sliding menu of the list
Travel notes from July 11 to August 1, 2022
Motion attitude control system of DOF pan tilt based on stm32
基于STM32的自由度云台运动姿态控制系统
PCL common operations
软考 系统架构设计师 简明教程 | 软件测试
SQL tutorial: introduction to SQL aggregate functions
Conditions and procedures of futures account opening
Cesium region clipping, local rendering
Wechat campus laundry applet graduation design finished product (5) assignment
西测测试深交所上市:年营收2.4亿募资9亿 市值47亿
基于C语言的LR1编译器设计
在灯塔工厂点亮5G,宁德时代抢先探路中国智造
Sword finger offer II 041. Average value of sliding window
VSCode -- 创建模板文件
Dat.gui control custom placement and dragging
Jianzhi offer 07 rebuild binary tree -- construct binary tree from middle order and post order traversal sequence
Creation and destruction of "C language" function stack frame -- (internal skill)
[training day3] delete [simulation]
【2022-07-25】