问题描述
当使用SDK连接到Azure Event Hub时,The most common way is to use a connection string.This website documents can be successful completion of the code:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
只是,如果使用Azure ADAuthentication for a visit,Code needs to be how to modify? 如何来使用AAD的 TokenCredential呢?
分析问题
在使用Connection String的时候,EventProcessorClientBuilder使用connectionStringMethod configuration connection string.
如果使用Azure AD认证,it needs to be based onAADRegistered in the application to obtainClient ID, Tenant ID, Client Secret,Then set these contents as system environment variables
- AZURE_TENANT_ID :对应AAD registration application page Tenant ID
- AZURE_CLIENT_ID :对应AAD registration application page Application (Client) ID
- AZURE_CLIENT_SECRET :对应AAD 注册应用的 Certificates & secrets 中创建的Client Secrets
然后使用 credential 初始化 EventProcessorClientBuilder 对象
注意点:
1) DefaultAzureCredentialBuilder 需要指定 Authority Host为 Azure China
2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名
操作实现
第一步:为AADRegistering an application grants an actionEvent Hub Data的权限
Azure 提供了以下 Azure 内置角色,用于通过 Azure AD 和 OAuth Awarded for event center data access:
- Azure Data owners event center (Azure Event Hubs Data Owner): Use this role to grant full access to Event Hub resources.
- Azure event hub data sender (Azure Event Hubs Data Sender) : Use this role can be granted to send access to events center resources.
- Azure Event Hub Data Receiver (Azure Event Hubs Data Receiver): Use this role to grant access to Event Hub resources/Receive access.
本实例中,Only need to receive data,So only given Azure Event Hubs Data Receiver权限.
第二步:在Java 项目中添加SDK依赖
添加在pom.xml文件中 dependencies 部分的内容为:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,Best to use the latest version,Avoid run-time type conflicts or not found
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>spdemo</artifactId> <version>1.0-SNAPSHOT</version> <name>spdemo</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs</artifactId> <version>5.12.2</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
第三步:add full code
package com.example; import com.azure.core.credential.TokenCredential; import com.azure.identity.AzureAuthorityHosts; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.io.IOException; import java.sql.Date; import java.time.Instant; import java.time.temporal.TemporalUnit; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; /** * Hello world! * */ public class App { // private static final String connectionString ="<connectionString>"; // private static final String eventHubName = "<eventHubName>"; // private static final String storageConnectionString = "<storageConnectionString>"; // private static final String storageContainerName = "<storageContainerName>"; public static void main(String[] args) throws IOException { System.out.println("Hello World!"); // String connectionString ="<connectionString>"; // The fully qualified namespace for the Event Hubs instance. This is likely to // be similar to: // {your-namespace}.servicebus.windows.net // String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn"; // String eventHubName = "<eventHubName>"; String storageConnectionString = System.getenv("storageConnectionString"); String storageContainerName = System.getenv("storageContainerName"); String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace"); String eventHubName = System.getenv("eventHubName"); TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA) .build(); // Create a blob container client that you use later to build an event processor // client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // EventHubProducerClient // EventHubProducerClient client = new EventHubClientBuilder() // .credential(fullyQualifiedNamespace, eventHubName, credential) // .buildProducerClient(); Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>(); initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000)); // EventProcessorClientBuilder // Create a builder object that you will use later to build an event processor // client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() // .connectionString(connectionString, eventHubName) .credential(fullyQualifiedNamespace, eventHubName, credential) .initialPartitionEventPosition(initialPartitionEventPosition) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // EventPosition.f // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process"); } public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob // Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); }; }
附录:自定义设置 Event Position,当程序运行时,指定从Event Hub中获取消息的 Sequence Number
使用EventPosition对象中的fromSequenceNumber方法,A serial number can be specified,ConsumeAfter get the news of the end will be according to the number.其他的方法还有 fromOffset(指定游标) / fromEnqueuedTime(specify a point in time,get the message after) / earliest(从最早开始) / latest(Get new data from the end,old data not fetched)
Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>(); initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
注意:
Map<String, EventPosition> 中的String 对象为Event Hub的分区ID,如果Event Hub有2个分区,then its value is0,1.
EventPosition 设置的值,只在Storage Accountstored inCheckPoint StoreThe value in no,or less than the value set here,才会起效果.否则,Consume 会根据从Checkpoint中获取的SequenceNumber为准.
参考资料
对使用 Azure Active Directory Authenticate applications accessing Event Hub resources : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application
使用 Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
[END]