当前位置:网站首页>长轮询,iframe和sse三种web消息实时推送demo实践
长轮询,iframe和sse三种web消息实时推送demo实践
2022-07-28 18:10:00 【Fire king】
长轮询,iframe和sse三种web消息实时推送demo实践
长轮询
@Controller
@RequestMapping("/polling")
public class PollingController {
private static final Long TIME_OUT = 10000l;
// 存放监听某个Id的长轮询集合
// 线程同步结构
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/** * * 设置监听 */
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// 延迟对象设置超时时间
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// 异步请求完成时移除 key,防止内存溢出
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// 注册长轮询请求
watchRequests.put(id, deferredResult);
return deferredResult;
}
/** * * 变更数据 */
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// 数据变更 取出监听ID的所有长轮询请求(多个浏览器,单个浏览器不同窗口只认为一个,每次publish只响改变一个),并一一响应处理
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
System.out.println(deferredResults.size());
for (DeferredResult<String> deferredResult : deferredResults) {
//只要set,就认为改变了
deferredResult.setResult("我更新了" + new Date());
}
}
return "success";
}
//当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("异步请求超时");
return "304";
}
}
}
iframe
IframeController
@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
int count = 0;
while (true) {
count++;
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count + "\";" +
"</script>");
}
}
@GetMapping(path = "toIframePage")
public String toIframePage() {
return "/iframe";
}
}
iframe.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>iframe长轮询</title>
</head>
<body>
<iframe src="/iframe/message" style="display:none"></iframe>
<h1 id="clock"></h1>
<h1 id="count"></h1>
</body>
</html>
sse
SseEmitterServer
@Slf4j
public class SseEmitterServer {
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/** * 当前连接数 */
private static AtomicInteger count = new AtomicInteger(0);
/** * 创建连接 * * @date: 2022/7/12 14:51 * @auther: */
public static SseEmitter connect(String userId) {
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("创建新的sse连接异常,当前用户:{}", userId);
}
return null;
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.info("连接超时:{}", userId);
removeUser(userId);
};
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接:{}", userId);
removeUser(userId);
};
}
public static void removeUser(String userId) {
sseEmitterMap.remove(userId);
// 数量-1
count.getAndDecrement();
log.info("移除用户:{}", userId);
}
/** * 给指定用户发送消息 * * @date: 2022/7/12 14:51 * @auther: */
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
//不同SpringBoot的版本可能要加
// sseEmitterMap.get(userId).complete();
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
}
SseEmitterController
@CrossOrigin
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
/** * 用于创建连接 */
@GetMapping("/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
return SseEmitterServer.connect(userId);
}
/** * 推送给所有人 * * @param message * @return */
/* @GetMapping("/push/{message}") public ResponseEntity<String> push(@PathVariable(name = "message") String message) { SseEmitterServer.batchSendMessage(message); return ResponseEntity.ok("WebSocket 推送消息给所有人"); }*/
/** * 发送给单个人 * * @param message * @param userid * @return */
@RequestMapping("/push_one")
public ResponseEntity<String> pushOne( String message, String userid) {
SseEmitterServer.sendMessage(userid, message);
return ResponseEntity.ok("WebSocket 推送消息给" + userid);
}
/** * 关闭连接 */
@GetMapping("/close/{userid}")
public ResponseEntity<String> close(@PathVariable("userid") String userid) {
SseEmitterServer.removeUser(userid);
return ResponseEntity.ok("连接关闭");
}
}
PageController
@Controller
public class PageController {
@RequestMapping("/toSsePage")
public String toSsePage() {
return "/sse";
}
}
sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script> let source = null; // 用时间戳模拟登录用户 //const userId = new Date().getTime(); const userId = "1"; if (window.EventSource) {
// 建立连接 source = new EventSource('http://localhost:8080/sse/connect/' + userId); /** * 连接一旦建立,就会触发open事件 * 另一种写法:source.onopen = function (event) {} */ source.addEventListener('open', function(e) {
setMessageInnerHTML("建立连接。。。"); }, false); /** * 客户端收到服务器发来的数据 * 另一种写法:source.onmessage = function (event) {} */ source.addEventListener('message', function(e) {
setMessageInnerHTML(e.data); }); /** * 如果发生通信错误(比如连接中断),就会触发error事件 * 或者: * 另一种写法:source.onerror = function (event) {} */ source.addEventListener('error', function(e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭"); } else {
console.log(e); } }, false); } else {
setMessageInnerHTML("你的浏览器不支持SSE"); } // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 window.onbeforeunload = function() {
closeSse(); }; // 关闭Sse连接 function closeSse() {
source.close(); const httpRequest = new XMLHttpRequest(); httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true); httpRequest.send(); console.log("close"); } // 将消息显示在网页上 function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script>
</html>
上面三种消息推送的优缺点
边栏推荐
- 5. Difference between break and continue (easy to understand version)
- How to use pycharm to quickly create a flask project
- 数字图像理论知识(一)(个人浅析)
- Design of air combat game based on qtgui image interface
- WFST decoding process
- Tencent cloud deployment lamp_ Experience of building a station
- English Translation Spanish - batch English Translation Spanish tools free of charge
- 河北邯郸:拓展基层就业空间 助力高校毕业生就业
- 通信网络基础知识01
- BeanFactory not initialized or already closed - call ‘refresh‘ before accessing beans via the Applic
猜你喜欢
![[C language] summary of methods for solving the greatest common divisor](/img/38/3a099948ebf51fd0da3076f71f9dad.png)
[C language] summary of methods for solving the greatest common divisor

Sprint for golden nine and silver ten, stay up at night for half a month, collect 1600 real interview questions from Android post of major manufacturers

Const pointer of C language and parameter passing of main function

Prometheus deployment

Multi-Modal Knowledge Graph Construction and Application: A Survey
![[C language] Gobang game [array and function]](/img/73/1e708b0640c2dbc86393206840b059.png)
[C language] Gobang game [array and function]

zfoo增加类似于mydog的路由

What is the variance?

C language pointer and two-dimensional array

党员故事|李青艾用漫画带动农民增收致富
随机推荐
基于 MinIO 对象存储保障 Rancher 数据
NEIL: Extracting Visual Knowledge from Web Data
[C language] print pattern summary
私有化部署的即时通讯平台,为企业移动业务安全保驾护航
9. Pointer of C language (4) pointer and one-dimensional array, pointer operation
Why is there no log output in the telnet login interface?
4. Const and difine and the problem of initializing arrays with const and define
C language - data type
[C language] shutdown game [loop and switch statement]
Source insight project import and use tutorial
[C language] simulation implementation of strlen (recursive and non recursive)
Rand function generates pseudo-random numbers
跨区域网络的通信学习静态路由
WFST decoding process
1、 Relationship among CPU, memory and hard disk
熊市下PLATO如何通过Elephant Swap,获得溢价收益?
一文读懂如何部署具有外部数据库的高可用 K3s
2. Floating point number, the difference between float and double in C language and how to choose them
Sequential linear table - practice in class
WPF--实现WebSocket服务端