当前位置:网站首页>GrowingIO 响应式编程探索和实践

GrowingIO 响应式编程探索和实践

2020-11-07 20:15:00 InfoQ

{"type":"doc","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"作者:林生生,GrowingIO 运营产品线研发经理,主要负责 GrowingIO 智能运营产品线研发管理工作。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"背景"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"GrowingIO 是一家提供增长平台的公司。在 2018 年初我们推出了基于底层数据能力的智能运营平台,结合精准的用户分群,数据采集以及多种运营方式,帮助企业客户用数据驱动用户运营,随时验证假设,助力产品增长。产品有以下特点:"}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"支持多种触达用户的渠道 :站内:弹窗、资源位,站外:Push、短信、Webhook。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"多平台支持,弹窗支持:App、Web、H5和小程序。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"轻松建立数据运营的闭环。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"下图是运营平台站外触达业务流程图。用户可以随时发起一次站外运营活动,通常是一个站外的触点(推送、短信、Webhook)。后台系统需要查询底层的数据平台接口,获取此次活动对应的人群信息,同时组装活动数据并对外投递任务。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/27/27cae57ec4ca5a91dc92205951b69ac8.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在这个业务场景,需要解决如下几个问题:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"系统外界输入是突发的,无法提前预估量级,系统需要在不断变化的负载中保持即时响应。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"依赖底层数据服务,如果外部系统无法工作,为了保证回弹性需要有熔断和恢复机制。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"业务流程较长,为保证及时响应需要对任务进行异步处理。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"综上,为了最大化利用服务器资源、提高服务稳定性和优化终端用户体验,GrowingIO 服务端团队在异步与反应式编程上做了一些实践。本文将介绍在优化过程中的探索与思考,希望能为读者带来帮助。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"异步与响应式"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"传统服务端程序一般采用同步阻塞模型,通过分配更多线程来支撑更多请求,这符合常人思维模式,但在突发流量的情况下,同步模型可能会导致线程池耗尽,基于一个请求一个线程的服务模式无法做到动态伸缩。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/3c/3c587c075ca798b4a5133d9636248070.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而异步编程的做法是基于一个共享的线程池,所有操作都是回调。如果遇到耗时的操作,线程并不会阻塞等待操作完成,而是会被释放回线程池中继续接受新的请求。等到耗时操作完成后(一般都是IO操作),通过消息机制重新向线程池申请线程恢复之前的请求代码。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/b6/b65f748e1ae39ae02f6a018b59324571.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" 我们可以简单写个程序简单实验一下,实现相同逻辑:1. 查询 db 2. 查询外部系统 3. 组装信息返回。唯一区别是一个是同步调用的实现,另一个是采用完全异步的方式实现。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"本地使用相同的 jmeter 参数模拟并发测试,得到结果如下,从左到右每列的含义分别为:请求名称、请求数目、失败请求数目、错误率(本次测试中出现错误的请求的数量/请求的总数)、平均响应时间、最短响应时间、最大响应时间、90%用户响应时间、95%用户响应时间、99%用户响应时间、吞吐量。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"总体测试结果如下:"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/04/04a4fa4abb53228258e3b9a7cd60614a.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/57/578322ef9871b221c27aa582ab1ac243.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":"center","origin":null},"content":[{"type":"text","marks":[{"type":"size","attrs":{"size":10}}],"text":"同步代码测试结果"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/49/49e9f5b8076db88f9e03a4a73f61f1ab.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":"center","origin":null},"content":[{"type":"text","marks":[{"type":"size","attrs":{"size":10}}],"text":"异步代码测试结果"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"同步代码总共完成了 260 次请求,平均响应时间约 5 秒,因为阻塞程序耗尽了线程池导致程序出现了拒绝服务的情况,产生了 13% 的错误率。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"异步代码整体吞吐量有明显提升,相同时间内完成了 3000 次请求。错误率为 0 ,并且整体没有出现拒绝服务的情况。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"可以看到基于消息驱动机制的异步系统能极大提高资源利用率,提高系统的吞吐量。而响应式系统则在消息驱动的基础上增加了三个要求:及时响应性、回弹性和可扩展性。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"简单来说具备以下四个特点的系统可以称为一个响应式系统:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"即时响应性,这个是响应式系统的核心目标。一个具有响应性的系统就是一个无论在什么情况下都能快速对客户的操作做出反馈的系统,包括事件、用户请求、失败场景,最终目的是保证客户良好的体验。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"回弹性,指的是系统从故障灾难中恢复的能力。主要分两部分,一个是系统需要考虑失败的情况,二是系统要能从失败中恢复回来。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"扩展性(弹性),指的是系统在不断变化的工作负载之下依然保持即时响应性。可扩展分为单机纵向扩展和横向线性扩展。这里主要指的是系统可以通过分片、复制等方式进行横向扩展,从而避免系统产生明显的性能瓶颈。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":4,"align":null,"origin":null},"content":[{"type":"text","text":"消息驱动的,这是响应式系统的基础。从上面异步系统的优势和原理分析可以看到,基于消息驱动的程序能最大化利用机器资源,同时松散耦合的设计创建了一个能让业务逻辑保持清洁的环境,显式的隔离失败有利于系统自动恢复。"}]}]}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/91/910e210c8013f2b66a498fd2b09d4af5.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"对应的响应式编程是一种程序设计思想,在 java 8 中首次引入了响应式流的规范,即 Reactive Streams 接口。Reactive Streams 非常类似于 JPA 或 JDBC,都是 API 规范,实际使用时需要采用对应的具体实现。JDK 提供的 Reactive Streams 接口:"}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java","title":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java"},"content":[{"type":"text","text":"org.reactivestreams.Publisher"}]},{"type":"text","text":": 代表一个潜在的无界数据源,根据 Subscriber 的需要发布新的数据。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Subscriber.java","title":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Subscriber.java"},"content":[{"type":"text","text":"org.reactivestreams.Subscriber"}]},{"type":"text","text":": 数据源消费者,通过 Subscription 向数据源请求数据。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Subscription.java","title":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Subscription.java"},"content":[{"type":"text","text":"org.reactivestreams.Subscription"}]},{"type":"text","text":": 代表一次数据消费请求。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Processor.java","title":"https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Processor.java"},"content":[{"type":"text","text":"org.reactivestreams.Processor"}]},{"type":"text","text":": 处理器,代表一个既能发布数据也能消费数据的组件。"}]}]}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/77/77e3a93604df2dc0a0b58fe96fa4b955.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Reactive Streams API 的范围是找到一组最小的接口,这些接口将描述必要的操作和实体,从而实现具有非阻塞背压的异步数据流。社区对于 Reactive Streams 的实现比较多,这里做一个简单的汇总和对比。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/df/df444a5aafde6247ea0c6c40596912b0.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"总结一下,如果是移动设备使用 rxjava 是比较合适的选择。如果是在服务端使用 spring 框架做开发,采用基于 reactor 实现的 webflux 更合适。如果是对性能要求很高,业务相对简单的场景,选择 vertx 可以最大限度发挥机器性能。而 gio 的真实场景是服务端的复杂业务系统,同时使用 scala 作为开发语言并且使用 play 作为 web 开发框架。所以在系统构建之初很自然的选择了 akka 作为我们的响应式系统的实现基础。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"使用 Actor 构建反应式系统"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在最初的时候并没有直接采用 akka-stream,而是选择更为简单,建模能力更强的 akka-actor 作为系统实现的基础。Akka-actor 是基于 actor 模型构建的异步工具包, 使用 akka-actor 可以很轻松的进行基于消息驱动的异步编程。Actor 的基础就是消息传递,一个 actor 可以认为是一个基本的计算单元,它能接收消息并执行运算,它也可以发送消息给其他 actor。Actors 之间相互隔离,它们之间并不共享内存,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/2c/2c89fb57c5767845107b433d3df02b24.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Akka-actor 最核心的实现包含三个部分:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"Mailbox:可以是一个有界或者无界的消息队列,用于存放所有收到的消息。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"Behavior:具体的消息处理逻辑。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"State:actor 包含的状态,每个 actor 的状态都是独立的避免锁竞争。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Actor 本身是不绑定线程的,相同进程的 actor 共享一个线程池,mailbox 是一个 runnable 对象,核心逻辑就是从队列中取出消息调用 behavior 进行处理。"}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":" override final def run(): Unit = {\n try {\n if (!isClosed) { //Volatile read, needed here\n processAllSystemMessages() // 先处理系统级别消息\n processMailbox() // 然后处理普通消息\n }\n } finally {\n setAsIdle() //Volatile write, needed here\n dispatcher.registerForExecution(this, false, false)\n }\n }"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在同一个进程中,可以通过调整 akka-actor 线程池大小来进行纵向负载伸缩。同时,akka-actor 支持在一个系统中绑定不同类型、数量的线程池。比如在一些耗时较长的 IO 场景下可以单独配置一个线程池起到隔离的目的。对需要横向扩展的场景,akka 提供了基于 gossip 协议的点对点去中心化集群解决方案 akka-cluster。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/bc/bc480b382871e121d45c5d10c206bc0a.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Akka-cluster 通过 gossip 协议进行成员之间的发现和状态同步,同时提供了更高层的集群工具:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"Cluster Singleton:全局唯一实例,能保证实例的全局唯一性,同时在实例出现问题的时 clsuter 能在另一个节点上重建它。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"Cluster Sharding:通过 sharding,集群中的 actor 能跨越多个节点通过 actorRef 标识进行交互,不需要关心它们在集群中的物理定位。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"Distributed Data"},{"type":"text","marks":[{"type":"strong"}],"text":":"},{"type":"text","text":"当需要在一个cluster的节点之间共享数据时,Distributed Data 提供了 k/v 存储 API。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":4,"align":null,"origin":null},"content":[{"type":"text","text":"Distributed Publish Subscribe"},{"type":"text","marks":[{"type":"strong"}],"text":":"},{"type":"text","text":"集群中的 actor 可以发布订阅点对点的广播消息。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"理论上使用 akka-actor 和 akka-cluster 可以使系统具备极强的扩展性(弹性)。但是在实际使用中我们并没有采用 akka-cluster 去扩展系统,原因也很简单,akka-cluster 生产案例太少,功能上过于复杂,不利于大规模推广。最终我们使用了传统的消息中间件作为系统横向扩展的解决方案。在单机内使用 akka-actor,涉及到跨节点通信的场景使用消息中间件进行通信。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/e6/e6c594f564fdeff687b65ebdf2fa2bdc.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"在系统回弹性方面,akka-actor 提供了基于层级的监督机制。可以把整个 actor 系统看做是一棵树,每个 actor 实例都是树中的一个节点。监督机制指的是每个 actor 都是其子 actor 的监督者,需要针对子 actor 制定一个错误处理策略。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/fb/fb806beb0b70f95f196c2664445c407a.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"对应到具体的业务系统里,我们将整个流程分割成多个 actor 实现,为了实现监督与错误恢复,需要创建一个顶层 route actor 来引用所有具体的业务 actor 。如果某个业务actor 遇到问题并抛出了异常,异常会被监管者 route actor 来处理。监管者可以选择恢复出现问题的 actor 或者重启,也可能会将其停止掉,这依赖于问题的严重程度和恢复策略。Akka-actor 中有以下 4 种错误处理策略:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"恢复子节点,保持子节点当前积累的内部状态。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"重启子节点,清除子节点的内部状态。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"永久地停止子节点。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":4,"align":null,"origin":null},"content":[{"type":"text","text":"抛出错误向上传递错误,由更高级的节点处理。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"最终我们基于 actor 实现了整个业务流程:当一个用户发起一次站外活动请求,主应用(基于 play)会将活动的元数据写入数据库中然后立马返回结果到前端,达到及时响应的目的。同时将活动请求封装成一个 actor 消息,异步的投递给 route actor 进行后续的任务处理。Route actor 会根据接收到的具体消息类型进行路由分发,分别是 User Insight Actor(查询人群信息) - Build Push Task Actor(查询 db 组装 task) - Checkpoint Actor(存储 task 信息)- Publish Task Actor(发布 task 到 kafka)。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/2c/2c30538b5e3ebcbfeb15ab113bf243b6.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"采用消息驱动的方式设计系统取得了一些好处:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"程序之间耦合性更低,每个 actor 只需要维护好一小段逻辑。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"整个流程是异步处理的,用户体验良好。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":3,"align":null,"origin":null},"content":[{"type":"text","text":"消息的生产和消费可以跨服务器,横向扩展变得简单。"}]}]}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"从 Actor 到 Stream"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上文提到我们创建了一个 route actor 将所有业务 actor 组织到一起,这样既能起到一个监督的作用,也可以知道全局的逻辑视图。但是这种实现方式也会带来一个问题,整体编排较为复杂。对于带有分支与合并逻辑的处理流更是难以描述,对后续新增流程也没有约束,只能人为约定一个顺序,比如在上面的比较靠前,可维护性比较差。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/84/84ad974f4fcf767e5bb72c651be33d60.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"又因为整个流程中 User Insight Actor 部分依赖外部数据查询系统,比较容易成为整个系统的瓶颈。在负载不断变化的情况下,外部查询可能会失败,从而对系统整体可用性造成影响。针对这个问题需要设计对应的限流机制和重试机制。 上面提到 Actor 的 mailbox 本身就是一个队列,如果在负载过高的情况下消息是可以丢弃的,只需要指定 actor 的 maibox 类型为有界队列即可。假如消息不能被丢弃,可以采用令牌桶算法实现限流功能。对于重试机制,User Insight Actor 本身是无状态的,这里很自然想到在失败时重新发送试消息到 User Insight Actor 本身进行重试。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/80/80a49ee25a7ee223d3d14bae33e59271.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这个方案比较简单,如果要满足一些特殊场景下的需求,比如设定重试次数,延迟执行重试请求,指定重试失败后的降级策略,只能通过定制一些逻辑实现,但是要做到代码灵活复用需要花费大量时间进行设计。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上述方案都能满足业务需求,总体来讲通过 actor 模型可以快速实现轻量业务异步封装,但面对相对复杂业务逻辑时还是存在一些局限:"}]},{"type":"numberedlist","attrs":{"start":null,"normalizeStart":1},"content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":1,"align":null,"origin":null},"content":[{"type":"text","text":"难以简单优雅实现多异步任务编排,路由方案过于复杂,不直观。"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":2,"align":null,"origin":null},"content":[{"type":"text","text":"重试机制、限流机制等和业务无关的功能复用性不高。"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"这也是为什么后来采用了 akka-stream 来对处理流程进行重构。Akka-stream 是基于 akka-actor 的 Reactive Streams 规范实现,具备以下特点:"}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"具有处理无限数量的元素的能力"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"异步地按序处理元素"}]}]},{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"实现了非阻塞的背压"}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"并在上层提供了更加抽象灵活的 DSL 封装,即 source、sink、flow 组件。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Source 即响应流的源头,源头具有一个数据出口。我们可以通过各种数据来创建一个 Source:"}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"val sourceFromRange = Source(1 to 10)\nval sourceFromIterable = Source(List(1, 2, 3))\nval sourceFromFuture = Source.fromFuture(Future.successful(\"hello\"))"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Sink 就是流的最终目的地,包含一个数据入口,我们可以如下来创建 Sink:"}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"val sinkPrintingOutElements = Sink.foreach[String](println(_))\nval sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"Flow 就是流的中间组件,包含一个数据入口和数据出口。我们可以这样来创建 Flow:"}]},{"type":"codeblock","attrs":{"lang":"text"},"content":[{"type":"text","text":"val flowDoublingElements = Flow[Int].map(_ * 2)\nval flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)\nval flowBatchingElements = Flow[Int].grouped(10)"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"而整个业务流可以通过基础组件构成的图和网络来表示:"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/96/968594b61d9d7345a80f84399ac3e1ee.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"流式操作可以类比成流水线,每个算子都是一道处理工序,数据源就是加工原材料,经过多道工序处理后最后输出一个成品。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"上文提到为了实现对系统速率的控制,引入了限流的逻辑,比如基于令牌桶算法的实现,只有程序拿到了令牌才能进入下一段处理逻辑,本质上这种实现方式是同步阻塞的,而且真实情况下下游节点可能完全能承载更多的请求。为了解决数据源和下游节点处理速度不一致的问题,在 Reactive Streams 的规范里引入了背压机制,本质上是一种由处理者向数据源发起数据请求,从而进行速度调整的一种方式。Akka-Stream 提供了一套开箱即用的背压功能,其实现方式和 Reactive Streams 一致,下游 subscriber 通过发送 subscription 到上游的 publisher 主动请求需要处理的元素数量。这样就能从整个数据流的源头进行速率控制,采用 pull 而不是 push 的模式能让系统按需保持最大的处理能力,同时又不会崩溃。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/14/149310577035dfe60bab6ab7e8dc5e1b.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"下面是基于 akka stream 重构后的处理流,简单对比 akka actor 的实现方式,基于操作符的组合代码更加清晰易读,可以轻松实现复杂任务编排。"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/45/45ab5438bf067adce89cb89a10fbd6ba.png","alt":null,"title":"","style":[{"key":"width","value":"100%"},{"key":"bordertype","value":"none"}],"href":"","fromPaste":false,"pastePass":false}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"从底层实现来讲,akka-stream 底层还是基于 akka-actor 进行工作的,只是在上层提供了更高视角的 DSL 封装 。这种灵活的编程方式能极大提高代码复用性和可维护性。"}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"总结"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"本文记录了 GrowingIO 服务团队在针对具体业务场景进行反应式系统设计的实践总结,从异步编程到使用 actor 模型构建基于消息驱动的系统,为了降低系统复杂度提高可维护性又引入了 akka-stram 作为反应式流的编排框架。最后,希望能与对反应式技术感兴趣的同学多多交流,打个小广告:我们的工程团队持续在招聘中~ 服务端、前端、大数据各种攻城狮都缺,感兴趣朋友欢迎砸简历 "},{"type":"link","attrs":{"href":"https://www.growingio.com/joinus","title":"https://www.growingio.com/joinus"},"content":[{"type":"text","text":"https://www.growingio.com/joinus"}]},{"type":"text","text":"。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"参考资料:"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://info.lightbend.com/rs/558-NCX-702/images/COLL-ebook-Reactive-Microservices-Architecture.pdf","title":"https://info.lightbend.com/rs/558-NCX-702/images/COLL-ebook-Reactive-Microservices-Architecture.pdf"},"content":[{"type":"text","text":"https://info.lightbend.com/rs/558-NCX-702/images/COLL-ebook-Reactive-Microservices-Architecture.pdf"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://learning.oreilly.com/library/view/applied-akka-patterns","title":"https://learning.oreilly.com/library/view/applied-akka-patterns"},"content":[{"type":"text","text":"https://learning.oreilly.com/library/view/applied-akka-patterns"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://freecontent.manning.com/akka-in-action-why-use-clustering/","title":"https://freecontent.manning.com/akka-in-action-why-use-clustering/"},"content":[{"type":"text","text":"https://freecontent.manning.com/akka-in-action-why-use-clustering/"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://doc.akka.io/","title":"https://doc.akka.io/"},"content":[{"type":"text","text":"https://doc.akka.io/"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":"关于 GrowingIO"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"GrowingIO 是国内领先的一站式数字化增长整体方案服务商。为产品、运营、市场、数据团队及管理者提供客户数据平台、广告分析、产品分析、智能运营等产品和咨询服务,帮助企业在数字化转型的路上,提升数据驱动能力,实现更好的增长。"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/23c3eaf78dc3a2c226bf0bfa9?utm_source=rss&utm_medium=article