当前位置:网站首页>Flume configuration 4 - Custom source+sink
Flume configuration 4 - Custom source+sink
2022-06-29 19:48:00 【A vegetable chicken that is working hard】
Customize Source
1. explain
- Official source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source
- Source The purpose of is to receive data from an external client and store it in the configured Channels in
2. Custom steps ( Refer to official documentation )
- class MySource extends AbstractSource implements Configurable,PollableSource
- Method
getBackOffSleepIncrement()// Not for the time being
getMaxBackOffSleepInterval()// Not for the time being
configure(Context context)// initialization context, That is, read the information in the configuration file , Each information corresponds to a configuration item
process()// get data , Encapsulated into Event And write Channel, This method is called in a loop
stop()// Close related resources
3. Case needs
- flume collecting data , And add prefix and suffix to each data , Default if not added , Last output to console
4. Schematic diagram

5. Realization
- Add dependency
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySource Code
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: author * @create: 2022-06-26 21:21 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
// Prefix , suffix
private String prefix;
private String subfix;
private Long delay;
@Override// Initialization configuration information , obtain .conf Configuration in
public void configure(Context context) {
prefix = context.getString("pre", "pre-");// The second parameter is the default value given
subfix = context.getString("sub");// If not given, there is no default value , That is, when the configuration file is not configured, there is no... By default
delay = context.getLong("delay", 2000L);// Delay time
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {
//ctrl+alt+t
// Loop creation Event, Output to Channel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/** * source Submit event Will pass the interceptor first ---> Read source code processEvent * event = this.interceptorChain.intercept(event);// Intercepted by interceptors * if (event != null) {// If the interceptor does not return null To enter if, Carry on * List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector It's a channel Selectors , return list Because it may need to be passed to multiple channel * Iterator var3 = requiredChannels.iterator();// Need to traverse channel * while(var3.hasNext()) { * Channel reqChannel = (Channel)var3.next(); * Transaction tx = reqChannel.getTransaction();// Get transaction * Preconditions.checkNotNull(tx, "Transaction object must not be null"); * try { * tx.begin();// Open transaction * reqChannel.put(event); * tx.commit();// Commit transaction * } catch (Throwable var17) { * tx.rollback();// If an exception occurs, it will be rolled back * ...... * }.... */
}
Thread.sleep(delay);// Once you get the return value is READY, Call again immediately process function
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- project jar Put in flume Of lib Under the table of contents


- /jobs/t7 Write the configuration file mysource-flume-logger.conf
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Startup sequence flume
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
- Prefix and suffix are not set

- Set prefix and suffix

Customize Sink
1. explain
- Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system 、 Or it could be sent to another FlumeAgent
- Sink It's completely transactional
In from Channel Batch delete data before , Every Sink use Channel Start a transaction
Once the batch event is successfully written out to the storage system or the next one Flume Agent,Sink using Channel Commit transaction
Once the transaction is committed , The Channel Removes events from its own internal buffer
- Official Sink There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Sink
2. Custom steps ( Refer to official documentation )
- extends AbstractSink implements Configurable
- Method
configure(Context context)// initialization context, That is, read the information in the configuration file , Each information corresponds to a configuration item
process()// get data , Encapsulated into Event And write Channel, This method is called in a loop
3. Case needs
- flume receive data , And add prefix and suffix to each data ( Default if not added ) Output to console
4. Schematic diagram

5. Realization
- Add dependency
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySink Code
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** * @program: Flume-MyISS * @description: * @author: author * @create: 2022-06-26 22:39 */
public class MySink extends AbstractSink implements Configurable {
// establish Logger object
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
// Read profile content , Have default values
prefix = context.getString("pre", "pre-hello:");
// Read profile content , No default
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1. obtain channel, Open transaction
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2. from channel To capture data , Print to console
try {
//2.1 Fetching the data
Event event;
while (true) {
// If you can't get the data, you'll be fetching it circularly
event = channel.take();
if (event != null) {
break;
}
}
//2.2 Processing data
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3 Commit transaction
transaction.commit();
return Status.READY;
} catch (Exception e) {
// Roll back
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
- project jar Put in flume Of lib Under the table of contents


- /jobs/t8 Write the configuration file netcat-flume-mysink.conf
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Startup sequence flume
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
- Port send data
telnet localhost 44444
- Prefix and suffix are not set

- Set prefix and suffix

边栏推荐
- idea中方法上没有小绿色三角
- Detailed description of gaussdb (DWS) complex and diverse resource load management methods
- What about frequent network disconnection of win11 system? Solution to win11 network instability
- Zotero journal automatic matching update influence factor
- Flume配置2——监控之Ganglia
- 苹果iPhone手机升级系统内存空间变小不够如何解决?
- 数据基础设施升级窗口下,AI 新引擎的技术方法论
- 一个mysql里有3306端口下,一个mysql有20多个数据库,怎么一键备份20多个数据库,做系统备份,防止数据误删除?
- Zotero期刊自動匹配更新影響因子
- 【网络方向实训】-企业园区网络设计-【Had Done】
猜你喜欢

Performance improvement at the cost of other components is not good

3-3 host discovery - layer 4 discovery

罗清启:高端家电已成红海?卡萨帝率先破局

How is the combination of convolution and transformer optimal?

童年经典蓝精灵之百变蓝爸爸数字藏品中奖名单公布

数据基础设施升级窗口下,AI 新引擎的技术方法论

以其他组件为代价的性能提升不是好提升

Creators foundation highlights in June

After CDN is added to the website, the Font Icon reports an error access control allow origin

JVM (4) bytecode technology + runtime optimization
随机推荐
JVM (4) bytecode technology + runtime optimization
Inception 新结构 | 究竟卷积与Transformer如何结合才是最优的?
mysql远程连接
Performance improvement at the cost of other components is not good
4-1端口扫描技术
3-3主机发现-四层发现
MBA-day26 数的概念与性质
MSYQL, redis, mongodb visual monitoring tool grafana
【摸鱼神器】UI库秒变低代码工具——表单篇(一)设计
4-2端口Banner信息获取
[software testing] 01 -- software life cycle and software development model
【软件测试】01 -- 软件生命周期、软件开发模型
Exploration and practice of NLP problem modeling scheme
并查集(Union-Find)
【U盘检测】为了转移压箱底的资料,买了个2T U盘检测仅仅只有47G~
docker compose 部署Flask项目并构建redis服务
The era of data security solutions
3-2主机发现-三层发现
freemarker模板框架生成图片
Canonical的工程师们正努力解决Firefox Snap的性能问题