当前位置:网站首页>Web server code parsing - thread pool
Web server code parsing - thread pool
2022-07-03 14:50:00 【III VII】
locker.h
The header file
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
// Thread synchronization mechanism encapsulates classes
#endif
Define mutex classes
// Mutex class
class locker {
public:
locker() {
// Constructors
if(pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();// Throw an exception
}// Initialize mutex
}
~locker() {
// Destructor
pthread_mutex_destroy(&m_mutex);
}// Destroy mutex
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}// locked
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}// Unlock
pthread_mutex_t *get()
{
return &m_mutex;
}// Get the lock ;
private:
pthread_mutex_t m_mutex;// Define mutex ;
};
Encapsulate conditional variable classes ;
class cond {
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}// Constructors , initialization m_cond;
~cond() {
pthread_cond_destroy(&m_cond);
}
// Destructor , Destroy semaphore
bool wait(pthread_mutex_t *m_mutex) {
int ret = 0;
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
// Condition variables, wait, You need conditional variables and mutexes ;
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) {
int ret = 0;
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
// Timeout time
bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
pthread_cond_t m_cond;
};
Encapsulate semaphore classes ;
// Semaphore class
class sem {
public:
sem() {
if( sem_init( &m_sem, 0, 0 ) != 0 ) {
throw std::exception();
}
}// Constructor creation , Initialize semaphores
sem(int num) {
if( sem_init( &m_sem, 0, num ) != 0 ) {
throw std::exception();
}
}// When constructing, the initialized value is passed in , Two constructors ;
~sem() {
sem_destroy( &m_sem );
}
// Wait for the semaphore
bool wait() {
return sem_wait( &m_sem ) == 0;
}
// Increase the semaphore
bool post() {
return sem_post( &m_sem ) == 0;
}
private:
sem_t m_sem;
};
The overall code :
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
// Thread synchronization mechanism encapsulates classes
// Mutex class
class locker {
public:
locker() {
// Constructors
if(pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();// Throw an exception
}// Initialize mutex
}
~locker() {
// Destructor
pthread_mutex_destroy(&m_mutex);
}// Destroy mutex
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}// locked
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}// Unlock
pthread_mutex_t *get()
{
return &m_mutex;
}// Get the lock ;
private:
pthread_mutex_t m_mutex;
};
// Conditional variable class
class cond {
public:
cond(){
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}// Constructors , initialization m_cond;
~cond() {
pthread_cond_destroy(&m_cond);
}
// Destructor , Destroy semaphore
bool wait(pthread_mutex_t *m_mutex) {
int ret = 0;
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
// Condition variables, wait, You need conditional variables and mutexes ;
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) {
int ret = 0;
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
// Timeout time
bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
pthread_cond_t m_cond;
};
// Semaphore class
class sem {
public:
sem() {
if( sem_init( &m_sem, 0, 0 ) != 0 ) {
throw std::exception();
}
}// Constructor creation , Initialize semaphores
sem(int num) {
if( sem_init( &m_sem, 0, num ) != 0 ) {
throw std::exception();
}
}// When constructing, the initialized value is passed in , Two constructors ;
~sem() {
sem_destroy( &m_sem );
}
// Wait for the semaphore
bool wait() {
return sem_wait( &m_sem ) == 0;
}
// Increase the semaphore
bool post() {
return sem_post( &m_sem ) == 0;
}
private:
sem_t m_sem;
};
#endif
Thread pool class
threadpool.h
Thread pool class , Define it as a template class , It can be applied to code reuse ;T Represents task class ;
template<typename T>
class threadpool
{
public:
private:
}
stay private Define various variables in
// Number of threads
int m_thread_number;
// An array describing the thread pool , The size is m_thread_number
pthread_t * m_threads;
// Maximum allowed in the request queue 、 Number of requests waiting to be processed
int m_max_requests;
// Request queue
std::list< T* > m_workqueue;
// Protect the mutex of the request queue
locker m_queuelocker; //locker.h As defined in
// Is there a task to deal with
sem m_queuestat; //locker.h As defined in
// End thread or not
bool m_stop;
public All the functions in ;
/*thread_number Is the number of threads in the thread pool ,max_requests Is the maximum number of... Allowed in the request queue 、 Number of requests waiting to be processed */
threadpool(int thread_number = 8, int max_requests = 10000);// Constructors
~threadpool();
bool append(T* request);// Add tasks ;
Constructors :
template< typename T >
threadpool< T >::threadpool(int thread_number, int max_requests) :
m_thread_number(thread_number), m_max_requests(max_requests),
m_stop(false), m_threads(NULL) {
if((thread_number <= 0) || (max_requests <= 0) ) {
throw std::exception();
}
m_threads = new pthread_t[m_thread_number];
if(!m_threads) {
throw std::exception();
}
// establish thread_number Threads , And set them to be out of thread .
for ( int i = 0; i < thread_number; ++i ) {
printf( "create the %dth thread\n", i);
if(pthread_create(m_threads + i, NULL, worker, this ) != 0) {
delete [] m_threads;
throw std::exception();
}
// In order to release resources after the thread is used
if( pthread_detach( m_threads[i] ) ) {
delete [] m_threads;
throw std::exception();
}// Thread detach failed , Release thread array
}
}
Destructor
template< typename T >
threadpool< T >::~threadpool() {
delete [] m_threads;
m_stop = true;
}
Add tasks to the queue :
template< typename T >
bool threadpool< T >::append( T* request )
{
// Be sure to lock the work queue , Because it is shared by all threads .
m_queuelocker.lock();
if ( m_workqueue.size() > m_max_requests ) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();// Increase the semaphore , Semaphores are used to determine whether a thread is blocked or can start immediately ;
return true;
}
private:
/* Functions run by worker threads , It constantly takes tasks from the work queue and executes them */
static void* worker(void* arg);
void run();
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}
template< typename T >
void threadpool< T >::run() {
while (!m_stop) {
m_queuestat.wait();
m_queuelocker.lock();
if ( m_workqueue.empty() ) {
// If the work queue is empty ;
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( !request ) {
continue;
}
request->process();
}
}
The overall code
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"
// Thread pool class , It is defined as a template class for code reuse , Template parameter T It's a task class
template<typename T>
class threadpool {
public:
/*thread_number Is the number of threads in the thread pool ,max_requests Is the maximum number of... Allowed in the request queue 、 Number of requests waiting to be processed */
threadpool(int thread_number = 8, int max_requests = 10000);
~threadpool();
bool append(T* request);
private:
/* Functions run by worker threads , It constantly takes tasks from the work queue and executes them */
static void* worker(void* arg);
void run();
private:
// Number of threads
int m_thread_number;
// An array describing the thread pool , The size is m_thread_number
pthread_t * m_threads;
// Maximum allowed in the request queue 、 Number of requests waiting to be processed
int m_max_requests;
// Request queue
std::list< T* > m_workqueue;
// Protect the mutex of the request queue
locker m_queuelocker;
// Is there a task to deal with
sem m_queuestat;
// End thread or not
bool m_stop;
};
template< typename T >
threadpool< T >::threadpool(int thread_number, int max_requests) :
m_thread_number(thread_number), m_max_requests(max_requests),
m_stop(false), m_threads(NULL) {
if((thread_number <= 0) || (max_requests <= 0) ) {
throw std::exception();
}
m_threads = new pthread_t[m_thread_number];
if(!m_threads) {
throw std::exception();
}
// establish thread_number Threads , And set them to be out of thread .
for ( int i = 0; i < thread_number; ++i ) {
printf( "create the %dth thread\n", i);
if(pthread_create(m_threads + i, NULL, worker, this ) != 0) {
delete [] m_threads;
throw std::exception();
}
// Release resources by yourself after use
if( pthread_detach( m_threads[i] ) ) {
delete [] m_threads;
throw std::exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool() {
delete [] m_threads;
m_stop = true;
}
template< typename T >
bool threadpool< T >::append( T* request )
{
// Be sure to lock the work queue , Because it is shared by all threads .
m_queuelocker.lock();
if ( m_workqueue.size() > m_max_requests ) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}
template< typename T >
void threadpool< T >::run() {
while (!m_stop) {
m_queuestat.wait();
m_queuelocker.lock();
if ( m_workqueue.empty() ) {
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( !request ) {
continue;
}
request->process();
}
}
#endif
边栏推荐
- Zzuli:1040 sum of sequence 1
- Write a 2-minute countdown.
- Zzuli:1053 sine function
- Zzuli:1058 solving inequalities
- Pyqt interface production (login + jump page)
- C # realizes the login interface, and the password asterisk is displayed (hide the input password)
- Get permissions dynamically
- C language DUP function
- Devaxpress: range selection control rangecontrol uses
- Optical cat super account password and broadband account password acquisition
猜你喜欢

Talking about part of data storage in C language
![[graphics] real shading in Unreal Engine 4](/img/8d/53775c7570c5578f4fe985592bb305.jpg)
[graphics] real shading in Unreal Engine 4

Four data flows and cases of grpc

【微信小程序】WXSS 模板样式

C language DUP function

Qt development - scrolling digital selector commonly used in embedded system

Rasterization: a practical implementation (2)

Protobuf and grpc

Zzuli:1053 sine function

Adobe Premiere Pro 15.4 has been released. It natively supports Apple M1 and adds the function of speech to text
随机推荐
The latest M1 dedicated Au update Adobe audit CC 2021 Chinese direct installation version has solved the problems of M1 installation without flash back!
mmdetection 学习率与batch_size关系
Zzuli:1056 lucky numbers
NOI OPENJUDGE 1.5(23)
Zzuli:1041 sum of sequence 2
[qingniaochangping campus of Peking University] in the Internet industry, which positions are more popular as they get older?
Vs+qt multithreading implementation -- run and movetothread
Zhejiang University Edition "C language programming (4th Edition)" topic set reference ideas set
[engine development] in depth GPU and rendering optimization (basic)
[graphics] hair simulation in tressfx
Introduction to opengl4.0 tutorial computing shaders
To improve efficiency or increase costs, how should developers understand pair programming?
Special research report on the market of lithium battery electrolyte industry in China (2022 Edition)
亚马逊、速卖通、Lazada、Shopee、eBay、wish、沃尔玛、阿里国际、美客多等跨境电商平台,测评自养号该如何利用产品上新期抓住流量?
Four data flows and cases of grpc
C language to implement a password manager (under update)
NOI OPENJUDGE 1.4(15)
dllexport和dllimport
1017 a divided by B (20 points)
[opengl] advanced chapter of texture - principle of flowmap