当前位置:网站首页>Zeromq from getting started to mastering
Zeromq from getting started to mastering
2022-06-26 04:05:00 【Dreamers on the road】
One 、ZeroMQ sketch
ZeroMQ Is a multi-threaded network library based on message queue , For socket type 、 Connection processing 、 frame 、 Even the underlying details of routing are abstracted , Provides sockets that span multiple transport protocols . There are three common communication modes :
Request response model
The requester initiates the request , And wait for the responder to respond to the request . From the point of view of the requester, it must be a pair of transceiver pairs ; conversely , At the response end, it must be the sender receiver pair . Both the requestor and the responder can be 1:N Model of . Usually put 1 Think it's server ,N Think it's Client .ZeroMQ It can support routing function very well ( The component that implements the routing function is called Device), hold 1:N Expand to N:M ( Only a few routing nodes need to be added ). From this model , The bottom endpoint address is hidden from the top . Each request contains an implicit response address , And apps don't care about it .
Publish and subscribe model
In this model , The publisher only sends data in one direction , And don't care whether all the information is sent to the subscriber . If the publisher starts publishing information , The subscriber is not connected yet , This information is discarded directly . But once the subscriber is connected , There will be no loss of information . Again , The subscriber is only responsible for receiving , Without feedback . If the publisher and subscriber need to interact ( For example, confirm whether the subscriber is connected ), Use additional socket Use the request response model to meet this requirement .
Pipe model
In this model , The pipe is one-way , from PUSH The end is unidirectional PULL One way push data flow at the end .
Two 、 Process steps
1、 Request response model
Server implementation :
(1)zmq_ctx_new()
// return ctx_t object
void *zmq_ctx_new (void)
{
// We do this before the ctx constructor since its embedded mailbox_t
// object needs the network to be up and running (at least on Windows).
if (!zmq::initialize_network ()) {
return NULL;
}
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx;
}The function returns context( Context ), It's actually calling theta ctx_t object ( Instantiation ),
zmq::ctx_t::ctx_t () :
_tag (ZMQ_CTX_TAG_VALUE_GOOD),
_starting (true),
_terminating (false),
_reaper (NULL),
_max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
_max_msgsz (INT_MAX),
_io_thread_count (ZMQ_IO_THREADS_DFLT),
_blocky (true),
_ipv6 (false),
_zero_copy (true)
{
#ifdef HAVE_FORK
_pid = getpid ();
#endif
#ifdef ZMQ_HAVE_VMCI
_vmci_fd = -1;
_vmci_family = -1;
#endif
// Initialise crypto library, if needed.
zmq::random_open ();
#ifdef ZMQ_USE_NSS
NSS_NoDB_Init (NULL);
#endif
#ifdef ZMQ_USE_GNUTLS
gnutls_global_init ();
#endif
}It mainly sets initialization parameters ; Such as ;_max_sockets = 1024;_io_thread_count = 1; Of course, there are some status settings and so on ;
(2)zmq_socket()
void *zmq_socket (void *ctx_, int type_)
{
// The object is NULL, Then return to
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return NULL;
}
// Strong go
zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
Parameters :
void *ctx_;zmq_ctx_new Context parameter returned ;
int type_:Socket types.
Socket types:
ZMQ_PAIR 0
ZMQ_PUB 1
ZMQ_SUB 2
ZMQ_REQ 3
ZMQ_REP 4
ZMQ_DEALER 5
ZMQ_ROUTER 6
ZMQ_PULL 7
ZMQ_PUSH 8
ZMQ_XPUB 9
ZMQ_XSUB 10
ZMQ_STREAM 11Return value : Create the generated socket, call create_socket().
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (_slot_sync);
// Initialize mailbox array , Add two slots (slots),
//zmq_ctx_term thread and reaper thread
if (unlikely (_starting)) {
if (!start ())
return NULL;
}
// Once zmq_ctx_term() was called, we can't create new sockets.
if (_terminating) {
errno = ETERM;
return NULL;
}
// If max_sockets limit was reached, return error.
if (_empty_slots.empty ()) {
errno = EMFILE;
return NULL;
}
// Choose a slot for the socket.
uint32_t slot = _empty_slots.back ();
_empty_slots.pop_back ();
// Generate new unique socket ID.
// Generate a new unique socket ID.
int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
// Create the socket and register its mailbox.
// establish socket And register email
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) {
_empty_slots.push_back (slot);
return NULL;
}
_sockets.push_back (s);
_slots[slot] = s->get_mailbox ();
return s;
}call start () function :
bool zmq::ctx_t::start ()
{
// Initialise the array of mailboxes. Additional two slots are for
// zmq_ctx_term thread and reaper thread.
_opt_sync.lock ();
const int term_and_reaper_threads_count = 2;
const int mazmq = _max_sockets; //1023
const int ios = _io_thread_count;//1
_opt_sync.unlock ();
int slot_count = mazmq + ios + term_and_reaper_threads_count;//1026
try {
// Add capacity , Don't create objects
_slots.reserve (slot_count);
_empty_slots.reserve (slot_count - term_and_reaper_threads_count);
}
catch (const std::bad_alloc &) {
errno = ENOMEM;
return false;
}
// Changed the size of the container , And created the objects in the container
_slots.resize (term_and_reaper_threads_count);
// Initialise the infrastructure for zmq_ctx_term thread.
_slots[term_tid] = &_term_mailbox;
// Create thread
_reaper = new (std::nothrow) reaper_t (this, reaper_tid);
if (!_reaper) {
errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!_reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
_slots[reaper_tid] = _reaper->get_mailbox ();
_reaper->start ();
// Create I/O thread objects and launch them.
_slots.resize (slot_count, NULL);
// establish IO Thread and start
for (int i = term_and_reaper_threads_count;
i != ios + term_and_reaper_threads_count; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
_io_threads.push_back (io_thread);
_slots[i] = io_thread->get_mailbox ();
io_thread->start ();
}
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
_empty_slots.push_back (i);
}
_starting = false;
return true;
fail_cleanup_reaper:
_reaper->stop ();
delete _reaper;
_reaper = NULL;
fail_cleanup_slots:
_slots.clear ();
return false;
}(3)zmq_bind()
(4)zmq_recv()
(5)zmq_send()
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int argc, char* argv[])
{
void* context=zmq_ctx_new();
void* socket=zmq_socket(context,ZMQ_REP);
zmq_bind(socket,"tcp://192.168.207.129:9999");
while(true)
{
char buf[10];
int bytes=zmq_recv(socket,buf,10,0);
buf[bytes]='\0';
printf("[Server] Received Request Message:%d bytes,content:\"%s\"\n",bytes,buf);
sleep(1);
const char* replyMsg="World";
bytes=zmq_send(socket,replyMsg,strlen(replyMsg),0);
printf("[Server] Sended Reply Message:%d bytes,content:\"%s\"\n",bytes,replyMsg);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}Client implementation :
(1)zmq_ctx_new()
(2)zmq_socket()
(3)zmq_bind()
(4)zmq_send()
(5)zmq_recv()
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
int main(int argc, char* argv[])
{
printf("Connect to server...\n");
void* context=zmq_ctx_new();
void* socket=zmq_socket(context,ZMQ_REQ);
zmq_connect(socket,"tcp://192.168.207.129:9999");
int k=0;
while(true)
{
char buf[10];
const char* requestMsg="Hello";
int bytes=zmq_send(socket,requestMsg,strlen(requestMsg),0);
printf("[Client] [%d] Send request Message: %d bytes,content:\"%s\"\n",k,bytes,requestMsg);
bytes=zmq_recv(socket,buf,10,0);
buf[bytes]='\0';
printf("[Client] [%d] Received Reply Message: %d bytes,content:\"%s\"\n",k,bytes,buf);
k++;
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}MakeFile
all:client server
client:client.cpp
g++ -std=c++11 client.cpp -o client -lzmq -lpthread -g
server:server.cpp
g++ -std=c++11 server.cpp -o server -lzmq -lpthread -g
clean:
rm -f server clientmake Is a run operation ,make clean eliminate server,client
2、 subscribe - Release pattern :
ZeroMQ The subscription publication mode of is a one-way data publication , After the client subscribes to the server , The server will continuously push the generated messages to the subscribers .
characteristic :
- A publisher , Multiple subscribers , namely 1:n;
- Publish data when publisher data changes , All subscribers can receive data and process it . This is the release / A subscription model .
- Be careful : Use SUB When setting up a subscription , You have to use zmq_setsockopt( ) Filter messages ;
Publisher use PUB Socket sends message to queue , Subscribers use SUB Sockets flow from the queue iesho9u news . New subscribers can join at any time , But previous messages cannot be received ; Existing subscribers can exit at any time ; Subscribers can also add “ filter ” Used to selectively receive information .

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int arg,char* argv[])
{
void* context=zmq_ctx_new();
assert(context!=NULL);
void* socket=zmq_socket(context,ZMQ_PUB);//socket Publisher mode
assert(socket!=NULL);
int ret=zmq_bind(socket,"tcp://192.168.207.129:9999");
assert(ret==0);
int k=0;
while(true)
{
char buf[1024];
memset(buf,0,sizeof(buf));
snprintf(buf,sizeof(buf),"server i=%d",k);
ret=zmq_send(socket,buf,strlen(buf)+1,0);
k++;
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <thread>
#define TRUE 1
void Recv(void* arg)
{
while(TRUE)
{
void* socket=arg;
printf("into while\n");
char buf[1024];
memset(buf,0,sizeof(buf));
int ret=zmq_recv(socket,buf,sizeof(buf)-1,0);
if(ret>0)
{
printf("Recv:%s\n",buf);
}
}
}
int main(int argc, char** argv)
{
printf("Hello,world!\n");
void* context=zmq_ctx_new();
assert(context!=NULL);
void* socket=zmq_socket(context,ZMQ_SUB);// Subscriber pattern
assert(socket!=NULL);
int ret=zmq_connect(socket,"tcp://192.168.207.129:9999");
assert(ret==0);
ret=zmq_setsockopt(socket,ZMQ_SUBSCRIBE,"",0);
assert(ret==0);
std::thread t1(Recv,socket);
std::thread t2(Recv,socket);
t1.join();
t2.join();
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}makefile:
all:pub sub
CXX=g++
CXXFLAGS=-fPIC -std=c++11 -o
LDFLAGS=-lzmq -lpthread
pub:pub.cpp
$(CXX) pub.cpp $(CXXFLAGS) pub $(LDFLAGS)
sub:sub.cpp
$(CXX) sub.cpp $(CXXFLAGS) sub $(LDFLAGS)
clean:
rm -f sub pub3、 Push pull mode
Push pull mode ,PUSH send out ,send.PULL Party A receives ,recv.PUSH Can and multiple PULL Establishing a connection ,PUSH The data sent is sent to in sequence PULL Fang . Like you PUSH And three PULL Establishing a connection , Namely A,B,C.PUSH The first data sent will be sent to A, The second data will be given to B, The third data is for C, The fourth data to A. It's been a cycle .

- At the top is the task generation Distributor ventilator
- In the middle is the executor worker
- Here are the recipients of the collected results sink
Distributor ventilator
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
int main(int argc,char **argv)
{
void * context=zmq_ctx_new();
void * sender=zmq_socket(context, ZMQ_PUSH);
zmq_bind(sender, "tcp://*:6666");
printf("Press Enter when the workers are ready: ");
getchar();
printf("Sending tasks to workers...\n");
while(true)
{
const char * replyMsg="World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}practitioners worker
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int argc, char **argv)
{
void * context=zmq_ctx_new();
void * recviver=zmq_socket(context, ZMQ_PULL);
zmq_connect(recviver, "tcp://localhost:6666");
void * sender=zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5555");
while(1)
{
char buffer [256];
int size=zmq_recv (recviver, buffer, 255, 0);
if(size < 0)
{
break;
}
printf("buffer:%s\n",buffer);
const char * replyMsg="World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content=\"%s\"\n", replyMsg);
}
zmq_close(recviver);
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}The receiver sink
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main(int argc, char **argv)
{
void * context=zmq_ctx_new();
void * socket=zmq_socket(context,ZMQ_PULL);
zmq_bind(socket, "tcp://*:5555");
while(true)
{
char buffer [256];
int size=zmq_recv(socket,buffer,255,0);
if(size<0)
{
break;
}
printf("buffer:%s\n",buffer);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
边栏推荐
- Nailing open platform - applet development practice (nailing applet client)
- What should I do if the 51 SCM board cannot find the device in keil
- DETR3D 多2d图片3D检测框架
- 【掘金运营套路揭露】真心被掘金的套路....
- 力扣 515. 在每个树行中找最大值
- 【Flink】Flink 批处理模式Map端数据聚合 NormalizedKeySorter
- In the matter of getting customers at sea, how can advertisers play besides digital advertising?
- 1.基础关
- Spark - 一文搞懂 parquet
- asp. Net web page, ASP connects to the database, and uses asp:panel and asp:dropdownlist controls
猜你喜欢

Spark - understand parquet

Oracle technology sharing Oracle 19.14 upgrade 19.15

Conditional variables for thread synchronization

用eclipse连mysql数据库出错然后出现图中的话是咋回事呀

Tencent Interviewer: How did binder get its system services?

高性能算力中心 — RoCE — Overview

What if the serial port fails to open when the SCM uses stc-isp to download software?

Matplotlib line chart, text display, win10

An error occurred using the connection to database 'on server' 10.28.253.2‘

Small record of neural network learning 71 - tensorflow2 deep learning with Google Lab
随机推荐
Part 4: drawing quadrilateral
xml 解析bean工具类
ABP framework Practice Series (II) - Introduction to domain layer
Detailed explanation of widget construction process of fluent
[MySQL] MySQL export database
【MySQL】 MySQL 导出数据库
Machine learning notes - trend components of time series
Threejs special sky box materials, five kinds of sky box materials are downloaded for free
如何解决 Iterative 半监督训练 在 ASR 训练中难以落地的问题丨RTC Dev Meetup
Conditional variables for thread synchronization
I/o virtualization technology - UIO framework
【Flink】Flink Sort-Shuffle写流程简析
软件调试测试的十大重要基本准则
QPS的概念和实现
I/o virtualization technology - vfio
[Flink] a brief analysis of the writing process of Flink sort shuffle
816. fuzzy coordinates
The style of the mall can also change a lot. DIY can learn about it
Question brushing record day01
169. most elements