当前位置:网站首页>Teach you to learn dapr - 6 Publish subscription
Teach you to learn dapr - 6 Publish subscription
2022-06-26 16:46:00 【Masa technical team】
Introduce
Release / Subscription mode allows microservices to communicate with each other using messages . Producers or publishers send messages to topics without knowing which application will receive them . This involves writing them to the input channel . Again , Consumers or subscribers subscribe to the topic and receive its messages , I don't know what service generated these messages . This involves receiving messages from the output channel . The intermediate message broker is responsible for copying each message from the input channel to the output channel of all subscribers interested in the message . When you need to separate microservices from each other , This model is particularly useful .
Dapr Release in / subscribe API Provide at least one (at-least-once) Guarantee , It integrates with various message brokers and queue systems . The specific implementation used by your service is pluggable , And configured to run at runtime Dapr Pub/Sub Components . This approach eliminates the dependency of your services , So that your service can be more portable , More flexibility to adapt to changes .

Dapr Release / The subscription building block provides a platform independent API To send and receive messages . Your service publishes messages to named topics , And subscribe to topics to use these messages .
The figure below shows a “shipping” Service and a “email” Examples of services , They're all subscribed to “cart” The topic of service publishing . Each service will be loaded to point to the same publication / Subscribe to the publication of the message bus component / Subscribe to the component configuration file , for example Redis Streams、NATS Streaming、Azure Service Bus or GCP Pub/Sub.

The following figure has the same service , But this time it shows Dapr Release API, It sends “ Order ” Order endpoints on topic and subscription services , These subject messages are provided by Dapr Publish to .
characteristic
Cloud Events The message format
To enable message routing and provide additional context for each message ,Dapr Use CloudEvents 1.0 Specification as its message format . Application usage Dapr Any message sent to the subject is automatically “ packing ” stay Cloud Events In the envelope , Use datacontenttype Attribute Content-Type Header value .
Dapr The following is achieved Cloud Events Field :
idsourcespecversiontypedatacontenttype(Optional)
The following example shows CloudEvent v1.0 Serialized as JSON Of XML Content :
{ "specversion" : "1.0", "type" : "xml.message", "source" : "https://example.com/message", "subject" : "Test XML Message", "id" : "id-1234-5678-9101", "time" : "2020-09-23T06:23:21Z", "datacontenttype" : "text/xml", "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"}News subscription
Dapr Applications can subscribe to published topics . Dapr There are two ways to allow your application to subscribe to topics :
- declarative , Where the subscription is defined in an external file
- Procedural , Define subscriptions in user code
The messaging
In principle, , When the subscriber responds with a non error response after processing the message ,Dapr Consider that the message has been successfully delivered . For finer control ,Dapr Release / subscribe API An explicit state defined in the response payload is also provided , Subscribers can use these states to send messages to Dapr Indicates a specific processing instruction ( for example RETRY or DROP). If two different applications ( Different app-ID) Subscribed to the same topic ,Dapr Pass each message to only one instance of each application .

Subject area
By default , All support Dapr Release / Subscription component ( for example Kafka、Redis Stream、RabbitMQ) The topics of are available for each application that configures the component . To limit which applications can publish or subscribe to topics ,Dapr Provides a range of topics . This enables you to allow the application to publish and subscribe to what topics .
Message lifetime (TTL)
Dapr Timeout messages can be set on a per message basis , This means that if not from pub/sub Component reads the message , The message will be discarded . This is to prevent the accumulation of unread messages . In the queue than configured TTL Long news is called dead news .
notes : You can also set messages for a given queue when the component is created TTL.
And no use Dapr and CloudEvents Application communication for
For an application, use Dapr A scenario that is not used by another application , You can disable... For publishers or subscribers CloudEvent packing . This allows the use of... Not all at once Dapr Some of our applications use Dapr Publish subscribe .
Use .Net call Dapr Publish subscription
The following example creates an application to publish and subscribe to an application named deathStarStatus The theme of

precondition
function dapr init when ,Redis Streams It is installed on the local machine by default . If it is dapr init --slim You need to do something yourself , I'm not going to do that here .
Verify by opening your component file %UserProfile%\.dapr\components\pubsub.yaml
apiVersion: dapr.io/v1alpha1kind: Componentmetadata: name: pubsubspec: type: pubsub.redis version: v1 metadata: - name: redisHost value: localhost:6379 - name: redisPassword value: "" notes : here redisHost Of value It can be set according to the actual situation , For example, a cloud redis Examples, etc . Someone asked if you can switch the default db, Certainly. ,name Set up redisDB,value Set to the... You want to use db that will do
If you want more details yaml Configuration parameters , For example, concurrency settings 、 The maximum number of retries and so on can be seen here https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/
Subscribe to topics
There are two ways to subscribe to a topic : declarative 、 Procedural
Declarative subscriptions
You can use the following custom resource definitions (CRD) Subscribe to topics . Create a file called subscription.yaml And paste the following :
apiVersion: dapr.io/v1alpha1kind: Subscriptionmetadata: name: myevent-subscriptionspec: topic: deathStarStatus route: /dsstatus pubsubname: pubsubscopes:- app1- app2 The above example shows a pair of pubsub Components pubsub The theme deathStarStatus Event subscription for .
take CRD Put the file in the component directory , I won't go on here .
Programmed subscription
Dapr The instance calls your application at startup and looks forward to the topic subscription JSON Respond to :
- pubsubname:Dapr Which should I use pub/sub Components
- topic: Which topic to subscribe to
- route: When the message relates to the subject ,Dapr Call which endpoint
notes : You might think , Isn't it troublesome ? Yes , So we use dapr dotnet-sdk To help us automate these things
.Net subscribe
The above is to let dapr sidecar Know who the subscription to this message will eventually go to . But how do we write it in our program ?
If you choose a declarative subscription , You make one route that will do , If it is a programmed subscription, you don't need to write one more yaml file , And it can support , So let's see .Net SDK How to do it .
establish Assignment.Server(Sub)
establish ASP.NET Core empty project , At the same time, add... According to the content of the previous article Dapr.AspNetCoreNuGet Package and modifier ports are 5000
modify program.cs Code
using Microsoft.AspNetCore.Mvc;var builder = WebApplication.CreateBuilder(args);var app = builder.Build();app.UseRouting();app.UseCloudEvents();app.UseEndpoints(endpoints =>{ endpoints.MapSubscribeHandler();});app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");app.Run(); notes : To tell Dapr The message was processed successfully , Please return 200 OK Respond to . If Dapr Received except 200 Any other return status code other than , Or if your application crashes ,Dapr Will try to follow At-Least-Once Semantic retransmission of messages .
function Assignment.Server
Use Dapr CLI To start up , First use the command line tool to jump to the directory dapr-study-room\Assignment06\Assignment.Server, Then execute the following command
dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet runestablish Assignment.Client(Publish)
establish Console project , And modify it program.cs
var client = new Dapr.Client.DaprClientBuilder().Build();await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World"); function Assignment.Client You can see Assignment.Server Will print Hello World!
Route messages to different event handlers
Content based routing is a way to use DSL Instead of the messaging pattern of imperative application code .PubSub Routing is an implementation of this pattern , It allows developers to use expressions based on CloudEvents The content of the is routed to different nodes in the application URI/ Paths and event handlers .
notes : This is a preview function , If you are interested, you can try it yourself , It is worth mentioning that Common Expression Language (CEL) Very interesting , Here is just a piece of code to see .
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)] [HttpPost("widgets")] public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient) { // Logic return stock; } [Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)] [HttpPost("gadgets")] public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient) { // Logic return stock; } [Topic("pubsub", "inventory")] [HttpPost("products")] public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient) { // Logic return stock; }Release / Subscribe to topic access
A namespace or component scope can be used to restrict component access to a particular application . The scope of these applications added to the component is limited only to those with specific characteristics ID Your application can use this component .
Beyond the scope of this common component , Release / The subscription component can also limit the following :
- What topics can be used ( Published or subscribed )
- Which applications are allowed to publish to specific topics
- Which applications are allowed to subscribe to specific topics
Subject access
apiVersion: dapr.io/v1alpha1kind: Componentmetadata: name: pubsub namespace: defaultspec: type: pubsub.redis version: v1 metadata: - name: redisHost value: "localhost:6379" - name: redisPassword value: "" - name: publishingScopes value: "app1=topic1;app2=topic2,topic3;app3=" - name: subscriptionScopes value: "app2=;app3=topic1"The following table shows which applications are allowed to publish to topics :
| topic1 | topic2 | topic3 | |
|---|---|---|---|
| app1 | X | ||
| app2 | X | X | |
| app3 |
The following table shows which applications are allowed to subscribe to topics :
| topic1 | topic2 | topic3 | |
|---|---|---|---|
| app1 | X | X | X |
| app2 | |||
| app3 | X |
notes : If the application is not listed ( for example subscriptionScopes Medium app1), You are allowed to subscribe to all topics . Because not used allowedTopics And app1 No subscription scope , So it can also use other topics not listed above .
Limit allowed topics
If Dapr The application sends a message to it , A theme will be created . In some cases , You should control the creation of this topic . for example :
- stay Dapr In the application , Errors in generating topic names can cause an unlimited number of topics to be created
- Compact topic name and total number , Prevent the theme from growing indefinitely
apiVersion: dapr.io/v1alpha1kind: Componentmetadata: name: pubsub namespace: defaultspec: type: pubsub.redis version: v1 metadata: - name: redisHost value: "localhost:6379" - name: redisPassword value: "" - name: allowedTopics value: "topic1,topic2,topic3"combination allowedTopics and scopes
Sometimes you want to combine these two scopes , Therefore, only a fixed set of topics is allowed , And assign the scope to a specific application .
apiVersion: dapr.io/v1alpha1kind: Componentmetadata: name: pubsub namespace: defaultspec: type: pubsub.redis version: v1 metadata: - name: redisHost value: "localhost:6379" - name: redisPassword value: "" - name: allowedTopics value: "A,B" - name: publishingScopes value: "app1=A" - name: subscriptionScopes value: "app1=;app2=A" notes : The third application is not listed , Because if an application does not specify in scope , It is allowed to use all topics .
The following table shows the applications that are allowed to be published to topics :
| A | B | C | |
|---|---|---|---|
| app1 | X | ||
| app2 | X | X | |
| app3 | X | X |
The following table shows which applications can subscribe to topics :
| A | B | C | |
|---|---|---|---|
| app1 | |||
| app2 | X | ||
| app3 | X | X |
news TTL
Same status management , Use metadata Of ttlInSeconds
Source code of this chapter
Assignment06
https://github.com/doddgu/dapr-study-room
We are acting , New framework 、 New ecology
Our goal is The freedom of the 、 Easy-to-use 、 Highly malleable 、 functional 、 Robust .
So we learn from Building blocks Design concept of , Working on a new framework MASA Framework, What are its characteristics ?
- Native support Dapr, And allow Dapr Replace with traditional means of communication
- Unlimited architecture , Single application 、SOA、 Micro services support
- Support .Net Native framework , Reduce the burden of learning , In addition to the concepts that must be introduced in a specific field , Insist on not making new wheels
- Rich ecological support , In addition to the framework, there are component libraries 、 Authority Center 、 Configuration center 、 Troubleshooting center 、 A series of products such as Alarm Center
- Unit test coverage of the core code base 90%+
- Open source 、 free 、 Community driven
- What is the ? We are waiting for you , Come together and discuss
After several months of production project practice , Completed POC, At present, the previous accumulation is being refactored into new open source projects
At present, the source code has been synchronized to Github( The document site is under planning , Will gradually improve ):
QQ Group :7424099
Wechat group : Plus technology operation wechat (MasaStackTechOps), Remarks , Invite in

边栏推荐
- Structure the graduation project of actual combat camp
- Redis 概述整理
- Failed to upload hyperf framework using alicloud OSS
- Redis migration (recommended operation process)
- Hyperf框架使用阿里云OSS上传失败
- Memory partition model
- Mono 的一些实例方法
- 经典同步问题
- Calculate the average of N numbers in the index group of X, and return the number that is less than the average and closest to the average through formal parameters
- num[i]++
猜你喜欢
随机推荐
Redis 概述整理
当一个程序员一天被打扰 10 次,后果很惊人!
108. 简易聊天室11:实现客户端群聊
【从删库到跑路】JDBC 完结篇(一天学完系列!!学完赶紧跑!)
MHA switching (recommended operation process)
MS|谢黎炜组发现混合益生菌制剂及其代谢产物可缓解结肠炎
Arduino UNO + DS1302简单获取时间并串口打印
《软件工程》期末重点复习笔记
Failed to upload hyperf framework using alicloud OSS
108. simple chat room 11: realize client group chat
Set up your own website (16)
Développer un opérateur basé sur kubebuilder (démarrer)
经典同步问题
Science | giant bacteria found in mangroves challenge the traditional concept of nuclear free membrane
用Attention和微调BERT进行自然语言推断-PyTorch
Hyperf框架使用阿里云OSS上传失败
Kept to implement redis autofailover (redisha) 1
[understanding of opportunity -31]: Guiguzi - Daoyu [x ī] Crisis is the coexistence of danger and opportunity
100+ data science interview questions and answers Summary - basic knowledge and data analysis
电路中缓存的几种形式









