当前位置:网站首页>Web server code parsing - thread pool
Web server code parsing - thread pool
2022-07-03 14:50:00 【III VII】
locker.h
The header file
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
// Thread synchronization mechanism encapsulates classes
#endif
Define mutex classes
// Mutex class
class locker {
public:
locker() {
// Constructors
if(pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();// Throw an exception
}// Initialize mutex
}
~locker() {
// Destructor
pthread_mutex_destroy(&m_mutex);
}// Destroy mutex
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}// locked
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}// Unlock
pthread_mutex_t *get()
{
return &m_mutex;
}// Get the lock ;
private:
pthread_mutex_t m_mutex;// Define mutex ;
};
Encapsulate conditional variable classes ;
class cond {
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}// Constructors , initialization m_cond;
~cond() {
pthread_cond_destroy(&m_cond);
}
// Destructor , Destroy semaphore
bool wait(pthread_mutex_t *m_mutex) {
int ret = 0;
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
// Condition variables, wait, You need conditional variables and mutexes ;
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) {
int ret = 0;
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
// Timeout time
bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
pthread_cond_t m_cond;
};
Encapsulate semaphore classes ;
// Semaphore class
class sem {
public:
sem() {
if( sem_init( &m_sem, 0, 0 ) != 0 ) {
throw std::exception();
}
}// Constructor creation , Initialize semaphores
sem(int num) {
if( sem_init( &m_sem, 0, num ) != 0 ) {
throw std::exception();
}
}// When constructing, the initialized value is passed in , Two constructors ;
~sem() {
sem_destroy( &m_sem );
}
// Wait for the semaphore
bool wait() {
return sem_wait( &m_sem ) == 0;
}
// Increase the semaphore
bool post() {
return sem_post( &m_sem ) == 0;
}
private:
sem_t m_sem;
};
The overall code :
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
// Thread synchronization mechanism encapsulates classes
// Mutex class
class locker {
public:
locker() {
// Constructors
if(pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();// Throw an exception
}// Initialize mutex
}
~locker() {
// Destructor
pthread_mutex_destroy(&m_mutex);
}// Destroy mutex
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}// locked
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}// Unlock
pthread_mutex_t *get()
{
return &m_mutex;
}// Get the lock ;
private:
pthread_mutex_t m_mutex;
};
// Conditional variable class
class cond {
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}// Constructors , initialization m_cond;
~cond() {
pthread_cond_destroy(&m_cond);
}
// Destructor , Destroy semaphore
bool wait(pthread_mutex_t *m_mutex) {
int ret = 0;
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
// Condition variables, wait, You need conditional variables and mutexes ;
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) {
int ret = 0;
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
// Timeout time
bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
pthread_cond_t m_cond;
};
// Semaphore class
class sem {
public:
sem() {
if( sem_init( &m_sem, 0, 0 ) != 0 ) {
throw std::exception();
}
}// Constructor creation , Initialize semaphores
sem(int num) {
if( sem_init( &m_sem, 0, num ) != 0 ) {
throw std::exception();
}
}// When constructing, the initialized value is passed in , Two constructors ;
~sem() {
sem_destroy( &m_sem );
}
// Wait for the semaphore
bool wait() {
return sem_wait( &m_sem ) == 0;
}
// Increase the semaphore
bool post() {
return sem_post( &m_sem ) == 0;
}
private:
sem_t m_sem;
};
#endif
Thread pool class
threadpool.h
Thread pool class , Define it as a template class , It can be applied to code reuse ;T Represents task class ;
template<typename T>
class threadpool
{
public:
private:
}
stay private Define various variables in
// Number of threads
int m_thread_number;
// An array describing the thread pool , The size is m_thread_number
pthread_t * m_threads;
// Maximum allowed in the request queue 、 Number of requests waiting to be processed
int m_max_requests;
// Request queue
std::list< T* > m_workqueue;
// Protect the mutex of the request queue
locker m_queuelocker; //locker.h As defined in
// Is there a task to deal with
sem m_queuestat; //locker.h As defined in
// End thread or not
bool m_stop;
public All the functions in ;
/*thread_number Is the number of threads in the thread pool ,max_requests Is the maximum number of... Allowed in the request queue 、 Number of requests waiting to be processed */
threadpool(int thread_number = 8, int max_requests = 10000);// Constructors
~threadpool();
bool append(T* request);// Add tasks ;
Constructors :
template< typename T >
threadpool< T >::threadpool(int thread_number, int max_requests) :
m_thread_number(thread_number), m_max_requests(max_requests),
m_stop(false), m_threads(NULL) {
if((thread_number <= 0) || (max_requests <= 0) ) {
throw std::exception();
}
m_threads = new pthread_t[m_thread_number];
if(!m_threads) {
throw std::exception();
}
// establish thread_number Threads , And set them to be out of thread .
for ( int i = 0; i < thread_number; ++i ) {
printf( "create the %dth thread\n", i);
if(pthread_create(m_threads + i, NULL, worker, this ) != 0) {
delete [] m_threads;
throw std::exception();
}
// In order to release resources after the thread is used
if( pthread_detach( m_threads[i] ) ) {
delete [] m_threads;
throw std::exception();
}// Thread detach failed , Release thread array
}
}
Destructor
template< typename T >
threadpool< T >::~threadpool() {
delete [] m_threads;
m_stop = true;
}
Add tasks to the queue :
template< typename T >
bool threadpool< T >::append( T* request )
{
// Be sure to lock the work queue , Because it is shared by all threads .
m_queuelocker.lock();
if ( m_workqueue.size() > m_max_requests ) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();// Increase the semaphore , Semaphores are used to determine whether a thread is blocked or can start immediately ;
return true;
}
private:
/* Functions run by worker threads , It constantly takes tasks from the work queue and executes them */
static void* worker(void* arg);
void run();
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}
template< typename T >
void threadpool< T >::run() {
while (!m_stop) {
m_queuestat.wait();
m_queuelocker.lock();
if ( m_workqueue.empty() ) {
// If the work queue is empty ;
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( !request ) {
continue;
}
request->process();
}
}
The overall code
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"
// Thread pool class , It is defined as a template class for code reuse , Template parameter T It's a task class
template<typename T>
class threadpool {
public:
/*thread_number Is the number of threads in the thread pool ,max_requests Is the maximum number of... Allowed in the request queue 、 Number of requests waiting to be processed */
threadpool(int thread_number = 8, int max_requests = 10000);
~threadpool();
bool append(T* request);
private:
/* Functions run by worker threads , It constantly takes tasks from the work queue and executes them */
static void* worker(void* arg);
void run();
private:
// Number of threads
int m_thread_number;
// An array describing the thread pool , The size is m_thread_number
pthread_t * m_threads;
// Maximum allowed in the request queue 、 Number of requests waiting to be processed
int m_max_requests;
// Request queue
std::list< T* > m_workqueue;
// Protect the mutex of the request queue
locker m_queuelocker;
// Is there a task to deal with
sem m_queuestat;
// End thread or not
bool m_stop;
};
template< typename T >
threadpool< T >::threadpool(int thread_number, int max_requests) :
m_thread_number(thread_number), m_max_requests(max_requests),
m_stop(false), m_threads(NULL) {
if((thread_number <= 0) || (max_requests <= 0) ) {
throw std::exception();
}
m_threads = new pthread_t[m_thread_number];
if(!m_threads) {
throw std::exception();
}
// establish thread_number Threads , And set them to be out of thread .
for ( int i = 0; i < thread_number; ++i ) {
printf( "create the %dth thread\n", i);
if(pthread_create(m_threads + i, NULL, worker, this ) != 0) {
delete [] m_threads;
throw std::exception();
}
// Release resources by yourself after use
if( pthread_detach( m_threads[i] ) ) {
delete [] m_threads;
throw std::exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool() {
delete [] m_threads;
m_stop = true;
}
template< typename T >
bool threadpool< T >::append( T* request )
{
// Be sure to lock the work queue , Because it is shared by all threads .
m_queuelocker.lock();
if ( m_workqueue.size() > m_max_requests ) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}
template< typename T >
void threadpool< T >::run() {
while (!m_stop) {
m_queuestat.wait();
m_queuelocker.lock();
if ( m_workqueue.empty() ) {
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( !request ) {
continue;
}
request->process();
}
}
#endif
边栏推荐
- Zzuli:1047 logarithmic table
- Qt development - scrolling digital selector commonly used in embedded system
- Zzuli:1044 failure rate
- Zzuli:1045 numerical statistics
- Amazon, express, lazada, shopee, eBay, wish, Wal Mart, Alibaba international, meikeduo and other cross-border e-commerce platforms evaluate how Ziyang account can seize traffic by using products in th
- mmdetection 学习率与batch_size关系
- [opengl] face pinching system
- .NET六大设计原则个人白话理解,有误请大神指正
- Creation of data table of Doris' learning notes
- Luogu p3065 [usaco12dec]first! G problem solution
猜你喜欢
[ue4] Niagara's indirect draw
Introduction to opengl4.0 tutorial computing shaders
Sub GHz wireless solution Z-Wave 800 Series zg23 SOC and zgm230s modules
Detailed explanation of four modes of distributed transaction (Seata)
[ue4] geometry drawing pipeline
Code writing and playing method of tonybot humanoid robot at fixed distance
Adobe Premiere Pro 15.4 has been released. It natively supports Apple M1 and adds the function of speech to text
My QT learning path -- how qdatetimeedit is empty
[ue4] material and shader permutation
链表有环,快慢指针走3步可以吗
随机推荐
adc128s022 ADC verilog设计实现
Zzuli:1041 sum of sequence 2
Zzuli:1044 failure rate
Zzuli:1059 highest score
Tensor 省略号(三个点)切片
Tonybot humanoid robot checks the port and corresponds to port 0701
NOI OPENJUDGE 1.3(06)
Fundamentals of PHP deserialization
How to color ordinary landscape photos, PS tutorial
Niuke bm83 string deformation (case conversion, string inversion, string replacement)
How can entrepreneurial teams implement agile testing to improve quality and efficiency? Voice network developer entrepreneurship lecture Vol.03
B2020 分糖果
Zzuli:1045 numerical statistics
Solve the problem that PR cannot be installed on win10 system. Pr2021 version -premiere Pro 2021 official Chinese version installation tutorial
Zhonggan micro sprint technology innovation board: annual revenue of 240million, net loss of 17.82 million, proposed to raise 600million
ASTC texture compression (adaptive scalable texture compression)
Detailed explanation of four modes of distributed transaction (Seata)
Adobe Premiere Pro 15.4 has been released. It natively supports Apple M1 and adds the function of speech to text
Vs+qt multithreading implementation -- run and movetothread
C language STR function