当前位置:网站首页>线程池原理

线程池原理

2022-08-04 05:31:00 小羊的预备程序员

目录

一、线程池原理

1、任务队列

2、工作的线程

3、管理者线程

二、代码实现

1、任务队列

2、线程池定义

3、头文件声明

4、源文件定义

6、测试代码


一、线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:

1、任务队列

任务队列,存储需要处理的任务,由工作的线程来处理这些任务

①通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
②已处理的任务会被从任务队列中删除
③线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

2、工作的线程

工作的线程(任务队列任务的消费者) ,N个

①线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
②工作的线程相当于是任务队列的消费者角色
③如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
④如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作

3、管理者线程

①它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
②当任务过多的时候,可以适当的创建一些新的工作线程
③当任务过少的时候,可以适当的销毁一些工作的线程

二、代码实现

1、任务队列

//任务结构体
//符合结构,包括函数指针和参数
typedef struct Task
{
	void (*function)(void* arg);
	void* arg;
}Task;

2、线程池定义

//线程池结构体
//包括任务队列,管理者ID和工作的线程数组指针
struct ThreadPool
{
	//任务队列
	Task* taskQ;
	int queueCapacity;		//容量
	int queueSize;			//当前任务个数
	int queueFront;			//队头->取数据
	int queueRear;			//队尾->放数据

	pthread_t managerID;	//管理者线程ID
	pthread_t* threadIDs;	//工作的线程ID

	int minNum;				//最小线程个数
	int maxNum;				//最大线程个数
	int busyNum;			//忙线程个数
	int liveNum;			//存活线程个数
	int exitNum;			//要销毁的线程个数

	pthread_mutex_t mutexPool;		//锁整个的线程池,防止数据混乱,操作非法内存,对整个任务队列加入同步
	pthread_mutex_t mutexBusy;		//锁busyNum变量
	pthread_cond_t notFull;			//任务队列是不是满了
	pthread_cond_t notEmpty;		//任务队列是不是空了

	int shutdown;			//是不是要销毁线程池,销毁为1,不销毁为0
};

3、头文件声明

#pragma once

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

4、源文件定义

//创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
	//为什么要用一个do while循环,因为如果出现了线程池成功开辟内存但是工作线程数组或者任务队列开辟内存失败的情况,
	//需要释放掉前一个开辟的内存空间,例如我判断工作线程数组是否开辟成功的时候还需要释放掉pool,这样非常麻烦,
	//所以加了一个do while循环,如果开辟失败直接break,在结束的时候判断是否需要释放内存
	do
	{
		if (pool == NULL)
		{
			printf("malloc threadpool failed...\n");
			break;
		}

		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
		if (pool->threadIDs == NULL)
		{
			printf("malloc threadIDs failed...\n");
			break;
		}

		//将threadIDs数组初始化
		memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
		pool->minNum = min;
		pool->maxNum = max;
		pool->busyNum = 0;
		pool->liveNum = min;		//和最小个数相等
		pool->exitNum = 0;

		//初始化锁和环境变量
		if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
			pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
			pthread_cond_init(&pool->notFull, NULL) != 0 ||
			pthread_cond_init(&pool->notEmpty, NULL) != 0)
		{
			printf("mutex or condition init failed...\n");
			break;
		}

		//任务队列
		pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
		pool->queueCapacity = queueSize;
		pool->queueSize = 0;
		pool->queueFront = 0;
		pool->queueRear = 0;

		pool->shutdown = 0;

		//创建线程
		pthread_create(&pool->managerID, NULL, manager, pool);
		for (int i = 0; i < min; i++)
		{
			pthread_create(&pool->threadIDs[i], NULL, worker, pool);
		}

		return pool;
	} while (0);

	//释放资源
	if (pool->threadIDs) free(pool->threadIDs);
	if (pool->taskQ) free(pool->taskQ);
	if (pool) free(pool);

	return NULL;
}

//销毁线程池函数
int threadPoolDestroy(ThreadPool* pool)
{
	if (pool == NULL)
	{
		return -1;
	}

	//关闭线程池
	pool->shutdown = 1;
	//阻塞回收管理者线程
	pthread_join(pool->managerID, NULL);
	//唤醒阻塞的消费者线程(因为唤醒之后会自杀)
	for (int i = 0; i < pool->liveNum; i++)
	{
		pthread_cond_signal(&pool->notEmpty);
	}
	//释放堆内存
	//1.任务队列
	if (pool->taskQ)
	{
		free(pool->taskQ);
	}
	//2.工作线程ID
	if (pool->threadIDs)
	{
		free(pool->threadIDs);
	}
	//3.互斥锁和环境变量
	pthread_mutex_destroy(&pool->mutexPool);
	pthread_mutex_destroy(&pool->mutexBusy);
	pthread_cond_destroy(&pool->notEmpty);
	pthread_cond_destroy(&pool->notFull);
	//4.pool
	free(pool);
	pool = NULL;

	return 0;
}

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
	pthread_mutex_lock(&pool->mutexPool);
	//判断是不是满了
	while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
	{
		//阻塞生产者线程
		pthread_cond_wait(&pool->notFull, &pool->mutexPool);	
	}
	//判断线程池是不是被关闭了
	if (pool->shutdown)
	{
		//先解锁再退出,防止死锁
		pthread_mutex_unlock(&pool->mutexPool);
		return;
	}
	//添加任务
	pool->taskQ[pool->queueRear].function = func;
	pool->taskQ[pool->queueRear].arg = arg;

	//把任务队列数组维护一个环形队列 
	//移动尾结点
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
	pool->queueSize++;

	//唤醒阻塞的消费者线程
	pthread_cond_signal(&pool->notEmpty);
	pthread_mutex_unlock(&pool->mutexPool);
}

//获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->mutexBusy);
	return busyNum;
}

//获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool)
{
	pthread_mutex_lock(&pool->mutexPool);
	int aliveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return aliveNum;
}

void* worker(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;

	while (1)
	{
		//因为是线程池是一个共享资源,使用之前需要加锁
		pthread_mutex_lock(&pool->mutexPool);
		//当前任务队列是否为空
		while (pool->queueSize == 0 && !pool->shutdown)
		{
			//阻塞工作线程
			pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

			//判断是不是要销毁线程
			if (pool->exitNum > 0)
			{
				pool->exitNum--;
				if (pool->liveNum > pool->minNum)
				{
					pool->liveNum--;
					//只要是从上面解除阻塞的线程向下执行,就已经将mutexPool互斥锁给到了这个线程,就已经将这把互斥锁锁上了
					//所以我们退出之前需要解锁,防止死锁
					pthread_mutex_unlock(&pool->mutexPool);
					//自杀
					threadExit(pool);
				}
			}
		}

		//判断线程池是否被关闭了
		if (pool->shutdown)
		{
			//先解锁,再退出,防止死锁
			pthread_mutex_unlock(&pool->mutexPool);
			threadExit(pool);
		}

		//从任务队列中取出一个任务
		Task task;
		task.function = pool->taskQ[pool->queueFront].function;
		task.arg = pool->taskQ[pool->queueFront].arg;

		//把任务队列数组维护一个环形队列 
		//移动头结点
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
		pool->queueSize--;

		//解锁
		//唤醒阻塞的生产者线程
		pthread_cond_signal(&pool->notFull);
		pthread_mutex_unlock(&pool->mutexPool);

		printf("thread %ld start working\n", pthread_self());
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy);
		task.function(task.arg);
		free(task.arg);
		task.arg = NULL;

		printf("thread %ld end working\n", pthread_self());
		pthread_mutex_lock(&pool->mutexBusy);
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy);
	}
	return NULL;
}

//管理线程主要是做两件事
//每隔三秒钟检测一次,判断是不是要添加线程或者销毁线程
void* manager(void* arg)
{
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown)
	{
		//每隔3s检测一次
		sleep(3);

		//取出线程池中任务的数量和当前线程的数量
		pthread_mutex_lock(&pool->mutexPool);
		int queueSize = pool->queueSize;
		int liveNum = pool->liveNum;
		pthread_mutex_unlock(&pool->mutexPool);

		//取出忙线程数量
		pthread_mutex_lock(&pool->mutexBusy);
		int busyNum = pool->busyNum;
		pthread_mutex_unlock(&pool->mutexBusy);

		//添加线程
		//任务的个数 > 存活的线程个数 && 存活的线程数 < 最大的线程数
		if (queueSize > liveNum && liveNum < pool->maxNum)
		{
			pthread_mutex_lock(&pool->mutexPool);
			int counter = 0;
			for (int i = 0; i < pool->maxNum && counter < NUMBER
				&& pool->liveNum < pool->maxNum; ++i)
			{
				if (pool->threadIDs[i] == 0)
				{
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexPool);
		}

		//销毁线程
		//忙的线程 * 2 < 存活的线程 && 存活的线程 > 最小线程数
		if (busyNum * 2 < liveNum && liveNum > pool->minNum)
		{
			pthread_mutex_lock(&pool->mutexPool);
			pool->exitNum = NUMBER;
			pthread_mutex_unlock(&pool->mutexPool);
			//让工作的线程自杀
			//没有事情可做的工作线程会阻塞,我们需要先唤醒这部分线程
			//唤醒之后工作的线程会进入阻塞下的if模块,会让线程自杀
			for (int i = 0; i < NUMBER; i++)
			{
				pthread_cond_signal(&pool->notEmpty);
			}
		}
	}
	return NULL;
}

//当线程退出之后将对于数组位置清0
void threadExit(ThreadPool* pool)
{
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; i++)
	{
		if (pool->threadIDs[i] == tid)
		{
			pool->threadIDs[i] = 0;
			printf("threadExit() called, %ld exiting...\n", tid);
			break;
		}
	}
	pthread_exit(NULL);
}

6、测试代码

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main()
{
    //创建出一个线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; i++)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool,taskFunc, num);
    }

    //让主线程睡眠一段时间,保证工作的线程处理完毕再退出
    sleep(30);

    threadPoolDestroy(pool);
    return 0;
}

原网站

版权声明
本文为[小羊的预备程序员]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_51654808/article/details/125789081