当前位置:网站首页>Implementing parallel computing framework with QT
Implementing parallel computing framework with QT
2022-06-09 03:28:00 【Manon Feifei】
Catalog
Communication protocol between node and main application
Calculation result data structure
Communication message data structure
Message transformation tool class
TCP Communication server and client implementation
Task distribution, recovery and progress monitoring
With thread pool, we can make full use of local computing performance , But sometimes the local computing performance can no longer meet the business requirements , At this time, we need to decompose the task and distribute it to multiple machines for calculation and processing . We split the computing task and distribute it , The calculation result is recycled , The business process of computing result integration is called parallel computing . This will use QT Build a parallel computing framework , Meet the large-scale computing tasks in daily business .
A main application in the parallel computing framework is responsible for splitting and dispatching the tasks requested by users , At the same time, the application is also responsible for the management of computing child nodes . Multiple computing nodes can exist simultaneously in parallel computing framework , These nodes are managed by the host application , Responsible for actual computing tasks . The node relationship diagram of the framework is shown in the following figure :

The difficulties of parallel computing framework are mainly divided into the following three aspects :
1. Communication protocol between node and main application
2.TCP Communication server and client implementation
3. Task distribution, recovery and progress monitoring
These three aspects are introduced below
Communication protocol between node and main application
In order to realize the effective communication between the main application and the child nodes , We must first determine the communication protocol between the two sides . The communication protocol is shared between the master application and the child nodes , We put it in a SDK Inside . The communication protocol defines the data structure of the computing task 、 The data structure of the calculation results 、 And the methods of serialization and deserialization of data structures .SDK The structure of is shown in the figure below .

Here, a test task is taken as an example .:
Computing task data structure
By rewriting QDataStream The operator of , We can realize the conversion between binary stream data and task structure ( Note that the corresponding sequence structure should be consistent when serializing and deserializing ).
//JobRequest.h
#ifndef JOBREQUEST_H
#define JOBREQUEST_H
#include <QSize>
#include <QPointF>
#include <QDataStream>
struct JobRequest
{
int int_param;
QPointF point_param;
double double_param;
QSize size_param;
int max_value;
};
// Serialize the structure of the task into a binary stream
inline QDataStream& operator<<(QDataStream& out, const JobRequest& jobRequest)
{
out << jobRequest.int_param
<< jobRequest.point_param
<< jobRequest.double_param
<< jobRequest.size_param
<< jobRequest.max_value;
return out;
}
// The binary stream is inversely sequenced into a structure
inline QDataStream& operator>>(QDataStream& in, JobRequest& jobRequest)
{
in >> jobRequest.int_param;
in >> jobRequest.point_param;
in >> jobRequest.double_param;
in >> jobRequest.size_param;
in >> jobRequest.max_value;
return in;
}
// Register the structure to QT In the system, it can be used when the signal communicates with the slot
Q_DECLARE_METATYPE(JobRequest)
#endif // JOBREQUEST_HCalculation result data structure
By rewriting QDataStream The operator of , We can realize the conversion between binary stream data and calculation result structure ( Note that the corresponding sequence structure should be consistent when serializing and deserializing ).
//JobResult.h
#ifndef JOBRESULT_H
#define JOBRESULT_H
#include <QSize>
#include <QVector>
#include <QPointF>
#include <QDataStream>
struct JobResult
{
JobResult(int valueCount = 1) :
size_result(0, 0),
int_result(0),
point_result(0, 0),
double_result(0.0),
vector_values(valueCount)
{
}
QSize size_result;
int int_result;
QPointF point_result;
double double_result;
QVector<int> vector_values;
};
// Convert the structure of the calculation result into a binary stream
inline QDataStream &operator<<(QDataStream& out, const JobResult& jobResult)
{
out << jobResult.size_result
<< jobResult.int_result
<< jobResult.point_result
<< jobResult.double_result
<< jobResult.vector_values;
return out;
}
// A structure that converts a binary stream into a calculation result
inline QDataStream &operator>>(QDataStream& in, JobResult& jobResult)
{
in >> jobResult.size_result;
in >> jobResult.int_result;
in >> jobResult.point_result;
in >> jobResult.double_result;
in >> jobResult.vector_values;
return in;
}
// towards QT Register the data structure of calculation results in the framework
Q_DECLARE_METATYPE(JobResult)
#endif // JOBRESULT_HCommunication message data structure
After realizing the conversion between binary stream and structure , We can go through TCP/IP The communication mechanism realizes the dispatch of tasks and the recovery of calculation results . The structure of network communication , There are two main components :
1. Type of request
2. Requested data body ( Binary data after task request and task result serialization )
The message structure of network communication is as follows
Match the serialized data with the corresponding task type , Let's serialize again , And then you can go through TCP Send data .
//Message.h
#ifndef MESSAGE_H
#define MESSAGE_H
#include <QByteArray>
#include <QDataStream>
struct Message {
enum class Type {
WORKER_REGISTER, // Register the compute node
WORKER_UNREGISTER, // De registration node
ALL_JOBS_ABORT, // Cancel calculation task
JOB_REQUEST, // Calculation request
JOB_RESULT, // The result of the calculation is
};
Message(const Type type = Type::WORKER_REGISTER,
const QByteArray& data = QByteArray()) :
type(type),
data(data)
{
}
~Message() {}
Type type;
QByteArray data;
};
// Serialize the structure of the message into a binary stream
inline QDataStream &operator<<(QDataStream &out, const Message &message)
{
out << static_cast<qint8>(message.type)
<< message.data;
return out;
}
// Deserialize the binary stream into a message structure
inline QDataStream &operator>>(QDataStream &in, Message &message)
{
qint8 type;
in >> type;
in >> message.data;
message.type = static_cast<Message::Type>(type);
return in;
}
#endif // MESSAGE_HBy sending a message of the corresponding type , We can implement the registration and logout of computing nodes 、 Calculate the dispatch and cancellation of tasks 、 The recovery of calculation results and other tasks .
Message transformation tool class
In order to improve the reusability of code , It is convenient to call in the process of network communication , We are right. TCP The process of sending and receiving messages further encapsulates , The package is as follows .
//MessageUtils.h
#ifndef MESSAGEUTILS_H
#define MESSAGEUTILS_H
#include <memory>
#include <vector>
#include <QByteArray>
#include <QTcpSocket>
#include <QDataStream>
#include "Message.h"
namespace MessageUtils {
// Read messages from the data stream
// There may be multiple messages in the data flow at the same time , Therefore, it is necessary to read circularly to prevent message loss
inline std::unique_ptr<std::vector<std::unique_ptr<Message>>> readMessages(QDataStream& stream)
{
auto messages = std::make_unique<std::vector<std::unique_ptr<Message>>>();
bool commitTransaction = true;
while (commitTransaction
&& stream.device()->bytesAvailable() > 0) {
stream.startTransaction();
auto message = std::make_unique<Message>();
stream >> *message;
commitTransaction = stream.commitTransaction();
if (commitTransaction) {
messages->push_back(std::move(message));
}
}
return messages;
}
// Send some messages containing data body through socket , Such as task dispatch and result recovery
//forceFlush Set to true Send directly without caching
inline void sendMessage(QTcpSocket& socket,
Message::Type messageType,
QByteArray& data,
bool forceFlush = false)
{
Message message(messageType, data);
QByteArray byteArray;
QDataStream stream(&byteArray, QIODevice::WriteOnly);
stream << message;
socket.write(byteArray);
if (forceFlush) {
socket.flush();
}
}
// Send some messages without data body through socket , Such as node deregistration , Cancel tasks and so on
//forceFlush Set to true Send directly without caching
inline void sendMessage(QTcpSocket& socket,
Message::Type messageType,
bool forceFlush = false) {
QByteArray data;
sendMessage(socket, messageType, data, forceFlush);
}
}
#endif // MESSAGEUTILS_HTCP Communication server and client implementation
Master Application is the main application of parallel computing framework , Be responsible for controlling the overall computing task .Master The structure of the application is shown in the following figure :

TCP Communication server
TCP Communication server and Master Application nodes are deployed together , Be responsible for receiving and managing the connections of computing nodes , It is also responsible for sending computing tasks 、 Recovery of calculation results .QT Provide ready-made TCP The server of communication QTcpServer, By rewriting QTcpServer Realize the server of task communication management :
//masterserver.h
#ifndef MASTER_SERVER_H
#define MASTER_SERVER_H
#include <memory>
#include <vector>
#include <QTcpServer>
#include <QList>
#include <QThread>
#include <QMap>
#include <QElapsedTimer>
#include "WorkerClient.h"
#include "JobResult.h"
#include "JobRequest.h"
class MasterServer : public QTcpServer
{
Q_OBJECT
public:
MasterServer(QObject* parent = 0);
~MasterServer();
void generateNewTask(int taskStartValue, int taskEndValue);
signals:
void resultGenerated(QList<JobResult> jobResults);
// A signal to terminate all computing tasks
void abortAllJobs();
private slots:
void process(WorkerClient* workerClient, JobResult jobResult);
// Remove a compute node
void removeWorkerClient(WorkerClient* workerClient);
protected:
// Function triggered after receiving a new connection
void incomingConnection(qintptr socketDescriptor) override;
private:
std::unique_ptr<JobRequest> createJobRequest(int jobValue);
// Send a computing task to a computing node
void sendJobRequests(WorkerClient& client, int jobRequestCount = 1);
// Clear all tasks
void clearJobs();
private:
int mJobCount = 0;
int mReceivedJobResults;
QList<JobResult> mJobResults; // The container of calculation results
QMap<WorkerClient*, QThread*> mWorkerClients; // Computing node - Mapping table of corresponding thread
std::vector<std::unique_ptr<JobRequest>> mJobRequests;// Container for requests to be sent
QElapsedTimer mTimer;
};
#endif //masterserver.cpp
#include "MasterServer.h"
#include <QDebug>
#include <QThread>
using namespace std;
MasterServer::MasterServer(QObject* parent) :
QTcpServer(parent),
mReceivedJobResults(0),
mWorkerClients(),
mJobRequests(),
mTimer()
{
// Listening port 5000
listen(QHostAddress::Any, 5000);
}
MasterServer::~MasterServer()
{
// Clear the connection of the computing node
while (!mWorkerClients.empty()) {
removeWorkerClient(mWorkerClients.firstKey());
}
}
void MasterServer::generateNewTask(int taskStartValue, int taskEndValue)
{
if (taskStartValue == taskEndValue) {
return;
}
mTimer.start();
// When you receive a new task , Clear previous tasks
clearJobs();
// Split the task according to the step size , Here, different task splitting rules are adopted according to different tasks
for(int indexValue = taskStartValue; indexValue <= taskEndValue; ++indexValue) {
mJobRequests.push_back(move(createJobRequest(indexValue)));
}
// Determine the number of tasks to be dispatched to the computing node according to the number of cores of the node
for(WorkerClient* client : mWorkerClients.keys()) {
sendJobRequests(*client, client->cpuCoreCount() * 2);
}
}
void MasterServer::process(WorkerClient* workerClient, JobResult jobResult)
{
mReceivedJobResults++;
mJobResults.append(jobResult);
// If all the tasks have been calculated, send the task calculation results , At the same time, empty the task container
if (mReceivedJobResults == mJobCount)
{
emit resultGenerated(mJobResults);
mJobResults.clear();
}
// If the calculation is not completed, send a new task to the idle node
if (mReceivedJobResults < mJobCount) {
sendJobRequests(*workerClient);
} else {
qDebug() << "Generated in" << mTimer.elapsed() << "ms";
}
}
void MasterServer::removeWorkerClient(WorkerClient* workerClient)
{
QThread* thread = mWorkerClients.take(workerClient);
thread->quit();
thread->wait(1000);
delete thread;
delete workerClient;
}
void MasterServer::incomingConnection(qintptr socketDescriptor)
{
// Create a thread after receiving a new connection , Put the connection into the new thread
QThread* thread = new QThread(this);
WorkerClient* client = new WorkerClient(socketDescriptor);
mWorkerClients.insert(client, thread);
client->moveToThread(thread);
// Correlates the signals between the server and the computing node
connect(this, &MasterServer::abortAllJobs,client, &WorkerClient::abortJob);
connect(client, &WorkerClient::unregistered,this, &MasterServer::removeWorkerClient);
connect(client, &WorkerClient::jobCompleted,this, &MasterServer::process);
connect(thread, &QThread::started, client, &WorkerClient::start);
thread->start();
}
std::unique_ptr<JobRequest> MasterServer::createJobRequest(int jobValue)
{
// Different task parameters can be added according to specific business needs
auto jobRequest = make_unique<JobRequest>();
jobRequest->int_param = jobValue;
return jobRequest;
}
// Send a task to a computing node
void MasterServer::sendJobRequests(WorkerClient& client, int jobRequestCount)
{
QList<JobRequest> listJobRequest;
for (int i = 0; i < jobRequestCount; ++i) {
if (mJobRequests.empty()) {
break;
}
auto jobRequest = move(mJobRequests.back());
mJobRequests.pop_back();
listJobRequest.append(*jobRequest);
}
if (!listJobRequest.empty()) {
emit client.sendJobRequests(listJobRequest);
}
}
// Clear all computing tasks
void MasterServer::clearJobs()
{
mReceivedJobResults = 0;
mJobRequests.clear();
emit abortAllJobs();
}TCP The server of the is through WorkerClient Class to save the communication connection of each computing node , The communication connection server can realize two-way network communication with each computing node .
The implementation of the communication connection class is as follows :
//WorkerClient.h
#ifndef WORKERTHREAD_H
#define WORKERTHREAD_H
#include <QTcpSocket>
#include <QList>
#include <QDataStream>
#include "JobRequest.h"
#include "JobResult.h"
#include "Message.h"
class WorkerClient : public QObject
{
Q_OBJECT
public:
WorkerClient(int socketDescriptor);
// Get the of the compute node CPU Check the number
int cpuCoreCount() const;
signals:
// Node logout signal
void unregistered(WorkerClient* workerClient);
// Calculate the signal of the end of the task
void jobCompleted(WorkerClient* workerClient, JobResult jobResult);
// Send signals for computing tasks
void sendJobRequests(QList<JobRequest> requests);
public slots:
// The client starts data receiving
void start();
// Stop the calculation task of this node
void abortJob();
private slots:
// Send calculation task
void doSendJobRequests(QList<JobRequest> requests);
// Read message
void readMessages();
private:
// Node registered slot function
void handleWorkerRegistered(Message& message);
// Slot function for node logout
void handleWorkerUnregistered(Message& message);
// Processing computing tasks
void handleJobResult(Message& message);
private:
int mSocketDescriptor; // Socket descriptor is used to obtain the corresponding connection
int mCpuCoreCount; // The calculation node contains CPU Check the number
QTcpSocket mSocket; // Socket
QDataStream mSocketReader;
};
Q_DECLARE_METATYPE(WorkerClient*)
#endif // WORKERTHREAD_H//WorkerClient.cpp
#include "WorkerClient.h"
#include "MessageUtils.h"
WorkerClient::WorkerClient(int socketDescriptor) :
QObject(),
mSocketDescriptor(socketDescriptor),
mSocket(this),
mSocketReader(&mSocket)
{
connect(this, &WorkerClient::sendJobRequests,this, &WorkerClient::doSendJobRequests);
}
void WorkerClient::start()
{
// Start receiving messages
connect(&mSocket, &QTcpSocket::readyRead,this, &WorkerClient::readMessages);
mSocket.setSocketDescriptor(mSocketDescriptor);
}
void WorkerClient::abortJob()
{
// Send a message to stop calculation
MessageUtils::sendMessage(mSocket, Message::Type::ALL_JOBS_ABORT, true);
}
void WorkerClient::doSendJobRequests(QList<JobRequest> requests)
{
QByteArray data;
QDataStream stream(&data, QIODevice::WriteOnly);
stream << requests;
MessageUtils::sendMessage(mSocket, Message::Type::JOB_REQUEST, data);
}
void WorkerClient::readMessages()
{
auto messages = MessageUtils::readMessages(mSocketReader);
for(auto& message : *messages) {
switch (message->type) {
case Message::Type::WORKER_REGISTER:
handleWorkerRegistered(*message);
break;
case Message::Type::WORKER_UNREGISTER:
handleWorkerUnregistered(*message);
break;
case Message::Type::JOB_RESULT:
handleJobResult(*message);
break;
default:
break;
}
}
}
void WorkerClient::handleWorkerRegistered(Message& message)
{
// The registration message contains the CPU Check the number
QDataStream in(&message.data, QIODevice::ReadOnly);
in >> mCpuCoreCount;
}
void WorkerClient::handleWorkerUnregistered(Message& /*message*/)
{
emit unregistered(this);
}
void WorkerClient::handleJobResult(Message& message)
{
QDataStream in(&message.data, QIODevice::ReadOnly);
JobResult jobResult;
in >> jobResult;
emit jobCompleted(this, jobResult);
}
int WorkerClient::cpuCoreCount() const
{
return mCpuCoreCount;
}TCP Communication client
Work Applications are the computing nodes of the parallel computing framework , Responsible for actual computing tasks , The structure of the calculation node is shown in the following figure :

TCP The communication client and the compute node program are deployed together , Be responsible for receiving computing tasks sent by the server , When the computing task ends, it is responsible for sending the computing results to the server . The client is implemented as follows :
//Worker.h
#ifndef WORKER_H
#define WORKER_H
#include <QObject>
#include <QTcpSocket>
#include <QDataStream>
#include "Message.h"
#include "JobResult.h"
class Job;
class JobRequest;
class Worker : public QObject
{
Q_OBJECT
public:
Worker(QObject* parent = 0);
~Worker();
int receivedJobsCounter() const; // Number of tasks received
int sentJobCounter() const; // Number of tasks completed
signals:
// Disable all tasks
void abortAllJobs();
private slots:
// Read the messages sent by the server
void readMessages();
private:
// Register the computing node with the server
void sendRegister();
// Send calculation results
void sendJobResult(JobResult jobResult);
// Send node logout message
void sendUnregister();
// Process the task messages sent by the server
void handleJobRequest(Message& message);
// Terminate all computing tasks
void handleAllJobsAbort(Message& message);
// Create a new task
Job* createJob(const JobRequest& jobRequest);
private:
QTcpSocket mSocket; // Socket is responsible for network communication
QDataStream mSocketReader; // The binary stream is responsible for reading the communication data
int mReceivedJobsCounter; // Record the number of tasks received
int mSentJobsCounter; // Record the number of tasks completed
};
#endif // WORKER_H//Worker.cpp
#include "Worker.h"
#include <QDebug>
#include <QThread>
#include <QThreadPool>
#include <QHostAddress>
#include "Job.h"
#include "JobRequest.h"
#include "MessageUtils.h"
Worker::Worker(QObject* parent) :
QObject(parent),
mSocket(this),
mSocketReader(&mSocket),
mReceivedJobsCounter(0),
mSentJobsCounter(0)
{
connect(&mSocket, &QTcpSocket::connected, [this] {
qDebug() << "Connected";
sendRegister();
});
connect(&mSocket, &QTcpSocket::disconnected, [] {
qDebug() << "Disconnected";
});
connect(&mSocket, &QTcpSocket::readyRead,
this, &Worker::readMessages);
// Connect to the server through the specified address
mSocket.connectToHost(QHostAddress::LocalHost, 5000);
}
Worker::~Worker()
{
sendUnregister();
}
void Worker::sendRegister()
{
// When registering a compute node, provide local CPU Check the number
QByteArray data;
QDataStream out(&data, QIODevice::WriteOnly);
out << QThread::idealThreadCount();
MessageUtils::sendMessage(mSocket,
Message::Type::WORKER_REGISTER,
data);
}
// Send calculation results
void Worker::sendJobResult(JobResult jobResult)
{
mSentJobsCounter++;
QByteArray data;
QDataStream out(&data, QIODevice::WriteOnly);
out << jobResult;
MessageUtils::sendMessage(mSocket,
Message::Type::JOB_RESULT,
data);
}
// Send a logout message
void Worker::sendUnregister()
{
MessageUtils::sendMessage(mSocket,
Message::Type::WORKER_UNREGISTER,
true);
}
// Read the messages sent by the server
void Worker::readMessages()
{
auto messages = MessageUtils::readMessages(mSocketReader);
for(auto& message : *messages) {
switch (message->type) {
case Message::Type::JOB_REQUEST:
handleJobRequest(*message);
break;
case Message::Type::ALL_JOBS_ABORT:
handleAllJobsAbort(*message);
break;
default:
break;
}
}
}
void Worker::handleJobRequest(Message& message)
{
// After receiving the calculation task, create a new thread to execute the corresponding calculation task
QDataStream in(&message.data, QIODevice::ReadOnly);
QList<JobRequest> requests;
in >> requests;
mReceivedJobsCounter += requests.size();
for(const JobRequest& jobRequest : requests) {
QThreadPool::globalInstance()->start(createJob(jobRequest));
}
}
void Worker::handleAllJobsAbort(Message& /*message*/)
{
emit abortAllJobs();
QThreadPool::globalInstance()->clear();
mReceivedJobsCounter = 0;
mSentJobsCounter = 0;
}
Job* Worker::createJob(const JobRequest& jobRequest)
{
// Create corresponding tasks according to specific business requirements
Job* job = new Job(jobRequest);
connect(this, &Worker::abortAllJobs,job, &Job::abort);
connect(job, &Job::jobCompleted,this, &Worker::sendJobResult);
return job;
}
int Worker::sentJobCounter() const
{
return mSentJobsCounter;
}
int Worker::receivedJobsCounter() const
{
return mReceivedJobsCounter;
}Each computing task of a computing node is a thread running in the thread pool , In order to make the thread run in the thread pool and use the signal and slot mechanism , Calculation task Job You need to inherit at the same time QRunnable Classes and QObject class . The corresponding computing tasks are implemented as follows :
//Job.h
#ifndef JOB_H
#define JOB_H
#include <QObject>
#include <QRunnable>
#include <QAtomicInteger>
#include "JobRequest.h"
#include "JobResult.h"
class Job : public QObject, public QRunnable
{
Q_OBJECT
public:
explicit Job(const JobRequest& jobRequest, QObject *parent = 0);
void run() override;
signals:
// A signal of the end of a mission
void jobCompleted(JobResult jobResult);
public slots:
// Destruction task
void abort();
private:
QAtomicInteger<bool> mAbort;
JobRequest mJobRequest;
};
#endif // JOB_H//Job.cpp
#include "Job.h"
Job::Job(const JobRequest& jobRequest, QObject *parent) :
QObject(parent),
mAbort(false),
mJobRequest(jobRequest)
{
}
void Job::run()
{
// Different computing tasks are performed here according to different businesses
JobResult jobResult(mJobRequest.int_param);
for (int index = 0; index < mJobRequest.int_param; ++index)
{
if (mAbort.load()) {
return;
}
}
emit jobCompleted(jobResult);
}
void Job::abort()
{
mAbort.store(true);
}Task distribution, recovery and progress monitoring
Task dispatch and recovery
In the main application, we dispatch corresponding tasks according to business needs and present the collected calculation results in a specific form . Main application UI The invocation process in is as follows :
//ServerAppWidget.h
#ifndef SERVER_APP_WIDGET_H
#define SERVER_APP_WIDGET_H
#include <memory>
#include <QWidget>
#include <QPoint>
#include <QThread>
#include <QList>
#include "masterserver.h"
class QResizeEvent;
class ServerAppWidget : public QWidget
{
Q_OBJECT
public:
explicit ServerAppWidget(QWidget *parent = 0);
~ServerAppWidget();
public slots:
// Deal with the calculation
void processJobResults(QList<JobResult> jobResults);
// Dispatch new computing tasks
void generateNewTask(int start_value,int end_value);
private:
MasterServer mMasterServer;
QThread mThreadServer;
};
#endif//ServerAppWidget.cpp
#include "ServerAppWidget.h"
#include <QResizeEvent>
#include <QImage>
#include <QPainter>
#include <QtMath>
ServerAppWidget::ServerAppWidget(QWidget *parent) :
QWidget(parent),
mMasterServer(),
mThreadServer(this)
{
// Put the listening Work into the child thread
mMasterServer.moveToThread(&mThreadServer);
// A signal of the end of a mission
connect(&mMasterServer, &MasterServer::resultGenerated,
this, &ServerAppWidget::processJobResults);
mThreadServer.start();
}
ServerAppWidget::~ServerAppWidget()
{
mThreadServer.quit();
mThreadServer.wait(1000);
}
void ServerAppWidget::processJobResults(QList<JobResult> jobResults)
{
for(JobResult& jobResult : jobResults)
{
;
}
}
void ServerAppWidget::generateNewTask(int start_value, int end_value)
{
mMasterServer.generateNewTask(start_value,end_value);
}Progress monitoring
By comparing the number of completed tasks with the total number of computing tasks received by the computing node , We can view the calculation progress of the current node , The calculation progress of the current node is displayed as a progress bar :
//WorkerWidget.h
#ifndef WORKERWIDGET_H
#define WORKERWIDGET_H
#include <QWidget>
#include <QThread>
#include <QProgressBar>
#include <QTimer>
#include "Worker.h"
class WorkerWidget : public QWidget
{
Q_OBJECT
public:
explicit WorkerWidget(QWidget *parent = 0);
~WorkerWidget();
private:
QProgressBar mStatus; // Progress bar for calculating progress
Worker mWorker; // Computing node
QThread mWorkerThread; // Thread of computing node
QTimer mRefreshTimer; // Timer refresh
};
#endif//WorkerWidget.cpp
#include "WorkerWidget.h"
#include <QVBoxLayout>
WorkerWidget::WorkerWidget(QWidget *parent) :
QWidget(parent),
mStatus(this),
mWorker(),
mWorkerThread(this),
mRefreshTimer()
{
QVBoxLayout* layout = new QVBoxLayout(this);
layout->addWidget(&mStatus);
// The thread of task calculation
mWorker.moveToThread(&mWorkerThread);
// Refresh the calculation progress of the task
connect(&mRefreshTimer, &QTimer::timeout, [this] {
mStatus.setMaximum(mWorker.receivedJobsCounter());
mStatus.setValue(mWorker.sentJobCounter());
});
mWorkerThread.start();
mRefreshTimer.start(100);
}
WorkerWidget::~WorkerWidget()
{
mWorkerThread.quit();
mWorkerThread.wait(1000);
}Through the above design and processing, we can get a compact parallel computing framework . Through the parallel computing framework, we can upgrade single machine applications to multi machine cluster applications , So as to make full use of the computing performance of the local hardware .
边栏推荐
- 85. (leaflet house) leaflet military plotting - line arrow drawing
- Understand the difference between cookie+session, redis+token and JWT
- Ccf-csp 202012-5 Star Trek 80 points
- 故障分析 | MySQL 中新建用户无法登陆的一种特殊场景
- Leetcode 1185. Day of the week
- Do you know websocket?
- oracle 连接PLSQL
- The writing speed is increased by tens of times. The application of tdengine in tostar intelligent factory solution
- Redis the distance between two points and the center point
- Laravel view access routes
猜你喜欢

神经网络学习(五)----常见的网络结构对比

Multi scale aligned distillation for low resolution detection

RTSP/Onvif协议视频平台EasyNVR如何配置用户的视频流播放时长?

6.18 approaching, Ott becomes a new marketing battlefield

The writing speed is increased by tens of times. The application of tdengine in tostar intelligent factory solution

Traversal of binary tree

Some bugs of unity physics engine caused by squeezing

What taxes do Tami dogs need to pay for equity transfer? What are the possible tax risks?

關於JS console.log() 是同步 or 异步引發的問題

Simple use of Wireshark
随机推荐
InfoQ geek media's 15th anniversary solicitation | one minute Automated Deployment Based on ECs [elastic ECS]
Vivado HLS int8/9 multiplication optimization
Foxconn suffered another blackmail attack, and a large number of unencrypted files were leaked
word+正则表达式==快速批量添加图注题注(保姆级图文)
Ccf-csp 202203-1 uninitialized warning
Free video format converter
Some thoughts on callback
Redis6学习笔记-第一章-Redis的介绍与环境搭建
Do you know the specifications of e-commerce background permission settings!
ERP从内部集成起步开篇
Ccf-csp 201503-3 Festival
Embedded related open source projects, libraries and materials
Generation of random numbers in C language [detailed explanation]
服务器注册使用
Leetcode 1185. Day of the week
Neural network learning (IV) -- a simple summary of the knowledge of each layer of neural network
Android 程序常用功能《清除缓存》
Word+ regular expression = = quickly batch add caption (nanny level text)
What does the seven layer network structure do?
Ccf-csp 201409-3 string matching