当前位置:网站首页>Data real-time feedback technology
Data real-time feedback technology
2022-07-28 15:52:00 【I'm not a code God】
In fact, I don't know how to start this title , This is a scene like this , In the development of background management system , Especially when real-time monitoring system , It is often necessary to show the continuous update and change of data . The common technology is polling , Or use websocket Long connection real-time communication . We know webpack There is a hot update function in debugging mode , It is achieved through the server data push function . It's called Server-Sent Events(SSE).
For specific principles and usage, please refer to Ruan Yifeng The article
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
This article will combine Node.js、SSE、Koa、Pm2、Rxjs Technology to achieve an elegant development technology of real-time data feedback . stay Node.js End , We can install one NPM package :http-event-stream To do server push . The source code of this package is very simple , Readers can have a look for themselves , It's not difficult to write by yourself . up to now , Start from the server to the browser , Real time data update is very simple , But we still need the final development experience , That is, how to make the data on the server real-time “ push ” To with http-event-stream In your request ?
A simple way , That is, when you get from the client SSE On request , Start a timer , Get the data in the database or memory in the timer , And then send it to the client . Before writing this code , We need to prepare a middleware to use Rxjs The event of is converted into SSE Send out .
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
const sink = new Sink
const stream = createEventStream(ctx.res, {
onClose() {
sink.dispose()
}
})
sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
sink.complete = err => stream.close();
sink.subscribe(await next())
ctx.respond = false
}there fastrx library , It's my own high-speed Rxjs Realization , You can go NPM Check it out on the website . I have also written articles on relevant principles in detail . With this middleware , Suppose we need to start from MongoDB Every 5 Read data once per second . our Controller The response function can be written like this (koa-router)
async function facade({collection }) {
return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
$lookup: {
from: "servers",
as: "servers",
let: { app: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$app", "$$app"] } } },
{ $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
{ $project: { k: "$_id", v: "$envs", _id: 0 } }
]
}
}, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}among collection Is from Mongodb Of client Ahead of time Koa Encapsulation in .
Advanced
There are many limitations in obtaining data regularly , In the real scene , We often need to broadcast data to the monitoring front desk in time when an event occurs , And some data is not stored somewhere for you to obtain . Then I need to establish a data source to Koa The pipe in the middle of the controller . Here is the introduction pm2 Managed interprocess communication . This method can also be extended to network communication .
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))combination koa-router
router.get('/getData', eventStreamM, ctx => subject_rounds)among eventStreamM Middleware mentioned before
The browser side can use EventSource The object is listening for data . In the process of another data source , We use it process.send({type:"myEvent",data: data }) Broadcast events to pm2 Just go to the event bus .
For other system architectures , We can use different technologies to broadcast data , Such as message queue . But it can be used in the end Rxjs Medium subject As a bridge to SSE Push Events .Subject It is this kind of existence , Act as both observer and observable object , about SSE It is an observable object , For other data sources, they are observers . If more than one person opens SSE monitor , Can perfectly deal with .
边栏推荐
- 语音社交系统——完善有声系统产业链
- Problem of fetching combinatorial numbers
- 2路DI高速脉冲计数器1路编码器转Modbus TCP有线无线模块IBF161
- 5路DI/DO继电器输出远程IO采集模块Modbus TCP/IBF95
- 软件架构与设计(八)-----分布式架构
- 屏下指纹价格战再起,二线厂商今年有望拿下30%市场?
- 0-75mv/0-100mv to rs485/232 communication interface Modbus RTU acquisition module ibf8
- 0-75mV/0-100mV转RS485/232通讯接口MODBUS RTU采集模块IBF8
- Duty cycle switch output high speed pulse counter rtumodbus module ibf63
- 玩死原型链
猜你喜欢

Learn about the native application management platform of rainbow cloud

Shell programming specifications and variables

shell编程规范与变量

高速计数器转RS485Modbus RTU模块IBF150

以太网转RS485串口计数器WiFI模块 LED灯光控制器IBF165

How to obtain and embed go binary execution package information

2路DI高速脉冲计数器1路编码器转Modbus TCP有线无线模块IBF161

使用systemd管理服务

【微信小程序开发(七)】订阅消息

如何有效进行回顾会议(上)?
随机推荐
1路编码器2路DI转速测量RS485串口连接1路DO报警模块IBF151
管理区解耦架构见过吗?能帮客户搞定大难题的
Software architecture and design (I) -- key principles
What is the concept of game testing? What are the test methods and processes?
知识点qwer
【微信小程序开发(七)】订阅消息
软件架构与设计(一)-----关键原则
低成本/小体积模块RS485/232转模拟信号的原理以及应用IBF33
ECCV 2022 | ssp: a new idea of small sample tasks with self-supporting matching
PXE network installation
Duty cycle switch output high speed pulse counter rtumodbus module ibf63
华为全球员工总数创新高:19.4万人,研发人员占比近50%
H265 streaming on OBS
热敏电阻PT100,NTC转0-10V/4-20mA转换器
0-75mv/0-100mv to rs485/232 communication interface Modbus RTU acquisition module ibf8
Framework定制系列(十)-----SystemUI定制状态栏statusbar和导航栏navigationbar教程
如何压缩与解压缩ramdisk.img
有道云笔记去除底部广告
PyQt5快速开发与实战 5.2 容器:装载更多的控件
0-75mV/0-100mV转RS485/232通讯接口MODBUS RTU采集模块IBF8