当前位置:网站首页>实现web实时消息推送的7种方案
实现web实时消息推送的7种方案
2022-07-30 16:36:00 【InfoQ】


什么是消息推送(push)
push
web端消息推送
移动端消息推送


+1

push
pull
短轮询
polling
短轮询
长轮询
HTTP
setInterval(() => {
// 方法请求
messageCount().then((res) => {
if (res.code === 200) {
this.messageCount = res.data
}
})
}, 1000);

长轮询
Nacos
apollo
kafka
RocketMQ
apollo
DeferredResult
servelet3.0

DeferredResult
DeferredResult.setResult(200)
guava
Multimap
@[email protected]("/polling")public class PollingController {
// 存放监听某个Id的长轮询集合
// 线程同步结构
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/**
* 公众号:程序员小富
* 设置监听
*/
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// 延迟对象设置超时时间
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// 异步请求完成时移除 key,防止内存溢出
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// 注册长轮询请求
watchRequests.put(id, deferredResult);
return deferredResult;
}
/**
* 公众号:程序员小富
* 变更数据
*/
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// 数据变更 取出监听ID的所有长轮询请求,并一一响应处理
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult("我更新了" + new Date());
}
}
return "success";
}
AsyncRequestTimeoutException
@ControllerAdvice
@ControllerAdvicepublic class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("异步请求超时");
return "304";
}
/polling/watch/10086
/polling/publish/10086

iframe流
<iframe>
src
iframe
HTML
javascript

<iframe>
<iframe src="/iframe/message" style="display:none"></iframe>
response
@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
while (true) {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
"</script>");
}
}
}

SSE (我的方式)
WebSocket
Server-sent events
SSE
SSE
HTTP

text/event-stream

SSE
WebSocket
- SSE 是基于HTTP协议的,它们不需要特殊的协议或服务器实现即可工作;
WebSocket
需单独服务器来处理协议。
- SSE 单向通信,只能由服务端向客户端单向通信;webSocket全双工通信,即通信的双方可以同时发送和接受信息。
- SSE 实现简单开发成本低,无需引入其他组件;WebSocket传输数据需做二次解析,开发门槛高一些。
- SSE 默认支持断线重连;WebSocket则需要自己实现。
- SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket默认支持传送二进制数据。
SEE
WebSockets
自动重新连接
事件ID
发送任意事件
<script>
let source = null;
let userId = 7777
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:7777/sse/sub/'+userId);
setMessageInnerHTML("连接用户=" + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
</script>
SseEmitter
sseEmitterMap
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建连接
*
* @date: 2022/7/12 14:51
* @auther: 公众号:程序员小富
*/
public static SseEmitter connect(String userId) {
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("创建新的sse连接异常,当前用户:{}", userId);
}
return null;
}
/**
* 给指定用户发送消息
*
* @date: 2022/7/12 14:51
* @auther: 公众号:程序员小富
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}

IE

MQTT
MQTT
publish
subscribe
轻量级
Internet of Thing
publisher
subscriber

TCP
MQTT
MQTT
TCP/IP
TCP/IP
MQTT
MQTT
HTTP
- 首先
HTTP
协议它是一种同步协议,客户端请求后需要等待服务器的响应。而在物联网(IOT)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合IOT
应用程序。
HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
- 通常需要将一条命令或者消息,发送到网络上的所有设备上。
HTTP
要实现这样的功能不但很困难,而且成本极高。
websocket
TCP

websocket
<!-- 引入websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@ServerEndpoint
ws://localhost:7777/webSocket/10086
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
// 用来存在线连接数
private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
/**
* 公众号:程序员小富
* 链接成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 公众号:程序员小富
* 收到客户端消息后调用的方法
*/
@OnMessage
public void onMessage(String message) {
log.info("websocket消息: 收到客户端消息:" + message);
}
/**
* 公众号:程序员小富
* 此为单点消息
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("websocket消: 单点消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
<script>
var ws = new WebSocket('ws://localhost:7777/webSocket/10086');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
console.log('ws连接状态:' + ws.readyState);
//连接成功则发送一个数据
ws.send('test1');
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
console.log('接收到来自服务器的消息:');
console.log(data);
//完成通信后关闭WebSocket连接
ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
// 监听整个过程中websocket的状态
console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
console.log(error);
}
function sendMessage() {
var content = $("#message").val();
$.ajax({
url: '/socket/publish?userId=10086&message=' + content,
type: 'GET',
data: { "id": "7777", "content": content },
success: function (data) {
console.log(data)
}
})
}
</script>


自定义推送

边栏推荐
猜你喜欢
Nervegrowold d2l (7) kaggle housing forecast model, numerical stability and the initialization and activation function
Navisworks切换语言
Qt 容器控件之Tab Widget 使用详解
DTSE Tech Talk丨Phase 2: 1 hour in-depth interpretation of SaaS application system design
rhce笔记3
数据库的三大范式
【C语言】指针和数组的深入理解(第二期)
3D激光SLAM:LeGO-LOAM论文解读---激光雷达里程计与建图
go 学习03 基础知识变量类型
Leetcode 118. 杨辉三角
随机推荐
服务器装好系统的电脑怎么分区
配置Path环境变量
2022-07-30 Androd 进入深度休眠后把WIFI给关掉,唤醒之后重新打开WIFI
huato 热更新环境搭建(DLL方式热更新C#代码)
第六章:决胜秋招
打印1-100之间的奇数
Minio 入门
gvim命令记录
DTSE Tech Talk丨第2期:1小时深度解读SaaS应用系统设计
Goland opens file saving and automatically formats
The case of five little pigs (five little pigs compare the size of the body weight)
PCIE入门
Invalid or corrupt jarfile xxx.jar
How to connect redis in node.js?
DTSE Tech Talk丨第2期:1小时深度解读SaaS应用系统设计
DTSE Tech Talk丨Phase 2: 1 hour in-depth interpretation of SaaS application system design
DTSE Tech Talk丨第2期:1小时深度解读SaaS应用系统设计
李沐d2l(七)kaggle房价预测+数值稳定性+模型初始化和激活函数
How does the new retail saas applet explore the way to break the digital store?
Redis 复习计划 - Redis 数据结构和持久化机制