当前位置:网站首页>长轮询,iframe和sse三种web消息实时推送demo实践
长轮询,iframe和sse三种web消息实时推送demo实践
2022-07-28 18:10:00 【Fire king】
长轮询,iframe和sse三种web消息实时推送demo实践
长轮询
@Controller
@RequestMapping("/polling")
public class PollingController {
private static final Long TIME_OUT = 10000l;
// 存放监听某个Id的长轮询集合
// 线程同步结构
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/** * * 设置监听 */
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// 延迟对象设置超时时间
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// 异步请求完成时移除 key,防止内存溢出
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// 注册长轮询请求
watchRequests.put(id, deferredResult);
return deferredResult;
}
/** * * 变更数据 */
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// 数据变更 取出监听ID的所有长轮询请求(多个浏览器,单个浏览器不同窗口只认为一个,每次publish只响改变一个),并一一响应处理
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
System.out.println(deferredResults.size());
for (DeferredResult<String> deferredResult : deferredResults) {
//只要set,就认为改变了
deferredResult.setResult("我更新了" + new Date());
}
}
return "success";
}
//当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("异步请求超时");
return "304";
}
}
}
iframe
IframeController
@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
int count = 0;
while (true) {
count++;
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count + "\";" +
"</script>");
}
}
@GetMapping(path = "toIframePage")
public String toIframePage() {
return "/iframe";
}
}
iframe.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>iframe长轮询</title>
</head>
<body>
<iframe src="/iframe/message" style="display:none"></iframe>
<h1 id="clock"></h1>
<h1 id="count"></h1>
</body>
</html>
sse
SseEmitterServer
@Slf4j
public class SseEmitterServer {
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/** * 当前连接数 */
private static AtomicInteger count = new AtomicInteger(0);
/** * 创建连接 * * @date: 2022/7/12 14:51 * @auther: */
public static SseEmitter connect(String userId) {
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("创建新的sse连接异常,当前用户:{}", userId);
}
return null;
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.info("连接超时:{}", userId);
removeUser(userId);
};
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接:{}", userId);
removeUser(userId);
};
}
public static void removeUser(String userId) {
sseEmitterMap.remove(userId);
// 数量-1
count.getAndDecrement();
log.info("移除用户:{}", userId);
}
/** * 给指定用户发送消息 * * @date: 2022/7/12 14:51 * @auther: */
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
//不同SpringBoot的版本可能要加
// sseEmitterMap.get(userId).complete();
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
}
SseEmitterController
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
/** * 用于创建连接 */
@GetMapping("/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
return SseEmitterServer.connect(userId);
}
/** * 推送给所有人 * * @param message * @return */
/* @GetMapping("/push/{message}") public ResponseEntity<String> push(@PathVariable(name = "message") String message) { SseEmitterServer.batchSendMessage(message); return ResponseEntity.ok("WebSocket 推送消息给所有人"); }*/
/** * 发送给单个人 * * @param message * @param userid * @return */
@RequestMapping("/push_one")
public ResponseEntity<String> pushOne( String message, String userid) {
SseEmitterServer.sendMessage(userid, message);
return ResponseEntity.ok("WebSocket 推送消息给" + userid);
}
/** * 关闭连接 */
@GetMapping("/close/{userid}")
public ResponseEntity<String> close(@PathVariable("userid") String userid) {
SseEmitterServer.removeUser(userid);
return ResponseEntity.ok("连接关闭");
}
}
PageController
@Controller
public class PageController {
@RequestMapping("/toSsePage")
public String toSsePage() {
return "/sse";
}
}
sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script> let source = null; // 用时间戳模拟登录用户 //const userId = new Date().getTime(); const userId = "1"; if (window.EventSource) {
// 建立连接 source = new EventSource('http://localhost:8080/sse/connect/' + userId); /** * 连接一旦建立,就会触发open事件 * 另一种写法:source.onopen = function (event) {} */ source.addEventListener('open', function(e) {
setMessageInnerHTML("建立连接。。。"); }, false); /** * 客户端收到服务器发来的数据 * 另一种写法:source.onmessage = function (event) {} */ source.addEventListener('message', function(e) {
setMessageInnerHTML(e.data); }); /** * 如果发生通信错误(比如连接中断),就会触发error事件 * 或者: * 另一种写法:source.onerror = function (event) {} */ source.addEventListener('error', function(e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭"); } else {
console.log(e); } }, false); } else {
setMessageInnerHTML("你的浏览器不支持SSE"); } // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 window.onbeforeunload = function() {
closeSse(); }; // 关闭Sse连接 function closeSse() {
source.close(); const httpRequest = new XMLHttpRequest(); httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true); httpRequest.send(); console.log("close"); } // 将消息显示在网页上 function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script>
</html>
上面三种消息推送的优缺点
边栏推荐
- Crawl IP
- 通信网络基础知识01
- 2022年下半年系统集成项目管理工程师认证8月20日开班
- Rand function generates pseudo-random numbers
- 9. Pointer of C language (2) wild pointer, what is wild pointer, and the disadvantages of wild pointer
- Advanced notes (Part 2)
- English translation Arabic - batch English translation Arabic tools free of charge
- JVM(二十四) -- 性能监控与调优(五) -- 分析GC日志
- Why is there no log output in the telnet login interface?
- [network] communication across regional networks learn how routing tables work
猜你喜欢

Advanced notes (Part 2)

There is a 'single quotation mark' problem in the string when Oracle inserts data

Multi-Modal Knowledge Graph Construction and Application: A Survey

Tencent cloud deployment lamp_ Experience of building a station

Idea properties file display \u solution of not displaying Chinese

Getting started with enterprise distributed crawler framework

Deploy LNMP automatically with saltstack

Concurrent programming, do you really understand?

河北:稳粮扩豆助力粮油生产提质增效

Sprint for golden nine and silver ten, stay up at night for half a month, collect 1600 real interview questions from Android post of major manufacturers
随机推荐
Labelme (I)
WPF -- implement websocket server
熊市下PLATO如何通过Elephant Swap,获得溢价收益?
跨区域网络的通信学习静态路由
Store and guarantee rancher data based on Minio objects
Source code analysis of scripy spider
[C language] summary of methods for solving the greatest common divisor
7. Functions of C language, function definitions and the order of function calls, how to declare functions, prime examples, formal parameters and arguments, and how to write a function well
Function fitting based on MATLAB
最大交换[贪心思想&单调栈实现]
Common modules of saltstack
Read how to deploy highly available k3s with external database
[C language] string reverse order implementation (recursion and iteration)
C language - data storage
[C language] advanced pointer exercise 1
[C language] simulation implementation of strlen (recursive and non recursive)
爬取IP
zfoo增加类似于mydog的路由
English translation Italian - batch English translation Italian tools free of charge
Item exception handling in SSM