当前位置:网站首页>[flume] notes

[flume] notes

2022-06-11 23:54:00 Twelve winged fallen angel

One 、Flume summary

1.1 Flume Definition

Flume yes Cloudera Provides a highly available , Highly reliable , Distributed massive log collection 、 Converged and transported systems .Flume Flow based architecture , Flexible and simple .

 Insert picture description here


1.2 Flume Infrastructure

Agent

Agent It's a JVM process , It transmits data from the source to the destination in the form of events .

Agent It mainly consists of three parts :Source、Channel、Sink.

Source

Source Is responsible for receiving data to Flume Agent The components of .Source Components can handle various types 、 Log data in various formats , Include avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy.

Sink

Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system . Or it could be sent to another Flume Agent.

Sink Component destinations include hdfs、logger、avro、thrift、ipc、file、hbase、solr、 Customize .

Channel

Channel It's located in Source and Sink Buffer between . therefore Channel allow Source and Sink It operates at different rates .Channel It's thread safe , You can do several at once Source Write operations and a few Sink Read operation of .

Flume Take two Channel:

  • Memory Channel

    Memory Channel It's an in-memory queue .Memory Channel It is applicable in the case of no need for loss of relational data . If relationship data is lost , that Memory Channel Should not apply , Because the program died 、 And its downtime or restart will lead to data loss .

  • File Channel

    File Channel Write all events to disk . So you don't lose data in the event of a program shutting down or a machine going down .

Event

Flume The basic unit of data transmission , With Event Send data from the source to the destination in the form of .Event from Header and Body Two parts ,Header Used to store the Event Some properties of , by K-V structure ,Body It is used to store the data , In the form of an array of bytes .



Two 、Flume introduction

2.1 Flume Installation and deployment

  • Download decompression

    tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt
    mv /opt/apache-flume-1.9.0-bin.tar.gz /opt/flume
    
  • Configure environment variables

    echo 'export FLUME_HOME=/opt/flume' >> /etc/bash.profile
    source /etc/profile
    
  • compatible Guava rely on

    rm $(find $FLUME_HOME/lib -name 'guava-*')
    cp $(find $HADOOP_HOME/share/hadoop/common/lib -name 'guava-*' | head -n 1) $FLUME_HOME/lib
    
  • compatible SLF4J rely on

    rm $(find $FLUME_HOME/lib -name 'slf4j-*')
    
  • Adjust memory

    cd $FLUME_HOME
    
    cp conf/flume-env.sh.template conf/flume-env.sh
    
    vim conf/flume-env.sh
    

    Modify the corresponding annotation behavior :

    export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"
    

2.2 Flume Introductory cases

2.2.1 Monitor port data official case

Case needs

Use Flume Listening on a port , Collect the port data , And print it to the console .

Implementation steps
  • Check 44444 Whether the port is occupied

    netstat -nlp | grep 44444
    
  • establish Flume The configuration file jobs/flume-netcat-logger.conf

    cd $FLUME_HOME
    
    mkdir -p jobs
    
    vim jobs/flume-netcat-logger.conf
    
    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # NetCat TCP Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port = 44444
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
  • Start the task

    bin/flume-ng agent                          \
      --conf conf/                              \
      --conf-file jobs/flume-netcat-logger.conf \
      --name a1                                 \
      -Dflume.root.logger=INFO,console
    
  • To this machine 44444 Port send data

    nc localhost 44444
    

2.2.2 Real time monitoring of a single append file

Case needs

Real-time monitoring /opt/flume/input/a.txt file , And upload to HDFS.

Implementation steps
  • establish Flume The configuration file jobs/flume-file-hdfs.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Exec Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/flume/input/a.txt
    a1.sources.r1.shell = /bin/bash -c
    
    # HDFS Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
  • Start the task

    bin/flume-ng agent -c conf/ -f jobs/flume-file-hdfs.conf -n a1
    
  • Additional documents

    mkdir -p /opt/flume/input
    echo $RANDOM >> /opt/flume/input/a.txt
    

2.2.3 Real time monitoring of multiple new files in the directory

Case needs

Use Flume monitor /opt/flume/input Files in the entire directory , And upload to HDFS.

Implementation steps
  • establish Flume The configuration file jobs/flume-dir-hdfs.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Spooling Directory Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /opt/flume/input
    a1.sources.r1.fileHeader = true
    
    # HDFS Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
  • Start the task

    bin/flume-ng agent -c conf/ -f jobs/flume-dir-hdfs.conf -n a1
    
  • New file

    touch /opt/flume/input/1.txt
    touch /opt/flume/input/2.txt
    
  • View directory changes

    watch ls /opt/flume/input
    
explain

spoolDir I won't support it ~ route . such as ~/input Will report a mistake :

java.lang.IllegalStateException: Directory does not exist: /opt/flume/~/input

In the use of Spooling Directory Source when :

  1. Stop monitoring directories for creating and continuously modifying files
  2. Upload completed files to .COMPLETED ending
  3. Monitored folder per 500 Scan a file change in milliseconds

2.2.4 Real time monitoring of multiple additional files in the directory

Introduce

Exec Source For monitoring a real-time appended file , Breakpoint continuation cannot be realized ;Spooling Directory Source For synchronizing new files , However, it is not suitable for monitoring and synchronizing files with real-time additional logs ; and Taildir Source Suitable for attaching multiple files in real time , And can achieve breakpoint continuation .

Case needs

Use Flume Listen for real-time appending files to the entire directory , And upload to HDFS.

Implementation steps
  • establish Flume The configuration file jobs/flume-taildir-hdfs.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Taildir Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/flume/input/.*txt.*
    a1.sources.r1.positionFile = /opt/flume/taildir_position.json
    a1.sources.r1.fileHeader = true
    
    # HDFS Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
  • Start the task

    bin/flume-ng agent -c conf/ -f jobs/flume-taildir-hdfs.conf -n a1
    
  • Additional documents

    echo $RANDOM >> /opt/flume/input/a.txt
    echo $RANDOM >> /opt/flume/input/b.txt
    
explain

Taildir Source One was maintained JSON Format Position File, Will go to... Regularly Position File Update the latest location of each file read in , Therefore, it can achieve breakpoint continuation .Position File The format is as follows :

[
  {
     "inode": 1579598, "pos": 12, "file": "/opt/flume/input/a.txt" },
  {
     "inode": 1579593, "pos": 6, "file": "/opt/flume/input/b.txt" }
]

Linux The area in which file metadata is stored is called inode, Every inode All have a number , For the operating system inode The number identifies different files ,Unix/Linux No file name is used inside the system , While using inode Number to identify the file .



3、 ... and 、Flume Advanced

3.1 Flume Business

 Insert picture description here


3.2 Flume Agent internals

 Insert picture description here

ChannelSelector

ChannelSelector The purpose of this is to select Event Which one will be sent to Channel. There are two types of them , Namely Replicating( Copy ) and Multiplexing( Multiplexing ).

ReplicatingSelector Will be the same Event To all Channel,MultiplexingSelector According to the corresponding principles , Will be different Event To different Channel.

SinkProcessor

SinkProcessor There are three types , Namely :

  • DefaultSinkProcessor: The corresponding is a single Sink.
  • LoadBalancingSinkProcessor: The corresponding is Sink Group, It can realize the function of load balancing .
  • FailoverSinkProcessor: The corresponding is Sink Group, It can realize the function of fault recovery .

3.3 Flume topology

Simple series connection

This pattern is to put more than one Flume The sequence is connected , From the initial Source From the beginning to the end Sink The purpose of the transfer storage system .

This mode is not recommended to bridge too many Flume Number ,Flume Too much will not only affect the transmission rate , And once a node is in transit Flume Downtime , It will affect the whole transmission system .

Replication and multiplexing

Flume Support the flow of events to one or more destinations . This mode can copy the same data to multiple Channel in , Or distribute different data to different Channel in ,Sink You can choose to send to different destinations .

Load balancing and fail over

 Insert picture description here

Flume Support multiple Sink Logically, there is a Sink Group ,Sink Group with different SinkProcessor Load balancing and fault recovery can be realized .

polymerization

This pattern is our most common , It's also very practical , daily Web Applications are usually distributed on hundreds of servers , There are even thousands of them 、 Tens of thousands of servers . Generated log , It's also very troublesome to deal with .

use Flume This kind of combination can solve this problem very well , Each server deploys one Flume Acquisition system , Transfer to a centralized log collector Flume, And so on Flume Uploaded to the HDFS、Hive、Hbase etc. , Log analysis .


3.4 Flume The development case

3.4.1 Replication and multiplexing

Case needs

Use Flume1 Monitor file changes ,Flume1 Pass the changes to Flume2,Flume2 Responsible for storing HDFS. meanwhile Flume1 Pass the changes to Flume3,Flume3 Responsible for output to Local File System.

Implementation steps
  • Create... To receive files Flume The configuration file jobs/flume1-file-flume.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Memory Channel
    a1.channels.c2.type = memory
    
    # Exec Source
    a1.sources.r1.selector.type = replicating
    a1.sources.r1.channels = c1 c2
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/flume/input/a.txt
    a1.sources.r1.shell = /bin/bash -c
    
    # Avro Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinks.k1.port = 4141
    
    # Avro Sink
    a1.sinks.k2.channel = c2
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = localhost
    a1.sinks.k2.port = 4142
    
  • Create input to HDFS Of Flume The configuration file jobs/flume2-flume-hdfs.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4141
    
    # HDFS Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://node6:9000/flume/%Y-%m-%d/%H
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 1
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
  • Create the output to the local directory Flume The configuration file jobs/flume3-flume-dir.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4142
    
    # File Roll Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /opt/flume/output
    

    Be careful : The output directory must count the existing directories , If the directory does not exist , Directories are not automatically created .

    mkdir -p /opt/flume/output
    
  • Start the tasks in sequence

    bin/flume-ng agent -c conf/ -f jobs/flume3-flume-dir.conf -n a1
    
    bin/flume-ng agent -c conf/ -f jobs/flume2-flume-hdfs.conf -n a1
    
    bin/flume-ng agent -c conf/ -f jobs/flume1-file-flume.conf -n a1
    
  • write file

    echo $RANDOM >> /opt/flume/input/a.txt
    
  • see HDFS And local directory

    watch ls /opt/flume/output
    

3.4.2 Responsible for balancing and failover

Case needs

Use Flume1 Monitor a port , Its Sink In group Sink Butt joint separately Flume2 and Flume3, use FailoverSinkProcessor Achieve failover .

Implementation steps
  • Create... For receiving port data Flume The configuration file jobs/flume1-netcat-flume.conf

    # Agent
    a1.sources = r1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # NetCat TCP Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Sink Group
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    
    # Avro Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinks.k1.port = 4141
    
    # Avro Sink
    a1.sinks.k2.channel = c1
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = localhost
    a1.sinks.k2.port = 4142
    
  • Create two... For output to the local console Flume The configuration file jobs/flume2-flume-console.confjobs/flume3-flume-console.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4141
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4142
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
  • Sequential start task

    bin/flume-ng agent -c conf/ -f jobs/flume2-flume-console.conf -n a1 -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -f jobs/flume3-flume-console.conf -n a1 -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent -c conf/ -f jobs/flume1-netcat-flume.conf -n a1
    
  • Send data to the local port

    nc loclahost 44444
    
  • see Flume2 And Flume3 Console log printing

  • stop it Flume2 see Flume3 Console log printing

3.4.3 polymerization

Case needs
  • Flume1 Monitor file /opt/flume/input/a.txt
  • Flume2 Monitor the port 44444
  • Flume1 And Flume2 Send the data to Flume3,Flume3 Print the final data to the console
Implementation steps
  • Monitor the file Flume1 The configuration file jobs/flume1-file-flume.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Exec Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/flume/input/a.txt
    a1.sources.r1.shell = /bin/bash -c
    
    # Avro Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinsk.k1.port = 4141
    
  • Hadoop102 Monitoring port data on Flume2 The configuration file jobs/flume2-netcat-flume.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # NetCat TCP Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Avro Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinsk.k1.port = 4141
    
  • Hadoop103 Aggregate output to the console Flume3 The configuration file jobs/flume3-flume-logger.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = arvo
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4141
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
  • Start the task

    # Flume3
    bin/flume-ng agent -c conf/ -f jobs/flume3-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
    
    # Flume2
    bin/flume-ng agent -c conf/ -f jobs/flume2-netcat-flume.conf -n a1
    
    # Flume1
    bin/flume-ng agent -c conf/ -f jobs/flume1-file-flume.conf -n a1
    
  • Append file and send data to port

    # Flume1
    echo $RANDOM >> /opt/flume/input/a.txt
    
    # Flume1
    nc localhost 44444
    
  • see Flume3 Console printing


3.5 Customize Interceptor

Case needs

Use Flume Collect server local logs , Different log types are required , Send different logs to different analysis systems .

Demand analysis

In actual development , There may be many types of logs generated by a server , Different types of logs may need to be sent to different analysis systems .

This will be used Flume In topology Multiplexing structure ,Multiplexing The principle is : according to Event in Header One of the Key Value , Will be different Event Send to different Channel. So we need to customize a Interceptor, For different types Event Of Header Of Key Give different values .

Implementation steps
  • establish Maven Project and introduce dependencies

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.9.0</version>
    </dependency>
    
  • Create custom Interceptor And implement Interceptor Interface

    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class MyInterceptor implements Interceptor {
          
        private List<Event> eventList;
    
        @Override
        public void initialize() {
          
            eventList = new ArrayList<>();
        }
    
        //  Single event interception 
        @Override
        public Event intercept(Event event) {
          
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody());
            if (body.contains("info")) {
          
                headers.put("type", "info");
            } else if (body.contains("error")) {
          
                headers.put("type", "error");
            }
            return event;
        }
    
        //  Batch event interception 
        @Override
        public List<Event> intercept(List<Event> events) {
          
            eventList.clear();
            for (Event event : events) {
          
                eventList.add(intercept(event));
            }
            return eventList;
        }
    
        @Override
        public void close() {
           }
    
        public static class Builder implements Interceptor.Builder {
          
            @Override
            public Interceptor build() {
          
                return new MyInterceptor();
            }
    
            @Override
            public void configure(Context context) {
           }
        }
    }
    
  • Flume The configuration file flume1-netcat-flume.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Memory Channel
    a1.channels.c2.type = memory
    
    # NetCat TCP Source
    a1.sources.r1.channels = c1 c2
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = xyz.icefery.demo.interceptor.MyInterceptor$Builder
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.info = c1
    a1.sources.r1.selector.mapping.error = c2
    
    # Avro Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = localhost
    a1.sinks.k1.port = 4141
    
    # Avro Sink
    a1.sinks.k2.channel = c2
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = localhost
    a1.sinks.k2.port = 4142
    
  • Flume The configuration file flume2-flume-logger.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4141
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
  • Flume The configuration file flume3-flume-logger.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Avro Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4142
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    
  • Start the task Flume2、Flume3、Flume1

  • Send data to the port

    echo 'info' | nc localhost 44444
    

3.6 Customize Source

Introduce

Source Is responsible for receiving data to Flume Agent The components of .Source Components can handle various types 、 Log data in various formats , Include Avro、Thrift、Exec、JMS、Spooling Directory、NetCat、Sequence Generator、Syslog、HTTP、Legacy.

Official Source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source.

demand

Use Flume receive data , And prefix each piece of data , Output to console . Prefix lesson from Flume Configuration in profile .

code
  • Customize Source, Inherit AbstractSource class , Realization Configurable and PollableSource Interface

    public class MySource extends AbstractSource implements Configurable, PollableSource {
          
        private Long delay;
        private String field;
    
        @Override
        public void configure(Context context) {
          
            delay = context.getLong("delay");
            field = context.getString("field");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
          
            try {
          
                Map<String, String> headers = new HashMap<>();
                Event event = new SimpleEvent();
                for (int i = 0; i < 5; i++) {
          
                    event.setHeaders(headers);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    TimeUnit.MILLISECONDS.sleep(delay);
                }
                return Status.READY;
            } catch (InterruptedException e) {
          
                e.printStackTrace();
                return Status.BACKOFF;
            }
        }
    
        @Override
        public long getBackOffSleepIncrement() {
          
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
          
            return 0;
        }
    }
    

    Pack and upload to Flume Of lib Under the table of contents .

  • The configuration file flume-custom-logger.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # Custom Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = xyz.icefery.demo.source.MySource
    a1.sources.r1.prefix = custom-
    
    # Logger Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    

3.7 Customize Sink

Introduce

Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system 、 Or it could be sent to another Flume Agent.

Sink It's completely transactional , In from Channel Batch delete data before , Every Sink use Channel Start a transaction . Once the batch event is successfully written out to the storage system or the next one Flume Agent,Sink using Channel Commit transaction . Once the transaction is committed , The Channel Removes events from its own internal buffer .

Sink Component destinations include HDFS、Avro、Thrift、IPC、File、Null、HBase、Solr、 Customize . Official Sink There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Sink.

demand

Use Flume receive data , And in Sink Add prefix and suffix to each data day , Output to console . The front and back can be in Flume Task configuration file .

code
  • Customize Sink, Inherit AbstractSink, Realization Configurable Interface

    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySink extends AbstractSink implements Configurable {
          
        private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
    
        private String prefix;
        private String suffix;
    
        @Override
        public void configure(Context context) {
          
            prefix = context.getString("prefix");
            suffix = context.getString("suffix");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
          
            Channel channel = getChannel();
            Transaction tx = channel.getTransaction();
            tx.begin();
            //  Read  Channel  In the event 
            Event event;
            while (true) {
          
                event = channel.take();
                if (event != null) {
          
                    break;
                }
            }
            //  Handling events 
            Status status;
            try {
          
                log.info(prefix + new String(event.getBody()) + suffix);
                tx.commit();
                status = Status.READY;
            } catch (Exception e) {
          
                tx.rollback();
                status = Status.BACKOFF;
            } finally {
          
                tx.close();
            }
            return status;
        }
    }
    

    Package and upload to Flume Of lib Catalog .

  • The configuration file flume-netcat-custom.conf

    # Agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Memory Channel
    a1.channels.c1.type = memory
    
    # NetCat TCP Source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Custom Sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = xyz.icefery.demo.sink.MySink
    a1.sinks.k1.prefix = custom-
    a1.sinks.k1.suffix = -custom
    


Four 、 Interview questions

4.1 How do you do that Flume Monitoring of data transmission ?

Using a third party framework Ganglia Real-time monitoring Flume.


4.2 Flume Of Source、Sink、Channel The role of ? You Source What type is it ?

effect

  • Source Components are designed to collect data , Can handle all kinds of 、 Log data in various formats , Include Avro、Thrift、Exec、JMS、Spooling Directory、NetCat、Sequence Generator、Syslog、HTTP、Legacy.
  • Sink A component is a component used to send data to a destination , Destinations include HDFS、Logger、Avro、Thrift、IPC、File、HBase、Solor、 Customize .
  • Channel Component to cache the collected data , It can be stored in Memory Or in the File in .

Source type

  • Monitor background logs :Exec
  • Monitor the port of log generation in the background :NetCat

4.3 Flume Of Channel Selectors

 Insert picture description here

4.4 Flume Parameter tuning

Source

increase Source The number can be increased Source The ability to read data ( Use Taildir Source It can be increased at the same time FileGroup Number ). for example : When a directory generates too many files, you need to split the file directory into multiple file directories , Configure many at the same time Source In order to make sure Source Have enough ability to get new data .

batchSize Parameter determination Source One batch transportation to Channel Of Event Number of pieces , Properly increasing this parameter can improve Source Carry Event To Channel Time performance .

Channel

type choice memory when Channel The best performance , But if Flume If the process is accidentally hung up, data may be lost .type choice file when Channel Better fault tolerance , But the performance will be better than Memory Channel Bad .

Use File Channel when dataDirs Configuring multiple directories on different disks can improve performance .

capacity Parameter determination Channel Can hold the largest Event Number of pieces .transactionCapacity Parameters determine every time Source Go to Channel It's the biggest one Event The number and each time Sink from Channel The biggest read inside Event Number of pieces .transactionCapacity Need greater than Source and Sink Of batchSize Parameters .

Sink

increase Sink The number of can be increased Sink consumption Event The ability of .Sink It's not the more the better , Enough is enough. , Too much Sink Will take up system resources , Cause unnecessary waste of system resources .

batchSize Parameter determination Sink Batch from Channel Read the Event Number of pieces , Properly increasing this parameter can improve Sink from Channel Move out Event Performance of .


4.5 Flume Transaction mechanism of

Flume Use two separate transactions that are responsible for the following Source To Channel, And from Channel To Sink Event delivery of .

such as Spooling Directory Source Create an event for each line of the file , Once all the events in the transaction are passed to Channel And submitted successfully , that Source Mark the file as complete .

Empathy , Transactions are handled in a similar way Channel To Sink Transfer process , If for some reason the event cannot be recorded , Then the transaction will roll back . And all the events will remain until Channel in , Waiting to be retransmitted .


4.6 Flume Will the collected data be lost ?

according to Flume Architecture principle of ,Flume It's impossible to lose data , It has a perfect internal transaction mechanism , Source To Channel It's transactional ,Channel To Sink It's transactional , Therefore, there will be no data loss in these two links , The only possible loss of data is Channel use Memory Channel,Agent Data loss due to downtime , perhaps Channel The storage is full , Lead to Source No more writing , Data not written is lost .

Flume No loss of data , But it may cause data duplication , For example, data has been generated by Sink issue , But no response was received ,Sink The data will be sent again , This may cause data duplication .

原网站

版权声明
本文为[Twelve winged fallen angel]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203011556211110.html