当前位置:网站首页>Flume配置3——拦截器过滤
Flume配置3——拦截器过滤
2022-06-29 19:46:00 【一个正在努力的菜鸡】
案例需求
- 使用flume采集本地日志,按照日志类型不同(评论,点赞,收藏等)发往不同的分析系统
- flume根据event的某个key的值,将不同的event发往不同的channel
- 在该案例中,以是否包含“hello”模拟不同的key赋予不同的值
原理图

实现
1.新建项目Flume-MySQLSource
2.添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
3.TypeInterceptor
- 拦截器在Source接收数据,打包成event之后执行,在Channel前执行
- 实现连接器方法有两步
1.实现Interceptor接口
Interceptor实现类的方法
public void initialize()初始化方法
public Event intercept(Event event)处理单个事件,这个一定要有,因为不能总等着批量事件
public List<Event> intercept(List<Event> list)批处理事件
public void close()
2.编写静态内部类,实现Interceptor.Builder接口,用于构建拦截器
Interceptor.Builder实现类的方法
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context)
- channel选择器官方配置说明

- 拦截器官方配置说明

- 代码
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 19:33 */
public class TypeInterceptor implements Interceptor {
private List<Event> events;//批量拦截器
@Override//初始化
public void initialize() {
events = new ArrayList<>();
}
@Override//处理单个事件,这个一定要有,因为不能总等着批量事件
public Event intercept(Event event) {
Map<String, String> header = event.getHeaders();
String body = new String(event.getBody());
//根据body中是否包含hello决定是否添加头信息
if (body.contains("hello")) {
header.put("type", "hello!");
} else {
header.put("type", "other");
}
return event;//如果返回null表示这一条数据是不要的
}
@Override//批处理事件
public List<Event> intercept(List<Event> list) {
//1.清空全局的集合
events.clear();
//2.遍历处理
for (Event event : list) {
events.add(event);
}
return events;
}
@Override//关闭
public void close() {
}
//这是内部类,帮助构建拦截器
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4.项目jar放入flume的lib目录下


5./jobs/t6下编写配置文件netcat-interceptor-avro.conf
- vim netcat-interceptor-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1 #拦截器
a1.sources.r1.interceptors.i1.type = com.yc.interceptor.TypeInterceptor$Builder #拦截器内部类
a1.sources.r1.selector.type = multiplexing #选择器
a1.sources.r1.selector.header = type #代码中编辑了header的map,会设置K-V
a1.sources.r1.selector.mapping.hello! = c1 #若type=hello!,则发送到c1
a1.sources.r1.selector.mapping.other = c2 #若type=other,则发送到c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node1
a1.sinks.k2.port = 4142
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
6./jobs/t6下编写配置文件avro-flume-logger1.conf
- vim avro-flume-logger1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
7./jobs/t6下编写配置文件avro-flume-logger2.conf
- vim avro-flume-logger2.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = node1
a1.sources.r1.port = 4142
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
8.启动顺序flume321
- bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger2.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/avro-flume-logger1.conf --name a1
-Dflume.root.logger==INFO,console - bin/flume-ng agent --conf conf --conf-file
jobs/t6/netcat-interceptor-avro.conf --name a1
-Dflume.root.logger==INFO,console
9.telnet向node1发送消息
- telnet localhost 44444
10.结果
- 包含hello的消息全部被拦截到


边栏推荐
- 剑指 Offer 41. 数据流中的中位数
- 软件工程专业大二,之前的学习情况不太好该怎么规划后续发展路线
- Game maker Foundation presents: Valley of belonging
- @Sneakythlows annotation
- 社区访谈丨一个IT新人眼中的JumpServer开源堡垒机
- Win11 system component cannot be opened? Win11 system widget cannot be opened solution
- NLP 类问题建模方案探索实践
- 2022年理财利率都降了,那该如何选择理财产品?
- Union find
- go: 如何编写一个正确的udp服务端
猜你喜欢

idea中方法上没有小绿色三角

3-3主機發現-四層發現

The concept and properties of mba-day26 number

关于印发宝安区重点产业项目和总部项目遴选及用地保障实施细则(2022修订版)的通知

畫虎國手孟祥順數字藏品限量發售,隨贈虎年茅臺

k线图经典图解(收藏版)
![[USB flash disk test] in order to transfer the data at the bottom of the pressure box, I bought a 2T USB flash disk, and the test result is only 47g~](/img/c3/e0637385d35943f1914477bb9f2b54.png)
[USB flash disk test] in order to transfer the data at the bottom of the pressure box, I bought a 2T USB flash disk, and the test result is only 47g~

Technical methodology of new AI engine under the data infrastructure upgrade window

4-1 port scanning technology

Shell bash script note: there must be no other irrelevant characters after the escape character \ at the end of a single line (multi line command)
随机推荐
ArrayList&lt;Integer&gt;使用==比较值是否相等出现 -129!=-129的情况思考
14.04 million! Sichuan provincial human resources and social security department relational database and middleware software system upgrade procurement bidding!
物理验证LVS流程和技术点滴(上)
JVM (3) class loading
软件工程专业大二,之前的学习情况不太好该怎么规划后续发展路线
7.取消与关闭
Sword finger offer 41 Median in data stream
第二章(物理层)
Creators foundation highlights in June
@Sneakythlows annotation
docker compose 部署Flask项目并构建redis服务
ASP.Net Core创建Razor页面上传多个文件(缓冲方式)(续)
3-3主機發現-四層發現
nacos 问题
AI scene Storage Optimization: yunzhisheng supercomputing platform storage practice based on juicefs
MBA-day26 数的概念与性质
lock4j--分布式锁中间件--自定义获取锁失败的逻辑
Sword finger offer 59 - ii Maximum value of the queue
There are more than 20 databases in a MySQL with 3306 ports. How can I backup more than 20 databases with one click and do system backup to prevent data from being deleted by mistake?
[network orientation training] - Enterprise Park Network Design - [had done]