当前位置:网站首页>Flume配置4——自定义MYSQLSource
Flume配置4——自定义MYSQLSource
2022-07-05 02:41:00 【一个正在努力的菜鸡】
自定义MySQLSource说明
- 官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source
- Source 的目的是从外部客户端接收数据并将其存储到配置的 Channels 中
- 如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource
虚拟机未安装MySQL先安装
1.mysql8(推荐)
2.mysql5
实现
1.新建项目Flume-MySQLSource
2.添加依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
3.添加配置信息,不是spring项目,所以没有使用yml配置
- jdbc.properties
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop100:3306/mysource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=aaaa #linux中的数据库
- log4j. properties
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
4.SQLSourceHelper
- 说明


- 代码分析

- 代码实现
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/** * @program: Flume-MySQLSource * @description: * @author: 作者 * @create: 2022-06-25 18:50 */
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay;//两次查询的时间间隔
private int startFrom;//开始id
private int currentIndex;//当前id
private int recordSixe = 0;//每次查询返回结果的条数
private int maxRow;//每次查询的最大条数
private String table;//要操作的表
private String columnsToSelect;//用户传入的查询的列
private String customQuery;//用户传入的查询语句
private String query;//构建的查询语句
private String defaultCharsetResultSet;//编码集
//上下文,用来获取配置文件
private Context context;
//为定义的变量赋值(默认值),可在flume任务的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
//加载静态资源
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
//获取JDBC连接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null) {
throw new SQLException();
}
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
//构造方法
SQLSourceHelper(Context context) throws ParseException {
//初始化上下文
this.context = context;
//有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
//无默认值参数:获取flume任务配置文件中的参数
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
//校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
checkMandatoryProperties();
//获取当前的id
currentIndex = getStatusDBIndex(startFrom);
//构建查询语句
query = buildQuery();
}
//校验相应的配置信息(表,查询语句以及数据库连接的参数)
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
//构建sql语句
private String buildQuery() {
String sql = "";
//获取当前id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
//以id作为offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
//执行查询
List<List<Object>> executeQuery() {
try {
//每次执行查询时都要重新生成sql,因为id不同
customQuery = buildQuery();
//存放结果的集合
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
//存放一条数据的集合(多个列)
List<Object> row = new ArrayList<>();
//将返回结果放入集合
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// 重新连接
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
//将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty()) {
return allRows;
}
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
//更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
void updateOffset2DB(int size) {
//以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
//执行sql语句
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
//获取当前id的offset
private Integer getStatusDBIndex(int startFrom) {
//从flume_meta表中查询出当前的id是多少
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
//如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
return startFrom;
}
//查询一条数据的执行语句(当前id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
//关闭相关资源
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
5.SQLSource
package com.yc;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** * @program: Flume-MySQLSource * @description: * @author: 作者 * @create: 2022-06-25 18:57 */
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
//打印日志
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
//定义sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public void configure(Context context) {
try {
//初始化
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
//查询数据表
List<List<Object>> result = sqlSourceHelper.executeQuery();
//存放event的集合
List<Event> events = new ArrayList<>();
//存放event头集合
HashMap<String, String> header = new HashMap<>();
//如果有返回数据,则将数据封装为event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
//将event写入channel
this.getChannelProcessor().processEventBatch(events);
//更新数据表中的offset信息
sqlSourceHelper.updateOffset2DB(result.size());
}
//等待时长
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
//关闭资源
sqlSourceHelper.close();
} finally {
super.stop();
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
6.两个jar放入flume的lib目录下
- 项目jar


- mysql

7./jobs/t9下编写配置文件
- vim mysql-flume-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.10.100:3306/mysource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = aaaa
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
8.建库,建表
- 建库
CREATE DATABASE mysource;
- 建表
use mysource;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
- 插4条数据
insert into student(name) values ('zhaoliu');

9.启动
bin/flume-ng agent --conf conf --conf-file jobs/t9/mysql-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
10.类似于如图结果输出则成功

边栏推荐
- Introduce reflow & repaint, and how to optimize it?
- [illumination du destin - 38]: Ghost Valley - chapitre 5 Flying clamp - one of the Warnings: There is a kind of killing called "hold Kill"
- 返回二叉树中两个节点的最低公共祖先
- RichView TRVStyle MainRVStyle
- The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
- Returns the lowest common ancestor of two nodes in a binary tree
- Blue bridge - maximum common divisor and minimum common multiple
- ELFK部署
- Hmi-32- [motion mode] add light panel and basic information column
- [技术发展-26]:新型信息与通信网络的数据安全
猜你喜欢

Unpool(nn.MaxUnpool2d)

Application and Optimization Practice of redis in vivo push platform

Character painting, I use characters to draw a Bing Dwen Dwen
![[uc/os-iii] chapter 1.2.3.4 understanding RTOS](/img/33/1d94583a834060cc31cab36db09d6e.jpg)
[uc/os-iii] chapter 1.2.3.4 understanding RTOS

【LeetCode】106. Construct binary tree from middle order and post order traversal sequence (wrong question 2)

Visual studio 2019 set transparent background (fool teaching)

2021 Li Hongyi machine learning (1): basic concepts

Design of KTV intelligent dimming system based on MCU

ELFK部署

CAM Pytorch
随机推荐
Exploration of short text analysis in the field of medical and health (I)
[understanding of opportunity -38]: Guiguzi - Chapter 5 flying clamp - warning one: there is a kind of killing called "killing"
数据库和充值都没有了
Design and implementation of campus epidemic prevention and control system based on SSM
Visual studio 2019 set transparent background (fool teaching)
Practical case of SQL optimization: speed up your database
openresty ngx_lua執行階段
Last words record
spoon插入更新oracle数据库,插了一部分提示报错Assertion botch: negative time
Learn game model 3D characters, come out to find a job?
使用druid连接MySQL数据库报类型错误
Acwing第 58 场周赛【完结】
Acwing game 58 [End]
Erreur de type de datagramme MySQL en utilisant Druid
Avoid material "minefields"! Play with super high conversion rate
[illumination du destin - 38]: Ghost Valley - chapitre 5 Flying clamp - one of the Warnings: There is a kind of killing called "hold Kill"
Scientific research: are women better than men?
Pytest (5) - assertion
Advanced learning of MySQL -- Application -- Introduction
The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety