当前位置:网站首页>Configuration du Flume 4 - source personnalisée + sink
Configuration du Flume 4 - source personnalisée + sink
2022-06-29 19:48:00 【Un poulet qui travaille dur.】
PersonnalisationSource
1.Description
- Officially providedsourceIl y a déjà beaucoup de types,Mais parfois ce n'est pas suffisant pour le développement réel,À ce stade, nous devons personnaliser certainsSource
- Source Le but est de recevoir des données d'un client externe et de les stocker dans la configuration Channels Moyenne
2.Étapes personnalisées(Voir les documents officiels)
- class MySource extends AbstractSource implements Configurable,PollableSource
- Méthodes
getBackOffSleepIncrement()//Pas encore.
getMaxBackOffSleepInterval()//Pas encore.
configure(Context context)//Initialisationcontext,C'est - à - dire lire les informations du profil,Chaque message correspond à un élément de configuration
process()//Obtenir des données,Encapsulé dansEventEt écritChannel,Cette méthode est appelée en boucle
stop()//Fermer les ressources associées
3.Besoins en matière de cas
- flumeCollecte de données,Et ajouter un préfixe et un suffixe à chaque donnée,Utilisez par défaut si vous n'ajoutez pas,Sortie finale vers la console
4.Schéma

5.Réalisation
- Ajouter une dépendance
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySourceCode
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
import java.util.Map;
/** * @program: Flume-MyISS * @description: * @author: Auteur * @create: 2022-06-26 21:21 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
//Préfixe,Suffixe
private String prefix;
private String subfix;
private Long delay;
@Override//Initialisation des informations de configuration,Accès.confConfiguration dans
public void configure(Context context) {
prefix = context.getString("pre", "pre-");// Le deuxième paramètre est la valeur par défaut donnée
subfix = context.getString("sub");// Aucune valeur par défaut n'est donnée , C'est - à - dire qu'il n'y a pas de
delay = context.getLong("delay", 2000L);//Délai
}
@Override
public Status process() throws EventDeliveryException {
Event event = new SimpleEvent();
Map<String, String> headers = new HashMap<>();
try {
//ctrl+alt+t
//Création de bouclesEvent,Produits versChannel
for (int i = 0; i < 5; i++) {
event.setHeaders(headers);
event.setBody((prefix + "hello" + subfix).getBytes());
getChannelProcessor().processEvent(event);
/** * sourceSoumettreevent Passe devant l'intercepteur --->Lire le code sourceprocessEvent * event = this.interceptorChain.intercept(event);// Intercepté par un intercepteur * if (event != null) {// Si l'intercepteur ne retourne pas nullPour entrerif,Poursuivre la mise en œuvre * List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selectorC'est unchannelSélecteur,Retourlist C'est parce qu'il peut être nécessaire de passer à plusieurs channel * Iterator var3 = requiredChannels.iterator();//Besoin de traverserchannel * while(var3.hasNext()) { * Channel reqChannel = (Channel)var3.next(); * Transaction tx = reqChannel.getTransaction();//Obtenir une transaction * Preconditions.checkNotNull(tx, "Transaction object must not be null"); * try { * tx.begin();//Ouvrir la transaction * reqChannel.put(event); * tx.commit();//Soumettre une transaction * } catch (Throwable var17) { * tx.rollback();// Si une exception se produit, elle est retournée * ...... * }.... */
}
Thread.sleep(delay);// Une fois que la valeur de retour est READY, Encore une fois. processFonctions
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- ProjetsjarMettre dansflumeDelibSous la table des matières


- /jobs/t7Écrire le profil suivantmysource-flume-logger.conf
vim mysource-flume-logger.conf
---------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.source.MySource
a1.sources.r1.delay = 4000
#a1.sources.r1.pre = pre-pre
#a1.sources.r1.sub = sub
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Séquence de démarrageflume
bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
- Préfixe et suffixe non définis

- Définir les préfixes et les suffixes

PersonnalisationSink
1.Description
- Sink Des sondages constants Channel Événements dans et les supprimer par lots,Et écrire ces événements en vrac sur le système de stockage ou d'index、Ou envoyé à un autre FlumeAgent
- Sink Est entièrement transactionnel
De Channel Avant de supprimer les données par lots,Chaque Sink Avec Channel Démarrer une transaction
Une fois que l'événement Batch a été écrit avec succès sur le système de stockage ou suivant Flume Agent,Sink Utilise - le. Channel Soumettre une transaction
Une fois la transaction engagée,Le Channel Supprimer l'événement de son propre tampon interne
- Officially provided Sink Il y a déjà beaucoup de types,Mais parfois ce n'est pas suffisant pour le développement réel,À ce stade, nous devons personnaliser certains Sink
2.Étapes personnalisées(Voir les documents officiels)
- extends AbstractSink implements Configurable
- Méthodes
configure(Context context)//Initialisationcontext,C'est - à - dire lire les informations du profil,Chaque message correspond à un élément de configuration
process()//Obtenir des données,Encapsulé dansEventEt écritChannel,Cette méthode est appelée en boucle
3.Besoins en matière de cas
- flume Réception des données,Et ajouter un préfixe et un suffixe à chaque donnée(Utilisez par défaut si vous n'ajoutez pas)Sortie vers la console
4.Schéma

5.Réalisation
- Ajouter une dépendance
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
- MySinkCode
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** * @program: Flume-MyISS * @description: * @author: Auteur * @create: 2022-06-26 22:39 */
public class MySink extends AbstractSink implements Configurable {
//Création Logger Objet
private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
private String prefix;
private String subfix;
@Override
public void configure(Context context) {
//Lire le contenu du profil,Il y a une valeur par défaut
prefix = context.getString("pre", "pre-hello:");
//Lire le contenu du profil,Aucune valeur par défaut
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1.Accèschannel,Ouvrir la transaction
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2.Dechannel Pour saisir les données ,Imprimer sur la console
try {
//2.1Saisir les données
Event event;
while (true) {
// Une prise qui tourne sans données
event = channel.take();
if (event != null) {
break;
}
}
//2.2Traitement des données
LOG.info(new String(prefix + event.getBody() + subfix));
//2.3Soumettre une transaction
transaction.commit();
return Status.READY;
} catch (Exception e) {
//Retour en arrière
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
- ProjetsjarMettre dansflumeDelibSous la table des matières


- /jobs/t8Écrire le profil suivantnetcat-flume-mysink.conf
vim netcat-flume-mysink.conf
------------------------------
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.yc.sink.MySink
#a1.sinks.k1.pre = pre-pre
#a1.sinks.k1.sub = sub
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Séquence de démarrageflume
bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
- Port send Data
telnet localhost 44444
- Préfixe et suffixe non définis

- Définir les préfixes et les suffixes

边栏推荐
- Zotero journal automatic matching update influence factor
- There are more than 20 databases in a MySQL with 3306 ports. How can I backup more than 20 databases with one click and do system backup to prevent data from being deleted by mistake?
- Understanding of software test logic coverage
- ArrayList&lt; Integer&gt; Use = = to compare whether the values are equal, and -129=- 129 situation thinking
- Lock4j -- distributed lock Middleware -- customize the logic of lock acquisition failure
- 2022年理财利率都降了,那该如何选择理财产品?
- 1404万!四川省人社厅关系型数据库及中间件软件系统升级采购招标!
- freemarker模板框架生成图片
- 2022年深圳市福田区支持招商引资若干政策
- Flume配置4——自定义Source+Sink
猜你喜欢

@Sneakythlows annotation

Game Maker 基金会呈献:归属之谷

Shell bash script note: there must be no other irrelevant characters after the escape character \ at the end of a single line (multi line command)

MySQL remote connection

ASP.Net Core创建Razor页面上传多个文件(缓冲方式)(续)

@SneakyThrows注解

洞见科技作为「唯一」隐私计算数商,「首批」入驻长三角数据要素流通服务平台

JVM(4) 字节码技术+运行期优化

How to solve the problem of insufficient memory space in Apple iPhone upgrade system?

idea中方法上没有小绿色三角
随机推荐
罗清启:高端家电已成红海?卡萨帝率先破局
【剑指Offer】51. 数组中的逆序对
WPS和Excele
How to use filters in jfinal to monitor Druid for SQL execution?
Sword finger offer 41 Median in data stream
Linux安装MySQL5
剑指 Offer 59 - II. 队列的最大值
数据安全解决方案的大时代
docker compose 部署Flask项目并构建redis服务
Zotero期刊自動匹配更新影響因子
JVM(4) 字节码技术+运行期优化
Where is the win11 installation permission set? Win11 installation permission setting method
Union find
What about frequent network disconnection of win11 system? Solution to win11 network instability
Arm comprehensive computing solution redefines visual experience and powerfully enables mobile games
Have you mastered all the testing methods of technology to ensure quality and software testing?
Canonical engineers are trying to solve the performance problem of Firefox snap
freemarker模板框架生成图片
KDD 2022 | characterization alignment and uniformity are considered in collaborative filtering
Test method learning