当前位置:网站首页>Flume配置4——自定义MYSQLSource
Flume配置4——自定义MYSQLSource
2022-07-05 02:41:00 【一个正在努力的菜鸡】
自定义MySQLSource说明
- 官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source
- Source 的目的是从外部客户端接收数据并将其存储到配置的 Channels 中
- 如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource
虚拟机未安装MySQL先安装
1.mysql8(推荐)
2.mysql5
实现
1.新建项目Flume-MySQLSource
2.添加依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
3.添加配置信息,不是spring项目,所以没有使用yml配置
- jdbc.properties
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop100:3306/mysource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=aaaa #linux中的数据库
- log4j. properties
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
4.SQLSourceHelper
- 说明
- 代码分析
- 代码实现
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/** * @program: Flume-MySQLSource * @description: * @author: 作者 * @create: 2022-06-25 18:50 */
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay;//两次查询的时间间隔
private int startFrom;//开始id
private int currentIndex;//当前id
private int recordSixe = 0;//每次查询返回结果的条数
private int maxRow;//每次查询的最大条数
private String table;//要操作的表
private String columnsToSelect;//用户传入的查询的列
private String customQuery;//用户传入的查询语句
private String query;//构建的查询语句
private String defaultCharsetResultSet;//编码集
//上下文,用来获取配置文件
private Context context;
//为定义的变量赋值(默认值),可在flume任务的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
//加载静态资源
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
//获取JDBC连接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null) {
throw new SQLException();
}
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
//构造方法
SQLSourceHelper(Context context) throws ParseException {
//初始化上下文
this.context = context;
//有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
//无默认值参数:获取flume任务配置文件中的参数
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
//校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
checkMandatoryProperties();
//获取当前的id
currentIndex = getStatusDBIndex(startFrom);
//构建查询语句
query = buildQuery();
}
//校验相应的配置信息(表,查询语句以及数据库连接的参数)
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
//构建sql语句
private String buildQuery() {
String sql = "";
//获取当前id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
//以id作为offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
//执行查询
List<List<Object>> executeQuery() {
try {
//每次执行查询时都要重新生成sql,因为id不同
customQuery = buildQuery();
//存放结果的集合
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
//存放一条数据的集合(多个列)
List<Object> row = new ArrayList<>();
//将返回结果放入集合
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// 重新连接
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
//将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty()) {
return allRows;
}
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
//更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
void updateOffset2DB(int size) {
//以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
//执行sql语句
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
//获取当前id的offset
private Integer getStatusDBIndex(int startFrom) {
//从flume_meta表中查询出当前的id是多少
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
//如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
return startFrom;
}
//查询一条数据的执行语句(当前id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
//关闭相关资源
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
5.SQLSource
package com.yc;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** * @program: Flume-MySQLSource * @description: * @author: 作者 * @create: 2022-06-25 18:57 */
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
//打印日志
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
//定义sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public void configure(Context context) {
try {
//初始化
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
//查询数据表
List<List<Object>> result = sqlSourceHelper.executeQuery();
//存放event的集合
List<Event> events = new ArrayList<>();
//存放event头集合
HashMap<String, String> header = new HashMap<>();
//如果有返回数据,则将数据封装为event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
//将event写入channel
this.getChannelProcessor().processEventBatch(events);
//更新数据表中的offset信息
sqlSourceHelper.updateOffset2DB(result.size());
}
//等待时长
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
//关闭资源
sqlSourceHelper.close();
} finally {
super.stop();
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
6.两个jar放入flume的lib目录下
- 项目jar
- mysql
7./jobs/t9下编写配置文件
- vim mysql-flume-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.10.100:3306/mysource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = aaaa
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
8.建库,建表
- 建库
CREATE DATABASE mysource;
- 建表
use mysource;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
- 插4条数据
insert into student(name) values ('zhaoliu');
9.启动
bin/flume-ng agent --conf conf --conf-file jobs/t9/mysql-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
10.类似于如图结果输出则成功
边栏推荐
- [uc/os-iii] chapter 1.2.3.4 understanding RTOS
- ELFK部署
- Asp+access campus network goods trading platform
- Design of KTV intelligent dimming system based on MCU
- Yyds dry goods inventory intelligent fan based on CC2530 design
- Can you really learn 3DMAX modeling by self-study?
- PHP cli getting input from user and then dumping into variable possible?
- Naacl 2021 | contrastive learning sweeping text clustering task
- Traditional chips and AI chips
- The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
猜你喜欢
Acwing第 58 场周赛【完结】
A label making navigation bar
[source code attached] Intelligent Recommendation System Based on knowledge map -sylvie rabbit
【LeetCode】106. Construct binary tree from middle order and post order traversal sequence (wrong question 2)
Asp+access campus network goods trading platform
Exploration of short text analysis in the field of medical and health (I)
Hmi-31- [motion mode] solve the problem of picture display of music module
Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
Prometheus monitors the correct posture of redis cluster
CAM Pytorch
随机推荐
ELK日志分析系统
低度酒赛道进入洗牌期,新品牌如何破局三大难题?
Bumblebee: build, deliver, and run ebpf programs smoothly like silk
Redis distributed lock, lock code logic
打破信息茧房-我主动获取信息的方法 -#3
Which common ports should the server open
[uc/os-iii] chapter 1.2.3.4 understanding RTOS
Bert fine tuning skills experiment
d3js小记
[機緣參悟-38]:鬼穀子-第五飛箝篇 - 警示之一:有一種殺稱為“捧殺”
PHP cli getting input from user and then dumping into variable possible?
Design and implementation of high availability website architecture
Asynchronous and promise
Summary and practice of knowledge map construction technology
丸子百度小程序详细配置教程,审核通过。
A label colorful navigation bar
TCP security of network security foundation
The most powerful new household god card of Bank of communications. Apply to earn 2100 yuan. Hurry up if you haven't applied!
Elk log analysis system
[daily problem insight] Li Kou - the 280th weekly match (I really didn't know it could be so simple to solve other people's problems)