当前位置:网站首页>Flume configuration 4 - Custom source+sink
Flume configuration 4 - Custom source+sink
2022-06-29 19:48:00 【A vegetable chicken that is working hard】
Customize Source
1. explain
- Official source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source
- Source The purpose of is to receive data from an external client and store it in the configured Channels in
2. Custom steps ( Refer to official documentation )
- class MySource extends AbstractSource implements Configurable,PollableSource
- Method
getBackOffSleepIncrement()// Not for the time being
getMaxBackOffSleepInterval()// Not for the time being
configure(Context context)// initialization context, That is, read the information in the configuration file , Each information corresponds to a configuration item
process()// get data , Encapsulated into Event And write Channel, This method is called in a loop
stop()// Close related resources
3. Case needs
- flume collecting data , And add prefix and suffix to each data , Default if not added , Last output to console
4. Schematic diagram

5. Realization
- Add dependency
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySource Code
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: author * @create: 2022-06-26 21:21 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
// Prefix , suffix
private String prefix;
private String subfix;
private Long delay;
@Override// Initialization configuration information , obtain .conf Configuration in
public void configure(Context context) {
prefix = context.getString("pre", "pre-");// The second parameter is the default value given
subfix = context.getString("sub");// If not given, there is no default value , That is, when the configuration file is not configured, there is no... By default
delay = context.getLong("delay", 2000L);// Delay time
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {
//ctrl+alt+t
// Loop creation Event, Output to Channel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/** * source Submit event Will pass the interceptor first ---> Read source code processEvent * event = this.interceptorChain.intercept(event);// Intercepted by interceptors * if (event != null) {// If the interceptor does not return null To enter if, Carry on * List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector It's a channel Selectors , return list Because it may need to be passed to multiple channel * Iterator var3 = requiredChannels.iterator();// Need to traverse channel * while(var3.hasNext()) { * Channel reqChannel = (Channel)var3.next(); * Transaction tx = reqChannel.getTransaction();// Get transaction * Preconditions.checkNotNull(tx, "Transaction object must not be null"); * try { * tx.begin();// Open transaction * reqChannel.put(event); * tx.commit();// Commit transaction * } catch (Throwable var17) { * tx.rollback();// If an exception occurs, it will be rolled back * ...... * }.... */
}
Thread.sleep(delay);// Once you get the return value is READY, Call again immediately process function
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- project jar Put in flume Of lib Under the table of contents


- /jobs/t7 Write the configuration file mysource-flume-logger.conf
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Startup sequence flume
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
- Prefix and suffix are not set

- Set prefix and suffix

Customize Sink
1. explain
- Sink Keep polling Channel And remove them in batches , These events are written in bulk to a storage or indexing system 、 Or it could be sent to another FlumeAgent
- Sink It's completely transactional
In from Channel Batch delete data before , Every Sink use Channel Start a transaction
Once the batch event is successfully written out to the storage system or the next one Flume Agent,Sink using Channel Commit transaction
Once the transaction is committed , The Channel Removes events from its own internal buffer
- Official Sink There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Sink
2. Custom steps ( Refer to official documentation )
- extends AbstractSink implements Configurable
- Method
configure(Context context)// initialization context, That is, read the information in the configuration file , Each information corresponds to a configuration item
process()// get data , Encapsulated into Event And write Channel, This method is called in a loop
3. Case needs
- flume receive data , And add prefix and suffix to each data ( Default if not added ) Output to console
4. Schematic diagram

5. Realization
- Add dependency
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySink Code
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** * @program: Flume-MyISS * @description: * @author: author * @create: 2022-06-26 22:39 */
public class MySink extends AbstractSink implements Configurable {
// establish Logger object
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
// Read profile content , Have default values
prefix = context.getString("pre", "pre-hello:");
// Read profile content , No default
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1. obtain channel, Open transaction
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2. from channel To capture data , Print to console
try {
//2.1 Fetching the data
Event event;
while (true) {
// If you can't get the data, you'll be fetching it circularly
event = channel.take();
if (event != null) {
break;
}
}
//2.2 Processing data
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3 Commit transaction
transaction.commit();
return Status.READY;
} catch (Exception e) {
// Roll back
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
- project jar Put in flume Of lib Under the table of contents


- /jobs/t8 Write the configuration file netcat-flume-mysink.conf
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Startup sequence flume
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
- Port send data
telnet localhost 44444
- Prefix and suffix are not set

- Set prefix and suffix

边栏推荐
- The concept and properties of mba-day26 number
- La collection numérique Meng xiangshun, artiste national du tigre peint, est disponible en quantité limitée et est offerte avec Maotai de l'année du tigre
- Linux安装MySQL5
- Classic illustration of K-line diagram (Collection Edition)
- 雪花id,分布式唯一id
- Shell bash script note: there must be no other irrelevant characters after the escape character \ at the end of a single line (multi line command)
- Static static member variables use @value injection
- Zotero journal Automatic Matching Update Influencing Factors
- QC协议+华为FCP+三星AFC快充取电5V9V芯片FS2601应用
- 创作者基金会 6 月份亮点
猜你喜欢

4-1 port scanning technology

Win11 system component cannot be opened? Win11 system widget cannot be opened solution

一个mysql里有3306端口下,一个mysql有20多个数据库,怎么一键备份20多个数据库,做系统备份,防止数据误删除?

QC protocol + Huawei fcp+ Samsung AFC fast charging 5v9v chip fs2601 application

3-3 host discovery - layer 4 discovery

What if the win11 policy service is disabled? Solution to disabling win11 policy service

【精品】pinia详解

【软件测试】01 -- 软件生命周期、软件开发模型

How important is it to make a silver K-line chart?
![[USB flash disk test] in order to transfer the data at the bottom of the pressure box, I bought a 2T USB flash disk, and the test result is only 47g~](/img/c3/e0637385d35943f1914477bb9f2b54.png)
[USB flash disk test] in order to transfer the data at the bottom of the pressure box, I bought a 2T USB flash disk, and the test result is only 47g~
随机推荐
Luoqingqi: has high-end household appliances become a red sea? Casati took the lead in breaking the game
Sword finger offer 59 - ii Maximum value of the queue
童年经典蓝精灵之百变蓝爸爸数字藏品中奖名单公布
Have you mastered all the testing methods of technology to ensure quality and software testing?
Flume配置4——自定义Source+Sink
Flume配置2——监控之Ganglia
1404万!四川省人社厅关系型数据库及中间件软件系统升级采购招标!
数据安全解决方案的大时代
Dynamics CRM: 本地部署的服务器中, Sandbox, Unzip, VSS, Asynchronous还有Monitor服务的作用
The list of winners in the classic Smurfs of childhood: bluedad's digital collection was announced
@Sneakythlows annotation
Notepad++ -- macro (record operation process)
Win11 system component cannot be opened? Win11 system widget cannot be opened solution
雲服務器的安全設置常識
1404萬!四川省人社廳關系型數據庫及中間件軟件系統昇級采購招標!
画虎国手孟祥顺数字藏品限量发售,随赠虎年茅台
The concept and properties of mba-day26 number
ASP.Net Core创建Razor页面上传多个文件(缓冲方式)(续)
Linux安装MySQL8
3 - 3 découverte de l'hôte - découverte à quatre niveaux