当前位置:网站首页>Flume incrementally collects MySQL data to Kafka
Flume incrementally collects MySQL data to Kafka
2022-07-27 16:16:00 【Carrot eating crocodile】
flume Installation configuration ( Non cluster , A single node )
1.flume-env.sh
Set up JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_221

2.flume Connect mysql Driver package preparation
Enter the link to download the source code
https://github.com/keedio/flume-ng-sql-source
Now the latest is 1.5.3
decompression ,
Enter the directory and compile
C:\Users\asus>D:
D:\>cd \ Utilities \ New folder \flume-ng-sql-source-1.4.3\flume-ng-sql-source-1.4.3
D:\ Utilities \ New folder \flume-ng-sql-source-1.4.3\flume-ng-sql-source-1.4.3>mvn package


Compile successfully 
2.flume And kafka,mysql Integrate
agent.sources = sql-source
agent.sinks = k1
agent.channels = ch
# This is flume collection mysql The driver ,git Address https://github.com/keedio/flume-ng-sql-source, You need to compile , After compilation , take flume-ng-sql-source-1.x.x.jar Put the bag in FLUME_HOME/lib Next , If it is CM Next CDH Version of flume, Is on the /opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib Next
agent.sources.sql-source.type= org.keedio.flume.source.SQLSource
# URL to connect to database (currently only mysql is supported)
#?useUnicode=true&characterEncoding=utf-8&useSSL=false The parameter needs to be added with
agent.sources.sql-source.hibernate.connection.url=jdbc:mysql://hostname:3306/yinqing?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Database connection properties
agent.sources.sql-source.hibernate.connection.user=root
agent.sources.sql-source.hibernate.connection.password =password
agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect
# Need to put mysql-connector-java-X-bin.jar Put it in FLUME_HOME/lib Next , If it is CM Next CDH Version of flume, Is on the /opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib It is directly provided here 5.1.48 edition ( theory mysql5.x All of them can be used ) Of course wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
agent.sources.sql-source.hibernate.driver_class = com.mysql.jdbc.Driver
agent.sources.sql-source.hibernate.connection.autocommit = true
# Fill in the name of the data sheet you need to collect
agent.sources.sql-source.table =table_name
agent.sources.sql-source.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
# Status file is used to save last readed row
# Store flume State data for , Because it is an incremental search
agent.sources.sql-source.status.file.path = /var/lib/flume-ng
agent.sources.sql-source.status.file.name = sql-source.status
#kafka.sink To configure , Here is the cluster , need zookeeper and kafka The address of the cluster has the port number , Unstudied , Look at the back kafka The configuration of has been introduced
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = yinqing
agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
agent.sinks.k1.batchsize = 200
agent.sinks.kafkaSink.requiredAcks=1
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
# Here zookeeper Ports are configured , I deserve 2180, Basically it should be 2181
agent.sinks.kafkaSink.zookeeperConnect=zookeeper-node1:2180,zookeeper-node2:2180,zookeeper-node3:2180
agent.channels.ch.type = memory
agent.channels.ch.capacity = 10000
agent.channels.ch.transactionCapacity = 10000
agent.channels.hbaseC.keep-alive = 20
agent.sources.sql-source.channels = ch
agent.sinks.k1.channel = ch

kafka Cluster installation configuration
1. To configure server.properties file
# Cluster unique ID
broker.id=2
# Intranet
host.name=host_name/IP
# Intranet
listeners=PLAINTEXT://host_name/IP:9092
# If there is an Internet iP When setting up the server of, you need to configure the external network of the server ip
advertised.listeners=PLAINTEXT://123.xxx.xxx.xxx:9092
log.dirs=/opt/moudle/kafka_2.11-2.1.0/logs/tmp/kafka-logs
zookeeper.connect=zookeeper_node1_hostname:2180,zookeeper_node2_hostname:2180,zookeeper_node3_hostname:2180
Configure the basic front and last , The general default in the middle is ok , Can be modified as needed 


2. zookeeper.properties To configure
dataDir=/tmp/zookeeper
dataLogDir=/tmp/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
# the port at which the clients will connect
clientPort=2180
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
server.1=192.168.100.81:2888:3888
server.2=192.168.100.95:2888:3888
server.3=192.168.100.224:2888:3888

Be careful
1.No suitable driver found for jdbc:mysql://xxxxx
One is : Connect URL There's something wrong with the format ((“jdbc:mysql://localhost:3306/XX”,“root”,“XXXX”)
Two is : Error in driver string (com.mysql.jdbc.Driver)
The third is Classpath There's no proper addition to mysql_jdbc drive
Four is to mysql-connector-java-5.1.22-bin.jar Put it in $JAVA_HOME/jre/lib/ext in
2.kafka Common commands
Delete theme
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
Be careful : Need to be in Broker Configuration file for server.properties Middle configuration delete.topic.enable=true To delete the theme .
Subject information
bin/kafka-topics.sh --describe --zookeeper localhost:2180 --topic test_topic
Add partition
bin/kafka-topics.sh --alter --zookeeper localhost:2180 --topic test_topic --partitions 3
Create a theme
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
List topics
bin/kafka-topics.sh --list --zookeeper localhost:2180
3.kafka shell Script
kafka The startup script
#/bin/bash
echo "start kafka......"
bin/kafka-server-start.sh -daemon /opt/hiteam_moudle/kafka_2.11-2.1.0/config/server.properties
establish topic
#/bin/bash
echo "create topic of kafka ...."
bin/kafka-topics.sh --create --zookeeper k8s-master:2181,k8s-node1:2181,k8s-node2:2181 --replication-factor 3 --partitions 4 --topic topic2
see topic
#/bin/bash
echo "list topic........"
./bin/kafka-topics.sh --list --zookeeper 192.168.100.81:2180
start-up consumer
#/bin/bash
echo "kafka-flume starting........"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yinqing --from-beginning
flume Script
Specify profile startup flume
#/bin/bash
echo "flume:mysql-kafka start....."
bin/flume-ng agent --conf conf --conf-file /opt/hiteam_moudle/apache-flume-1.8.0-bin/conf/flume-conf.properties --name agent -Dflume.root.logger=INFO,console
边栏推荐
- 可载100人!马斯克发布史上最强“星际飞船” !最早明年上火星!
- Common Oracle statements
- QT (VI) value and string conversion
- centos上mysql5.7主从热备设置
- Nacos
- makefile 中指定程序运行时加载的库文件路径
- DRF学习笔记(五):视图集ViewSet
- Join hands with sifive, Galanz will enter the semiconductor field! Exposure of two self-developed chips
- Six capabilities of test and development
- It can carry 100 people! Musk releases the strongest "starship" in history! Go to Mars as early as next year!
猜你喜欢

Security software related to wireless network analysis (airtrack ng)

Content ambiguity occurs when using transform:translate()

判断数据的精确类型

Mlx90640 infrared thermal imager temperature sensor module development notes (VII)

解决flink启动后无法正常关闭

解决MT7620不断循环uboot(LZMA ERROR 1 - must RESET board to recover)

DRF学习笔记(五):视图集ViewSet

2.2 basic elements of JMeter

mysql设置密码时报错 Your password does not satisfy the current policy requirements(修改·mysql密码策略设置简单密码)

Addition, deletion, query and modification of MySQL table data
随机推荐
可载100人!马斯克发布史上最强“星际飞船” !最早明年上火星!
多行文本溢出打点
4-digit random data
43亿欧元现金收购欧司朗宣告失败!ams表示将继续收购
__ The difference between typeof and typeof
To meet risc-v challenges? ARM CPU introduces custom instruction function!
web测试学习笔记01
单机高并发模型设计
Brief description of tenant and multi tenant concepts in cloud management platform
Characters generated by JMeter function assistant in jmeter5.3 and later versions cannot be copied when they are grayed out
编码技巧——全局异常捕获&统一的返回体&业务异常
一款功能强大的Web漏洞扫描和验证工具(Vulmap)
Coding skills - Global exception capture & unified return body & Business exception
firefox旧版本
Taking advantage of 5g Dongfeng, does MediaTek want to fight the high-end market again?
Excel extract duplicates
flink打包程序提交任务示例
Mapreduce实例(一):WordCount
判断数据的精确类型
解决flink启动后无法正常关闭