当前位置:网站首页>Flume配置4——自定义Source+Sink
Flume配置4——自定义Source+Sink
2022-06-29 19:46:00 【一个正在努力的菜鸡】
自定义Source
1.说明
- 官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source
- Source 的目的是从外部客户端接收数据并将其存储到配置的 Channels 中
2.自定义步骤(参考官方文档)
- class MySource extends AbstractSource implements Configurable,PollableSource
- 方法
getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
stop()//关闭相关的资源
3.案例需求
- flume收集数据,并给每条数据添加前缀与后缀,若不添加则使用默认,最后输出到控制台
4.原理图
5.实现
- 添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySource代码
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 21:21 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
//前缀,后缀
private String prefix;
private String subfix;
private Long delay;
@Override//初始化配置信息,获取.conf中的配置
public void configure(Context context) {
prefix = context.getString("pre", "pre-");//第二个参数是给出的默认值
subfix = context.getString("sub");//未给出则没有默认值,即配置文件中未配置时默认没有
delay = context.getLong("delay", 2000L);//延迟时间
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {
//ctrl+alt+t
//循环创建Event,输出给Channel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/** * source提交event会先经过拦截器--->读源码processEvent * event = this.interceptorChain.intercept(event);//经过拦截器拦截 * if (event != null) {//如果拦截器返回的不是null才会进入if,继续执行 * List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector是一个channel选择器,返回list是因为可能需要传给多个channel * Iterator var3 = requiredChannels.iterator();//需要遍历channel * while(var3.hasNext()) { * Channel reqChannel = (Channel)var3.next(); * Transaction tx = reqChannel.getTransaction();//获取事务 * Preconditions.checkNotNull(tx, "Transaction object must not be null"); * try { * tx.begin();//开启事务 * reqChannel.put(event); * tx.commit();//提交事务 * } catch (Throwable var17) { * tx.rollback();//发生异常则会回滚 * ...... * }.... */
}
Thread.sleep(delay);//一旦得到返回值是READY,又立马调用process函数
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- 项目jar放入flume的lib目录下
- /jobs/t7下编写配置文件mysource-flume-logger.conf
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动顺序flume
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
- 未设置前缀与后缀
- 设置前缀与后缀
自定义Sink
1.说明
- Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 FlumeAgent
- Sink 是完全事务性的
在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务
批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务
事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件
- 官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink
2.自定义步骤(参考官方文档)
- extends AbstractSink implements Configurable
- 方法
configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
3.案例需求
- flume 接收数据,并给每条数据添加前缀与后缀(若不添加则使用默认)输出到控制台
4.原理图
5.实现
- 添加依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySink代码
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 22:39 */
public class MySink extends AbstractSink implements Configurable {
//创建 Logger 对象
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
//读取配置文件内容,有默认值
prefix = context.getString("pre", "pre-hello:");
//读取配置文件内容,无默认值
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1.获取channel,开启事务
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2.从channel中抓去数据,打印到控制台
try {
//2.1抓取数据
Event event;
while (true) {
//抓取不到数据就一直循环的抓取
event = channel.take();
if (event != null) {
break;
}
}
//2.2处理数据
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3提交事务
transaction.commit();
return Status.READY;
} catch (Exception e) {
//回滚
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
- 项目jar放入flume的lib目录下
- /jobs/t8下编写配置文件netcat-flume-mysink.conf
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动顺序flume
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
- 端口发送数据
telnet localhost 44444
- 未设置前缀与后缀
- 设置前缀与后缀
边栏推荐
猜你喜欢
Win11 system component cannot be opened? Win11 system widget cannot be opened solution
Win11系统小组件打不开?Win11系统小组件无法打开解决方法
4-1 port scanning technology
JVM (4) bytecode technology + runtime optimization
Finally, Amazon~
【U盘检测】为了转移压箱底的资料,买了个2T U盘检测仅仅只有47G~
[observation] softcom power liutianwen: embrace change and "follow the trend" to become an "enabler" of China's digital economy
福昕软件受邀亮相2022先进制造业数智发展论坛
MySQL remote connection
软件测试逻辑覆盖相关理解
随机推荐
数据基础设施升级窗口下,AI 新引擎的技术方法论
Creators foundation highlights in June
After CDN is added to the website, the Font Icon reports an error access control allow origin
软件测试逻辑覆盖相关理解
4-2 port banner information acquisition
自动获取本地连接及网络地址修改
做白银k线图有多重要?
[proteus simulation] matrix keyboard interrupt scanning
CorelDRAW最新24.1.0.360版本更新介绍讲解
k线图经典图解(收藏版)
How to use filters in jfinal to monitor Druid for SQL execution?
Mba-day19 if P then q contradictory relation P and not Q
【剑指Offer】51. 数组中的逆序对
Freemaker template framework generates images
Win11系统小组件打不开?Win11系统小组件无法打开解决方法
AI scene Storage Optimization: yunzhisheng supercomputing platform storage practice based on juicefs
有了这4个安全测试工具,对软件安全测试say so easy!
Detailed description of gaussdb (DWS) complex and diverse resource load management methods
MBA-day26 数的概念与性质
freemarker模板框架生成图片