当前位置:网站首页>[flume] notes
[flume] notes
2022-06-11 23:54:00 【Twelve winged fallen angel】
One 、Flume summary
1.1 Flume Definition
Flume yes Cloudera Provides a highly available , Highly reliable , Distributed massive log collection 、 Converged and transported systems .Flume Flow based architecture , Flexible and simple .

1.2 Flume Infrastructure

Agent
Agent It's a JVM process , It transmits data from the source to the destination in the form of events .
Agent It mainly consists of three parts :Source、Channel、Sink.
Source
Source Is responsible for receiving data to Flume Agent The components of .Source Components can handle various types 、 Log data in various formats , Include avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy.
Sink
Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system . Or it could be sent to another Flume Agent.
Sink Component destinations include hdfs、logger、avro、thrift、ipc、file、hbase、solr、 Customize .
Channel
Channel It's located in Source and Sink Buffer between . therefore Channel allow Source and Sink It operates at different rates .Channel It's thread safe , You can do several at once Source Write operations and a few Sink Read operation of .
Flume Take two Channel:
Memory Channel
Memory Channel It's an in-memory queue .Memory Channel It is applicable in the case of no need for loss of relational data . If relationship data is lost , that Memory Channel Should not apply , Because the program died 、 And its downtime or restart will lead to data loss .
File Channel
File Channel Write all events to disk . So you don't lose data in the event of a program shutting down or a machine going down .
Event
Flume The basic unit of data transmission , With Event Send data from the source to the destination in the form of .Event from Header and Body Two parts ,Header Used to store the Event Some properties of , by K-V structure ,Body It is used to store the data , In the form of an array of bytes .
Two 、Flume introduction
2.1 Flume Installation and deployment
Download decompression
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt mv /opt/apache-flume-1.9.0-bin.tar.gz /opt/flumeConfigure environment variables
echo 'export FLUME_HOME=/opt/flume' >> /etc/bash.profile source /etc/profilecompatible Guava rely on
rm $(find $FLUME_HOME/lib -name 'guava-*') cp $(find $HADOOP_HOME/share/hadoop/common/lib -name 'guava-*' | head -n 1) $FLUME_HOME/libcompatible SLF4J rely on
rm $(find $FLUME_HOME/lib -name 'slf4j-*')Adjust memory
cd $FLUME_HOME cp conf/flume-env.sh.template conf/flume-env.sh vim conf/flume-env.shModify the corresponding annotation behavior :
export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"
2.2 Flume Introductory cases
2.2.1 Monitor port data official case
Case needs
Use Flume Listening on a port , Collect the port data , And print it to the console .
Implementation steps
Check 44444 Whether the port is occupied
netstat -nlp | grep 44444establish Flume The configuration file
jobs/flume-netcat-logger.confcd $FLUME_HOME mkdir -p jobs vim jobs/flume-netcat-logger.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # NetCat TCP Source a1.sources.r1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind=localhost a1.sources.r1.port = 44444 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = loggerStart the task
bin/flume-ng agent \ --conf conf/ \ --conf-file jobs/flume-netcat-logger.conf \ --name a1 \ -Dflume.root.logger=INFO,consoleTo this machine 44444 Port send data
nc localhost 44444
2.2.2 Real time monitoring of a single append file
Case needs
Real-time monitoring /opt/flume/input/a.txt file , And upload to HDFS.
Implementation steps
establish Flume The configuration file
jobs/flume-file-hdfs.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Exec Source a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/flume/input/a.txt a1.sources.r1.shell = /bin/bash -c # HDFS Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = trueStart the task
bin/flume-ng agent -c conf/ -f jobs/flume-file-hdfs.conf -n a1Additional documents
mkdir -p /opt/flume/input echo $RANDOM >> /opt/flume/input/a.txt
2.2.3 Real time monitoring of multiple new files in the directory
Case needs
Use Flume monitor /opt/flume/input Files in the entire directory , And upload to HDFS.
Implementation steps
establish Flume The configuration file
jobs/flume-dir-hdfs.confa1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Spooling Directory Source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/flume/input a1.sources.r1.fileHeader = true # HDFS Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = trueStart the task
bin/flume-ng agent -c conf/ -f jobs/flume-dir-hdfs.conf -n a1New file
touch /opt/flume/input/1.txt touch /opt/flume/input/2.txtView directory changes
watch ls /opt/flume/input
explain
spoolDir I won't support it ~ route . such as ~/input Will report a mistake :
java.lang.IllegalStateException: Directory does not exist: /opt/flume/~/input
In the use of Spooling Directory Source when :
- Stop monitoring directories for creating and continuously modifying files
- Upload completed files to
.COMPLETEDending - Monitored folder per 500 Scan a file change in milliseconds
2.2.4 Real time monitoring of multiple additional files in the directory
Introduce
Exec Source For monitoring a real-time appended file , Breakpoint continuation cannot be realized ;Spooling Directory Source For synchronizing new files , However, it is not suitable for monitoring and synchronizing files with real-time additional logs ; and Taildir Source Suitable for attaching multiple files in real time , And can achieve breakpoint continuation .
Case needs
Use Flume Listen for real-time appending files to the entire directory , And upload to HDFS.
Implementation steps
establish Flume The configuration file
jobs/flume-taildir-hdfs.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Taildir Source a1.sources.r1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/flume/input/.*txt.* a1.sources.r1.positionFile = /opt/flume/taildir_position.json a1.sources.r1.fileHeader = true # HDFS Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = trueStart the task
bin/flume-ng agent -c conf/ -f jobs/flume-taildir-hdfs.conf -n a1Additional documents
echo $RANDOM >> /opt/flume/input/a.txt echo $RANDOM >> /opt/flume/input/b.txt
explain
Taildir Source One was maintained JSON Format Position File, Will go to... Regularly Position File Update the latest location of each file read in , Therefore, it can achieve breakpoint continuation .Position File The format is as follows :
[
{
"inode": 1579598, "pos": 12, "file": "/opt/flume/input/a.txt" },
{
"inode": 1579593, "pos": 6, "file": "/opt/flume/input/b.txt" }
]
Linux The area in which file metadata is stored is called inode, Every inode All have a number , For the operating system inode The number identifies different files ,Unix/Linux No file name is used inside the system , While using inode Number to identify the file .
3、 ... and 、Flume Advanced
3.1 Flume Business

3.2 Flume Agent internals

ChannelSelector
ChannelSelector The purpose of this is to select Event Which one will be sent to Channel. There are two types of them , Namely Replicating( Copy ) and Multiplexing( Multiplexing ).
ReplicatingSelector Will be the same Event To all Channel,MultiplexingSelector According to the corresponding principles , Will be different Event To different Channel.
SinkProcessor
SinkProcessor There are three types , Namely :
- DefaultSinkProcessor: The corresponding is a single Sink.
- LoadBalancingSinkProcessor: The corresponding is Sink Group, It can realize the function of load balancing .
- FailoverSinkProcessor: The corresponding is Sink Group, It can realize the function of fault recovery .
3.3 Flume topology
Simple series connection

This pattern is to put more than one Flume The sequence is connected , From the initial Source From the beginning to the end Sink The purpose of the transfer storage system .
This mode is not recommended to bridge too many Flume Number ,Flume Too much will not only affect the transmission rate , And once a node is in transit Flume Downtime , It will affect the whole transmission system .
Replication and multiplexing

Flume Support the flow of events to one or more destinations . This mode can copy the same data to multiple Channel in , Or distribute different data to different Channel in ,Sink You can choose to send to different destinations .
Load balancing and fail over

Flume Support multiple Sink Logically, there is a Sink Group ,Sink Group with different SinkProcessor Load balancing and fault recovery can be realized .
polymerization

This pattern is our most common , It's also very practical , daily Web Applications are usually distributed on hundreds of servers , There are even thousands of them 、 Tens of thousands of servers . Generated log , It's also very troublesome to deal with .
use Flume This kind of combination can solve this problem very well , Each server deploys one Flume Acquisition system , Transfer to a centralized log collector Flume, And so on Flume Uploaded to the HDFS、Hive、Hbase etc. , Log analysis .
3.4 Flume The development case
3.4.1 Replication and multiplexing
Case needs
Use Flume1 Monitor file changes ,Flume1 Pass the changes to Flume2,Flume2 Responsible for storing HDFS. meanwhile Flume1 Pass the changes to Flume3,Flume3 Responsible for output to Local File System.
Implementation steps
Create... To receive files Flume The configuration file
jobs/flume1-file-flume.conf# Agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Memory Channel a1.channels.c1.type = memory # Memory Channel a1.channels.c2.type = memory # Exec Source a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/flume/input/a.txt a1.sources.r1.shell = /bin/bash -c # Avro Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 4141 # Avro Sink a1.sinks.k2.channel = c2 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 4142Create input to HDFS Of Flume The configuration file
jobs/flume2-flume-hdfs.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4141 # HDFS Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = trueCreate the output to the local directory Flume The configuration file
jobs/flume3-flume-dir.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4142 # File Roll Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /opt/flume/outputBe careful : The output directory must count the existing directories , If the directory does not exist , Directories are not automatically created .
mkdir -p /opt/flume/outputStart the tasks in sequence
bin/flume-ng agent -c conf/ -f jobs/flume3-flume-dir.conf -n a1 bin/flume-ng agent -c conf/ -f jobs/flume2-flume-hdfs.conf -n a1 bin/flume-ng agent -c conf/ -f jobs/flume1-file-flume.conf -n a1write file
echo $RANDOM >> /opt/flume/input/a.txtsee HDFS And local directory
watch ls /opt/flume/output
3.4.2 Responsible for balancing and failover
Case needs
Use Flume1 Monitor a port , Its Sink In group Sink Butt joint separately Flume2 and Flume3, use FailoverSinkProcessor Achieve failover .
Implementation steps
Create... For receiving port data Flume The configuration file
jobs/flume1-netcat-flume.conf# Agent a1.sources = r1 a1.sinkgroups = g1 a1.sinks = k1 k2 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # NetCat TCP Source a1.sources.r1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Sink Group a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Avro Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 4141 # Avro Sink a1.sinks.k2.channel = c1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 4142Create two... For output to the local console Flume The configuration file
jobs/flume2-flume-console.conf、jobs/flume3-flume-console.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4141 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4142 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = loggerSequential start task
bin/flume-ng agent -c conf/ -f jobs/flume2-flume-console.conf -n a1 -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -f jobs/flume3-flume-console.conf -n a1 -Dflume.root.logger=INFO,console bin/flume-ng agent -c conf/ -f jobs/flume1-netcat-flume.conf -n a1Send data to the local port
nc loclahost 44444see Flume2 And Flume3 Console log printing
stop it Flume2 see Flume3 Console log printing
3.4.3 polymerization
Case needs
- Flume1 Monitor file
/opt/flume/input/a.txt - Flume2 Monitor the port 44444
- Flume1 And Flume2 Send the data to Flume3,Flume3 Print the final data to the console
Implementation steps
Monitor the file Flume1 The configuration file
jobs/flume1-file-flume.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Exec Source a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/flume/input/a.txt a1.sources.r1.shell = /bin/bash -c # Avro Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinsk.k1.port = 4141Hadoop102 Monitoring port data on Flume2 The configuration file
jobs/flume2-netcat-flume.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # NetCat TCP Source a1.sources.r1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Avro Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinsk.k1.port = 4141Hadoop103 Aggregate output to the console Flume3 The configuration file
jobs/flume3-flume-logger.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = arvo a1.sources.r1.bind = localhost a1.sources.r1.port = 4141 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = loggerStart the task
# Flume3 bin/flume-ng agent -c conf/ -f jobs/flume3-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console # Flume2 bin/flume-ng agent -c conf/ -f jobs/flume2-netcat-flume.conf -n a1 # Flume1 bin/flume-ng agent -c conf/ -f jobs/flume1-file-flume.conf -n a1Append file and send data to port
# Flume1 echo $RANDOM >> /opt/flume/input/a.txt # Flume1 nc localhost 44444see Flume3 Console printing
3.5 Customize Interceptor
Case needs
Use Flume Collect server local logs , Different log types are required , Send different logs to different analysis systems .
Demand analysis
In actual development , There may be many types of logs generated by a server , Different types of logs may need to be sent to different analysis systems .
This will be used Flume In topology Multiplexing structure ,Multiplexing The principle is : according to Event in Header One of the Key Value , Will be different Event Send to different Channel. So we need to customize a Interceptor, For different types Event Of Header Of Key Give different values .
Implementation steps
establish Maven Project and introduce dependencies
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>Create custom Interceptor And implement
InterceptorInterfaceimport org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { private List<Event> eventList; @Override public void initialize() { eventList = new ArrayList<>(); } // Single event interception @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); String body = new String(event.getBody()); if (body.contains("info")) { headers.put("type", "info"); } else if (body.contains("error")) { headers.put("type", "error"); } return event; } // Batch event interception @Override public List<Event> intercept(List<Event> events) { eventList.clear(); for (Event event : events) { eventList.add(intercept(event)); } return eventList; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }Flume The configuration file
flume1-netcat-flume.conf# Agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Memory Channel a1.channels.c1.type = memory # Memory Channel a1.channels.c2.type = memory # NetCat TCP Source a1.sources.r1.channels = c1 c2 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = xyz.icefery.demo.interceptor.MyInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.info = c1 a1.sources.r1.selector.mapping.error = c2 # Avro Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 4141 # Avro Sink a1.sinks.k2.channel = c2 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 4142Flume The configuration file
flume2-flume-logger.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4141 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = loggerFlume The configuration file
flume3-flume-logger.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Avro Source a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 4142 # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = loggerStart the task Flume2、Flume3、Flume1
Send data to the port
echo 'info' | nc localhost 44444
3.6 Customize Source
Introduce
Source Is responsible for receiving data to Flume Agent The components of .Source Components can handle various types 、 Log data in various formats , Include Avro、Thrift、Exec、JMS、Spooling Directory、NetCat、Sequence Generator、Syslog、HTTP、Legacy.
Official Source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source.
demand
Use Flume receive data , And prefix each piece of data , Output to console . Prefix lesson from Flume Configuration in profile .
code
Customize Source, Inherit
AbstractSourceclass , RealizationConfigurableandPollableSourceInterfacepublic class MySource extends AbstractSource implements Configurable, PollableSource { private Long delay; private String field; @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field"); } @Override public Status process() throws EventDeliveryException { try { Map<String, String> headers = new HashMap<>(); Event event = new SimpleEvent(); for (int i = 0; i < 5; i++) { event.setHeaders(headers); event.setBody((field + i).getBytes()); getChannelProcessor().processEvent(event); TimeUnit.MILLISECONDS.sleep(delay); } return Status.READY; } catch (InterruptedException e) { e.printStackTrace(); return Status.BACKOFF; } } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }Pack and upload to Flume Of
libUnder the table of contents .The configuration file
flume-custom-logger.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # Custom Source a1.sources.r1.channels = c1 a1.sources.r1.type = xyz.icefery.demo.source.MySource a1.sources.r1.prefix = custom- # Logger Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger
3.7 Customize Sink
Introduce
Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system 、 Or it could be sent to another Flume Agent.
Sink It's completely transactional , In from Channel Batch delete data before , Every Sink use Channel Start a transaction . Once the batch event is successfully written out to the storage system or the next one Flume Agent,Sink using Channel Commit transaction . Once the transaction is committed , The Channel Removes events from its own internal buffer .
Sink Component destinations include HDFS、Avro、Thrift、IPC、File、Null、HBase、Solr、 Customize . Official Sink There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Sink.
demand
Use Flume receive data , And in Sink Add prefix and suffix to each data day , Output to console . The front and back can be in Flume Task configuration file .
code
Customize Sink, Inherit
AbstractSink, RealizationConfigurableInterfaceimport org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { private static final Logger log = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix"); } @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction tx = channel.getTransaction(); tx.begin(); // Read Channel In the event Event event; while (true) { event = channel.take(); if (event != null) { break; } } // Handling events Status status; try { log.info(prefix + new String(event.getBody()) + suffix); tx.commit(); status = Status.READY; } catch (Exception e) { tx.rollback(); status = Status.BACKOFF; } finally { tx.close(); } return status; } }Package and upload to Flume Of
libCatalog .The configuration file
flume-netcat-custom.conf# Agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Memory Channel a1.channels.c1.type = memory # NetCat TCP Source a1.sources.r1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Custom Sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = xyz.icefery.demo.sink.MySink a1.sinks.k1.prefix = custom- a1.sinks.k1.suffix = -custom
Four 、 Interview questions
4.1 How do you do that Flume Monitoring of data transmission ?
Using a third party framework Ganglia Real-time monitoring Flume.
4.2 Flume Of Source、Sink、Channel The role of ? You Source What type is it ?
effect
- Source Components are designed to collect data , Can handle all kinds of 、 Log data in various formats , Include Avro、Thrift、Exec、JMS、Spooling Directory、NetCat、Sequence Generator、Syslog、HTTP、Legacy.
- Sink A component is a component used to send data to a destination , Destinations include HDFS、Logger、Avro、Thrift、IPC、File、HBase、Solor、 Customize .
- Channel Component to cache the collected data , It can be stored in Memory Or in the File in .
Source type
- Monitor background logs :Exec
- Monitor the port of log generation in the background :NetCat
4.3 Flume Of Channel Selectors

4.4 Flume Parameter tuning
Source
increase Source The number can be increased Source The ability to read data ( Use Taildir Source It can be increased at the same time FileGroup Number ). for example : When a directory generates too many files, you need to split the file directory into multiple file directories , Configure many at the same time Source In order to make sure Source Have enough ability to get new data .
batchSize Parameter determination Source One batch transportation to Channel Of Event Number of pieces , Properly increasing this parameter can improve Source Carry Event To Channel Time performance .
Channel
type choice memory when Channel The best performance , But if Flume If the process is accidentally hung up, data may be lost .type choice file when Channel Better fault tolerance , But the performance will be better than Memory Channel Bad .
Use File Channel when dataDirs Configuring multiple directories on different disks can improve performance .
capacity Parameter determination Channel Can hold the largest Event Number of pieces .transactionCapacity Parameters determine every time Source Go to Channel It's the biggest one Event The number and each time Sink from Channel The biggest read inside Event Number of pieces .transactionCapacity Need greater than Source and Sink Of batchSize Parameters .
Sink
increase Sink The number of can be increased Sink consumption Event The ability of .Sink It's not the more the better , Enough is enough. , Too much Sink Will take up system resources , Cause unnecessary waste of system resources .
batchSize Parameter determination Sink Batch from Channel Read the Event Number of pieces , Properly increasing this parameter can improve Sink from Channel Move out Event Performance of .
4.5 Flume Transaction mechanism of
Flume Use two separate transactions that are responsible for the following Source To Channel, And from Channel To Sink Event delivery of .
such as Spooling Directory Source Create an event for each line of the file , Once all the events in the transaction are passed to Channel And submitted successfully , that Source Mark the file as complete .
Empathy , Transactions are handled in a similar way Channel To Sink Transfer process , If for some reason the event cannot be recorded , Then the transaction will roll back . And all the events will remain until Channel in , Waiting to be retransmitted .
4.6 Flume Will the collected data be lost ?
according to Flume Architecture principle of ,Flume It's impossible to lose data , It has a perfect internal transaction mechanism , Source To Channel It's transactional ,Channel To Sink It's transactional , Therefore, there will be no data loss in these two links , The only possible loss of data is Channel use Memory Channel,Agent Data loss due to downtime , perhaps Channel The storage is full , Lead to Source No more writing , Data not written is lost .
Flume No loss of data , But it may cause data duplication , For example, data has been generated by Sink issue , But no response was received ,Sink The data will be sent again , This may cause data duplication .
边栏推荐
- What is webstorage? And cookies
- (greedy + longest ascending subsequence) acwing 896 Longest ascending subsequence II
- Graph and graph traversal
- Share a treasure website necessary for new media operation for free
- Achievements in science and Technology (XV)
- mysql5和mysql8同时安装
- loading
- Mingdeyang ADC series development board-ad9653 daughter board multi-channel high resolution and high sampling rate
- Queue (C language)
- HMS core shows the latest open capabilities in mwc2022, helping developers build high-quality applications
猜你喜欢

Pleasant and burden free cross process communication

Lake Shore—SuperTran-VP 连续流低温恒温器系统

Ar helps brand stores achieve global data growth

2022 high place installation, maintenance and removal of simulated examination platform for operation certificate examination question bank
![[C language] data type storage, original code, inverse code and complement code](/img/ce/36363647d745c018d911588476e441.jpg)
[C language] data type storage, original code, inverse code and complement code

C language leetcode deleting duplicate items in an ordered array

2022 safety officer-a certificate test question simulation test platform operation
![Delete the receiving address [project mall]](/img/3d/60819a1a8c36fd0c1014f91fe80470.png)
Delete the receiving address [project mall]

VS code 编写汇编代码【微机原理】

显示商品详情【项目 商城】
随机推荐
2022 R1 quick opening pressure vessel operation test questions and online simulation test
Stack (C language)
Lake Shore—SuperVariTemp 低温恒温器
Antigen products enter the family, and Chinese medical device enterprises usher in a new blue ocean
Top selling commodities 【 project mall 】
Software installation and use, etc
DOM知识点总结
ETF operation record: March 1, 2022
mysql5和mysql8同时安装
二叉排序树
Graph and graph traversal
MySQL some simple commands
Shell (32): configure SSH privacy free
C collection of questions for project review
I2C read / write process
RF中使用reuqests的两种方式
Two way leading circular linked list (C language)
2022 low voltage electrician certificate and online simulation examination
Flex flexible layout tutorial and understanding of the main axis cross axis: Grammar
HMS core shows the latest open capabilities in mwc2022, helping developers build high-quality applications