当前位置:网站首页>Flume配置4——自定義Source+Sink
Flume配置4——自定義Source+Sink
2022-06-29 19:48:00 【一個正在努力的菜雞】
自定義Source
1.說明
- 官方提供的source類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Source
- Source 的目的是從外部客戶端接收數據並將其存儲到配置的 Channels 中
2.自定義步驟(參考官方文檔)
- class MySource extends AbstractSource implements Configurable,PollableSource
- 方法
getBackOffSleepIncrement()//暫不用
getMaxBackOffSleepInterval()//暫不用
configure(Context context)//初始化context,即讀取配置文件中的信息,每個信息對應一個配置項
process()//獲取數據,封裝成Event並寫入Channel,這個方法被循環調用
stop()//關閉相關的資源
3.案例需求
- flume收集數據,並給每條數據添加前綴與後綴,若不添加則使用默認,最後輸出到控制臺
4.原理圖

5.實現
- 添加依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySource代碼
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 21:21 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
//前綴,後綴
private String prefix;
private String subfix;
private Long delay;
@Override//初始化配置信息,獲取.conf中的配置
public void configure(Context context) {
prefix = context.getString("pre", "pre-");//第二個參數是給出的默認值
subfix = context.getString("sub");//未給出則沒有默認值,即配置文件中未配置時默認沒有
delay = context.getLong("delay", 2000L);//延遲時間
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {
//ctrl+alt+t
//循環創建Event,輸出給Channel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/** * source提交event會先經過攔截器--->讀源碼processEvent * event = this.interceptorChain.intercept(event);//經過攔截器攔截 * if (event != null) {//如果攔截器返回的不是null才會進入if,繼續執行 * List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector是一個channel選擇器,返回list是因為可能需要傳給多個channel * Iterator var3 = requiredChannels.iterator();//需要遍曆channel * while(var3.hasNext()) { * Channel reqChannel = (Channel)var3.next(); * Transaction tx = reqChannel.getTransaction();//獲取事務 * Preconditions.checkNotNull(tx, "Transaction object must not be null"); * try { * tx.begin();//開啟事務 * reqChannel.put(event); * tx.commit();//提交事務 * } catch (Throwable var17) { * tx.rollback();//發生异常則會回滾 * ...... * }.... */
}
Thread.sleep(delay);//一旦得到返回值是READY,又立馬調用process函數
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- 項目jar放入flume的lib目錄下


- /jobs/t7下編寫配置文件mysource-flume-logger.conf
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 啟動順序flume
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
- 未設置前綴與後綴

- 設置前綴與後綴

自定義Sink
1.說明
- Sink 不斷地輪詢 Channel 中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個 FlumeAgent
- Sink 是完全事務性的
在從 Channel 批量删除數據之前,每個 Sink 用 Channel 啟動一個事務
批量事件一旦成功寫出到存儲系統或下一個 Flume Agent,Sink 就利用 Channel 提交事務
事務一旦被提交,該 Channel 從自己的內部緩沖區删除事件
- 官方提供的 Sink 類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些 Sink
2.自定義步驟(參考官方文檔)
- extends AbstractSink implements Configurable
- 方法
configure(Context context)//初始化context,即讀取配置文件中的信息,每個信息對應一個配置項
process()//獲取數據,封裝成Event並寫入Channel,這個方法被循環調用
3.案例需求
- flume 接收數據,並給每條數據添加前綴與後綴(若不添加則使用默認)輸出到控制臺
4.原理圖

5.實現
- 添加依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySink代碼
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** * @program: Flume-MyISS * @description: * @author: 作者 * @create: 2022-06-26 22:39 */
public class MySink extends AbstractSink implements Configurable {
//創建 Logger 對象
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
//讀取配置文件內容,有默認值
prefix = context.getString("pre", "pre-hello:");
//讀取配置文件內容,無默認值
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1.獲取channel,開啟事務
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2.從channel中抓去數據,打印到控制臺
try {
//2.1抓取數據
Event event;
while (true) {
//抓取不到數據就一直循環的抓取
event = channel.take();
if (event != null) {
break;
}
}
//2.2處理數據
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3提交事務
transaction.commit();
return Status.READY;
} catch (Exception e) {
//回滾
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
- 項目jar放入flume的lib目錄下


- /jobs/t8下編寫配置文件netcat-flume-mysink.conf
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 啟動順序flume
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
- 端口發送數據
telnet localhost 44444
- 未設置前綴與後綴

- 設置前綴與後綴

边栏推荐
- JVM (2) garbage collection
- 1404萬!四川省人社廳關系型數據庫及中間件軟件系統昇級采購招標!
- The era of data security solutions
- Game maker Foundation presents: Valley of belonging
- Nacos problem
- Dynamics CRM: 本地部署的服务器中, Sandbox, Unzip, VSS, Asynchronous还有Monitor服务的作用
- k线图经典图解(收藏版)
- JVM (3) class loading
- ArrayList&lt;Integer&gt;使用==比较值是否相等出现 -129!=-129的情况思考
- Sophomore majoring in software engineering, the previous learning situation is not very good. How to plan the follow-up development route
猜你喜欢

创作者基金会 6 月份亮点

What if the win11 policy service is disabled? Solution to disabling win11 policy service

npm ERR! fatal: early EOF npm ERR! fatal: index-pack failed

Creators foundation highlights in June

There are more than 20 databases in a MySQL with 3306 ports. How can I backup more than 20 databases with one click and do system backup to prevent data from being deleted by mistake?

JVM(4) 字节码技术+运行期优化

Flutter calls Baidu map app to realize location search and route planning

【观察】软通动力刘天文:拥抱变化“顺势而为”,做中国数字经济“使能者”...

Linux安装MySQL5

Win11 system component cannot be opened? Win11 system widget cannot be opened solution
随机推荐
[sword finger offer] 51 Reverse pair in array
4-1端口扫描技术
As the "only" privacy computing provider, insight technology is the "first" to settle in the Yangtze River Delta data element circulation service platform
Union find
MySQL remote connection
Automatically obtain local connection and network address modification
[software testing] 01 -- software life cycle and software development model
Lock4j -- distributed lock Middleware -- customize the logic of lock acquisition failure
In 2022, the financial interest rate has dropped, so how to choose financial products?
@Sneakythlows annotation
k线图经典图解(收藏版)
自动获取本地连接及网络地址修改
1404萬!四川省人社廳關系型數據庫及中間件軟件系統昇級采購招標!
【软件测试】01 -- 软件生命周期、软件开发模型
罗清启:高端家电已成红海?卡萨帝率先破局
JVM (2) garbage collection
Mba-day19 if P then q contradictory relation P and not Q
Luoqingqi: has high-end household appliances become a red sea? Casati took the lead in breaking the game
static静态成员变量使用@Value注入方式
【Proteus仿真】矩阵键盘中断扫描