当前位置:网站首页>Flume learning 4
Flume learning 4
2022-06-30 09:52:00 【Keep-upup】
Customize Interceptor
Case needs : Use Flume Collect server local logs , Different log types are required , Send different kinds of logs to different analysis systems . In actual development , There may be many types of logs generated by a server , Different types of logs may need to be sent to different analysis systems . This will be used Flume In topology Multiplexing structure ,Multiplexing The principle is , according to event in Header One of the key Value , Will be different event Send to different Channel in , So we need to customize a Interceptor, For different types event Of Header Medium key Give different values .
In this case , We simulate logs with port data , With data containing “up” And without “up” Simulate different types of logs , We need to customize it interceptor Distinguish between numbers and letters , Send them to different analysis systems (Channel).

adopt netcat Port listens for incoming data flume1, adopt channel Selectors sort the data , contain “up” The introduction of flume2, Not included “up” The introduction of flume3.flume2 and flume3 Terminal sink Just print to the console OK 了 .
stay idea It's code , First, add flume Dependence :
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>Code section :
Define a class implementation Intercept Interface to rewrite the methods inside :
package test.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor {
// A collection of events
private List<Event> addHeaderEvents;
public void initialize() {
// initialization
addHeaderEvents = new ArrayList<Event>();
}
// Single event interception
public Event intercept(Event event) {
//1. Get the header information in the event header
Map<String, String> headers = event.getHeaders();
//2. Gets the in the event body
String body = new String(event.getBody());
//3. according to body Whether contains 'hello' To decide what header information to add
if(body.contains("up")){
headers.put("type", "up");
}else {
headers.put("type", "notup");
}
return event;
}
// Batch event interception
public List<Event> intercept(List<Event> events) {
//1. Empty the set
addHeaderEvents.clear();
//2. Traverse envents
for (Event event :events){
//3. Add header information to each event
addHeaderEvents.add(intercept(event));
}
return addHeaderEvents;
}
public void close() {
}
public static class Bulider implements Interceptor.Builder{
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
Package after writing :
After packing successfully :
Upload to the cluster
Then write the configuration file :
First write hadoop03 Upper flume2 The configuration file :
a1.sources - r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1To configure hadoop04 Of flume3 Configuration file for :
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1Final configuration hadoop02 Upper flume1 file :
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
In profile :
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
there type And if Statement type Corresponding , Because it contains up Incoming to flume2 in , therefore c1 The corresponding is up,c1 The corresponding is notup
The last is to start our flume 了 , When it starts , It must be started first and then , That is to say hadoop03 and hadoop04
边栏推荐
- Design of mfc+mysql document data management system based on VS2010
- NER – Named Entity Recognition Summary
- Framework program of browser self-service terminal based on IE kernel
- Clickhouse installation (quick start)
- 训练一个图像分类器demo in PyTorch【学习笔记】
- Object detection yolov5 open source project debugging
- Oracle cross database replication data table dblink
- Work notes: SendTo failed errno 22
- Practice of super integration and transformation of core production business of public funds
- Good partner for cloud skill improvement, senior brother cloud of Amazon officially opened today
猜你喜欢

CentOS MySQL installation details
![[Ubuntu redis installation]](/img/66/d8054ae89007180b317641cf92d1cc.png)
[Ubuntu redis installation]
Recommend a very easy-to-use network communication framework HP socket
![[new book recommendation] cleaning data for effective data science](/img/42/1258694fc55a178a666391669a67ad.png)
[new book recommendation] cleaning data for effective data science

Follow the wechat oauth2.0 access scheme

Distributed ID

MySQL optimization

Notes on masking and padding in tensorflow keras

Difference between bow and cbow

Eight sorts (I)
随机推荐
Machine learning note 9: prediction model optimization (to prevent under fitting and over fitting problems)
Acquisition de 100% des actions de Guilin latex par Guilin Robust Medical pour combler le vide de la gamme de produits Latex
CRF (conditional random field) learning summary
2021-10-20
AutoUpdater. Net client custom update file
DDD interview
Dart development skills
八大排序(一)
CentOS MySQL installation details
Startup of MySQL green edition in Windows system
NTP of Prometheus monitoring_ exporter
MCU firmware packaging Script Software
Cronexpression expression explanation and cases
JVM tuning tool commands (notes)
Principle and implementation of small program hand-held bullet screen (uni APP)
抽象类和接口
Enterprise data center "cloud" transformation solution
银河麒麟server-V10配置镜像源
Flutter 中的 ValueNotifier 和 ValueListenableBuilder
Train an image classifier demo in pytorch [learning notes]