当前位置:网站首页>I have seven schemes to realize web real-time message push, seven!
I have seven schemes to realize web real-time message push, seven!
2022-07-25 05:10:00 【InfoQ】


What is message push (push)
pushweb End message push Mobile message push 

+1
pushpullShort polling
polling Short polling Long polling HTTPsetInterval(() => {
// Method request
messageCount().then((res) => {
if (res.code === 200) {
this.messageCount = res.data
}
})
}, 1000);

Long polling
NacosapollokafkaRocketMQNacosapolloDeferredResultservelet3.0
DeferredResultDeferredResult.setResult(200)guavaMultimap@Controller
@RequestMapping("/polling")
public class PollingController {
// Store and monitor a Id Long polling set for
// Thread synchronization structure
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/**
* official account : Programmer Xiaofu
* Set listening
*/
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// Delay object setting timeout
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// Remove when the asynchronous request completes key, Prevent memory overflow
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// Registrar polling request
watchRequests.put(id, deferredResult);
return deferredResult;
}
/**
* official account : Programmer Xiaofu
* Change data
*/
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// Data changes Take out the monitor ID All long polling requests , And respond one by one
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(" I updated " + new Date());
}
}
return "success";
}
AsyncRequestTimeoutException@ControllerAdvice@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println(" Asynchronous request timed out ");
return "304";
}
}
/polling/watch/10086/polling/publish/10086
iframe flow
<iframe>srciframeHTMLjavascript
<iframe><iframe src="/iframe/message" style="display:none"></iframe>
response@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
while (true) {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
"</script>");
}
}
}

SSE ( My way )
WebSocketServer-sent eventsSSESSEHTTP
text/event-stream
SSEWebSocket- SSE Is based on HTTP Agreed , They do not require special protocols or server implementations to work ;
WebSocketA separate server is required to handle the Protocol .
- SSE One-way communication , Only one-way communication from the server to the client ;webSocket Full duplex communication , That is, both sides of the communication can send and receive information at the same time .
- SSE Simple implementation and low development cost , There is no need to introduce other components ;WebSocket Data transmission requires secondary analysis , The development threshold is higher .
- SSE Disconnection and reconnection are supported by default ;WebSocket You need to do it yourself .
- SSE Only text messages can be sent , Binary data needs to be encoded before transmission ;WebSocket Binary data transfer is supported by default .
SEEWebSockets Automatically reconnect event ID Send any event <script>
let source = null;
let userId = 7777
if (window.EventSource) {
// Establishing a connection
source = new EventSource('http://localhost:7777/sse/sub/'+userId);
setMessageInnerHTML(" Connect the user =" + userId);
/**
* Once the connection is established , It will trigger open event
* Another way of writing :source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML(" Establishing a connection ...");
}, false);
/**
* The client receives the data from the server
* Another way of writing :source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML(" Your browser does not support it SSE");
}
</script>
SseEmittersseEmitterMapprivate static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* Create connection
*
* @date: 2022/7/12 14:51
* @auther: official account : Programmer Xiaofu
*/
public static SseEmitter connect(String userId) {
try {
// Set timeout ,0 No expiration date . Default 30 second
SseEmitter sseEmitter = new SseEmitter(0L);
// Register callback
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info(" Create a new sse Abnormal connection , The current user :{}", userId);
}
return null;
}
/**
* Send a message to the specified user
*
* @date: 2022/7/12 14:51
* @auther: official account : Programmer Xiaofu
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
log.error(" user [{}] Push exception :{}", userId, e.getMessage());
removeUser(userId);
}
}
}

IE
MQTT
MQTTpublishsubscribe Lightweight Internet of Thingpublishersubscriber
TCPMQTTMQTTTCP/IPTCP/IPMQTTMQTTHTTP- First
HTTPProtocol, which is a synchronization protocol , After the client requests, it needs to wait for the response of the server . And in the Internet of things (IOT) Environment , The equipment will be subject to the influence of the environment , Such as low bandwidth 、 High network latency 、 Unstable network communication, etc , Obviously, asynchronous messaging protocol is more suitable forIOTApplications .
HTTPIs one-way , If you want to get a message, the client must initiate a connection , And in the Internet of things (IOT) In the application , Devices or sensors are often clients , This means that they cannot passively receive commands from the network .
- Usually you need to send a command or message , Send to all devices on the network .
HTTPIt is not only difficult to realize such a function , And the cost is very high .
Websocket
websocketTCP
websocket<!-- introduce websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@ServerEndpointws://localhost:7777/webSocket/10086@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
// A connection session with a client , It is needed to send data to the client
private Session session;
private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
// Used to store the number of online connections
private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
/**
* official account : Programmer Xiaofu
* Link the method successfully called
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("websocket news : There are new connections , The total number is :" + webSockets.size());
} catch (Exception e) {
}
}
/**
* official account : Programmer Xiaofu
* Method called upon receipt of a client message
*/
@OnMessage
public void onMessage(String message) {
log.info("websocket news : Received client message :" + message);
}
/**
* official account : Programmer Xiaofu
* This is a single message
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("websocket eliminate : Single point message :" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
<script>
var ws = new WebSocket('ws://localhost:7777/webSocket/10086');
// Get connection status
console.log('ws Connection status :' + ws.readyState);
// Monitor whether the connection is successful
ws.onopen = function () {
console.log('ws Connection status :' + ws.readyState);
// If the connection is successful, send a data
ws.send('test1');
}
// Answer the information sent back by the server and process the display
ws.onmessage = function (data) {
console.log(' Received a message from the server :');
console.log(data);
// Close after communication WebSocket Connect
ws.close();
}
// Listen for connection close events
ws.onclose = function () {
// Monitor the whole process websocket The state of
console.log('ws Connection status :' + ws.readyState);
}
// Monitor and process error event
ws.onerror = function (error) {
console.log(error);
}
function sendMessage() {
var content = $("#message").val();
$.ajax({
url: '/socket/publish?userId=10086&message=' + content,
type: 'GET',
data: { "id": "7777", "content": content },
success: function (data) {
console.log(data)
}
})
}
</script>


Custom push

Github Address
Github
边栏推荐
- Baklib: share some methods about building enterprise knowledge management (km)
- If you don't know these 20 classic redis interview questions, don't go to the interview!
- epoll的实现原理
- Harbor installation
- 微信小程序相关操作示例
- STL notes (VII): container deque
- 教你三招从让性能从20s优化到500ms
- Learning records [email protected] R & D effectiveness measurement indicators
- four hundred and forty-four thousand one hundred and forty-one
- STL notes (I): knowledge system
猜你喜欢

2、 Mysql database foundation

Small case of data analysis: visualize recruitment data and view the most needed technologies in the field~

rhcsa暑假第三天

HMS core discovery Episode 16 live broadcast preview | play AI's new "sound" state with tiger pier

2022-7-15 summary
[email protected]研发效能度量指标"/>学习记录[email protected]研发效能度量指标

How can test / development programmers with 5 years of experience break through the technical bottleneck? Common problems in big factories

Dragon Dragon community released the first Anolis OS Security Guide to escort users' business systems

Basic knowledge of scratch crawler framework

Redis的三个必知必会的问题
随机推荐
OA and fansoft Bi cross system users, departments and posts synchronous summary
Panda3D keyboard moving scene
Logu p3398 hamsters find sugar solution
[live review] AI customer service "changes according to the situation", and man-machine dialogue can be easier
STL notes (I): knowledge system
Solve the problem that uni app applet obtains routes and route parameters
956. Highest billboard pressure DP
Render Optimization: repaint and reflow
Which side of Nacos has the SQL script of this column?
Docker builds MySQL master-slave replication
教你如何定位不合理的SQL?并优化之
four hundred and forty-four thousand one hundred and forty-one
STL notes (II): template and operator overloading
unity 3D物体添加 点击事件
Document collaboration tool recommendation
Web: compiling big refactoring from 10 to 1
2022-7-13 summary
Performance Optimization: lazy loading of pictures
哪种网站适合物理服务器
Special analysis of data security construction in banking industry