当前位置:网站首页>Flume configuration 3 - interceptor filtering
Flume configuration 3 - interceptor filtering
2022-06-29 19:49:00 【A vegetable chicken that is working hard】
Case needs
- Use flume Collect local logs , According to the log type ( Comment on , give the thumbs-up , Collection, etc. ) To different analysis systems
- flume according to event One of the key Value , Will be different event To different channel
- In this case , Whether to include “hello” Simulate different key Give different values
Schematic diagram

Realization
1. New projects Flume-MySQLSource
2. Add dependency
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
3.TypeInterceptor
- The interceptor is Source receive data , Pack it up event After performing , stay Channel Pre execution
- There are two steps to implementing the connector method
1. Realization Interceptor Interface
Interceptor Methods to implement classes
public void initialize() Initialization method
public Event intercept(Event event) Handle a single event , This must have , Because you can't always wait for batch events
public List<Event> intercept(List<Event> list) Batch events
public void close()
2. Write static inner classes , Realization Interceptor.Builder Interface , For building interceptors
Interceptor.Builder Methods to implement classes
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context)
- channel Official configuration description of selector

- Official configuration description of interceptor

- Code
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: author * @create: 2022-06-26 19:33 */
public class TypeInterceptor implements Interceptor {
private List<Event> events;// Batch interceptor
@Override// initialization
public void initialize() {
events = new ArrayList<>();
}
@Override// Handle a single event , This must have , Because you can't always wait for batch events
public Event intercept(Event event) {
Map<String, String> header = event.getHeaders();
String body = new String(event.getBody());
// according to body Include in hello Decide whether to add header information
if (body.contains("hello")) {
header.put("type", "hello!");
} else {
header.put("type", "other");
}
return event;// If you return null This indicates that this piece of data is unnecessary
}
@Override// Batch events
public List<Event> intercept(List<Event> list) {
//1. Clear the global collection
events.clear();
//2. Traverse processing
for (Event event : list) {
events.add(event);
}
return events;
}
@Override// close
public void close() {
}
// This is the inner class , Help build interceptors
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4. project jar Put in flume Of lib Under the table of contents


5./jobs/t6 Write the configuration file netcat-interceptor-avro.conf
- vim netcat-interceptor-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1 # Interceptor
a1.sources.r1.interceptors.i1.type = com.yc.interceptor.TypeInterceptor$Builder # Interceptor inner class
a1.sources.r1.selector.type = multiplexing # Selectors
a1.sources.r1.selector.header = type # The code has been edited header Of map, Will be set K-V
a1.sources.r1.selector.mapping.hello! = c1 # if type=hello!, Then send it to c1
a1.sources.r1.selector.mapping.other = c2 # if type=other, Then send it to c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node1
a1.sinks.k2.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
6./jobs/t6 Write the configuration file avro-flume-logger1.conf
- vim avro-flume-logger1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
7./jobs/t6 Write the configuration file avro-flume-logger2.conf
- vim avro-flume-logger2.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4142
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
8. Startup sequence flume321
- bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger2.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger1.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/netcat-interceptor-avro.conf --name a1
-Dflume.root.logger==INFO,console
9.telnet towards node1 Send a message
- telnet localhost 44444
10. result
- contain hello All messages were intercepted


边栏推荐
- Zotero journal Automatic Matching Update Influencing Factors
- MBA-day19 如果p则q矛盾关系p 且非q
- NLP - GIZA++ 实现词对齐
- Flutter 2.0 FocusScope. of(context). The requestfocus (focusnode()) does not take effect
- KDD 2022 | 协同过滤中考虑表征对齐和均匀性
- How is the combination of convolution and transformer optimal?
- static静态成员变量使用@Value注入方式
- Performance improvement at the cost of other components is not good
- Understanding of software test logic coverage
- How to use filters in jfinal to monitor Druid for SQL execution?
猜你喜欢

Creators foundation highlights in June

@Sneakythlows annotation

Canonical engineers are trying to solve the performance problem of Firefox snap

There are more than 20 databases in a MySQL with 3306 ports. How can I backup more than 20 databases with one click and do system backup to prevent data from being deleted by mistake?

QC protocol + Huawei fcp+ Samsung AFC fast charging 5v9v chip fs2601 application

做白银k线图有多重要?

Detailed description of gaussdb (DWS) complex and diverse resource load management methods

Win11 system component cannot be opened? Win11 system widget cannot be opened solution

以其他组件为代价的性能提升不是好提升

QC协议+华为FCP+三星AFC快充取电5V9V芯片FS2601应用
随机推荐
idea中方法上没有小绿色三角
JVM(4) 字節碼技術+運行期優化
One hour to build a sample scenario sound network to release lingfalcon Internet of things cloud platform
2022年深圳市福田区支持先进制造业发展若干措施
4-1端口扫描技术
第二章(物理层)
Flume配置1——基础案例
lock4j--分布式锁中间件--自定义获取锁失败的逻辑
IP error problem of PHP laravel using AWS load balancer
The era of data security solutions
Koa 源码剖析
3 - 3 découverte de l'hôte - découverte à quatre niveaux
static静态成员变量使用@Value注入方式
With these four security testing tools, software security testing can be said so easy!
【网络方向实训】-企业园区网络设计-【Had Done】
freemarker模板框架生成图片
Dynamics CRM: 本地部署的服务器中, Sandbox, Unzip, VSS, Asynchronous还有Monitor服务的作用
KDD 2022 | characterization alignment and uniformity are considered in collaborative filtering
Zotero journal automatic matching update influence factor
Flume配置4——自定义Source+Sink