当前位置:网站首页>通过两种方式手写一个消息队列
通过两种方式手写一个消息队列
2022-06-11 06:11:00 【温JZ】
参考每特教育蚂蚁课堂
1.基于多线程的方式实现MQ(极简纯享版)
MQ就是在消费者服务和生产者服务传递信息的通道,生产者向队列中投递消息,然后消费者过来取。所以我们可以这样设计,用一个队列作为中间件,然后开启两个线程,一个是生产者线程,一个是消费者线程,生产者线程向队列中插入json串,消费者线程过来取这个就是一个简单的MQ的实现。
private static LinkedBlockingDeque<JSONObject> msgs = new LinkedBlockingDeque<JSONObject>();
public static void main(String[] args) {
// 生产线程
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
JSONObject data = new JSONObject();
data.put("wjz", "nb");
// 存入消息
msgs.offer(data);
}
} catch (Exception e) {
}
}
}, "生产者");
producerThread.start();
// 消费者线程
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
JSONObject data = msgs.poll();
if (data != null) {
System.out.println(LocalTime.now() + Thread.currentThread().getName() + "," + data);
}
}
} catch (Exception e) {
}
}
}, "消费者");
consumerThread.start();
}
我们可以看到每隔1s生产者就会向消息队列中投放消息,消费者不断接收消息并打印结果,

如图所示大概是每隔1s消费者接收到消息。
这里面我们要注意的是不能用普通队列,必须用阻塞队列,因为阻塞队列当队列有东西的时候回自动触发消费者线程从队列里取东西,当队列为空的时候会自动触发生产者线程往里面放东西。如果用普通队列的话就会一直往队列里放东西而没有取东西,所以他会阻塞在生产者线程,不会进行线程之间的切换。

2.基于网络通讯的方式实现MQ
我们可以基于Netty实现一个MQ,首先我要有一个MQ服务端用来接收连接以及推送消息。
public class MayiktNettyMQServer {
public void bind(int port) throws Exception {
/** * Netty 抽象出两组线程池BossGroup和WorkerGroup * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。 */
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 设定NioServerSocketChannel 为服务器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服务器端监听数据回调Handler
.childHandler(new MayiktNettyMQServer.ChildChannelHandler());
//绑定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("当前服务器端启动成功...");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅关闭 线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置异步回调监听
ch.pipeline().addLast(new MayiktNettyMQServer.MayiktServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 9008;
new MayiktNettyMQServer().bind(port);
}
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
// 生产者投递消息的:topicName
public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
/** * 服务器接收客户端请求 * * @param ctx * @param data * @throws Exception */
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data)
throws Exception {
// 接收生产者或者消费者的连接,根据接收到的连接做出不同的处理
JSONObject clientMsg = getData(data);
String type = clientMsg.getString("type");
switch (type) {
case type_producer:
producer(clientMsg);
break;
case type_consumer:
consumer(ctx);
break;
}
}
private void consumer(ChannelHandlerContext ctx) {
// 保存消费者连接,目的是为了生产者可以在生产消息的同时主动推送给消费者信息
ctxs.add(ctx);
// 主动拉取mq服务器端缓存中没有被消费的消息
String data = msgs.poll();
if (StringUtils.isEmpty(data)) {
return;
}
// 将该消息发送给消费者
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
private void producer(JSONObject clientMsg) {
// 缓存生产者投递 消息
String msg = clientMsg.getString("msg");
msgs.offer(msg);
//需要将该消息推送消费者
ctxs.forEach((ctx) -> {
// 将该消息发送给消费者
String data = msgs.poll();
if (data == null) {
return;
}
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
});
}
private JSONObject getData(Object data) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) data;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return JSONObject.parseObject(body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
}
Netty如何建立连接就不做介绍了,直接看MQ相关的部分,看一下前置的一些变量。
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
消息传进来的时候都会带着type类型,看看他是生产者发来的还是消费者发来的两种类型会有不同的处理方式。
private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
消息队列用阻塞队列来存放消息,ctxs是存放所有消费者连接的。这里我们只是设计了一个简易的MQ,其实真的的MQ应该有Topic主题,通过订阅不同的主题,消费者能拿到不同的消息。我们可以用一个Map来实现key存放topic,value存放LinkedBlockingDeque。
同样的消费者集群也应该分为不同的组,所以存放消费者连接的容器也应该是一个Map,key存放groupId, value存放ArrayList。
这是一个服务端那就必须得有接收信息的能力所有就有了channelRead0()这个方法,这个方法有两个参数一个是ctx就是消费者连接,一个是Object data这个就是传进来的消息。然后我们将这个data转化成JSONObject类型,然后拿到他的type看看他是producer还是consumer之后执行各自的方法。
如果他是producer就执行produce这个生产者函数,生产者要生产东西,然后投放到MQ当中,然后可以主动推送给消费者。也就是从队列中拿到消息,通过保存的消费者连接发送给消费者。
如果他是consumer就执行consumer方法,consumer方法中首先我要把这个消费者的连接存放在list当中,然后拿到队列中的消息,推送给消费者。
生产者
public class MayiktNettyMQProducer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/** * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。 * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。 * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。 */
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MayiktNettyMQProducer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
MayiktNettyMQProducer client = new MayiktNettyMQProducer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("wjz", "nb");
data.put("msg", msg);
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/** * 客户端读取到服务器端数据 * * @param ctx * @param msg * @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
生产者会和MQ服务器建立连接,然后通过channelActive将消息投放到MQ服务器。然后通过channelRead接收来自服务器的消息。
消费者
public class MayiktNettyMQConsumer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/** * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。 * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。 * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。 */
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MayiktNettyMQConsumer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
MayiktNettyMQConsumer client = new MayiktNettyMQConsumer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "consumer");
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/** * 客户端读取到服务器端数据 * * @param ctx * @param msg * @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
和生产者一样,消费者也是主要就两个流程一个是接收消息,一个是发送消息,生产者可以主动获取到MQ服务器的消息。也可以像MQ服务器中投放消息。
我们可以结合图示来理解一下

最后我们看一下结果

边栏推荐
- Linux Installation redis
- Error reporting injection of SQL injection
- 跨境电商测评自养号团队应该怎么做?
- FPGA设计——乒乓操作实现与modelsim仿真
- jenkins-不同风格的项目构建
- Sqli-libs range 23-24 filtration and secondary injection practice
- Chapter 6 of machine learning [series] random forest model
- Using Metasploit Trojan horse for remote control
- MMEditing中超分模型训练与测试
- 修复鼠标右键没有vscode快捷入口的问题
猜你喜欢
![Chapter 4 of machine learning [series] naive Bayesian model](/img/77/7720afe4e28cd55284bb365a16ba62.jpg)
Chapter 4 of machine learning [series] naive Bayesian model

Devsecops in Agile Environment

FPGA interview notes (III) -- implementation of handshake signal synchronization in cross clock domain, arbitrary frequency division, binary conversion, RAM memory, original code inversion and complem

Installing and using sublist3r in Kali

Deployment of Flink

做亚马逊测评要了解的知识点有哪些?

Thymeleafengine template engine

Shandong University machine learning experiment VI k-means

Using Internet of things technology to accelerate digital transformation

Login and registration based on servlet, JSP and MySQL
随机推荐
Transfer Learning
Sqoop installation tutorial
How to use the markdown editor
Data quality: the core of data governance
[reading this article is enough!!! Easy to understand] confidence level understanding (95% confidence level and confidence interval)
This point of arrow function
Global case | how an airline with a history of 100 years can expand and transform to promote innovation in the aviation industry
Jenkins voucher management
Installing MySQL for Linux
Analyze the capacity expansion mechanism of ArrayList
Continuous update of ansible learning
Linux Installation redis
Verilog realizes binocular camera image data acquisition and Modelsim simulation, and finally matlab performs image display
Review XML and JSON
Pycharm usage experience
Metasploitabile2 target learning
FPGA面试题目笔记(二)——同步异步D触发器、静动态时序分析、分频设计、Retiming
FPGA面试题目笔记(三)——跨时钟域中握手信号同步的实现、任意分频、进制转换、RAM存储器等、原码反码和补码
MMEditing中超分模型训练与测试
End of 2021 graphics of Shandong University