当前位置:网站首页>基于 SSE 实现服务端消息主动推送解决方案
基于 SSE 实现服务端消息主动推送解决方案
2022-08-04 03:29:00 【小毕超】
一、SSE 服务端消息推送
SSE
是 Server-Sent Events
的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source
接口被制定为HTML5
的一部分。不过现在IE
不支持该技术,只能通过轮训的方式实现。相比于 WebSocket
,SSE
简单很多,服务器端和客户端工作量都要小很多、简单很多,同时实现的功能也要有局限。
相比于 WebSocket
两者的区别:
WebSocket
是全双工通道,可以双向通信,功能更强。SSE
是单向通道,只能服务器向浏览器端发送。WebSocket
是一个新的协议,需要服务器端支持。SSE
则是部署在HTTP
协议之上的,现有的服务器软件都支持。SSE
是一个轻量级协议,相对简单。WebSocket
是一种较重的协议,相对复杂。SSE
默认支持断线重连,WebSocket
则需要额外部署。SSE
支持自定义发送的数据类型。SSE
不支持CORS
,参数url
就是服务器网址,必须与当前网页的网址在同一个网域(domain
),而且协议和端口都必须相同。
在我们平常使用 SpringBoot
进行开发中,其实已经集成好了 SSE
,里面有个 SseEmitter
类已经封装好了相关操作,可以方便的实现功能。
但是在实现的时候需要注意下是否要兼容 IE
浏览器的使用,IE
浏览器目前不支持 Event Source
接口,如果需要兼容 IE
可以创建一个顺序队列比如放在 Redis
中或 消息队列中,IE
客户端通过接口轮训的方式每次都从队列中消费消息,以实现 SSE
的功能。
下面分别从 服务端 和 客户端进行实施。
二、服务端
服务端需要考虑 IE
浏览器不支持的情况,对于 IE
可以通过轮训的方式实现,首先新建一个 SpringBoot
项目,声明 SseEmitter
连接:
@Slf4j
public class SseEmitterServer {
private static AtomicInteger count = new AtomicInteger(0);
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(() -> {
log.info("结束连接:{}", userId);
removeUser(userId);
});
sseEmitter.onError(throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
});
sseEmitter.onTimeout(() -> {
log.info("连接超时:{}", userId);
removeUser(userId);
});
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
log.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
}
public static void sendMessage(String userId, Object message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
log.info("SSE 发送信息成功!id = {} , message: {} ", userId, message);
} catch (IOException e) {
log.error("[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
} else {
log.warn("SSE 发送信息异常,用户不存在:id = {} ", userId);
}
}
private static void removeUser(String userId) {
sseEmitterMap.remove(userId);
count.getAndDecrement();
}
}
然后声明 SSEWebServer
释放对客户端的接口,对于 IE
的话我们就直接提供一个接口,每次都从队列中消费消息,这里以 LinkedBlockingDeque
为例实现一个单机的队列,如果是分布式的可能就要考虑 Redis
或 消息队列 :
@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SSEWebServer {
private static Cache<String, LinkedBlockingDeque<SseEvent>> sseCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
/* *
* sse 连接服务
*/
@GetMapping("/sseEvent/{userId}")
public SseEmitter push(@PathVariable("userId") String userId) {
return SseEmitterServer.connect(userId);
}
//IE 浏览器不支持SSE 采用轮训
@GetMapping("/sseEventIE/{userId}")
public ResponseEntity pushIe(@PathVariable("userId") String userId) {
if (StringUtils.isEmpty(userId)) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(" userId is Empty ! ");
}
log.info("IE 连接,userId = {} ", userId);
try {
SseEvent poll = Objects.requireNonNull(sseCache.getIfPresent(userId)).poll();
return poll == null ? ResponseEntity.status(HttpStatus.BAD_REQUEST).body("连接失败!") : ResponseEntity.ok().body(poll.getMsg());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
}
}
static boolean publicMsg(SseEvent event) {
LinkedBlockingDeque<SseEvent> ifPresent = sseCache.getIfPresent(event.getUserId());
if (ifPresent == null) {
sseCache.put(event.getUserId(), new LinkedBlockingDeque<SseEvent>());
}
log.info("添加到队列,userId:{} ", event.getUserId());
return Objects.requireNonNull(sseCache.getIfPresent(event.getUserId())).offer(event);
}
}
上面为考虑 IE
的兼容性,多增加了一个接口和队列,因此在发布的时候,就需要同时向 SSE
和 队列 抛数据,因此这块可以在增加一个事件发布:
事件发布我们就使用 Spring
自带的 ApplicationListener
来实现。
首先创建一个事件交易类:
@Getter
@Setter
@ToString
public class SseEvent<T> extends ApplicationEvent {
private int code;
private String userId;
private T msg;
public SseEvent(Object source) {
super(source);
}
}
声明事件监听,在这里同时向 SSE
和 队列发送消息:
@Slf4j
@Component
public class SseListener implements ApplicationListener<SseEvent> {
@Override
public void onApplicationEvent(SseEvent event) {
SseEmitterServer.sendMessage(event.getUserId(), event.getMsg());
SSEWebServer.publicMsg(event);
}
}
最后再 创建一个测试接口,便于我们下面的测试:
@RestController
public class TestController {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@GetMapping("/test/{userId}/{message}")
public ResponseEntity test(@PathVariable("userId") String userId, @PathVariable("message") String message) {
SseEvent<String> sseEvent = new SseEvent<>(this);
sseEvent.setCode(200);
sseEvent.setMsg(message);
sseEvent.setUserId(userId);
applicationEventPublisher.publishEvent(sseEvent);
return ResponseEntity.ok().build();
}
}
到此服务端就已经完成了,下面开始客户端的对接:
三、客户端
相比于服务端,客户端就显得非常简单了,但也要考虑 IE
不支持的情况,需要进行判断,如果是 IE 的话,就换成轮训的方式:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
<script src="js/jquery-1.10.2.min.js"></script>
<script>
openSSE("1122", function (msg) {
console.log("收到服务器推送消息:" + msg);
});
function openSSE(userId, callback) {
if (window.EventSource) {
var source = new EventSource('http://localhost:8080/sse/sseEvent/' + userId);
source.onmessage = function (event) {
callback(event.data);
};
} else {
//ie 不支持sse 采用轮训
window.setInterval(function () {
$.ajax({
url: "http://localhost:8080/sse/sseEventIE/" + userId + "?" + new Date().getTime(),
method: "GET",
success: function (result) {
callback(result);
}
})
}, 1000);
}
}
</script>
</html>
四、效果测试
启动服务端,首先演示 SSE 的效果,使用 goole
浏览器打开客户端网页,可以看到服务端日志的打印:
可以看到客户端已经连接,下面使用测试接口对 1122
用户发送消息,使用浏览器访问下面地址: http://localhost:8080/test/1122/测试 SSE 发送消息!
查看服务端日志打印:
可以看到同时向SSE
和 队列抛出了消息,下面看客户端浏览器打印的日志:
已经收到了服务端推送的消息。
下面开始对 IE 浏览器进行测试,用 IE 浏览器打开页面:
开始了每秒一次的轮训,由于服务端没有消息,一直返回的 400 状态,下面使用上面的接口发送一次消息: http://localhost:8080/test/1122/测试 IE 发送消息!
查看IE
浏览器打印的日志:
已经接收到了服务端推送的消息!
边栏推荐
- Sfdp 超级表单开发平台 V6.0.5 正式发布
- FPGA parsing B code----serial 3
- 一文看懂推荐系统:召回04:离散特征处理,one-hot编码和embedding特征嵌入
- FFmpeg —— 录制麦克风声音(附源码)
- Functions, recursion and simple dom operations
- KingbaseES数据库启动失败,报“内存段超过可用内存”
- Polygon zkEVM network node
- How many ways do you know about communication between multiple threads?
- MySQL Query Exercise (1)
- 【源码】使用深度学习训练一个游戏
猜你喜欢
基于Qt的目录统计QDirStat
Countdown to 2 days, the "New Infrastructure of Cultural Digital Strategy and Ecological Construction of Cultural Art Chain" will kick off soon
2022杭电多校联赛第五场 题解
Shell 函数
SQL injection in #, - +, - % 20, % 23 is what mean?
单片机C语言->的用法,和意思
出现504怎么办?由于服务器更新导致的博客报504错误[详细记录]
一文详解DHCP原理及配置
LeetCode每日一题(2285. Maximum Total Importance of Roads)
Mini program + new retail, play the new way of playing in the industry!
随机推荐
初识Numpy
STM8S project creation (STVD creation) --- use COSMIC to create a C language project
数据湖(二十):Flink兼容Iceberg目前不足和Iceberg与Hudi对比
sqoop ETL tool
Deep Learning (3) Classification Theory Part
if,case,for,while
用户与用户互发红包/支付宝C2C/B2C现金红包php源码示例/H5方式/兼容苹果/安卓
Polygon zkEVM network node
学会iframe并用其解决跨域问题
汇编语言之栈
Homemade bluetooth mobile app to control stm8/stm32/C51 onboard LED
Innovation and Integration | Huaqiu Empowerment Helps OpenHarmony Ecological Hardware Development and Landing
目标检测-中篇
三分建设,七分管理!产品、系统、组织三管齐下节能降耗
TOML配置文件格式,YAML最有力的竞争者
"Introduction to nlp + actual combat: Chapter 8: Using Pytorch to realize handwritten digit recognition"
FPGA parsing B code----serial 3
力扣(LeetCode)215. 数组中的第K个最大元素(2022.08.03)
深度学习——以CNN服装图像分类为例,探讨怎样评价神经网络模型
机器学习模型的“可解释性”