当前位置:网站首页>Flume配置3——拦截器过滤
Flume配置3——拦截器过滤
2022-06-29 19:46:00 【一个正在努力的菜鸡】
案例需求
- 使用flume采集本地日志,按照日志类型不同(评论,点赞,收藏等)发往不同的分析系统
- flume根据event的某个key的值,将不同的event发往不同的channel
- 在该案例中,以是否包含“hello”模拟不同的key赋予不同的值
原理图

实现
1.新建项目Flume-MySQLSource
2.添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
3.TypeInterceptor
- 拦截器在Source接收数据,打包成event之后执行,在Channel前执行
- 实现连接器方法有两步
1.实现Interceptor接口
Interceptor实现类的方法
public void initialize()初始化方法
public Event intercept(Event event)处理单个事件,这个一定要有,因为不能总等着批量事件
public List<Event> intercept(List<Event> list)批处理事件
public void close()
2.编写静态内部类,实现Interceptor.Builder接口,用于构建拦截器
Interceptor.Builder实现类的方法
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context)
- channel选择器官方配置说明

- 拦截器官方配置说明

- 代码
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 19:33 */
public class TypeInterceptor implements Interceptor {
private List<Event> events;//批量拦截器
@Override//初始化
public void initialize() {
events = new ArrayList<>();
}
@Override//处理单个事件,这个一定要有,因为不能总等着批量事件
public Event intercept(Event event) {
Map<String, String> header = event.getHeaders();
String body = new String(event.getBody());
//根据body中是否包含hello决定是否添加头信息
if (body.contains("hello")) {
header.put("type", "hello!");
} else {
header.put("type", "other");
}
return event;//如果返回null表示这一条数据是不要的
}
@Override//批处理事件
public List<Event> intercept(List<Event> list) {
//1.清空全局的集合
events.clear();
//2.遍历处理
for (Event event : list) {
events.add(event);
}
return events;
}
@Override//关闭
public void close() {
}
//这是内部类,帮助构建拦截器
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.项目jar放入flume的lib目录下


5./jobs/t6下编写配置文件netcat-interceptor-avro.conf
- vim netcat-interceptor-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1 #拦截器
a1.sources.r1.interceptors.i1.type = com.yc.interceptor.TypeInterceptor$Builder #拦截器内部类
a1.sources.r1.selector.type = multiplexing #选择器
a1.sources.r1.selector.header = type #代码中编辑了header的map,会设置K-V
a1.sources.r1.selector.mapping.hello! = c1 #若type=hello!,则发送到c1
a1.sources.r1.selector.mapping.other = c2 #若type=other,则发送到c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node1
a1.sinks.k2.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
6./jobs/t6下编写配置文件avro-flume-logger1.conf
- vim avro-flume-logger1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
7./jobs/t6下编写配置文件avro-flume-logger2.conf
- vim avro-flume-logger2.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4142
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
8.启动顺序flume321
- bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger2.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger1.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/netcat-interceptor-avro.conf --name a1
-Dflume.root.logger==INFO,console
9.telnet向node1发送消息
- telnet localhost 44444
10.结果
- 包含hello的消息全部被拦截到


边栏推荐
- Notepad++--宏(记录操作过程)
- Mba-day19 if P then q contradictory relation P and not Q
- MSYQL, redis, mongodb visual monitoring tool grafana
- Go: how to write a correct UDP server
- [sword finger offer] 51 Reverse pair in array
- static静态成员变量使用@Value注入方式
- npm ERR! fatal: early EOF npm ERR! fatal: index-pack failed
- MySQL remote connection
- With these four security testing tools, software security testing can be said so easy!
- Zotero期刊自动匹配更新影响因子
猜你喜欢

数据链路层

shell bash脚本注意:单行末尾转义符 \ 后千万不能有其他无关字符(多行命令)

Kdd 2022 | prise en compte de l'alignement et de l'uniformité des représentations dans le Filtrage collaboratif

Physical verification LVS process and Technology (Part I)

3-3 host discovery - layer 4 discovery

@Sneakythlows annotation

KDD 2022 | 協同過濾中考慮錶征對齊和均勻性

Sophomore majoring in software engineering, the previous learning situation is not very good. How to plan the follow-up development route

QC protocol + Huawei fcp+ Samsung AFC fast charging 5v9v chip fs2601 application

苹果iPhone手机升级系统内存空间变小不够如何解决?
随机推荐
苹果iPhone手机升级系统内存空间变小不够如何解决?
做白银k线图有多重要?
一个mysql里有3306端口下,一个mysql有20多个数据库,怎么一键备份20多个数据库,做系统备份,防止数据误删除?
Inception 新结构 | 究竟卷积与Transformer如何结合才是最优的?
第二章(物理层)
k线图经典图解(收藏版)
ArrayList&lt;Integer&gt;使用==比较值是否相等出现 -129!=-129的情况思考
Sword finger offer 59 - I. maximum value of sliding window
Technical methodology of new AI engine under the data infrastructure upgrade window
QC协议+华为FCP+三星AFC快充取电5V9V芯片FS2601应用
How to use filters in jfinal to monitor Druid for SQL execution?
社区访谈丨一个IT新人眼中的JumpServer开源堡垒机
NLP - GIZA++ 实现词对齐
Connaissance générale des paramètres de sécurité du serveur Cloud
MBA-day26 数的概念与性质
14.04 million! Sichuan provincial human resources and social security department relational database and middleware software system upgrade procurement bidding!
Snowflake ID, distributed unique ID
洞见科技作为「唯一」隐私计算数商,「首批」入驻长三角数据要素流通服务平台
AI scene Storage Optimization: yunzhisheng supercomputing platform storage practice based on juicefs
How important is it to make a silver K-line chart?