当前位置:网站首页>基于 SSE 实现服务端消息主动推送解决方案
基于 SSE 实现服务端消息主动推送解决方案
2022-08-04 03:29:00 【小毕超】
一、SSE 服务端消息推送
SSE 是 Server-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。不过现在IE不支持该技术,只能通过轮训的方式实现。相比于 WebSocket,SSE 简单很多,服务器端和客户端工作量都要小很多、简单很多,同时实现的功能也要有局限。
相比于 WebSocket 两者的区别:
WebSocket是全双工通道,可以双向通信,功能更强。SSE是单向通道,只能服务器向浏览器端发送。WebSocket是一个新的协议,需要服务器端支持。SSE则是部署在HTTP协议之上的,现有的服务器软件都支持。SSE是一个轻量级协议,相对简单。WebSocket是一种较重的协议,相对复杂。SSE默认支持断线重连,WebSocket则需要额外部署。SSE支持自定义发送的数据类型。SSE不支持CORS,参数url就是服务器网址,必须与当前网页的网址在同一个网域(domain),而且协议和端口都必须相同。
在我们平常使用 SpringBoot 进行开发中,其实已经集成好了 SSE ,里面有个 SseEmitter 类已经封装好了相关操作,可以方便的实现功能。
但是在实现的时候需要注意下是否要兼容 IE 浏览器的使用,IE 浏览器目前不支持 Event Source 接口,如果需要兼容 IE 可以创建一个顺序队列比如放在 Redis 中或 消息队列中,IE 客户端通过接口轮训的方式每次都从队列中消费消息,以实现 SSE 的功能。
下面分别从 服务端 和 客户端进行实施。
二、服务端
服务端需要考虑 IE 浏览器不支持的情况,对于 IE 可以通过轮训的方式实现,首先新建一个 SpringBoot 项目,声明 SseEmitter 连接:
@Slf4j
public class SseEmitterServer {
private static AtomicInteger count = new AtomicInteger(0);
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(() -> {
log.info("结束连接:{}", userId);
removeUser(userId);
});
sseEmitter.onError(throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
});
sseEmitter.onTimeout(() -> {
log.info("连接超时:{}", userId);
removeUser(userId);
});
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
log.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
}
public static void sendMessage(String userId, Object message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
log.info("SSE 发送信息成功!id = {} , message: {} ", userId, message);
} catch (IOException e) {
log.error("[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
} else {
log.warn("SSE 发送信息异常,用户不存在:id = {} ", userId);
}
}
private static void removeUser(String userId) {
sseEmitterMap.remove(userId);
count.getAndDecrement();
}
}
然后声明 SSEWebServer 释放对客户端的接口,对于 IE 的话我们就直接提供一个接口,每次都从队列中消费消息,这里以 LinkedBlockingDeque 为例实现一个单机的队列,如果是分布式的可能就要考虑 Redis 或 消息队列 :
@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SSEWebServer {
private static Cache<String, LinkedBlockingDeque<SseEvent>> sseCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
/* *
* sse 连接服务
*/
@GetMapping("/sseEvent/{userId}")
public SseEmitter push(@PathVariable("userId") String userId) {
return SseEmitterServer.connect(userId);
}
//IE 浏览器不支持SSE 采用轮训
@GetMapping("/sseEventIE/{userId}")
public ResponseEntity pushIe(@PathVariable("userId") String userId) {
if (StringUtils.isEmpty(userId)) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(" userId is Empty ! ");
}
log.info("IE 连接,userId = {} ", userId);
try {
SseEvent poll = Objects.requireNonNull(sseCache.getIfPresent(userId)).poll();
return poll == null ? ResponseEntity.status(HttpStatus.BAD_REQUEST).body("连接失败!") : ResponseEntity.ok().body(poll.getMsg());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
}
}
static boolean publicMsg(SseEvent event) {
LinkedBlockingDeque<SseEvent> ifPresent = sseCache.getIfPresent(event.getUserId());
if (ifPresent == null) {
sseCache.put(event.getUserId(), new LinkedBlockingDeque<SseEvent>());
}
log.info("添加到队列,userId:{} ", event.getUserId());
return Objects.requireNonNull(sseCache.getIfPresent(event.getUserId())).offer(event);
}
}
上面为考虑 IE 的兼容性,多增加了一个接口和队列,因此在发布的时候,就需要同时向 SSE 和 队列 抛数据,因此这块可以在增加一个事件发布:
事件发布我们就使用 Spring 自带的 ApplicationListener 来实现。
首先创建一个事件交易类:
@Getter
@Setter
@ToString
public class SseEvent<T> extends ApplicationEvent {
private int code;
private String userId;
private T msg;
public SseEvent(Object source) {
super(source);
}
}
声明事件监听,在这里同时向 SSE 和 队列发送消息:
@Slf4j
@Component
public class SseListener implements ApplicationListener<SseEvent> {
@Override
public void onApplicationEvent(SseEvent event) {
SseEmitterServer.sendMessage(event.getUserId(), event.getMsg());
SSEWebServer.publicMsg(event);
}
}
最后再 创建一个测试接口,便于我们下面的测试:
@RestController
public class TestController {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@GetMapping("/test/{userId}/{message}")
public ResponseEntity test(@PathVariable("userId") String userId, @PathVariable("message") String message) {
SseEvent<String> sseEvent = new SseEvent<>(this);
sseEvent.setCode(200);
sseEvent.setMsg(message);
sseEvent.setUserId(userId);
applicationEventPublisher.publishEvent(sseEvent);
return ResponseEntity.ok().build();
}
}
到此服务端就已经完成了,下面开始客户端的对接:
三、客户端
相比于服务端,客户端就显得非常简单了,但也要考虑 IE 不支持的情况,需要进行判断,如果是 IE 的话,就换成轮训的方式:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
<script src="js/jquery-1.10.2.min.js"></script>
<script>
openSSE("1122", function (msg) {
console.log("收到服务器推送消息:" + msg);
});
function openSSE(userId, callback) {
if (window.EventSource) {
var source = new EventSource('http://localhost:8080/sse/sseEvent/' + userId);
source.onmessage = function (event) {
callback(event.data);
};
} else {
//ie 不支持sse 采用轮训
window.setInterval(function () {
$.ajax({
url: "http://localhost:8080/sse/sseEventIE/" + userId + "?" + new Date().getTime(),
method: "GET",
success: function (result) {
callback(result);
}
})
}, 1000);
}
}
</script>
</html>
四、效果测试
启动服务端,首先演示 SSE 的效果,使用 goole 浏览器打开客户端网页,可以看到服务端日志的打印:

可以看到客户端已经连接,下面使用测试接口对 1122 用户发送消息,使用浏览器访问下面地址: http://localhost:8080/test/1122/测试 SSE 发送消息!
查看服务端日志打印:
可以看到同时向SSE 和 队列抛出了消息,下面看客户端浏览器打印的日志:

已经收到了服务端推送的消息。
下面开始对 IE 浏览器进行测试,用 IE 浏览器打开页面:
开始了每秒一次的轮训,由于服务端没有消息,一直返回的 400 状态,下面使用上面的接口发送一次消息: http://localhost:8080/test/1122/测试 IE 发送消息!
查看IE浏览器打印的日志:
已经接收到了服务端推送的消息!
边栏推荐
- [Original] Start the XPS/OXPS reader that comes with Windows 10
- 数据湖(二十):Flink兼容Iceberg目前不足和Iceberg与Hudi对比
- sql注入一般流程(附例题)
- 2022支付宝C2C现金红包PHP源码DEMO/兼容苹果/安卓浏览器和扫码形式
- 创新互融|华秋赋能助力OpenHarmony生态硬件开发落地
- 【观察】超聚变:首提“算网九阶”评估模型,共建开放繁荣的算力网络
- SQL注入中 #、 --+、 --%20、 %23是什么意思?
- Simple record of Flink principle flow chart
- Mockito单元测试
- 机器学习之视频学习【更新】
猜你喜欢

Gigabit 2 X light 8 electricity management industrial Ethernet switches WEB management - a key Ring Ring net switch

DIY电工维修如何拆卸和安装开关面板插座

马尔可夫链

There are too many systems, how to realize multi-account interworking?

Y86. Chapter iv Prometheus giant monitoring system and the actual combat, Prometheus storage (17)

Polygon zkEVM网络节点

逻辑漏洞----其他类型

复制带随机指针的链表

Basic form validation process

数组相关 内容 解析
随机推荐
STM8S项目创建(STVD创建)---使用 COSMIC 创建 C 语言项目
十一种概率分布
Pine Script | How to display and typeset a plot switch?
kingbaseES V8R2/R3 表在指定表空间,为何显示为默认表空间?
【指针内功修炼】深度剖析指针笔试题(三)
多线程间的通信方式你知道几种?
Basic form validation process
Polygon zkEVM网络节点
Detailed analysis of scaffolding content
Good bosses, please ask the flink CDC oracle to Doris, found that the CPU is unusual, a run down
FPGA parsing B code----serial 3
Mockito unit testing
汇编语言之栈
sql注入一般流程(附例题)
unsafe.Pointer, pointer, reference in golang
基本表单验证流程
数据安全峰会2022 | 美创DSM获颁“数据安全产品能力验证计划”评测证书
Innovation and Integration | Huaqiu Empowerment Helps OpenHarmony Ecological Hardware Development and Landing
Sfdp 超级表单开发平台 V6.0.5 正式发布
2022年最新海南建筑八大员(材料员)模拟考试试题及答案