当前位置:网站首页>Flume (5 demos easy to get started)
Flume (5 demos easy to get started)
2022-07-28 02:14:00 【kjshuan】
kafka Integrate flume
3 A question Massive orders Put in message queue Premise Manual Clone expression ( Timer ) springBoot Integrate ack Simply send The synchronous Asynchronous sending speeds up ( Data will be lost ) Sub database cannot stand flume Installed on the extension Common version oragle 11g mysql 5.7 tomcat spring 5.2.12 nacos 1.4.2 setea setinel canal 1.1.6 flume 1.6cdh 5.14.2 redis 5 kafka 2.0.0 nginx 1.8.1 zookerper Pay attention to Agent The inner part of , Contains Source、Channel and Sink In the third part of ,Agent It's a JVM process . Before focusing on these three parts, understand a concept :Flume event.event The data to be transmitted is encapsulated , And then to event To transmit data , yes flume The basic unit of data transmission . Now let's focus on the functions of three parts . Source: To collect data . Data sources can be many , for example web server. It can handle all kinds of data , Include custom types ; Channel: Used to cache data . When Source When data is collected , Cache data to One or more Channel in ( Can be in memory , It can also be in the disk file ), until Sink Take out the data ; Sink: Take out Channel Data in , Send data to the destination . Destinations such as HDFS, It could be another Agent Of Source. reliability Only Channel All caches in reach the destination or the next Agent when ,Channel Will empty the cache . Recoverability Channel You can cache it in file or memory , Data recovery can be performed when the operation fails . To configure AGENT Agent The configuration is stored in a local configuration file , It's a follow java Attribute configuration file . A configuration file can configure one or more Agent. The configuration contains each Sourcce,Channel and Sink Property configuration for , And how they inject to form data streams .
kafka Basic knowledge ( Use the premise )
kafka Command line view topic View all in the current server topic: bin/kafka-topics.sh --zookeeper host :2181 --list establish topic: bin/kafka-topics.sh --zookeeper host :2181 --create --replication-factor 3 --partitions 1 --topic name word Delete topic: bin/kafka-topics.sh --zookeeper host :2181 --delete --topic name need server.properties Set in delete.topic.enable=true Otherwise just mark delete . Send a message : bin/kafka-console-producer.sh --broker-list host :9092 --topic name >hello world News consumption : bin/kafka-console-consumer.sh --zookeeper host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --topic name bin/kafka-console-consumer.sh --bootstrap-server host :9092 --from-beginning --topic name View a certain topic The situation of bin/kafka-topic.sh --zookeeper host :2181 --describe --topic name Change the number of partitions : bin/kafka-topics.sh --zookeeper host :2181 --alter --topic name --partitions 6
demo1

1. decompression
cd /opt/ ls tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz ls mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160 ls cd soft/flume160/conf/ ls cp flume-conf.properties.template flume-conf.properties
2. To write hello

# stay opt establish flumecfg The configuration file first.conf, Add the following
cd /opt/ mkdir flumecfg cd flumecfg/ ls vim first.conf # File configuration a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=netcat a1.sources.r1.bind=localhost a1.sources.r1.port=9999 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1

3. install navcat
# install netcat Tools yum install -y nc # Judge 44444 Whether the port is occupied sudo netstat -nlp | grep 44444 # Judge 9999 Whether the port is occupied sudo netstat -nlp | grep 9999 # wait for 3.1 Window start Run the code nc localhost 99993.1 Change the window
# Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/first.conf -Dflume.root.logger=INFO,console
Notice to start the window first 3.1 Window before startup !!!

demo2
1. Using Windows 3.1

vim /opt/flumecfg/first.conf touch /opt/flumecfg/sec.conf vim sec.conf # Add the following # name sources channels sinks a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir # The monitoring content comes from /opt/data Catalog a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory # The destination of the output is the console logger type a1.sinks.k1.type = logger # take sink source and channel binding a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 # preservation :wq
2. Drag file

3. Turn on flume Listening port
cd /opt/soft/flume160/bin/ # To configure ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/sec.conf -Dflume.root.logger=INFO,console
demo3
1. To configure
cd /opt/flumecfg/ ls cp sec.conf third.conf vim third.conf # Add the following to the configuration a1.channels = c1 a1.sources = s1 a1.sinks = k1 a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /opt/data a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
2. New window A start-up kafka
#zookeper Start... First zkServer.sh start #kafka start-up kafka-server-start.sh /opt/soft/kafka200/config/server.properties # New window b Turn on consumer monitor kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo
3. Rename the log file

.log
4. New window C start-up
cd /opt/soft/flume160/bin/ ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf
demo4( Secondary development )
1. New project

1.1 Import pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>tcinterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<name>tcinterceptor</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.kgc.towercrane.intceptors.SimplePartitioner</mainClass><!-- Change the location of your main class here -->
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2. To write SimplePartitioner class
3. Drag in jar package

4. Configure regular
cd /opt/flumecfg vim third.conf # add to a1.sources.s1.spoolDir = /opt/data a1.sources.s1.interceptors=i1 a1.sources.s1.interceptors.i1.type=regex_extractor a1.sources.s1.interceptors.i1.regex=(TC[0-9]+).* a1.sources.s1.interceptors.i1.serializers=ssl a1.sources.s1.interceptors.i1.serializers.ssl.name=key a1.channels.c1.type=memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mydemo a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.partitioner.class=com.kgc.towercrane.intceptor.SimplePartitioner a1.sinks.k1.channel = c1 a1.sources.s1.channels = c1 :wq
5. To configure zookeper del
vim /opt/soft/kafka200/config/server.properties # Must be configured to true To delete delete.topic.enable=true :wq

6. Delete
kafka-topics.sh --delete --zookeeper 192.168.64.138:2181 --topic mydemo # View delete results cd /opt/soft/kafka200/kafka-logs/ ls

7. establish mydemo
# establish 3 Zones kafka-topics.sh --create --zookeeper 192.168.64.138:2181 --topic mydemo --replication-factor 1 --partitions 3 # View partition results cd /opt/soft/kafka200/kafka-logs/ ls # First create a window A B C D #1 zookeper Start... First ( window A) zkServer.sh start #2 kafka start-up ( window A) kafka-server-start.sh /opt/soft/kafka200/config/server.properties #3 monitor kafka( window B) kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo #4 Entry directory ( window C) cd /opt/soft/flume160/bin #4.1 start-up flume( window C) ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf -Dflume.root.logger=INFO,console #5 monitor mydemo Easy to view the number of partitions ( window D) kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.64.138:9092 --topic mydemo # Group reset ./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
8 Problem testing


9.ideal To configure MyConsumer class
@Component
public class MyConsumer {
@KafkaListener(topics = "tc",groupId = "cm")
public void readKafka(ConsumerRecord<String,String> record, Acknowledgment ack){
System.out.println(record.key()+"====>"+record.value()+"====>"+record.partition());
ack.acknowledge();
}
}
#application start-up
10. Group reset
./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
demo5(springBoot Integrate kafka consumer See the next article for details )
1.ideal tcmanager To configure TcinfoPushService

1
# start-up nacos startup.sh

边栏推荐
猜你喜欢

Interviewer: are you sure redis is a single threaded process?

Synchronized details

Linux Installation mysql8.0.29 detailed tutorial

Leveraging the blue ocean of household appliances consumption with "digital channels", the dealer online system enables enterprises to further their business

Hcip 13th day notes

Codeforces Round #810 (Div. 2)A~C题解

Flume(5个demo轻松入门)

HyperMesh circular array - plug in

都在说DevOps,你真正了解它吗?

记录一次生产死锁
随机推荐
交叉熵原理及实现
Software testing interview question: what is the purpose of test planning? What are the contents of the test plan? Which are the most important?
Cloud native enthusiast weekly: the evolution of Prometheus architecture
Common video resolution
Starfish OS X metabell strategic cooperation, metauniverse business ecosystem further
Codeworks round 807 (Div. 2) a-c problem solution
[website construction] update SSL certificate with acme.sh: change zerossl to letsencrypt
Flex开发网页实例web端
Hcip 13th day notes
mongodb/mongoTemplate.upsert批量插入更新数据的实现
JS what situations can't use json Parse, json.stringify deep copy and a better deep copy method
一种比读写锁更快的锁,还不赶紧认识一下
What are the important applications of MES system in manufacturing enterprises
Behind every piece of information you collect, you can't live without TA
数据输出-图片注释、标注
Linux Installation mysql8.0.29 detailed tutorial
云原生爱好者周刊:Prometheus 架构演进之路
Fiddler mobile packet capturing agent settings (for Huawei glory 60s)
软考 --- 数据库(2)关系模型
Common problem types and methods of mathematical univariate differential proof problems in postgraduate entrance examination