当前位置:网站首页>手写一个的在线聊天系统(原理篇1)
手写一个的在线聊天系统(原理篇1)
2022-07-06 10:47:00 【Java知音_】
点击关注公众号,实用技术文章及时了解
摘要:边学 Netty 相关原理,边实现一个在线聊天系统。本篇文章主要讲述一些 Netty 的基本原理、流程,可能有点枯燥,不过也很重要,相关 demo 代码最好可以自己动手敲一遍,这样更容易理解。不过也不必担心过于复杂,这只是相当于 Netty 的 Hello World。
一、目录介绍
前置知识点
NIO
Netty 的核心组件
Channel
Callback
Future 和 Promise
事件和 ChannelHandler
Hello World
二、前置知识点
1、NIO
首先我们需要回顾一下,同步、异步、阻塞、非阻塞的相关概念。
同步:调用 API 后,调用者能“马上”就知道操作的结果。
异步:相对于同步,调用 API 后,调用者不能“马上”知道操作的结果,要等被调用方 回调 通知结果。
阻塞:等待全部数据读取(写入)完成后,才返回。
非阻塞:读取时,读多少返回多少;写入时,写入多少返回多少。不用等待,全部数据完成操作后,才返回。
NIO 是一种 同步非阻塞 的 I/O模型。
同步是指线程不断轮询 I/O 事件是否就绪。
非阻塞是指线程在等待 I/O 的时候,可以同时做其他任务。
同步的核心是 选择器,选择器代替了线程本身轮询 I/O 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是 通道和缓冲区,当 I/O 事件就绪时,可以通过写到缓冲区,保证 I/O 的成功,而无需线程阻塞式地等待。
NIO主要有三大核心部分:
Channel(通道)
Buffer(缓冲区)
Selector(选择器)
传统 I/O 基于 字节流和字符流 进行操作,而 NIO 基于 Channel 和 Buffer 进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector 用于监听多个通道的事件(连接打开,数据到达等)。因此,单个线程可以监听多个数据通道,如下图所示:
三、Netty 的核心组件
1、Channel
Channel 是一个通道,用于连接字节缓冲区 Buffer 和另一端的实体。在 NIO 网络编程模型中,服务端和客户端进行 I/O 数据交互(得到彼此推送的信息)的媒介就是 Channel。
Netty 对 JDK 原生的 ServerSocketChannel
进行了封装和增强。
Netty的Channel增加了如下的组件:
id 标识唯一身份信息
可能存在的 parent Channel
管道 pepiline
用于数据读写的 unsafe 内部类
事件循环执行器 NioEventLoop
Channel可以分成两类:
服务端: NioServerSocketChannel
客户端: NioSocketChannel
具体依赖关系如下图所示:
服务端: NioServerSocketChannel
NioServerSocketChannel
客户端: NioSocketChannel
2、Callback
callback 就是回调,一个方法可以在适当的时候回过头来调用这个 callback 方法。callback 是用于通知相关方某个操作已经完成最常用的方法之一。
Netty 在处理事件时内部使用了 callback。当一个 callback 被触发,事件可以被 ChannelHandler 的接口实现处理。
一个简单的例子如下所示:
public class ConnectHandler extends ChannelInboundHandlerAdapter {
// 当一个新的连接建立时,channelActive 被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
}
}
当一个新的连接建立后,ChannelHandler 的 callback 方法 channelActive() 会被调用,然后打印一条消息。
这个 ConnectHandler 实例(相当于被调用者)以参数的形式传入创建 Channel 连接的函数(调用者)中,之后这个函数创建新连接后,就会回来调用这个 ConnectHandler 的 channelActive 方法,这个过程就叫回调。
3、Future 和 Promise
Future 和 Promise 起源于函数式编程,目的是将值(Future)与其计算方式(Promise)分离,从而允许更灵活地进行计算,特别是通过并行化。
Future 表示目标计算的返回值,Promise 表示计算的方式,这个模型将返回结果和计算逻辑分离,目的是为了让计算逻辑不影响返回结果,从而抽象出一套异步编程模型。它们之间的纽带就是 Callback。
简单来说:Future 表示一个 异步任务的结果,针对这个结果可以添加 Callback 方法以便在任务 执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。
在 Netty 中:
Future 接口定义了 isSuccess(),isCancellable(),cause() 等方法,这些判断异步执行状态的方法都是只读的。
Promise 接口在 extends Future 的基础上增加了 setSuccess(),setFailure() 等方法,这些方法是可写的,即 Promise 是可写的 Future。
4、事件(event) 和 ChannelHandler
ChannelHandler
Netty 是一个事件驱动的框架,所有的 event(事件) 都是由 Handler 来进行处理。
ChannelHandler 可以处理 I/O、拦截 I/O 或者将 event 传递给 ChannelPipeline 中的下一个 Handler 进行处理。
ChannelHandler 的结构很简单,只有三个方法,分别是:
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
event
Netty 用细分的 event(事件) 来通知我们状态的变化或者操作的状况。这让我们可以基于发的 event 来触发适当的行为。这类行为可能包括:
日志记录
数据传送
流控制
应用逻辑
event 按输入或者输出数据流的关系来分类。可能被输入数据或者相关状态改变触发的 event 包括:
活跃或者停用的连接
读数据
用户 event
错误 event
而输出 event 则是会触发将来行为的操作的结果,可能会是:
打开或者关闭到远端的连接
写或者刷数据到一个 socket
每一个 event 都可以被分派到一个用户实现的 handler 对象的方法。
Hello World
一个简单的 websocket 服务端,如下所示:
Server 代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
// 用来接收客户端传进来的连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来处理已被接收的连接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建 netty 服务
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup)
// 设置 NIO 模式
.channel(NioServerSocketChannel.class)
// 设置 tcp 缓冲区
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置发送缓冲区数据大小
.childOption(ChannelOption.SO_SNDBUF, 64 * 1024)
// 设置接收缓冲区数据大小
.option(ChannelOption.SO_RCVBUF, 64 * 1024)
// 保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// HttpClient编解码器
pipeline.addLast(new HttpServerCodec());
// 设置最大内容长度
pipeline.addLast(new HttpObjectAggregator(65536));
// WebSocket 数据压缩扩展
pipeline.addLast(new WebSocketServerCompressionHandler());
// WebSocket 握手、控制帧处理
pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
// 通道的初始化,数据传输过来进行拦截及执行
pipeline.addLast(new ServerHandler());
}
});
// 绑定端口启动服务
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ServerHandler 代码:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道激活(回调)");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 仅处理 TextWebSocketFrame
if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text();
System.out.println("收到请求:" + request);
ctx.writeAndFlush(new TextWebSocketFrame("PONG"));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("数据读取完成");
}
}
pom 依赖
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
</dependencies>
然后运行 Server 即可。
接下来我们来测试一下程序是否正常,这里使用到一个在线测试网站:http://www.easyswoole.com/wstool.html
连接上我们的服务,如下图所示:
如果出现 OPENED => 127.0.0.1:8080 的提示,则表示连接成功。否则请排查是否程序和示例代码一致。
然后我们点击开始发送按钮,如果出现以下提示则表示,消息发送成功啦。
好了到这里,我们的 Hello World 已经完成了。
推荐
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
边栏推荐
猜你喜欢
Coco2017 dataset usage (brief introduction)
Compilation Principle -- C language implementation of prediction table
The role of applet in industrial Internet
关于npm install 报错问题 error 1
2022-2024年CIFAR Azrieli全球学者名单公布,18位青年学者加入6个研究项目
Xu Xiang's wife Ying Ying responded to the "stock review": she wrote it!
None of the strongest kings in the monitoring industry!
巨杉数据库首批入选金融信创解决方案!
用于远程医疗的无创、无袖带血压测量【翻译】
Hongke shares | plate by plate ar application in Beijing Winter Olympics
随机推荐
Reprint: defect detection technology of industrial components based on deep learning
小程序在产业互联网中的作用
[Sun Yat sen University] information sharing of postgraduate entrance examination and re examination
Afnetworking framework_ Upload file or image server
UFIDA OA vulnerability learning - ncfindweb directory traversal vulnerability
2022暑期项目实训(三)
一种用于夜间和无袖测量血压手臂可穿戴设备【翻译】
Reproduce ThinkPHP 2 X Arbitrary Code Execution Vulnerability
Execution process of MySQL query request - underlying principle
This article discusses the memory layout of objects in the JVM, as well as the principle and application of memory alignment and compression pointer
CRMEB 商城系统如何助力营销?
Atcoder a mountaineer
Grafana 9.0 is officially released! It's the strongest!
十、进程管理
287. Find duplicates
Comparative examples of C language pointers *p++, * (p++), * ++p, * (++p), (*p) + +, +(*p)
Recursive way
重磅硬核 | 一文聊透对象在 JVM 中的内存布局,以及内存对齐和压缩指针的原理及应用
Cobra 快速入门 - 专为命令行程序而生
图片缩放中心