当前位置:网站首页>Data real-time feedback technology
Data real-time feedback technology
2022-07-28 15:52:00 【I'm not a code God】
In fact, I don't know how to start this title , This is a scene like this , In the development of background management system , Especially when real-time monitoring system , It is often necessary to show the continuous update and change of data . The common technology is polling , Or use websocket Long connection real-time communication . We know webpack There is a hot update function in debugging mode , It is achieved through the server data push function . It's called Server-Sent Events(SSE).
For specific principles and usage, please refer to Ruan Yifeng The article
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
This article will combine Node.js、SSE、Koa、Pm2、Rxjs Technology to achieve an elegant development technology of real-time data feedback . stay Node.js End , We can install one NPM package :http-event-stream To do server push . The source code of this package is very simple , Readers can have a look for themselves , It's not difficult to write by yourself . up to now , Start from the server to the browser , Real time data update is very simple , But we still need the final development experience , That is, how to make the data on the server real-time “ push ” To with http-event-stream In your request ?
A simple way , That is, when you get from the client SSE On request , Start a timer , Get the data in the database or memory in the timer , And then send it to the client . Before writing this code , We need to prepare a middleware to use Rxjs The event of is converted into SSE Send out .
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
const sink = new Sink
const stream = createEventStream(ctx.res, {
onClose() {
sink.dispose()
}
})
sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
sink.complete = err => stream.close();
sink.subscribe(await next())
ctx.respond = false
}there fastrx library , It's my own high-speed Rxjs Realization , You can go NPM Check it out on the website . I have also written articles on relevant principles in detail . With this middleware , Suppose we need to start from MongoDB Every 5 Read data once per second . our Controller The response function can be written like this (koa-router)
async function facade({collection }) {
return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
$lookup: {
from: "servers",
as: "servers",
let: { app: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$app", "$$app"] } } },
{ $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
{ $project: { k: "$_id", v: "$envs", _id: 0 } }
]
}
}, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}among collection Is from Mongodb Of client Ahead of time Koa Encapsulation in .
Advanced
There are many limitations in obtaining data regularly , In the real scene , We often need to broadcast data to the monitoring front desk in time when an event occurs , And some data is not stored somewhere for you to obtain . Then I need to establish a data source to Koa The pipe in the middle of the controller . Here is the introduction pm2 Managed interprocess communication . This method can also be extended to network communication .
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))combination koa-router
router.get('/getData', eventStreamM, ctx => subject_rounds)among eventStreamM Middleware mentioned before
The browser side can use EventSource The object is listening for data . In the process of another data source , We use it process.send({type:"myEvent",data: data }) Broadcast events to pm2 Just go to the event bus .
For other system architectures , We can use different technologies to broadcast data , Such as message queue . But it can be used in the end Rxjs Medium subject As a bridge to SSE Push Events .Subject It is this kind of existence , Act as both observer and observable object , about SSE It is an observable object , For other data sources, they are observers . If more than one person opens SSE monitor , Can perfectly deal with .
边栏推荐
- 热敏电阻PT100,NTC转0-10V/4-20mA转换器
- FTP file transfer protocol
- 跟我学Rx编程——Concat
- Software architecture and design (IV) -- data flow architecture
- PXE network installation
- Framework customization series (VI) -- shield fallbackhome mobile phone from pop-up during startup and directly enter the launcher
- 管理区解耦架构见过吗?能帮客户搞定大难题的
- Late 2021 year-end summary
- 突发!微星CEO江胜昌坠楼身亡
- How to obtain and embed go binary execution package information
猜你喜欢

DNS域名解析协议

Open light input / relay output rs485/232 remote data acquisition IO module ibf70

Encoder high speed pulse counter Modbus RTU module ibf150

PXE网络装机

Endnote is associated with word

Communication between client and server based on rsocket protocol

软件架构与设计(八)-----分布式架构

比例电磁阀控制阀4-20mA转0-165mA/330mA信号隔离放大器

编码器高速脉冲计数器Modbus RTU模块IBF150

便携式钻孔测斜仪数据采集仪测量原理与测斜探头的连接及使用方法
随机推荐
德国电信否认加强与华为合作,并称过去3年已缩减与华为的合作
Love programming
Summary and arrangement of postgraduate entrance examination information of 211 colleges and universities nationwide
Rxdart is used instead of stateful in fluent
学习方法123
【直播预约】数据架构演进下的新挑战——上海站
Learn RX programming from me -- concat
Shell编程规范与变量
软件架构与设计(四)-----数据流架构
Encoder high speed pulse counter Modbus RTU module ibf150
5路DI/DO继电器输出远程IO采集模块Modbus TCP/IBF95
samba服务器搭建指南
Samba Server Setup Guide
Software architecture and design (VI) -- hierarchy
800V high voltage system
[delete specified number leetcode]
Framework定制系列(十)-----SystemUI定制状态栏statusbar和导航栏navigationbar教程
PyQt5快速开发与实战 5.1 表格与树
Youdao cloud notes remove the bottom advertisement
Framework定制系列(六)-----屏蔽FallbackHome手机启动中弹窗直接进入Launcher