当前位置:网站首页>Flume learning 4
Flume learning 4
2022-06-30 09:52:00 【Keep-upup】
Customize Interceptor
Case needs : Use Flume Collect server local logs , Different log types are required , Send different kinds of logs to different analysis systems . In actual development , There may be many types of logs generated by a server , Different types of logs may need to be sent to different analysis systems . This will be used Flume In topology Multiplexing structure ,Multiplexing The principle is , according to event in Header One of the key Value , Will be different event Send to different Channel in , So we need to customize a Interceptor, For different types event Of Header Medium key Give different values .
In this case , We simulate logs with port data , With data containing “up” And without “up” Simulate different types of logs , We need to customize it interceptor Distinguish between numbers and letters , Send them to different analysis systems (Channel).

adopt netcat Port listens for incoming data flume1, adopt channel Selectors sort the data , contain “up” The introduction of flume2, Not included “up” The introduction of flume3.flume2 and flume3 Terminal sink Just print to the console OK 了 .
stay idea It's code , First, add flume Dependence :
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>Code section :
Define a class implementation Intercept Interface to rewrite the methods inside :
package test.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor {
// A collection of events
private List<Event> addHeaderEvents;
public void initialize() {
// initialization
addHeaderEvents = new ArrayList<Event>();
}
// Single event interception
public Event intercept(Event event) {
//1. Get the header information in the event header
Map<String, String> headers = event.getHeaders();
//2. Gets the in the event body
String body = new String(event.getBody());
//3. according to body Whether contains 'hello' To decide what header information to add
if(body.contains("up")){
headers.put("type", "up");
}else {
headers.put("type", "notup");
}
return event;
}
// Batch event interception
public List<Event> intercept(List<Event> events) {
//1. Empty the set
addHeaderEvents.clear();
//2. Traverse envents
for (Event event :events){
//3. Add header information to each event
addHeaderEvents.add(intercept(event));
}
return addHeaderEvents;
}
public void close() {
}
public static class Bulider implements Interceptor.Builder{
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
Package after writing :
After packing successfully :
Upload to the cluster
Then write the configuration file :
First write hadoop03 Upper flume2 The configuration file :
a1.sources - r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1To configure hadoop04 Of flume3 Configuration file for :
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1Final configuration hadoop02 Upper flume1 file :
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
In profile :
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
there type And if Statement type Corresponding , Because it contains up Incoming to flume2 in , therefore c1 The corresponding is up,c1 The corresponding is notup
The last is to start our flume 了 , When it starts , It must be started first and then , That is to say hadoop03 and hadoop04
边栏推荐
- 目标检测yolov5开源项目调试
- Redis docker 主从模式与哨兵sentinel
- Torch learning summary
- IDC released the report on China's software defined storage and hyper convergence market in the fourth quarter of 2020, and smartx hyper convergence software ranked first in the financial industry
- 【ARK UI】HarmonyOS ETS的启动页的实现
- DataTableToModelList实体类
- Why won't gold depreciate???
- Bluetooth BT RF test (forwarding)
- 近期学习遇到的比较问题
- Function simplification principle: save if you can
猜你喜欢

Comparison problems encountered in recent study

【新书推荐】MongoDB Performance Tuning

抽象类和接口

JVM garbage collector G1 & ZGC details

Idea shortcut key settings

八大排序(二)

Guilin robust medical acquired 100% equity of Guilin Latex to fill the blank of latex product line
![[Ubuntu redis installation]](/img/66/d8054ae89007180b317641cf92d1cc.png)
[Ubuntu redis installation]

Abstract classes and interfaces
![[new book recommendation] DeNO web development](/img/86/27906ae378e597cf64bb2d760a9dff.png)
[new book recommendation] DeNO web development
随机推荐
Microsoft. Bcl. Async usage summary -- in Net framework 4.5 project Net framework version 4.5 and above can use async/await asynchronous feature in C 5
How to reduce the delay in live broadcast in the development of live broadcast source code with goods?
IDC released the report on China's software defined storage and hyper convergence market in the fourth quarter of 2020, and smartx hyper convergence software ranked first in the financial industry
Electron, which can wrap web page programs into desktop applications
[new book recommendation] cleaning data for effective data science
Pytorch graduate warm LR installation
Self service terminal handwritten Chinese character recognition input method library tjfink introduction
Redis docker master-slave mode and sentinel
Work notes: SendTo failed errno 22
银河麒麟server-V10配置镜像源
utils session&rpc
Recommend a very easy-to-use network communication framework HP socket
【新书推荐】Cleaning Data for Effective Data Science
Alibaba billion concurrent projects in architecture
Difference between bow and cbow
Startup of MySQL green edition in Windows system
JVM family
Cftpconnection:: getfile() download FTP server files and related parameter descriptions
Task summary in NLP
Enterprise data center "cloud" transformation solution