当前位置:网站首页>数据实时反馈技术
数据实时反馈技术
2022-07-28 14:40:00 【我不是码神】
其实不知道怎么起这个标题,这是一个这样的场景,在开发后台管理系统,尤其是实时监控系统的时候,往往需要展示数据的不断更新变化。常用的技术就是轮询,或者使用websocket进行长连接实时通讯。我们知道webpack在调试模式的时候有个热更新功能,它是通过服务器数据推送功能实现的。就是所谓的Server-Sent Events(SSE).
具体原理和用法可以参考阮一峰 的文章
http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
本文将结合Node.js、SSE、Koa、Pm2、Rxjs技术来实现一个优雅的数据实时反馈的开发技术。 在Node.js端,我们可以安装一个NPM包:http-event-stream来做服务端推送。这个包源码十分简单,读者可以自己看一下,自己写也不难。 到目前为止,从服务端出发到浏览器端,数据实时更新是很简单了,但还差最后的开发体验,就是如何将服务器端的数据实时“推送”到带有http-event-stream的请求中去呢?
一种简单的方法,就是当得到来自客户端的SSE请求的时候,启动一个定时器,在定时器里面去获取数据库或者内存中的数据,然后再发送给客户端。 在写这段代码之前,我们需要准备一个中间件用来将Rxjs的事件转换成SSE发送出去。
const { createEventStream } = require("http-event-stream")
const { Sink } = require('fastrx')
module.exports = async function(ctx, next) {
const sink = new Sink
const stream = createEventStream(ctx.res, {
onClose() {
sink.dispose()
}
})
sink.next = data => stream.sendMessage({ event: 'message', data: JSON.stringify(data) })
sink.complete = err => stream.close();
sink.subscribe(await next())
ctx.respond = false
}这里的fastrx库,是我自己研发的高速Rxjs实现,大家可以去NPM网站上查看。我也有详细写过相关原理的文章。 有了这个中间件后,假定我们需要从MongoDB中每隔5秒读取一次数据。 我们的Controller响应函数就可以这么写(koa-router)
async function facade({collection }) {
return pipe(interval(5000), startWith(0), switchMap(x => fromPromise(collection('apps').aggregate([{
$lookup: {
from: "servers",
as: "servers",
let: { app: "$_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$app", "$$app"] } } },
{ $group: { _id: "$env", envs: { $push: "$$ROOT" } } },
{ $project: { k: "$_id", v: "$envs", _id: 0 } }
]
}
}, { $addFields: { servers: { $arrayToObject: "$servers" } } }]).toArray())))
}其中collection是来自Mongodb的client提前在Koa中进行了封装。
进阶
定时获取数据有许多局限性,真实场景中,我们往往需要在事件发生的时候及时广播数据到监控前台,而且有些数据并非保存在某地待你去获取的。那么我就需要建立一个数据源到Koa控制器中间的管道。这里介绍在pm2管理下的进程间通讯方式。这种方法也可以扩展到网络间通讯。
const subject_rounds = rx.subject()
pm2.launchBus((err, bus) => bus.on('myEvent', ({ data, process }) => subject_rounds.next(data)))结合koa-router
router.get('/getData', eventStreamM, ctx => subject_rounds)其中eventStreamM就是之前讲的中间件
浏览器端可以通过EventSource对象监听数据了。 在另一个数据源发生的进程中,我们用process.send({type:"myEvent",data:数据})方式广播事件到pm2的事件总线中去就可以了。
对于其他的系统架构,我们可以用不同的技术进行数据的广播,比如消息队列等等。但最终都可以用到Rxjs中的subject作为桥梁给SSE推送事件。Subject就是这种存在,同时担任观察者和可观察对象,对于SSE来说是可观察对象,对于其他数据源来说就是观察者。如果有多个人打开了SSE进行监听,都可以完美应对。
边栏推荐
- 软件架构与设计(四)-----数据流架构
- 4.8 hd-gr GNSS navigation software source code
- Give you a linked list, delete the penultimate node of the linked list, and return the head node of the linked list.
- Docker容器实现MySQL主从复制
- VS使用技巧
- ECCV 2022 | ssp: a new idea of small sample tasks with self-supporting matching
- Tencent interview -- please design a thread pool to implement sequential execution
- 21. Definition of message processing task
- Late 2021 year-end summary
- 10. Implementation of related data accumulation task
猜你喜欢
随机推荐
19、通道分配任务定义
Qt创建文件夹的两种方式区别
如何通过adb打开和关闭飞行模式
Communication between client and server based on rsocket protocol
Camera连拍自动化测试shell脚本
融云实时社区解决方案
Endnote 与word关联
3. Basic constants and macro definitions
Voice social system -- improve the audio system industry chain
2、开源GPS项目HD-GR GNSS的自叙
Five connection modes of QT signal and slot
Canoe tutorial
ECCV 2022 | ssp: a new idea of small sample tasks with self-supporting matching
Getting started with crawlers (1) -- requests (1)
9. Related data accumulation task definition
软件架构与设计(七)-----互动架构
软件架构与设计(一)-----关键原则
Summarize the knowledge points of the ten JVM modules. If you don't believe it, you still don't understand it
华为全球员工总数创新高:19.4万人,研发人员占比近50%
EasyExcel复杂表头导出(一对多)









