当前位置:网站首页>数据实时反馈技术
数据实时反馈技术
2022-07-28 14:40:00 【我不是码神】
其实不知道怎么起这个标题,这是一个这样的场景,在开发后台管理系统,尤其是实时监控系统的时候,往往需要展示数据的不断更新变化。常用的技术就是轮询,或者使用websocket进行长连接实时通讯。我们知道webpack在调试模式的时候有个热更新功能,它是通过服务器数据推送功能实现的。就是所谓的Server-Sent Events(SSE).
具体原理和用法可以参考阮一峰 的文章
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
本文将结合Node.js、SSE、Koa、Pm2、Rxjs技术来实现一个优雅的数据实时反馈的开发技术。 在Node.js端,我们可以安装一个NPM包:http-event-stream来做服务端推送。这个包源码十分简单,读者可以自己看一下,自己写也不难。 到目前为止,从服务端出发到浏览器端,数据实时更新是很简单了,但还差最后的开发体验,就是如何将服务器端的数据实时“推送”到带有http-event-stream的请求中去呢?
一种简单的方法,就是当得到来自客户端的SSE请求的时候,启动一个定时器,在定时器里面去获取数据库或者内存中的数据,然后再发送给客户端。 在写这段代码之前,我们需要准备一个中间件用来将Rxjs的事件转换成SSE发送出去。
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
const sink = new Sink
const stream = createEventStream(ctx.res, {
onClose() {
sink.dispose()
}
})
sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
sink.complete = err => stream.close();
sink.subscribe(await next())
ctx.respond = false
}这里的fastrx库,是我自己研发的高速Rxjs实现,大家可以去NPM网站上查看。我也有详细写过相关原理的文章。 有了这个中间件后,假定我们需要从MongoDB中每隔5秒读取一次数据。 我们的Controller响应函数就可以这么写(koa-router)
async function facade({collection }) {
return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
$lookup: {
from: "servers",
as: "servers",
let: { app: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$app", "$$app"] } } },
{ $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
{ $project: { k: "$_id", v: "$envs", _id: 0 } }
]
}
}, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}其中collection是来自Mongodb的client提前在Koa中进行了封装。
进阶
定时获取数据有许多局限性,真实场景中,我们往往需要在事件发生的时候及时广播数据到监控前台,而且有些数据并非保存在某地待你去获取的。那么我就需要建立一个数据源到Koa控制器中间的管道。这里介绍在pm2管理下的进程间通讯方式。这种方法也可以扩展到网络间通讯。
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))结合koa-router
router.get('/getData', eventStreamM, ctx => subject_rounds)其中eventStreamM就是之前讲的中间件
浏览器端可以通过EventSource对象监听数据了。 在另一个数据源发生的进程中,我们用process.send({type:"myEvent",data:数据})方式广播事件到pm2的事件总线中去就可以了。
对于其他的系统架构,我们可以用不同的技术进行数据的广播,比如消息队列等等。但最终都可以用到Rxjs中的subject作为桥梁给SSE推送事件。Subject就是这种存在,同时担任观察者和可观察对象,对于SSE来说是可观察对象,对于其他数据源来说就是观察者。如果有多个人打开了SSE进行监听,都可以完美应对。
边栏推荐
猜你喜欢

800V高压系统

shell编程规范与变量

Endnote is associated with word

生命的感悟
![[delete specified number leetcode]](/img/16/b40492d8414a363a3a24f00b4afd47.png)
[delete specified number leetcode]

Sharing of award-winning activities: you can get up to iphone13 after using WordPress to build your own blog

About how Simulink generates model coverage reports

Vs dynamic library debugging

ECCV 2022 | ssp: a new idea of small sample tasks with self-supporting matching

Easyexcel complex header export (one to many)
随机推荐
根据输入target,返回数组的两个下标。
flowable工作流所有业务概念
File and directory operations (5)
字符数组和字符串的区别
The subst command mirrors a folder to a local disk
Summarize the knowledge points of the ten JVM modules. If you don't believe it, you still don't understand it
2022年最火的十大测试工具,你掌握了几个
H265 streaming on OBS
Flowable workflow all business concepts
samba服务器如何配置
10、相关数据累积任务实现
About how Simulink generates model coverage reports
字符串(3)
monkey压力测试
Summary and arrangement of postgraduate entrance examination information of 985 colleges and universities nationwide
Return the two subscripts of the array according to the input target.
7. Definitions of real-time data backup and real-time clock
Heap operation
关于word文档中插入的图片只显示下面一部分
Baidu proposes a dynamic self distillation method to realize dense paragraph retrieval by combining interactive model and double tower model