当前位置:网站首页>Implementing a server-side message active push solution based on SSE
Implementing a server-side message active push solution based on SSE
2022-08-04 03:32:00 【Little Bichao】
一、SSE 服务端消息推送
SSE
是 Server-Sent Events
的简称, 是一种服务器端到客户端(浏览器)的单项消息推送.The corresponding browser side to realize Event Source
Interface is set asHTML5
的一部分.不过现在IE
不支持该技术,Can only be accomplished by way of training in rotation.相比于 WebSocket
,SSE
简单很多,Client and server workload is much smaller、简单很多,At the same time, the functions also should have limited.
相比于 WebSocket
两者的区别:
WebSocket
是全双工通道,可以双向通信,功能更强.SSE
是单向通道,只能服务器向浏览器端发送.WebSocket
是一个新的协议,需要服务器端支持.SSE
则是部署在HTTP
协议之上的,现有的服务器软件都支持.SSE
是一个轻量级协议,相对简单.WebSocket
是一种较重的协议,相对复杂.SSE
默认支持断线重连,WebSocket
则需要额外部署.SSE
支持自定义发送的数据类型.SSE
不支持CORS
,参数url
就是服务器网址,必须与当前网页的网址在同一个网域(domain
),而且协议和端口都必须相同.
在我们平常使用 SpringBoot
进行开发中,其实已经集成好了 SSE
,里面有个 SseEmitter
Class already sealed the related operations,Can facilitate the realization of the function.
But at the time of implementation under the need to pay attention to whether or not to compatible IE
浏览器的使用,IE
浏览器目前不支持 Event Source
接口,如果需要兼容 IE
Can create an order queue, such as in Redis
中或 消息队列中,IE
The client through interface training in rotation consumption message from the queue every time,以实现 SSE
的功能.
下面分别从 服务端 和 The client to implement.
二、服务端
The service side need to consider IE
The situation of the browser does not support,对于 IE
Can be accomplished by way of training in rotation,首先新建一个 SpringBoot
项目,声明 SseEmitter
连接:
@Slf4j
public class SseEmitterServer {
private static AtomicInteger count = new AtomicInteger(0);
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(() -> {
log.info("结束连接:{}", userId);
removeUser(userId);
});
sseEmitter.onError(throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
});
sseEmitter.onTimeout(() -> {
log.info("连接超时:{}", userId);
removeUser(userId);
});
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
log.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
}
public static void sendMessage(String userId, Object message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
log.info("SSE 发送信息成功!id = {} , message: {} ", userId, message);
} catch (IOException e) {
log.error("[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
} else {
log.warn("SSE 发送信息异常,用户不存在:id = {} ", userId);
}
}
private static void removeUser(String userId) {
sseEmitterMap.remove(userId);
count.getAndDecrement();
}
}
然后声明 SSEWebServer
Release to the client interface,对于 IE
We provide an interface directly,Consumer message from the queue every time,这里以 LinkedBlockingDeque
As an example to achieve a single queue,If it's distributed may consider Redis
或 消息队列 :
@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SSEWebServer {
private static Cache<String, LinkedBlockingDeque<SseEvent>> sseCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
/* *
* sse 连接服务
*/
@GetMapping("/sseEvent/{userId}")
public SseEmitter push(@PathVariable("userId") String userId) {
return SseEmitterServer.connect(userId);
}
//IE 浏览器不支持SSE 采用轮训
@GetMapping("/sseEventIE/{userId}")
public ResponseEntity pushIe(@PathVariable("userId") String userId) {
if (StringUtils.isEmpty(userId)) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(" userId is Empty ! ");
}
log.info("IE 连接,userId = {} ", userId);
try {
SseEvent poll = Objects.requireNonNull(sseCache.getIfPresent(userId)).poll();
return poll == null ? ResponseEntity.status(HttpStatus.BAD_REQUEST).body("连接失败!") : ResponseEntity.ok().body(poll.getMsg());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
}
}
static boolean publicMsg(SseEvent event) {
LinkedBlockingDeque<SseEvent> ifPresent = sseCache.getIfPresent(event.getUserId());
if (ifPresent == null) {
sseCache.put(event.getUserId(), new LinkedBlockingDeque<SseEvent>());
}
log.info("添加到队列,userId:{} ", event.getUserId());
return Objects.requireNonNull(sseCache.getIfPresent(event.getUserId())).offer(event);
}
}
The above is considered IE
的兼容性,Adding an interface and the queue,So at the time of release,You need to SSE
和 队列 Behind the data,So this may be increased in an event publishing:
Event publishing we use Spring
自带的 ApplicationListener
来实现.
First create a transactional event:
@Getter
@Setter
@ToString
public class SseEvent<T> extends ApplicationEvent {
private int code;
private String userId;
private T msg;
public SseEvent(Object source) {
super(source);
}
}
The statement event listeners,Here at the same time SSE
和 队列发送消息:
@Slf4j
@Component
public class SseListener implements ApplicationListener<SseEvent> {
@Override
public void onApplicationEvent(SseEvent event) {
SseEmitterServer.sendMessage(event.getUserId(), event.getMsg());
SSEWebServer.publicMsg(event);
}
}
最后再 创建一个测试接口,便于我们下面的测试:
@RestController
public class TestController {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@GetMapping("/test/{userId}/{message}")
public ResponseEntity test(@PathVariable("userId") String userId, @PathVariable("message") String message) {
SseEvent<String> sseEvent = new SseEvent<>(this);
sseEvent.setCode(200);
sseEvent.setMsg(message);
sseEvent.setUserId(userId);
applicationEventPublisher.publishEvent(sseEvent);
return ResponseEntity.ok().build();
}
}
To the server has been completed,Start the client under the butt:
三、客户端
相比于服务端,The client is very simple,但也要考虑 IE
不支持的情况,需要进行判断,如果是 IE 的话,The way to his training in rotation:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
<script src="js/jquery-1.10.2.min.js"></script>
<script>
openSSE("1122", function (msg) {
console.log("Receive the server push message:" + msg);
});
function openSSE(userId, callback) {
if (window.EventSource) {
var source = new EventSource('http://localhost:8080/sse/sseEvent/' + userId);
source.onmessage = function (event) {
callback(event.data);
};
} else {
//ie 不支持sse 采用轮训
window.setInterval(function () {
$.ajax({
url: "http://localhost:8080/sse/sseEventIE/" + userId + "?" + new Date().getTime(),
method: "GET",
success: function (result) {
callback(result);
}
})
}, 1000);
}
}
</script>
</html>
四、效果测试
启动服务端,首先演示 SSE 的效果,使用 goole
Open the client web browser,You can see the server log printing:
You can see the client is connected,Using the test under the interface of 1122
用户发送消息,Use the browser to visit the following address: http://localhost:8080/test/1122/测试 SSE 发送消息!
Check the server logs to print:
You can see at the same timeSSE
和 Raise the message queue,See below the client browser print log:
已经收到了服务端推送的消息.
下面开始对 IE 浏览器进行测试,用 IE 浏览器打开页面:
Started once per second training in rotation,Due to the service side no news,Has been returned 400 状态,Use the above interface sends a message below: http://localhost:8080/test/1122/测试 IE 发送消息!
查看IE
The browser print log:
Have received the server push message!
边栏推荐
猜你喜欢
2 Gigabit Optical + 6 Gigabit Electric Rail Type Managed Industrial Ethernet Switch Supports X-Ring Redundant Ring One-key Ring Switch
tkmapper的crud示例:
base address: environment variable
Mini program + new retail, play the new way of playing in the industry!
6口全千兆二层网管型工业以太网交换机千兆2光4电光纤自愈ERPS环网交换机
出现504怎么办?由于服务器更新导致的博客报504错误[详细记录]
Pine Script | How to display and typeset a plot switch?
一个属于程序员的七夕节!
Sfdp 超级表单开发平台 V6.0.5 正式发布
【Playwright测试教程】5分钟上手
随机推荐
初识Numpy
y86.第四章 Prometheus大厂监控体系及实战 -- prometheus存储(十七)
Basic form validation process
一个属于程序员的七夕节!
STM8S项目创建(STVD创建)---使用 COSMIC 创建 C 语言项目
[Playwright Test Tutorial] 5 minutes to get started
系统太多,多账号互通如何实现?
基于Qt的目录统计QDirStat
数组相关 内容 解析
自定义通用分页标签02
DIY电工维修如何拆卸和安装开关面板插座
【观察】超聚变:首提“算网九阶”评估模型,共建开放繁荣的算力网络
SQL injection in #, - +, - % 20, % 23 is what mean?
mq应用场景介绍
tkmapper的crud示例:
返回字符串中的最大回文数
Brush esp8266-01 s firmware steps
外卖店优先级
View mysql deadlock syntax
验证码业务逻辑漏洞