当前位置:网站首页>数据实时反馈技术
数据实时反馈技术
2022-07-28 14:40:00 【我不是码神】
其实不知道怎么起这个标题,这是一个这样的场景,在开发后台管理系统,尤其是实时监控系统的时候,往往需要展示数据的不断更新变化。常用的技术就是轮询,或者使用websocket进行长连接实时通讯。我们知道webpack在调试模式的时候有个热更新功能,它是通过服务器数据推送功能实现的。就是所谓的Server-Sent Events(SSE).
具体原理和用法可以参考阮一峰 的文章
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
本文将结合Node.js、SSE、Koa、Pm2、Rxjs技术来实现一个优雅的数据实时反馈的开发技术。 在Node.js端,我们可以安装一个NPM包:http-event-stream来做服务端推送。这个包源码十分简单,读者可以自己看一下,自己写也不难。 到目前为止,从服务端出发到浏览器端,数据实时更新是很简单了,但还差最后的开发体验,就是如何将服务器端的数据实时“推送”到带有http-event-stream的请求中去呢?
一种简单的方法,就是当得到来自客户端的SSE请求的时候,启动一个定时器,在定时器里面去获取数据库或者内存中的数据,然后再发送给客户端。 在写这段代码之前,我们需要准备一个中间件用来将Rxjs的事件转换成SSE发送出去。
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
const sink = new Sink
const stream = createEventStream(ctx.res, {
onClose() {
sink.dispose()
}
})
sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
sink.complete = err => stream.close();
sink.subscribe(await next())
ctx.respond = false
}这里的fastrx库,是我自己研发的高速Rxjs实现,大家可以去NPM网站上查看。我也有详细写过相关原理的文章。 有了这个中间件后,假定我们需要从MongoDB中每隔5秒读取一次数据。 我们的Controller响应函数就可以这么写(koa-router)
async function facade({collection }) {
return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
$lookup: {
from: "servers",
as: "servers",
let: { app: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$app", "$$app"] } } },
{ $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
{ $project: { k: "$_id", v: "$envs", _id: 0 } }
]
}
}, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}其中collection是来自Mongodb的client提前在Koa中进行了封装。
进阶
定时获取数据有许多局限性,真实场景中,我们往往需要在事件发生的时候及时广播数据到监控前台,而且有些数据并非保存在某地待你去获取的。那么我就需要建立一个数据源到Koa控制器中间的管道。这里介绍在pm2管理下的进程间通讯方式。这种方法也可以扩展到网络间通讯。
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))结合koa-router
router.get('/getData', eventStreamM, ctx => subject_rounds)其中eventStreamM就是之前讲的中间件
浏览器端可以通过EventSource对象监听数据了。 在另一个数据源发生的进程中,我们用process.send({type:"myEvent",data:数据})方式广播事件到pm2的事件总线中去就可以了。
对于其他的系统架构,我们可以用不同的技术进行数据的广播,比如消息队列等等。但最终都可以用到Rxjs中的subject作为桥梁给SSE推送事件。Subject就是这种存在,同时担任观察者和可观察对象,对于SSE来说是可观察对象,对于其他数据源来说就是观察者。如果有多个人打开了SSE进行监听,都可以完美应对。
边栏推荐
猜你喜欢
随机推荐
全国211院校考研信息汇总整理
堆操作
有道云笔记去除底部广告
在OBS上进行H265推流
Easyexcel complex header export (one to many)
Perception of life
Framework customization series (I) -- systemui NavigationBar slide back to launcher on the navigation bar
The subst command mirrors a folder to a local disk
Stateflow逻辑系统建模
如何通过adb打开和关闭飞行模式
EasyExcel复杂表头导出(一对多)
Communication between client and server based on rsocket protocol
Explain the difference set, intersection set and union set of complex type set in detail.Net
使用Mock技术帮助提升测试效率的小tips,你知道几个?
字符串(3)
华为全球员工总数创新高:19.4万人,研发人员占比近50%
Set structure byte alignment
Problem of fetching combinatorial numbers
Daily news on July 28, 2022: Science: AI has made another breakthrough in protein design, and can design specific functional proteins
21. Definition of message processing task









