当前位置:网站首页>采坑websocket总结
采坑websocket总结
2022-07-24 00:24:00 【一笑杯莫停】
需求:
- 要Nginx代理(不向外暴露端口号)
- 创建namespace监听(事件名称:connectTask,namespace名称:/remote)
- 可随时终止服务逻辑,但不关闭客户端(异步执行服务逻辑)
- 同一浏览器不同页面打开建立新的客户端(session要不同)
socketio采坑
版本
<!--netty socketio-->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.18</version>
</dependency>properties
netty.io.host=0.0.0.0
netty.io.port=9092
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
socketio.maxFramePayloadLength=1048576
# 设置http交互最大内容长度
socketio.maxHttpContentLength=1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)
socketio.bossCount=1
socketio.workCount=100
socketio.allowCustomRequests=true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
socketio.upgradeTimeout=10000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
socketio.pingTimeout=60000
# Ping消息间隔(毫秒),默认10秒。客户端向服务器发送一条心跳消息间隔
socketio.pingInterval=10000配置类
同一浏览器不同页面打开建立新的客户端,所以每次请求需要创建一个随机的session id。
坑
这里踩了个坑,之前的老版本比如1.7.14,要生成随机的session id就必须自己重写相关逻辑
1.LocalAuthorizeHandler extends AuthorizeHandler------->
然后重写方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
然后实现随机UUID的生成
2.class LocalSocketIoChannelInitializer extends SocketIOChannelInitializer------->
重写里面需要设置AuthorizeHandler的方法,替换为LocalAuthorizeHandler
3.在config类中将LocalSocketIoChannelInitializer set到SocketIOServer 里面,像这样
SocketIOServer server = new SocketIOServer(config);
server.setPipelineFactory(new SksSocketIoChannelInitializer());
简直low上天了,最后用新版本发现只需要在config中setRandomSession(true)就搞定了
config.setRandomSession(true);config类
@org.springframework.context.annotation.Configuration
public class SocketIoConfig {
private final Logger LOGGER = LoggerFactory.getLogger(SocketIoConfig.class);
@Value("${netty.io.host}")
private String host;
@Value("${netty.io.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
Configuration config = new Configuration();
config.setRandomSession(true);
config.setAllowCustomRequests(true);
config.setSocketConfig(socketConfig);
config.setPort(port);
config.setHostname(host);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
config.setUpgradeTimeout(upgradeTimeout);
config.setMaxHttpContentLength(maxHttpContentLength);
//如果要用到Nginx代理,需要添加一个路径/localPath区分不同的websocket,否则默认路径是/socket.io很难搞
config.setContext("/localPath" + config.getContext());
config.setMaxFramePayloadLength(maxFramePayloadLength);
//该处可以用来进行身份验证
config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
........
});
return new SocketIOServer(config);
}
}
WebSocketHandler 类
添加namespace,这儿没有添加room。namespace就相当于给连接分类,方便细化管理和广播。这里不深入说namespace和room
@Component
public class WebSocketHandler {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final SocketIOServer socketIOServer;
private final ConnectService connectService;
public final static String DEV_CONNECT = "connectTask";
/**
* 关联client和webSocket session
*/
public static final Map<UUID, SocketIOClient> CLIENT_MAP = new ConcurrentHashMap<>();
public WebSocketHandler(SocketIOServer socketIOServer,ConnectService connectService) {
this.socketIOServer = socketIOServer;
this.connectService= connectService;
}
@PostConstruct
public void autoStart() {
this.start();
}
@PreDestroy
private void onDestroy() {
if (socketIOServer != null) {
socketIOServer.stop();
}
}
public void start() {
socketIOServer.addConnectListener(client -> {
if (CLIENT_MAP.containsKey(client.getSessionId())) {
client.disconnect();
} else {
CLIENT_MAP.put(client.getSessionId(), client);
}
});
socketIOServer.addDisconnectListener(client -> {
remoteWebHandler.onDisconnect(client);
remoteWebHandler.cleanContext(client);
log.info("clean client:{}", client.getSessionId());
client.disconnect();
});
socketIOServer.start();
log.info("start finish");
addOnDataNamespace();
}
private void addOnDataNamespace() {
final SocketIONamespace namespace = socketIOServer.addNamespace("/remote");
namespace.addEventListener(WebSocketHandler.DEV_CONNECT, String.class, new DataListener<String>() {
/**
* Invokes when data object received from client
*
* @param client - receiver
* @param data - received object
* @param ackSender - ack request
* @throws Exception
*/
@Override
public void onData(SocketIOClient client, String data, AckRequest ackSender) {
try {
connectService.onData(client, data);
} catch (Exception e) {
log.error("onData error", e);
}
}
});
}
}service类大致写了些逻辑
connect类消息
- 使用线程池来实现异步执行相关逻辑
- 缓存Future等信息作为上下文
- 执行逻辑
disconnect类消息
- 1.设置上下文的中断标志isStop为true()
- 2.future.cancel(true)取消任务
- 参数true:如果任务已经被执行,则会尝试中断处理。(中断处理会改变中断标志位,任务应该判断isInterrupted()或者在任务过程中使用sleep,这样任务才可以被中断)
- 参数false:如果任务已经被执行,则会等待任务执行完毕。如果是个无线循环任务,将会无法停止。
3.发送消息给客户端,如果想关闭连接可以client.disconnect();还要清除上下文缓存
@Component
public class DeviceRemoteWebHandler {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ThreadPoolTaskExecutor threadPool;
private Map<UUID, RemoteWebContext> remoteWebContexts = new ConcurrentHashMap<>();
public DeviceRemoteWebHandler(@Qualifier(BeanConsts.USER_DATA_EXECUTOR_SERVICE) ThreadPoolTaskExecutor threadPool) {
this.threadPool = threadPool;
}
public void onData(final SocketIOClient client, final String data) {
getContext(client);
ClientMessage message = JsonUtils.deserializeJson(JsonUtils.getObjectMapper(), data, ClientMessage.class);
if (message == null) {
log.info("client message is empty");
return;
}
//remote的connect信息
if (connect message) {
try {
connect(client, message.getData());
} catch (InterruptedException i) {
return;
} catch (Exception e) {
log.error("connect event failed", e);
sendMsgToClient(client, getFailResult(DeviceConnectResultType.SERVER_FAIL, client));
return;
}
}
//remote的disconnect信息
if (disconnect message) {
onDisconnect(client);
}
}
private RemoteWebContext getContext(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
return remoteWebContexts.get(client.getSessionId());
}
RemoteWebContext remoteWebContext = new RemoteWebContext(client.getSessionId());
remoteWebContexts.put(client.getSessionId(), remoteWebContext);
return remoteWebContext;
}
public void cleanContext(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
log.info("remove remoteWebContext by sessionId:{}", client.getSessionId());
remoteWebContexts.remove(client.getSessionId());
}
}
public void onDisconnect(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
RemoteWebContext context = getContext(client);
context.setIsStop();
Future future = remoteWebContexts.get(client.getSessionId()).getFuture();
log.info("Try to Interrupt if running");
if (future == null) {
//判空处理
return;
}
try {
future.cancel(true);
} catch (Exception e) {
log.error("DisConnect Failed.", e);
return;
}
sendMsgToClient(client);
}
}
/**
* 连接建立后处理
*/
public void connect(final SocketIOClient client, final DeviceConnectModel connectData) throws InterruptedException {
log.info("接收到参数:{}", client.getHandshakeData().getUrlParams());
Future<?> deviceConnectFuture = threadPool.submit(() -> {
@Todo 需要异步执行的逻辑
});
//对Future进行缓存
remoteWebContexts.put(client.getSessionId(), new RemoteWebContext(deviceConnectFuture));
}
/**
* send msg
*
* @param client socket.io client
*/
private void sendMsgToClient(SocketIOClient client, DeviceConnectResult connectResult) {
MessageBase<DeviceConnectResult> result = new MessageBase(configInstance.getDeviceConnectType(), connectResult);
log.info("starting send connect message,sid={}\nvalue {}", client.getSessionId(), result);
try {
client.sendEvent(WebSocketHandler.DEV_CONNECT, result);
} catch (Exception e) {
log.error("send message {} to client failed", result, e);
throw new RuntimeException(e.getMessage(), e);
}
}
}
边栏推荐
- Scheme for importing XMIND use cases into tapd (with code)
- How to open a low commission account? Is it safe?
- 如何提升数据质量
- NGFW portal authentication experiment
- GBase 8c 字符串操作符
- Inode, soft link, hard link
- I like investing
- 高数_第1章空间解析几何与向量代数__两点距
- Try new methods
- Pytest interface automated testing framework | common running parameters of pytest
猜你喜欢
随机推荐
Take stock of 10 new layer1 to prepare for the next bull market
Gbase 8C mode visibility query function (I)
GBase 8c 二进制字符串操作符
Redis cluster hash sharding algorithm (slot location algorithm)
[low code] limitations of low code development
Jenkins 使用sonarqube构建流水线代码审查项目
English语法_指示代词 -such / the same
Gbase 8C session information function (I)
Redis data structure
How to improve data quality
Pytest interface automation test framework | summary
vulnhub wpwn: 1
Mysql database foundation
Pytest interface automated testing framework | how to get help
English grammar_ Demonstrative pronoun -such / the same
It basic English
What are blue-green deployment, Canary release and a/b test
GBase 8c 访问权限访问函数(四)
Redis 主从、哨兵、集群架构有缺点比较
数仓数据标准详解-2022









