当前位置:网站首页>Flume (5 demos easy to get started)
Flume (5 demos easy to get started)
2022-07-28 02:14:00 【kjshuan】
kafka Integrate flume
3 A question Massive orders Put in message queue Premise Manual Clone expression ( Timer ) springBoot Integrate ack Simply send The synchronous Asynchronous sending speeds up ( Data will be lost ) Sub database cannot stand flume Installed on the extension Common version oragle 11g mysql 5.7 tomcat spring 5.2.12 nacos 1.4.2 setea setinel canal 1.1.6 flume 1.6cdh 5.14.2 redis 5 kafka 2.0.0 nginx 1.8.1 zookerper Pay attention to Agent The inner part of , Contains Source、Channel and Sink In the third part of ,Agent It's a JVM process . Before focusing on these three parts, understand a concept :Flume event.event The data to be transmitted is encapsulated , And then to event To transmit data , yes flume The basic unit of data transmission . Now let's focus on the functions of three parts . Source: To collect data . Data sources can be many , for example web server. It can handle all kinds of data , Include custom types ; Channel: Used to cache data . When Source When data is collected , Cache data to One or more Channel in ( Can be in memory , It can also be in the disk file ), until Sink Take out the data ; Sink: Take out Channel Data in , Send data to the destination . Destinations such as HDFS, It could be another Agent Of Source. reliability Only Channel All caches in reach the destination or the next Agent when ,Channel Will empty the cache . Recoverability Channel You can cache it in file or memory , Data recovery can be performed when the operation fails . To configure AGENT Agent The configuration is stored in a local configuration file , It's a follow java Attribute configuration file . A configuration file can configure one or more Agent. The configuration contains each Sourcce,Channel and Sink Property configuration for , And how they inject to form data streams .
kafka Basic knowledge ( Use the premise )
kafka Command line view topic View all in the current server topic: bin/kafka-topics.sh --zookeeper host :2181 --list establish topic: bin/kafka-topics.sh --zookeeper host :2181 --create --replication-factor 3 --partitions 1 --topic name word Delete topic: bin/kafka-topics.sh --zookeeper host :2181 --delete --topic name need server.properties Set in delete.topic.enable=true Otherwise just mark delete . Send a message : bin/kafka-console-producer.sh --broker-list host :9092 --topic name >hello world News consumption : bin/kafka-console-consumer.sh --zookeeper host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --from-beginning --topic name View a certain topic The situation of bin/kafka-topic.sh --zookeeper host :2181 --describe --topic name Change the number of partitions : bin/kafka-topics.sh --zookeeper host :2181 --alter --topic name --partitions 6
demo1

1. decompression
cd /opt/ ls tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz ls mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160 ls cd soft/flume160/conf/ ls cp flume-conf.properties.template flume-conf.properties
2. To write hello

# stay opt establish flumecfg The configuration file first.conf, Add the following
cd /opt/ mkdir flumecfg cd flumecfg/ ls vim first.conf # File configuration a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=9999 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

3. install navcat
# install netcat Tools yum install -y nc # Judge 44444 Whether the port is occupied sudo netstat -nlp | grep 44444 # Judge 9999 Whether the port is occupied sudo netstat -nlp | grep 9999 # wait for 3.1 Window start Run the code nc localhost 99993.1 Change the window
# Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/first.conf -Dflume.root.logger=INFO,console
Notice to start the window first 3.1 Window before startup !!!

demo2
1. Using Windows 3.1

vim /opt/flumecfg/first.conf touch /opt/flumecfg/sec.conf vim sec.conf # Add the following # name sources channels sinks a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir # The monitoring content comes from /opt/data Catalog a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory # The destination of the output is the console logger type a1.sinks.k1.type = logger # take sink source and channel binding a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 # preservation :wq
2. Drag file

3. Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/sec.conf -Dflume.root.logger=INFO,console
demo3
1. To configure
cd /opt/flumecfg/ ls cp sec.conf third.conf vim third.conf # Add the following to the configuration a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
2. New window A start-up kafka
#zookeper Start... First zkServer.sh start #kafka start-up kafka-server-start.sh /opt/soft/kafka200/config/server.properties # New window b Turn on consumer monitor kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo
3. Rename the log file

.log
4. New window C start-up
cd /opt/soft/flume160/bin/ ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf
demo4( Secondary development )
1. New project

1.1 Import pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>tcinterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<name>tcinterceptor</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.kgc.towercrane.intceptors.SimplePartitioner</mainClass><!-- Change the location of your main class here -->
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. To write SimplePartitioner class
3. Drag in jar package

4. Configure regular
cd /opt/flumecfg vim third.conf # add to a1.sources.s1.spoolDir = /opt/data a1.sources.s1.interceptors=i1 a1.sources.s1.interceptors.i1.type=regex_extractor a1.sources.s1.interceptors.i1.regex=(TC[0-9]+).* a1.sources.s1.interceptors.i1.serializers=ssl a1.sources.s1.interceptors.i1.serializers.ssl.name=key a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.partitioner.class=com.kgc.towercrane.intceptor.SimplePartitioner a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
5. To configure zookeper del
vim /opt/soft/kafka200/config/server.properties # Must be configured to true To delete delete.topic.enable=true :wq

6. Delete
kafka-topics.sh --delete --zookeeper 192.168.64.138:2181 --topic mydemo # View delete results cd /opt/soft/kafka200/kafka-logs/ ls

7. establish mydemo
# establish 3 Zones kafka-topics.sh --create --zookeeper 192.168.64.138:2181 --topic mydemo --replication-factor 1 --partitions 3 # View partition results cd /opt/soft/kafka200/kafka-logs/ ls # First create a window A B C D #1 zookeper Start... First ( window A) zkServer.sh start #2 kafka start-up ( window A) kafka-server-start.sh /opt/soft/kafka200/config/server.properties #3 monitor kafka( window B) kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo #4 Entry directory ( window C) cd /opt/soft/flume160/bin #4.1 start-up flume( window C) ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf -Dflume.root.logger=INFO,console #5 monitor mydemo Easy to view the number of partitions ( window D) kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.64.138:9092 --topic mydemo # Group reset ./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
8 Problem testing


9.ideal To configure MyConsumer class
@Component
public class MyConsumer {
@KafkaListener(topics = "tc",groupId = "cm")
public void readKafka(ConsumerRecord<String,String> record, Acknowledgment ack){
System.out.println(record.key()+"====>"+record.value()+"====>"+record.partition());
ack.acknowledge();
}
}
#application start-up
10. Group reset
./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
demo5(springBoot Integrate kafka consumer See the next article for details )
1.ideal tcmanager To configure TcinfoPushService

1
# start-up nacos startup.sh

边栏推荐
- Codeforces Round #810 (Div. 2)A~C题解
- 交叉熵原理及实现
- The principle and implementation of loss function cross entropy
- Uniapp summary (applet)
- Promise from introduction to mastery (Chapter 1 Introduction and basic use of promise)
- Gbase 8C backup control function (II)
- A letter to the user of qubu drawing bed
- Skywalking distributed system application performance monitoring tool - medium
- Appium click operation sorting
- 软件测试面试题:你认为做好测试用例设计工作的关键是什么?
猜你喜欢

Flex layout learning completed on PC side
![53: Chapter 5: develop admin management service: 6: develop [admin administrator exit login, interface]; (one point: when we want to modify a value with a certain coding method, the new value should b](/img/9a/674308c7a21fa2be0943ca7967e274.png)
53: Chapter 5: develop admin management service: 6: develop [admin administrator exit login, interface]; (one point: when we want to modify a value with a certain coding method, the new value should b

Flex布局—固定定位+流式布局—主轴对齐—侧轴对齐—伸缩比

The level "trap" of test / development programmers is not a measure of one-dimensional ability

A lock faster than read-write lock. Don't get to know it quickly

新零售业态下,零售电商RPA助力重塑增长

Uniapp summary (applet)

Netease cloud copywriting

Data security and privacy computing summit - provable security: Learning

【数据库数据恢复】SQL Server数据库磁盘空间不足的数据恢复案例
随机推荐
Talk to ye Yanxiu, an atlassian certification expert: where should Chinese users go when atlassian products enter the post server era?
华为APP UI自动化测试岗面试真题,真实面试经历。
Redis design specification
软件测试面试题:为什要在一个团队中开展测试工作?
JS what situations can't use json Parse, json.stringify deep copy and a better deep copy method
交叉熵原理及实现
UE4 unreal ndisplay plug-in easy to use three fold screen details
Go learn 02 basic knowledge
WMS you don't know
Interviewer: are you sure redis is a single threaded process?
数据输出-绘制动图
软件测试面试题:请详细介绍一下各种测试类型的含义?
Sample imbalance - entry 0
执行 Add-Migration 迁移时报 Build failed.
Hcip 13th day notes
Starfish Os打造的元宇宙生态,跟MetaBell的合作只是开始
SkyWalking分布式系统应用程序性能监控工具-中
【数据库数据恢复】SQL Server数据库磁盘空间不足的数据恢复案例
Flex layout learning completed on PC side
Data output - dynamic drawing