当前位置:网站首页>技术干货|如何将 Pulsar 数据快速且无缝接入 Apache Doris
技术干货|如何将 Pulsar 数据快速且无缝接入 Apache Doris
2022-08-03 15:42:00 【InfoQ】
KoP 架构介绍:
- 利用企业级多租户特性简化运营。
- 避免数据搬迁,简化操作。
- 利用 Apache BookKeeper 和分层存储持久保留事件流。
- 利用 Pulsar Functions 进行无服务器化事件处理。
Routine Load 订阅 Pulsar 数据思路
--------------------------
| Apache Doris |
| --------------- |
| | Routine Load | |
| --------------- |
--------------------------
|Kafka Protocol(librdkafka)
------------v--------------
| --------------- |
| | KoP | |
| --------------- |
| Apache Pulsar |
--------------------------
操作实战
1. Pulsar Standalone 安装环境准备:
- JDK 安装:略
- 下载 Pulsar 二进制包,并解压:
#下载
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解压并进入安装目录
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
2. KoP 组件编译和安装:
- 下载 KoP 源码
git clone https://github.com/streamnative/kop.git
cd kop
- 编译 KoP 项目:
mvn clean install -DskipTests
- protocols 配置:在解压后的 apache-pulsar 目录下创建 protocols文 件夹,并把编译好的 nar 包复制到 protocols 文件夹中。
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
- 添加后的结果查看:
[[email protected] apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
3. KoP 配置添加:
- 在 standalone.conf 或者 broker.conf 添加如下配置
#kop适配的协议
messagingProtocols=kafka
#kop 的NAR文件路径
protocolHandlerDirectory=./protocols
#是否允许自动创建topic
allowAutoTopicCreationType=partitioned
- 添加如下服务监听配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
4. Pulsar 启动
#前台启动
#bin/pulsar standalone
#后台启动
pulsar-daemon start standalone
5. 创建 Doris 数据库和建表
#进入Doris
mysql -u root -h 127.0.0.1 -P 9030
# 创建数据库
create database pulsar_doris;
#切换数据库
use pulsar_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` String NOT NULL COMMENT "点击类型",
`id` VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
6. 创建 Routine Load 任务
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"
);
- pulsar_doris :Routine Load 任务所在的数据库
- load_from_pulsar_test:Routine Load 任务名称
- clicklog:Routine Load 任务的目标表,也就是配置 Routine Load 任务将数据导入到 Doris 哪个表中。
- strict_mode:导入是否为严格模式,这里设置为 False。
- format:导入数据的类型,这里配置为 Json。
- kafka_broker_list:Kafka Broker 服务的地址
- kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。
- property.group.id:消费组 ID
7. 数据导入和测试
- 数据导入
- 构造一个 ClickLog 的数据结构,并调用 Kafka 的 Producer 发送 5000 万条数据到 Pulsar。
public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //省略getter和setter
}
String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i<batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
- ROUTINE LOAD 任务查看 执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;命令,查看导入任务的状态。
mysql> SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified
- 数据统计验证 执行如下命令统计表中的数据,发现统计的结果也是 50000000,符合预期。
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql>
边栏推荐
猜你喜欢
随机推荐
How Navicat connects to MySQL on a remote server
产品以及研发团队有使用专业的办公软件,如禅道、蓝湖等,他们应该如何使用 Tita 系统?
Yii2安装遇到Loading composer repositories with package information
How to use binary search and find whether the rotation in the array contains a (target) value?Rotate the sorted array leetcode 81. Search
cnpm 安装成功后提示不是内部和外部命令,也不是可运行的命令解决方案
JD6606SP5_JD6606SSP_JD6606SASP_JD6621W7百盛新纪元授权代理商
ModelWhale 云端运行 WRF 中尺度数值气象模式,随时随地即开即用的一体化工作流
程序员面试必备PHP基础面试题 – 第二十天
JS basics--judgment
实习路途:记录给我的第一个实习项目中的困惑
动态链接库.dll、.so和静态库.a,cmake指令
劲爆!协程终于来了!线程即将是过去式
取消转义字符(r)
2021年12月电子学会图形化四级编程题解析含答案:聪明的小猫
MySQL性能优化的'4工具+10技巧'
LyScript 验证PE程序开启的保护
问题1:get和post的区别
30W 2C(JD6606S + FP6652X2)BOM
分享一款免费OPC UA服务器
Reptile attention