当前位置:网站首页>一文走近ZMQ
一文走近ZMQ
2022-06-22 09:10:00 【InfoQ】
一、我们为什么需要ZMQ
- 如何处理I/O?是让程序阻塞等待响应,还是在后台处理这些事?这是软件设计的关键因素。阻塞式的I/O操作会让程序架构难以扩展,而后台处理I/O也是比较困难的。
- 如何处理那些临时的、来去自由的组件?我们是否要将组件分为客户端和服务端两种,并要求服务端永不消失?那如果我们想要将服务端相连怎么办?我们要每隔几秒就进行重连吗?
- 如何表示一条消息?我们怎样通过拆分消息,让其变得易读易写,不用担心缓存溢出,既能高效地传输小消息,又能胜任视频等大型文件的传输?
- 如何处理那些不能立刻发送出去的消息?比如我们需要等待一个网络组件重新连接的时候?我们是直接丢弃该条消息,还是将它存入数据库,或是内存中的一个队列?
- 要在哪里保存消息队列?如果某个组件读取消息队列的速度很慢,造成消息的堆积怎么办?我们要采取什么样的策略?
- 如何处理丢失的消息?我们是等待新的数据,请求重发,还是需要建立一套新的可靠性机制以保证消息不会丢失?如果这个机制自身崩溃了呢?
- 如果想换一种网络连接协议,如用广播代替TCP单播?或者改用IPv6?我们是否需要重写所有的应用程序,或者将这种协议抽象到一个单独的层中?
- 如何对消息进行路由?我们可以将消息同时发送给多个节点吗?是否能将应答消息返回给请求的发送方?
- 如何为另一种语言写一个API?我们是否需要完全重写某项协议,还是重新打包一个类库?
- 怎样才能做到在不同的架构之间传送消息?是否需要为消息规定一种编码?
- 如何处理网络通信错误?等待并重试,还是直接忽略或取消?
- ZMQ会在后台线程异步地处理I/O操作,它使用一种不会死锁的数据结构来存储消息。
- 网络组件可以来去自如,ZMQ会负责自动重连,这就意味着你可以以任何顺序启动组件;用它创建的面向服务架构(SOA)中,服务端可以随意地加入或退出网络。
- ZMQ会在有必要的情况下自动将消息放入队列中保存,一旦建立了连接就开始发送。
- ZMQ有阈值(HWM)的机制,可以避免消息溢出。当队列已满,ZMQ会自动阻塞发送者,或丢弃部分消息,这些行为取决于你所使用的消息模式。
- ZMQ可以让你用不同的通信协议进行连接,如TCP、广播、进程内、进程间。改变通信协议时你不需要去修改代码。
- ZMQ会恰当地处理速度较慢的节点,会根据消息模式使用不同的策略。
- ZMQ提供了多种模式进行消息路由,如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构。
- ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度。
- ZMQ会发送整个消息,使用消息帧的机制来传递。如果你发送了10KB大小的消息,你就会收到10KB大小的消息。
- ZMQ不强制使用某种消息格式,消息可以是0字节的,或是大到GB级的数据。当你表示这些消息时,可以选用诸如谷歌的protocol buffers,XDR等序列化产品。
- ZMQ能够智能地处理网络错误,有时它会进行重试,有时会告知你某项操作发生了错误。
- ZMQ甚至可以降低对环境的污染,因为节省了CPU时间意味着节省了电能。
二、ZeroMQ 背景介绍
三、ZMQ是什么
四、三种模型
4.1 应答模式
4.2 订阅发布模式
4.3 基于分布式处理(管道模式)
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope publisher
*/
public class psenvpub {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
while (!Thread.currentThread ().isInterrupted ()) {
// Write two messages, each with an envelope and content
publisher.sendMore ("A");
publisher.send ("We don't want to see this");
publisher.sendMore ("B");
publisher.send("We would like to see this");
}
publisher.close ();
context.term ();
}
}context.socket(ZMQ.PUB)package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope subscriber
*/
public class psenvsub {
public static void main (String[] args) {
// Prepare our context and subscriber
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread ().isInterrupted ()) {
// Read envelope with address
String address = subscriber.recvStr ();
// Read message contents
String contents = subscriber.recvStr ();
System.out.println(address + " : " + contents);
}
subscriber.close ();
context.term ();
}
}package cn.edu.ujn.req_rep;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
import org.zeromq.ZMQ;
public class hwserver {
private static int i = 0;
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received " + new String(request) + i++);
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}package cn.edu.ujn.req_rep;
// Hello World client in Java
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import java.util.Random;
import org.zeromq.ZMQ;
// Task ventilator in Java
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
public class taskvent {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
// Socket to send messages on
ZMQ.Socket sink = context.socket(ZMQ.PUSH);
sink.connect("tcp://localhost:5558");
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
sink.send("0", 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = srandom.nextInt(100) + 1;
total_msec += workload;
System.out.print(workload + ".");
String string = String.format("%d", workload);
sender.send(string, 0);
}
System.out.println("Total expected cost: " + total_msec + " msec");
Thread.sleep(1000); // Give 0MQ time to deliver
sink.close();
sender.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task worker in Java
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
public class taskwork {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to receive messages on
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (!Thread.currentThread ().isInterrupted ()) {
String string = new String(receiver.recv(0)).trim();
long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + '.');
// Do the work
Thread.sleep(msec);
// Send results to sink
sender.send("".getBytes(), 0);
}
sender.close();
receiver.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task sink in Java
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
public class tasksink {
public static void main (String[] args) throws Exception {
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.bind("tcp://*:5558");
// Wait for start of batch
String string = new String(receiver.recv(0));
// Start our clock now
long tstart = System.currentTimeMillis();
// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
string = new String(receiver.recv(0)).trim();
if ((task_nbr / 10) * 10 == task_nbr) {
System.out.print(":");
} else {
System.out.print(".");
}
}
// Calculate and report duration of batch
long tend = System.currentTimeMillis();
System.out.println("\nTotal elapsed time: " + (tend - tstart) + " msec");
receiver.close();
context.term();
}
}五、注意事项
5.1 正确地使用上下文
5.2 正确地退出和清理
- 处理完消息后,记得用zmq_msg_close()函数关闭消息;
- 如果你同时打开或关闭了很多套接字,那可能需要重新规划一下程序的结构了;
- 退出程序时,应该先关闭所有的套接字,最后调用zmq_term()函数,销毁上下文对象。
5.3 你的想法可能会被颠覆
边栏推荐
- 国外十款免费开源PHP网店程序介绍
- 函数总结(1)
- PHP seven methods to obtain complete instances of file name suffixes [collect]
- Medical information management system database mysql
- Didi's two-sided summary
- MSSQL injection of SQL injection
- Kali Trojan invades win7 system
- [detailed explanation] point multiplication and cross multiplication of neural network matrix (pytorch version)
- C语言刷题 | 判断某年是否只闰年(12)
- Final典型案例
猜你喜欢

面试突击59:一个表中可以有多个自增列吗?

OpenCV每日函数 直方图相关(3)

Sound and shadow 2022 heavy release! Detailed explanation of the new functions of Huisheng Huiying 2022

DOM programming

pytorch OSError: DLL load failed: 问题解决方法

【node】快收下爬虫,我们不再为数据发愁

MSSQL injection of SQL injection

报告:在技术领域,男性更有可能获得工作面试的机会

Interview shock 59: can there be multiple auto increment columns in a table?

yolov5 export Gpu推理模型导出
随机推荐
【node】脚手架搭建服务器,完成token验证
Php+mysqli create a table, read multiple items, add, modify and query a complete instance
np.arange与np.linspace细微区别(数据溢出问题)
Luogu p5406 [thupc2019] find tree
threejs实现简单全景看房demo
PHP login registration page
看看volatile你深知多少
Monaco editor format JSON, SQL
Xshell远程服务器tensorboard/visdom的本地可视化方法【亲测一步有效】
When easypoi imports the secondary header data of an excel file, the data in the first column of the @excelentity entity class is null
yolov5 export Gpu推理模型导出
为啥要使用梯度下降法
Servlet的生命周期
景联文科技:机器学习AI数据集产品汇总(三)
ffmpeg之volumedetect
[detailed explanation] point multiplication and cross multiplication of neural network matrix (pytorch version)
Win11 mongodb installation tutorial
C语言刷题 | 三目运算实现判断大写(16)
Message Oriented Middleware (message queue)
PHP seven methods to obtain complete instances of file name suffixes [collect]