当前位置:网站首页>Flume learning 4
Flume learning 4
2022-06-30 09:52:00 【Keep-upup】
Customize Interceptor
Case needs : Use Flume Collect server local logs , Different log types are required , Send different kinds of logs to different analysis systems . In actual development , There may be many types of logs generated by a server , Different types of logs may need to be sent to different analysis systems . This will be used Flume In topology Multiplexing structure ,Multiplexing The principle is , according to event in Header One of the key Value , Will be different event Send to different Channel in , So we need to customize a Interceptor, For different types event Of Header Medium key Give different values .
In this case , We simulate logs with port data , With data containing “up” And without “up” Simulate different types of logs , We need to customize it interceptor Distinguish between numbers and letters , Send them to different analysis systems (Channel).

adopt netcat Port listens for incoming data flume1, adopt channel Selectors sort the data , contain “up” The introduction of flume2, Not included “up” The introduction of flume3.flume2 and flume3 Terminal sink Just print to the console OK 了 .
stay idea It's code , First, add flume Dependence :
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>Code section :
Define a class implementation Intercept Interface to rewrite the methods inside :
package test.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor {
// A collection of events
private List<Event> addHeaderEvents;
public void initialize() {
// initialization
addHeaderEvents = new ArrayList<Event>();
}
// Single event interception
public Event intercept(Event event) {
//1. Get the header information in the event header
Map<String, String> headers = event.getHeaders();
//2. Gets the in the event body
String body = new String(event.getBody());
//3. according to body Whether contains 'hello' To decide what header information to add
if(body.contains("up")){
headers.put("type", "up");
}else {
headers.put("type", "notup");
}
return event;
}
// Batch event interception
public List<Event> intercept(List<Event> events) {
//1. Empty the set
addHeaderEvents.clear();
//2. Traverse envents
for (Event event :events){
//3. Add header information to each event
addHeaderEvents.add(intercept(event));
}
return addHeaderEvents;
}
public void close() {
}
public static class Bulider implements Interceptor.Builder{
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
Package after writing :
After packing successfully :
Upload to the cluster
Then write the configuration file :
First write hadoop03 Upper flume2 The configuration file :
a1.sources - r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1To configure hadoop04 Of flume3 Configuration file for :
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1Final configuration hadoop02 Upper flume1 file :
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
In profile :
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.up = c1
a1.sources.r1.selector.mapping.notup= c2
there type And if Statement type Corresponding , Because it contains up Incoming to flume2 in , therefore c1 The corresponding is up,c1 The corresponding is notup
The last is to start our flume 了 , When it starts , It must be started first and then , That is to say hadoop03 and hadoop04
边栏推荐
- CRF (conditional random field) learning summary
- JVM memory common parameter configuration set
- Experience of an acmer
- 工作小记: sendto失败 errno 22
- Galaxy Kirin server-v10 configuration image source
- 近期学习遇到的比较问题
- Machine learning note 9: prediction model optimization (to prevent under fitting and over fitting problems)
- MySQL optimization
- Good partner for cloud skill improvement, senior brother cloud of Amazon officially opened today
- Create thread pool demo
猜你喜欢

八大排序(二)

【新书推荐】Deno Web Development

【新书推荐】Cleaning Data for Effective Data Science

I once met a girl whom I most wanted to take care of all my life. Later... No later

【新书推荐】MongoDB Performance Tuning

云技能提升好伙伴,亚马逊云师兄今天正式营业

布隆过滤器

MySQL internal component structure

AutoUpdater. Net client custom update file

Differences and relationships among hyper convergence, software defined storage (SDS), distributed storage and server San
随机推荐
JVM tuning tool introduction and constant pool explanation
1, 基本配置
Galaxy Kirin server-v10 configuration image source
Why won't gold depreciate???
【新书推荐】MongoDB Performance Tuning
OCX child thread cannot trigger event event (forward)
MySQL index optimization miscellaneous
MySQL directory
Financial private cloud infrastructure scheme evaluation (Architecture and storage)
MCU firmware packaging Script Software
1. Basic configuration
桂林 穩健醫療收購桂林乳膠100%股權 填補乳膠產品線空白
Demo of guavacache
CentOS MySQL installation details
utils session&rpc
Properties of string
oracle跨数据库复制数据表-dblink
DDD interview
Experience of an acmer
Flutter 中的 ValueNotifier 和 ValueListenableBuilder