当前位置:网站首页>Flume learning II - Cases
Flume learning II - Cases
2022-06-30 09:52:00 【Keep-upup】
Official entry cases :
install newcat:
sudo yum install -y ncCheck whether the port is occupied :
sudo netstat -tunlp | grep Port number stay flume-netcat-logger.conf Add... To the file :
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 First pair agent Naming component :
a1 It is current. agent Name ,sources、sinks、channels stay agent More than one source、sink、channel
r1、k1、c1 Corresponding to the following
Describe the configuration source----a1 The input source type of is newcat Port type 、a1 The listening host and port number of
sink---- Express a1 The output of is intended for the console logger type
channel The type is memory Memory type ( It can also be File), The total buffer capacity is 1000(1000 representative 1000 Events , Not bytes ), Collect to 100 Events and then commit the transaction
because source、sink、channel Probably more , So we need to make a binding at the end , Only by establishing a separate channel can the transmission be smooth
r1 write in c1,k1 from c1 Pull data
channel added s, So a source It can be cached to multiple channel, And the following channel No addition s, therefore sink Only from one channel, One channel Can be multiple sink Read 
start-up flume Listening port :
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
This is equivalent to a client connecting to the server

Here, because I didn't install the cluster , So I just ssh The virtual machine is remotely connected to listen

Real time monitoring of a single append file

Monitor file , Then name the component 、channel、sink And the binding remains the same , It just needs to change source Configuration of

Bold is a necessary configuration , What is not bold is optional
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f File path to monitor #f- If monitoring fails, it fails ,-F Will try again f Default read first 10 That's ok If tail The following parameter is f, Ten pieces of data will be printed before the monitoring file
Read local files in real time to HDFS
: Monitor a file
1、Flume To output data to HDFS, Must have Hadoop relevant jar package
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
2、 establish flume-file-hdfs.conf file , And add the following
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://spark-local:9000/flume/%Y%m%d/%H
# Upload file prefix
a2.sinks.k2.hdfs.filePrefix = logs-
# Whether to scroll the folder according to the time
a2.sinks.k2.hdfs.round = true
# How many time units to create a new folder
a2.sinks.k2.hdfs.roundValue = 1
# Redefine units of time
a2.sinks.k2.hdfs.roundUnit = hour
# Whether to use a local timestamp
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# How many Event only flush To HDFS once
a2.sinks.k2.hdfs.batchSize = 1000
# Set file type , compressibility
a2.sinks.k2.hdfs.fileType = DataStream
# How often to generate a new file
a2.sinks.k2.hdfs.rollInterval = 30
# Set the scroll size for each file
a2.sinks.k2.hdfs.rollSize = 134217700
# File scrolling with Event The number of independent
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
It's not saving 1000 Talented person flush To HDFS , But even if it doesn't arrive 1000 Events , But in an hour , Still will flush, If less than an hour , But I have accumulated 1000 Events will also flush
Then start flume:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.confstart-up hadoop
sbin/start-dfs.sh # start-up hadoop
sbin/start-yarn.sh After starting, you can jps Check to see if the startup is successful
visit spark-local:50070, see file
Read directory files in real time to HDFS
: Change according to the file in a directory , Monitor the added files

create profile flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# Ignore all .tmp Final document , Do not upload
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://spark-local:9000/flume/%Y%m%d/%H
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
# Upload file prefix
a3.sinks.k3.hdfs.filePrefix = upload-
# Whether to scroll the folder according to the time
a3.sinks.k3.hdfs.round = true
# How many time units to create a new folder
a3.sinks.k3.hdfs.roundValue = 1
# Redefine units of time
a3.sinks.k3.hdfs.roundUnit = hour
# Whether to use a local timestamp
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# How many Event only flush To HDFS once
a3.sinks.k3.hdfs.batchSize = 100
# Set file type , compressibility
a3.sinks.k3.hdfs.fileType = DataStream
# How often to generate a new file
a3.sinks.k3.hdfs.rollInterval = 60
# Set the scroll size for each file to approximately 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# File scrolling with Event The number of independent
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
Open the folder monitoring command :
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.confUntil then spark-local:50070 View the monitored files above spark-local Is your host name , Or lose ip The address can also be
Real time monitoring of multiple additional files in the directory
Exec source For monitoring a real-time appended file , But there is no guarantee that data will not be lost ;Spooldir Source Can guarantee the data not to lose , And can achieve breakpoint continuous transmission , But the delay is high , It can't be monitored in real time ; and Taildir Source It can realize breakpoint continuous transmission , It can also ensure that the data will not be lost , And real time monitoring .
- First, in the /opt/environment/flume-1.9.0/files Create two files in the directory file1.txt、file2,txt

- stay flume-1.9.0/job Configuration files in directory :files-flume-logger.conf, Write the following :
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/environment/flume-1.9.0/files/.*txt a1.sources.r1.postitionFile = /opt/enviromment/flume-1.9.0/postition/postition.json # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1establish f1 Filegroups , monitor files All under the directory txt Final document
start-up flume:
bin/flume-ng agent -c conf/ -f job/files-flume-logger.conf -n a1 -Dflume.root.logger=INFO,consoleGo to file1.txt and file2.txt Middle append Content :
echo hello >> file1.txt echo 2022 upup_7 >> file1.txt echo 2022 upup_7 >> file2.txt
Finally, the monitoring results :

Since it is breakpoint monitoring , Then we put the flume Turn off the , Then add the contents to the file 
Start again after appending flume The result :
file2.txt file , We added it first nice Plus good, The monitoring is also in this order ! This is breakpoint monitoring , Even if flume Has not started , Hang up , It will also ensure that the data will not be lost ! And this is exec Impossible !
边栏推荐
- Using OpenCV Net for image restoration
- Redis + MySQL implements the like function
- Principle and implementation of small program hand-held bullet screen (uni APP)
- Acquisition de 100% des actions de Guilin latex par Guilin Robust Medical pour combler le vide de la gamme de produits Latex
- 布隆过滤器
- Machine learning note 9: prediction model optimization (to prevent under fitting and over fitting problems)
- Clickhouse installation (quick start)
- Small program development journey
- Properties of string
- 1, 基本配置
猜你喜欢

Machine learning note 9: prediction model optimization (to prevent under fitting and over fitting problems)

Cloud native database

Bloom filter

Terminal -- Zsh of terminal three swordsmen

Distributed things

Guilin robust medical acquired 100% equity of Guilin Latex to fill the blank of latex product line

Abstract classes and interfaces

桂林 穩健醫療收購桂林乳膠100%股權 填補乳膠產品線空白

NTP of Prometheus monitoring_ exporter

抽象类和接口
随机推荐
Machine learning note 9: prediction model optimization (to prevent under fitting and over fitting problems)
Distributed ID
qmlplugindump executable not found. It is required to generate the qmltypes file for VTK Qml
Self service terminal handwritten Chinese character recognition input method library tjfink introduction
How to reduce the delay in live broadcast in the development of live broadcast source code with goods?
thrift简单使用
Solution to the eighth training competition of 2020 Provincial Games
Critical applications and hyper converged infrastructure: the time has come
Cloud native database
JVM family
Shenhe thermomagnetic: Super fusion dual active cluster solution for MES system
oracle跨数据库复制数据表-dblink
Niuke IOI weekly competition 20 popularization group (problem solving)
Principle and implementation of small program hand-held bullet screen (uni APP)
Flutter的特别之处在哪里
Galaxy Kirin server-v10 configuration image source
Acquisition de 100% des actions de Guilin latex par Guilin Robust Medical pour combler le vide de la gamme de produits Latex
Microsoft. Bcl. Async usage summary -- in Net framework 4.5 project Net framework version 4.5 and above can use async/await asynchronous feature in C 5
JVM tuning tool introduction and constant pool explanation
Mysq database remote connection error, remote connection is not allowed
