当前位置:网站首页>Flume learning II - Cases
Flume learning II - Cases
2022-06-30 09:52:00 【Keep-upup】
Official entry cases :
install newcat:
sudo yum install -y ncCheck whether the port is occupied :
sudo netstat -tunlp | grep Port number stay flume-netcat-logger.conf Add... To the file :
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 First pair agent Naming component :
a1 It is current. agent Name ,sources、sinks、channels stay agent More than one source、sink、channel
r1、k1、c1 Corresponding to the following
Describe the configuration source----a1 The input source type of is newcat Port type 、a1 The listening host and port number of
sink---- Express a1 The output of is intended for the console logger type
channel The type is memory Memory type ( It can also be File), The total buffer capacity is 1000(1000 representative 1000 Events , Not bytes ), Collect to 100 Events and then commit the transaction
because source、sink、channel Probably more , So we need to make a binding at the end , Only by establishing a separate channel can the transmission be smooth
r1 write in c1,k1 from c1 Pull data
channel added s, So a source It can be cached to multiple channel, And the following channel No addition s, therefore sink Only from one channel, One channel Can be multiple sink Read 
start-up flume Listening port :
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
This is equivalent to a client connecting to the server

Here, because I didn't install the cluster , So I just ssh The virtual machine is remotely connected to listen

Real time monitoring of a single append file

Monitor file , Then name the component 、channel、sink And the binding remains the same , It just needs to change source Configuration of

Bold is a necessary configuration , What is not bold is optional
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f File path to monitor #f- If monitoring fails, it fails ,-F Will try again f Default read first 10 That's ok If tail The following parameter is f, Ten pieces of data will be printed before the monitoring file
Read local files in real time to HDFS
: Monitor a file
1、Flume To output data to HDFS, Must have Hadoop relevant jar package
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
2、 establish flume-file-hdfs.conf file , And add the following
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://spark-local:9000/flume/%Y%m%d/%H
# Upload file prefix
a2.sinks.k2.hdfs.filePrefix = logs-
# Whether to scroll the folder according to the time
a2.sinks.k2.hdfs.round = true
# How many time units to create a new folder
a2.sinks.k2.hdfs.roundValue = 1
# Redefine units of time
a2.sinks.k2.hdfs.roundUnit = hour
# Whether to use a local timestamp
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# How many Event only flush To HDFS once
a2.sinks.k2.hdfs.batchSize = 1000
# Set file type , compressibility
a2.sinks.k2.hdfs.fileType = DataStream
# How often to generate a new file
a2.sinks.k2.hdfs.rollInterval = 30
# Set the scroll size for each file
a2.sinks.k2.hdfs.rollSize = 134217700
# File scrolling with Event The number of independent
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
It's not saving 1000 Talented person flush To HDFS , But even if it doesn't arrive 1000 Events , But in an hour , Still will flush, If less than an hour , But I have accumulated 1000 Events will also flush
Then start flume:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.confstart-up hadoop
sbin/start-dfs.sh # start-up hadoop
sbin/start-yarn.sh After starting, you can jps Check to see if the startup is successful
visit spark-local:50070, see file
Read directory files in real time to HDFS
: Change according to the file in a directory , Monitor the added files

create profile flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# Ignore all .tmp Final document , Do not upload
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://spark-local:9000/flume/%Y%m%d/%H
hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
# Upload file prefix
a3.sinks.k3.hdfs.filePrefix = upload-
# Whether to scroll the folder according to the time
a3.sinks.k3.hdfs.round = true
# How many time units to create a new folder
a3.sinks.k3.hdfs.roundValue = 1
# Redefine units of time
a3.sinks.k3.hdfs.roundUnit = hour
# Whether to use a local timestamp
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# How many Event only flush To HDFS once
a3.sinks.k3.hdfs.batchSize = 100
# Set file type , compressibility
a3.sinks.k3.hdfs.fileType = DataStream
# How often to generate a new file
a3.sinks.k3.hdfs.rollInterval = 60
# Set the scroll size for each file to approximately 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# File scrolling with Event The number of independent
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
Open the folder monitoring command :
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.confUntil then spark-local:50070 View the monitored files above spark-local Is your host name , Or lose ip The address can also be
Real time monitoring of multiple additional files in the directory
Exec source For monitoring a real-time appended file , But there is no guarantee that data will not be lost ;Spooldir Source Can guarantee the data not to lose , And can achieve breakpoint continuous transmission , But the delay is high , It can't be monitored in real time ; and Taildir Source It can realize breakpoint continuous transmission , It can also ensure that the data will not be lost , And real time monitoring .
- First, in the /opt/environment/flume-1.9.0/files Create two files in the directory file1.txt、file2,txt

- stay flume-1.9.0/job Configuration files in directory :files-flume-logger.conf, Write the following :
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/environment/flume-1.9.0/files/.*txt a1.sources.r1.postitionFile = /opt/enviromment/flume-1.9.0/postition/postition.json # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1establish f1 Filegroups , monitor files All under the directory txt Final document
start-up flume:
bin/flume-ng agent -c conf/ -f job/files-flume-logger.conf -n a1 -Dflume.root.logger=INFO,consoleGo to file1.txt and file2.txt Middle append Content :
echo hello >> file1.txt echo 2022 upup_7 >> file1.txt echo 2022 upup_7 >> file2.txt
Finally, the monitoring results :

Since it is breakpoint monitoring , Then we put the flume Turn off the , Then add the contents to the file 
Start again after appending flume The result :
file2.txt file , We added it first nice Plus good, The monitoring is also in this order ! This is breakpoint monitoring , Even if flume Has not started , Hang up , It will also ensure that the data will not be lost ! And this is exec Impossible !
边栏推荐
- DDD interview
- Initialize static resource demo
- Redis docker master-slave mode and sentinel
- Tclistener server and tcpclient client
- Electron, which can wrap web page programs into desktop applications
- Distributed session
- Torch learning summary
- 小程序开发踩坑之旅
- JVM tuning tool introduction and constant pool explanation
- 八大排序(一)
猜你喜欢

NER – Named Entity Recognition Summary

How to build a private cloud and create a hybrid cloud ecosystem?

MySQL优化

Distributed ID

机器学习笔记 九:预测模型优化(防止欠拟合和过拟合问题发生)

Shenhe thermomagnetic: Super fusion dual active cluster solution for MES system

八大排序(二)

MySQL internal component structure

Techtarget: Interpretation of the basic concept of super fusion cloud

IDC released the report on China's software defined storage and hyper convergence market in the fourth quarter of 2020, and smartx hyper convergence software ranked first in the financial industry
随机推荐
Work notes: SendTo failed errno 22
Numpy (time date and time increment)
小程序开发踩坑之旅
11.自定义hooks
Tclistener server and tcpclient client
Datatabletomodellist entity class
Train an image classifier demo in pytorch [learning notes]
Function simplification principle: save if you can
2021-10-20
[new book recommendation] DeNO web development
基于Svelte3.x桌面端UI组件库Svelte UI
Solution to the eighth training competition of 2020 Provincial Games
Redis docker 主从模式与哨兵sentinel
Pytorch graduate warm LR installation
Upgrade log4j2 to 2.17.1 stepped pit
3. integrate eslint and prettier
MySQL internal component structure
prometheus 监控之 ntp_exporter
Follow the wechat oauth2.0 access scheme
JVM tuning tool introduction and constant pool explanation
