当前位置:网站首页>utlis 线程池
utlis 线程池
2022-08-03 22:23:00 【Zip-List】
utlis 线程池
必须熟练掌握的线程池结构
万变不离其宗,仍然是熟悉的生产者消费者模式,仍然是熟悉的互斥锁+条件变量实现信号量语义,通知工作线程从任务队列中取数据,仍然是熟悉的flag标志位控制工作线程退出。非常熟悉!
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
工作线程和主线程之间的交互方式即任务队列,
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//为什么用宏,宏==模板了,可以操作不同的类型
//头插
#define LL_ADD(item, list) do {
\ item->prev = NULL; \ item->next = list; \ list = item; \ } while(0)
//删除某个节点
#define LL_REMOVE(item, list) do {
\ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0)
//执行
typedef struct NWORKER {
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue; //回指指针,指向所有worker共享的任务队列
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
//任务
typedef struct NJOB {
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
//池组件管理
typedef struct NWORKQUEUE {
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr) {
nWorker *worker = (nWorker*)ptr;
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
//sdk --> software develop kit
// 提供SDK给其他开发者使用
#if 1
#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000
void king_counter(nJob *job) {
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu\n", index, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREAD);
int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
ntyThreadPoolQueue(&pool, job);
}
getchar();
printf("\n");
}
#endif
边栏推荐
- 一文带你了解软件测试是干什么的?薪资高不高?0基础怎么学?
- 从0到1看支付
- 【day6】类与对象、封装、构造方法
- With 4 years of work experience, the 5 communication methods between multi-threads can't be said, can you believe it?
- 21天打卡挑战学习MySQL—Day第一周 第一篇
- Flink--Join以及Flink函数
- CAS:1260586-88-6_Biotin-C5-Azide_Biotin-C5-Azide
- HCIP第十五天
- 2022-08-02 mysql/stonedb慢SQL-Q18-内存使用暴涨分析
- 投资性大于游戏性 NFT游戏到底是不是门好生意
猜你喜欢

2022-08-02 mysql/stonedb slow SQL-Q18 - memory usage surge analysis

113. Teach a Man how to fish - How to query the documentation and technical implementation details of any SAP UI5 control property by yourself

【历史上的今天】8 月 3 日:微软研究院的创始人诞生;陌陌正式上线;苹果发布 Newton OS

2019年10月SQL注入的两倍

Go开发工具GoLand V2022.2 来了——Go 工作区重大升级

VLAN实验

Summary bug 】 【 Elipse garbled solution project code in Chinese!

Diazo Biotin-PEG3-DBCO|重氮化合物修饰生物素-三聚乙二醇-二苯并环辛炔

On the Qixi Festival of 2022, I will offer 7 exquisite confession codes, and at the same time teach you to quickly change the source code for your own use

『百日百题 · 基础篇』备战面试,坚持刷题 第四话——循环语句!
随机推荐
21天打卡挑战学习MySQL——《Window下安装MySql》第一周 第三篇
授人以渔 - 如何自行查询任意 SAP UI5 控件属性的文档和技术实现细节试读版
Golang第一章:入门
CAS:908007-17-0_Biotin-azide_Biotin azide
Causes of Mysql Disk Holes and Several Ways to Rebuild Tables
On the Qixi Festival of 2022, I will offer 7 exquisite confession codes, and at the same time teach you to quickly change the source code for your own use
嵌入式开发:嵌入式基础——代码和数据空间揭秘
L2-029 特立独行的幸福
E-commerce data warehouse ODS layer-----log data loading
老板:公司系统太多,能不能实现账号互通?
关于IDO预售系统开发技术讲解丨浅谈IDO预售合约系统开发原理分析
Golang Chapter 1: Getting Started
Adobe是什么?
21天打卡挑战学习MySQL—Day第一周 第一篇
[b01lers2020]Life on Mars
东西向和南北向通信的统一
.NET6之MiniAPI(十四):跨域CORS(上)
CAS:1797415-74-7_TAMRA-Azide-PEG-Biotin
决策树、GBDT、XGBOOST树的可视化
2019年10月SQL注入的两倍