当前位置:网站首页>Mina disconnects and reconnects
Mina disconnects and reconnects
2022-08-05 06:21:00 【Sajia Roshan Demon King】
Mina断线重连
这里的Mina 断线重连是指使用mina作为客户端软件,Connect to other offersSocket通讯服务的服务器端.Socket服务器可以是Mina提供的服务器,也可以是C++提供的服务器.
1. The method of disconnection and reconnection
1.1 增加监听器/拦截器方式
在创建MinaAdd a listener to the client,Or add an interceptor,当检测到Session关闭时,Automatically reconnect.
1.2 基于监听器/拦截器,Increase read and write idle checks
在第1种方式的基础上,Increase the client's read and write channel idle check,当发生SessionClosed or when reading and writing are idle,进行重连.
1.3 Comparison of two disconnection and reconnection methods
- 第一种方式比较传统,优点是简单方便,Suitable for network stability、数据量不大(1Mbelow the bandwidth)的环境;However, the disadvantage is that it cannot capture system-level disconnection blocking.
- The second way is more elaborate,Basically the app is captured、网络、System level disconnect.
2. reconnect purpose
在使用Minaas a client,Often because of the network、服务器、A problem with the application caused the connection to drop,And automatically reconnect,It's the only way to solve the disconnection.
如果网线断开、服务器宕机、应用程序挂了,All are disconnected,这个时候,By adding a listener or interceptor,reconnection can be achieved.
但是生产环境中,The reason for the disconnection may be more complicated:网络不稳定、延时、服务器负载高、Problems such as the full send or receive buffer of the server or application may cause a situation similar to disconnection during data transmission,这个时候,光检测SessionClosing is not enough,At this time, a reconnection mechanism is needed,For example, read and write idle exceeds30秒,reconnect.Uninterrupted for data、实时性高、Application scenarios with large amounts of data,It is more practical.
3. 实例:
3.1 方式1-监听器方式
Create a listener implementationmina的IoServiceListener接口,The methods inside can be implemented without writing
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class IoListener implements IoServiceListener{
@Override
public void serviceActivated(IoService arg0) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void serviceDeactivated(IoService arg0) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionCreated(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void sessionDestroyed(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
}
}
Join the listener when creating the client
// 创建连接客户端
NioSocketConnector connector = new NioSocketConnector();
// 设置连接超时
connector.setConnectTimeoutMillis(30000);
// 设置接收缓冲区的大小
connector.getSessionConfig().setReceiveBufferSize(10240);
// Sets the size of the output buffer
connector.getSessionConfig().setSendBufferSize(10240);
// 加入解码器
TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("GBK"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
factory.setDecoderMaxLineLength(10240);
factory.setEncoderMaxLineLength(10240);
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
// Set the default access address
connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));
// 添加处理器
connector.setHandler(new IoHandler());
// Add reconnection monitor
connector.addListener(new IoListener() {
@Override
public void sessionDestroyed(IoSession arg0) throws Exception {
for (;;) {
try {
Thread.sleep(3000);
ConnectFuture future = connector.connect();
// 等待连接创建成功
future.awaitUninterruptibly();
// 获取会话
session = future.getSession();
if (session.isConnected()) {
logger.info("断线重连[" + connector.getDefaultRemoteAddress().getHostName() + ":" + connector.getDefaultRemoteAddress().getPort() + "]成功");
break;
}
} catch (Exception ex) {
logger.info("Reconnection server login failed,3Connect again in seconds:" + ex.getMessage());
}
}
}
});
for (;;) {
try {
ConnectFuture future = connector.connect();
// 等待连接创建成功
future.awaitUninterruptibly();
// 获取会话
session = future.getSession();
logger.info("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
break;
} catch (RuntimeIoException e) {
logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
// 连接失败后,重连间隔5s
Thread.sleep(5000);
}
}
3.2 方式1-拦截器方式
connector = new NioSocketConnector(); //创建连接客户端
connector.setConnectTimeoutMillis(30000); //设置连接超时
// Callback interceptor for disconnection and reconnection
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
for(;;){
try{
Thread.sleep(3000);
ConnectFuture future = connector.connect();
future.awaitUninterruptibly();// 等待连接创建成功
session = future.getSession();// 获取会话
if(session.isConnected()){
logger.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
break;
}
}catch(Exception ex){
logger.info("Reconnection server login failed,3Connect again in seconds:" + ex.getMessage());
}
}
}
});
TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
factory.setDecoderMaxLineLength(10240);
factory.setEncoderMaxLineLength(10240);
//加入解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
//添加处理器
connector.setHandler(new IoHandler());
connector.getSessionConfig().setReceiveBufferSize(10240); // 设置接收缓冲区的大小
connector.getSessionConfig().setSendBufferSize(10240); // Sets the size of the output buffer
connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// Set the default access address
for (;;) {
try {
ConnectFuture future = connector.connect();
// 等待连接创建成功
future.awaitUninterruptibly();
// 获取会话
session = future.getSession();
logger.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
break;
} catch (RuntimeIoException e) {
logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
Thread.sleep(5000);// 连接失败后,重连间隔5s
}
}
3.2 方式2-Add idle detection mechanism
The idle detection mechanism is required when the client is created,Added idle timeout,Then in the processorhandler端的sessionIdleA method to pre-close the connection is added to the method.让SessionCloses passed to listeners or interceptorssessionClosemethod to implement reconnection.
Take the interceptor method as an example,在创建客户端时,Add read and write channel idle check timeout mechanism.
connector = new NioSocketConnector(); //创建连接客户端
connector.setConnectTimeoutMillis(30000); //设置连接超时
// Callback interceptor for disconnection and reconnection
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
for(;;){
try{
Thread.sleep(3000);
ConnectFuture future = connector.connect();
future.awaitUninterruptibly();// 等待连接创建成功
session = future.getSession();// 获取会话
if(session.isConnected()){
logger.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
break;
}
}catch(Exception ex){
logger.info("Reconnection server login failed,3Connect again in seconds:" + ex.getMessage());
}
}
}
});
connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());
TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName(encoding), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue());
factory.setDecoderMaxLineLength(10240);
factory.setEncoderMaxLineLength(10240);
//加入解码器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
connector.getSessionConfig().setReceiveBufferSize(10240); // 设置接收缓冲区的大小
connector.getSessionConfig().setSendBufferSize(10240);// Sets the size of the output buffer
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30000); //读写都空闲时间:30秒
connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40000);//读(接收通道)空闲时间:40秒
connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50000);//写(发送通道)空闲时间:50秒
//添加处理器
connector.setHandler(new IoHandler());
connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));// Set the default access address
for (;;) {
try {
ConnectFuture future = connector.connect();
// 等待连接创建成功
future.awaitUninterruptibly();
// 获取会话
session = future.getSession();
logger.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
break;
} catch (RuntimeIoException e) {
System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());
logger.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
Thread.sleep(5000);// 连接失败后,重连10次,间隔30s
}
}
Then in the data processorIoHandler中sessionIdle方法中加入SessionCode for session closing,这样sessionClosing can be passed to the interceptor or listener,Then reconnect.
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class IoHandler extends IoHandlerAdapter {
//部分代码忽略...
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
logger.info("-客户端与服务端连接[空闲] - " + status.toString());
if(session != null){
session.close(true);
}
}
//部分代码忽略...
}
4. 总结-最佳实践:
In the above two ways, I personally think it is best to use the second one.在实际的生产环境,For small amounts of data,A thread needs to be added to send heartbeat information,Then respond to the heartbeat on the server side,This ensures that the read and write channels do not appear idle.如果数据量比较大,大到24Data is available every hour,Then there is no need for a heartbeat thread,可以直接在IoHandleron the processor sidemessageReceivedThe method periodically sends a heartbeat to the server.Since read and write monitoring can also handle servers、网络、Uncertainties in application, etc,So it is recommended to use the second method.
5. 注意事项
第一:Disconnection and reconnection are for long connections,也就是说,After the connection, the two ends have been sending data.
第二:Disconnection and reconnection are for the client,If you use it on the server side,There may be failures depending on the scenario.Because the server will automatically send a heartbeat.
第三:Disconnect and reconnect if the test setup receives a timeout condition,It should always send data,The server only receives and does not send,Heartbeat data is also not sent,It will only work after time.
边栏推荐
- NAT实验
- 交换机原理
- Billions of IT operations in the market, the product by strength to speak
- 单臂路由实验和三层交换机实验
- RAID disk array
- LeetCode Interview Questions
- IP packet format (ICMP protocol and ARP protocol)
- Introduction to Network Layer Protocols
- Autoware--Beike Tianhui rfans lidar uses the camera & lidar joint calibration file to verify the fusion effect of point cloud images
- 技术分享杂七杂八技术
猜你喜欢
VLAN details and experiments
运维的高光时刻,从智能化开始
增长:IT运维发展趋势报告
传输层协议(TCP3次握手)
路由器和静态路由的配置
Configuration of TensorFlow ObjecDetectionAPI under Anaconda3 of win10 system
[issue resolved] - jenkins pipeline checkout timeout
单臂路由实验和三层交换机实验
[Day5] Soft and hard links File storage, deletion, directory management commands
The problem of calling ds18b20 through a single bus
随机推荐
ROS video tutorial
What's the point of monitoring the involution of the system?
[问题已处理]-虚拟机报错contains a file system with errors check forced
带你深入了解Cookie
vim教程:vimtutor
Spark source code - task submission process - 6-sparkContext initialization
传输层协议
[Day1] VMware software installation
spark operator-wholeTextFiles operator
干货!教您使用工业树莓派结合CODESYS配置EtherCAT主站
静态路由
Cloud computing - osi seven layers and TCP\IP protocol
Small example of regular expression--remove spaces in the middle and on both sides of the string
RAID disk array
Logical volume creation
[问题已处理]-jenkins流水线checkout超时
运维的高光时刻,从智能化开始
IP地址及子网的划分
RAID磁盘阵列
多线程之传递参数