当前位置:网站首页>Flume (5 demos easy to get started)
Flume (5 demos easy to get started)
2022-07-28 02:14:00 【kjshuan】
kafka Integrate flume
3 A question Massive orders Put in message queue Premise Manual Clone expression ( Timer ) springBoot Integrate ack Simply send The synchronous Asynchronous sending speeds up ( Data will be lost ) Sub database cannot stand flume Installed on the extension Common version oragle 11g mysql 5.7 tomcat spring 5.2.12 nacos 1.4.2 setea setinel canal 1.1.6 flume 1.6cdh 5.14.2 redis 5 kafka 2.0.0 nginx 1.8.1 zookerper Pay attention to Agent The inner part of , Contains Source、Channel and Sink In the third part of ,Agent It's a JVM process . Before focusing on these three parts, understand a concept :Flume event.event The data to be transmitted is encapsulated , And then to event To transmit data , yes flume The basic unit of data transmission . Now let's focus on the functions of three parts . Source: To collect data . Data sources can be many , for example web server. It can handle all kinds of data , Include custom types ; Channel: Used to cache data . When Source When data is collected , Cache data to One or more Channel in ( Can be in memory , It can also be in the disk file ), until Sink Take out the data ; Sink: Take out Channel Data in , Send data to the destination . Destinations such as HDFS, It could be another Agent Of Source. reliability Only Channel All caches in reach the destination or the next Agent when ,Channel Will empty the cache . Recoverability Channel You can cache it in file or memory , Data recovery can be performed when the operation fails . To configure AGENT Agent The configuration is stored in a local configuration file , It's a follow java Attribute configuration file . A configuration file can configure one or more Agent. The configuration contains each Sourcce,Channel and Sink Property configuration for , And how they inject to form data streams .
kafka Basic knowledge ( Use the premise )
kafka Command line view topic View all in the current server topic: bin/kafka-topics.sh --zookeeper host :2181 --list establish topic: bin/kafka-topics.sh --zookeeper host :2181 --create --replication-factor 3 --partitions 1 --topic name word Delete topic: bin/kafka-topics.sh --zookeeper host :2181 --delete --topic name need server.properties Set in delete.topic.enable=true Otherwise just mark delete . Send a message : bin/kafka-console-producer.sh --broker-list host :9092 --topic name >hello world News consumption : bin/kafka-console-consumer.sh --zookeeper host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --from-beginning --topic name View a certain topic The situation of bin/kafka-topic.sh --zookeeper host :2181 --describe --topic name Change the number of partitions : bin/kafka-topics.sh --zookeeper host :2181 --alter --topic name --partitions 6
demo1

1. decompression
cd /opt/ ls tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz ls mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160 ls cd soft/flume160/conf/ ls cp flume-conf.properties.template flume-conf.properties
2. To write hello

# stay opt establish flumecfg The configuration file first.conf, Add the following
cd /opt/ mkdir flumecfg cd flumecfg/ ls vim first.conf # File configuration a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=9999 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

3. install navcat
# install netcat Tools yum install -y nc # Judge 44444 Whether the port is occupied sudo netstat -nlp | grep 44444 # Judge 9999 Whether the port is occupied sudo netstat -nlp | grep 9999 # wait for 3.1 Window start Run the code nc localhost 99993.1 Change the window
# Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/first.conf -Dflume.root.logger=INFO,console
Notice to start the window first 3.1 Window before startup !!!

demo2
1. Using Windows 3.1

vim /opt/flumecfg/first.conf touch /opt/flumecfg/sec.conf vim sec.conf # Add the following # name sources channels sinks a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir # The monitoring content comes from /opt/data Catalog a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory # The destination of the output is the console logger type a1.sinks.k1.type = logger # take sink source and channel binding a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 # preservation :wq
2. Drag file

3. Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/sec.conf -Dflume.root.logger=INFO,console
demo3
1. To configure
cd /opt/flumecfg/ ls cp sec.conf third.conf vim third.conf # Add the following to the configuration a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
2. New window A start-up kafka
#zookeper Start... First zkServer.sh start #kafka start-up kafka-server-start.sh /opt/soft/kafka200/config/server.properties # New window b Turn on consumer monitor kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo
3. Rename the log file

.log
4. New window C start-up
cd /opt/soft/flume160/bin/ ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf
demo4( Secondary development )
1. New project

1.1 Import pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>tcinterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<name>tcinterceptor</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.kgc.towercrane.intceptors.SimplePartitioner</mainClass><!-- Change the location of your main class here -->
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. To write SimplePartitioner class
3. Drag in jar package

4. Configure regular
cd /opt/flumecfg vim third.conf # add to a1.sources.s1.spoolDir = /opt/data a1.sources.s1.interceptors=i1 a1.sources.s1.interceptors.i1.type=regex_extractor a1.sources.s1.interceptors.i1.regex=(TC[0-9]+).* a1.sources.s1.interceptors.i1.serializers=ssl a1.sources.s1.interceptors.i1.serializers.ssl.name=key a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.partitioner.class=com.kgc.towercrane.intceptor.SimplePartitioner a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
5. To configure zookeper del
vim /opt/soft/kafka200/config/server.properties # Must be configured to true To delete delete.topic.enable=true :wq

6. Delete
kafka-topics.sh --delete --zookeeper 192.168.64.138:2181 --topic mydemo # View delete results cd /opt/soft/kafka200/kafka-logs/ ls

7. establish mydemo
# establish 3 Zones kafka-topics.sh --create --zookeeper 192.168.64.138:2181 --topic mydemo --replication-factor 1 --partitions 3 # View partition results cd /opt/soft/kafka200/kafka-logs/ ls # First create a window A B C D #1 zookeper Start... First ( window A) zkServer.sh start #2 kafka start-up ( window A) kafka-server-start.sh /opt/soft/kafka200/config/server.properties #3 monitor kafka( window B) kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo #4 Entry directory ( window C) cd /opt/soft/flume160/bin #4.1 start-up flume( window C) ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf -Dflume.root.logger=INFO,console #5 monitor mydemo Easy to view the number of partitions ( window D) kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.64.138:9092 --topic mydemo # Group reset ./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
8 Problem testing


9.ideal To configure MyConsumer class
@Component
public class MyConsumer {
@KafkaListener(topics = "tc",groupId = "cm")
public void readKafka(ConsumerRecord<String,String> record, Acknowledgment ack){
System.out.println(record.key()+"====>"+record.value()+"====>"+record.partition());
ack.acknowledge();
}
}
#application start-up
10. Group reset
./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
demo5(springBoot Integrate kafka consumer See the next article for details )
1.ideal tcmanager To configure TcinfoPushService

1
# start-up nacos startup.sh

边栏推荐
- Software test interview question: please introduce the meaning of various test types in detail?
- C # using ABP warehouse to access the database error record set
- They are all talking about Devops. Do you really understand it?
- What are the important applications of MES system in manufacturing enterprises
- A letter to the user of qubu drawing bed
- 54: Chapter 5: develop admin management services: 7: face warehousing process; Face login process; The browser turns on the video debugging mode (so that the camera can also be turned on in the case o
- 二叉树的遍历和性质
- 数据输出-图片注释、标注
- 软件测试面试题:为什要在一个团队中开展测试工作?
- Gbase 8C general file access function
猜你喜欢
![[Taichi] draw a regular grid in Tai Chi](/img/48/14e825562afa3ffba96296799617f7.png)
[Taichi] draw a regular grid in Tai Chi

Talk to ye Yanxiu, an atlassian certification expert: where should Chinese users go when atlassian products enter the post server era?

Codeforces Round #810 (Div. 2)A~C题解

产品解读丨MeterSphere UI测试模块的设计与分布式扩展

二叉树的遍历和性质

Aike AI frontier promotion (7.14)

Unreal ue4.27 switchboard porting engine process

JS what situations can't use json Parse, json.stringify deep copy and a better deep copy method

记录一次生产死锁

In the era of great changes in material enterprises, SRM supplier procurement system helps enterprises build a digital benchmark for property procurement
随机推荐
都在说DevOps,你真正了解它吗?
Promise从入门到精通 (第1章 Promise的介绍和基本使用)
SFTP file / folder upload server
Aike AI frontier promotion (7.14)
Unreal ue4.27 switchboard porting engine process
Cloud native enthusiast weekly: the evolution of Prometheus architecture
Go learn 02 basic knowledge
shell正则和元字符
Software testing interview question: why should we carry out testing in a team?
Principle and implementation of focal loss
四种常见的 POST 提交数据方式
WMS you don't know
Unittest unit test framework full stack knowledge
Redis design specification
Common modules of ros2 launch files
Talk to ye Yanxiu, an atlassian certification expert: where should Chinese users go when atlassian products enter the post server era?
Embedded classic communication protocol
go 学习01
【网站搭建】使用acme.sh更新ssl证书:将zerossl改为letsencrypt
Redis设计规范