当前位置:网站首页>utlis thread pool
utlis thread pool
2022-08-03 22:26:00 【Zip-List】
utlis 线程池
The thread pool structure must be mastered
万变不离其宗,Still the familiar producer-consumer pattern,Still the familiar mutex+Condition variables implement semaphore semantics,Notifies the worker thread to fetch data from the task queue,still familiarflagThe flag bit controls the worker thread to exit.非常熟悉!
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
The interaction between the worker thread and the main thread is the task queue,
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
//Why use macros,宏==模板了,Different types can be manipulated
//头插
#define LL_ADD(item, list) do {
\ item->prev = NULL; \ item->next = list; \ list = item; \ } while(0)
//删除某个节点
#define LL_REMOVE(item, list) do {
\ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0)
//执行
typedef struct NWORKER {
pthread_t thread;
int terminate;
struct NWORKQUEUE *workqueue; //回指指针,指向所有workerShared task queue
struct NWORKER *prev;
struct NWORKER *next;
} nWorker;
//任务
typedef struct NJOB {
void (*job_function)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
} nJob;
//Pool component management
typedef struct NWORKQUEUE {
struct NWORKER *workers; //执行队列
struct NJOB *waiting_jobs;//任务队列
pthread_mutex_t jobs_mtx; //互斥锁
pthread_cond_t jobs_cond; //条件变量
} nWorkQueue;
typedef nWorkQueue nThreadPool;
static void *ntyWorkerThread(void *ptr) {
nWorker *worker = (nWorker*)ptr;
while (1) {
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {
if (numWorkers < 1) numWorkers = 1;
memset(workqueue, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
int i = 0;
for (i = 0;i < numWorkers;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return 1;
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);
return 1;
}
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1;
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond);
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
//sdk --> software develop kit
// 提供SDK给其他开发者使用
#if 1
#define KING_MAX_THREAD 80
#define KING_COUNTER_SIZE 1000
void king_counter(nJob *job) {
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu\n", index, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, KING_MAX_THREAD);
int i = 0;
for (i = 0;i < KING_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = king_counter;
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
ntyThreadPoolQueue(&pool, job);
}
getchar();
printf("\n");
}
#endif
边栏推荐
猜你喜欢

PowerMockup 4.3.4::::Crack

CAS: 773888-45-2_BIOTIN ALKYNE_Biotin-alkynyl

CAS:1620523-64-9_Azide-SS-biotin_biotin-disulfide-azide

PowerMockup 4.3.4::::Crack
![[N1CTF 2018]eating_cms](/img/09/3599d889d9007eb45c6eab3043f0c4.png)
[N1CTF 2018]eating_cms

数据一致性:双删为什么要延时?

七夕快乐!
![navicat 连接 mongodb 报错[13][Unauthorized] command listDatabases requires authentication](/img/09/a579c60e07cdc145175e72673409f7.png)
navicat 连接 mongodb 报错[13][Unauthorized] command listDatabases requires authentication

如何设计 DAO 的 PoW 评判标准 并平衡不可能三角

Canvas App中点击图标生成PDF并保存到Dataverse中
随机推荐
趣链的产品构架
.NET6之MiniAPI(十四):跨域CORS(上)
Golang第二章:程序结构
start with connect by implements recursive query
【刷题篇】二叉树的右视图
Teach a Man How to Fish - How to Query the Properties of Any SAP UI5 Control by Yourself Documentation and Technical Implementation Details Demo
override学习(父类和子类)
466. Count The Repetitions
21天打卡挑战学习MySQL—Day第一周 第一篇
utils 定时器
中国企业构建边缘计算解决方案的最佳实践
Bytebase database schema change management tool
VLAN实验
UVa 10003 - Cutting Sticks (White Book, Interval DP)
Golang Chapter 1: Getting Started
navicat 连接 mongodb 报错[13][Unauthorized] command listDatabases requires authentication
Golang Chapter 2: Program Structure
With 4 years of work experience, the 5 communication methods between multi-threads can't be said, can you believe it?
Lift, Splat, Shoot: Encoding Images from Arbitrary Camera Rigs by Implicitly Unprojecting to 3D 论文笔记
Testng监听器