当前位置:网站首页>Flume configuration 4 - customize mysqlsource
Flume configuration 4 - customize mysqlsource
2022-07-05 02:45:00 【A vegetable chicken that is working hard】
Customize MySQLSource explain
- Official source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source
- Source The purpose of is to receive data from an external client and store it in the configured Channels in
- Such as : Real-time monitoring MySQL, from MySQL Get data from and transfer it to HDFS Or other storage framework , So at this time, we need to realize MySQLSource
The virtual machine is not installed MySQL Install first
1.mysql8( recommend )
2.mysql5
Realization
1. New projects Flume-MySQLSource
2. Add dependency
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
3. Add configuration information , No spring project , So I didn't use yml To configure
- jdbc.properties
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop100:3306/mysource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=aaaa #linux Database in
- log4j. properties
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
4.SQLSourceHelper
- explain


- The code analysis

- Code implementation
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:50 */
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay;// Time interval between two queries
private int startFrom;// Start id
private int currentIndex;// At present id
private int recordSixe = 0;// The number of returned results per query
private int maxRow;// Maximum number of entries per query
private String table;// Watch to operate
private String columnsToSelect;// The column of the query passed in by the user
private String customQuery;// Query statement passed in by user
private String query;// Query statement constructed
private String defaultCharsetResultSet;// Encoding set
// Context , Used to get the configuration file
private Context context;
// Assign values to defined variables ( The default value is ), Can be found in flume Task in the configuration file
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
// Load static resources
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
// obtain JDBC Connect
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null) {
throw new SQLException();
}
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Construction method
SQLSourceHelper(Context context) throws ParseException {
// Initialization context
this.context = context;
// Parameters with default values : obtain flume Parameters in the task configuration file , Unreadable use default value
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
// No default parameter : obtain flume Parameters in the task configuration file
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
// Verify the corresponding configuration information , If there is no default value, the parameter is not assigned , Throw an exception
checkMandatoryProperties();
// Get current id
currentIndex = getStatusDBIndex(startFrom);
// Building query statements
query = buildQuery();
}
// Verify the corresponding configuration information ( surface , Query statements and database connection parameters )
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
// structure sql sentence
private String buildQuery() {
String sql = "";
// Get current id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
// With id As offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
// Execute the query
List<List<Object>> executeQuery() {
try {
// The query is regenerated every time it is executed sql, because id Different
customQuery = buildQuery();
// A collection of results
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
// A collection that holds a piece of data ( Multiple columns )
List<Object> row = new ArrayList<>();
// Put the returned results into the collection
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// Reconnect the
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
// Convert result set to string , Every piece of data is a list aggregate , Take each small one list Set to a string
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty()) {
return allRows;
}
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
// to update offset Metadata status , Each time the result set is returned, it is called. . The number of queries must be recorded offset value , Used when the program interrupts the run data , With id by offset
void updateOffset2DB(int size) {
// With source_tab As KEY, If not, insert , If it exists, it will be updated ( Each source table corresponds to a record )
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
// perform sql sentence
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
// Get current id Of offset
private Integer getStatusDBIndex(int startFrom) {
// from flume_meta The current id How much is the
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
// If there is no data , It means that the data has not been saved in the first query or data table , Returns the value originally passed in
return startFrom;
}
// Query a data execution statement ( At present id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Shut down related resources
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
5.SQLSource
package com.yc;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:57 */
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
// Print log
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
// Definition sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public void configure(Context context) {
try {
// initialization
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
// Query data table
List<List<Object>> result = sqlSourceHelper.executeQuery();
// Deposit event Set
List<Event> events = new ArrayList<>();
// Deposit event Header set
HashMap<String, String> header = new HashMap<>();
// If there is data returned , Then encapsulate the data as event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
// take event write in channel
this.getChannelProcessor().processEventBatch(events);
// Update the offset Information
sqlSourceHelper.updateOffset2DB(result.size());
}
// The waiting time
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
// close resource
sqlSourceHelper.close();
} finally {
super.stop();
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
6. Two jar Put in flume Of lib Under the table of contents
- project jar


- mysql

7./jobs/t9 Write the configuration file
- vim mysql-flume-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.10.100:3306/mysource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = aaaa
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
8. Building database , Build table
- Building database
CREATE DATABASE mysource;
- Build table
use mysource;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
- insert 4 Data
insert into student(name) values ('zhaoliu');

9. start-up
bin/flume-ng agent --conf conf --conf-file jobs/t9/mysql-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
10. Similar to the figure, the result output is successful

边栏推荐
- Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
- Yolov5 model training and detection
- The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
- 腾讯云,实现图片上传
- 2021 Li Hongyi machine learning (2): pytorch
- Scientific research: are women better than men?
- Single line function*
- Yolov5 model training and detection
- Apache Web page security optimization
- Six stone programming: advantages of automated testing
猜你喜欢

College Students' innovation project management system

spoon插入更新oracle数据库,插了一部分提示报错Assertion botch: negative time
![Moco V2 literature research [self supervised learning]](/img/bd/79b7b203ea064c65d143116c9f4dd0.jpg)
Moco V2 literature research [self supervised learning]

Privatization lightweight continuous integration deployment scheme -- 01 environment configuration (Part 1)

Apache build web host

Sqoop命令

2021 Li Hongyi machine learning (1): basic concepts

Application and Optimization Practice of redis in vivo push platform

Elk log analysis system

Chinese natural language processing, medical, legal and other public data sets, sorting and sharing
随机推荐
Chinese natural language processing, medical, legal and other public data sets, sorting and sharing
2022/02/13
Linux安装Redis
openresty ngx_ Lua variable operation
Design and implementation of campus epidemic prevention and control system based on SSM
Design and practice of kubernetes cluster and application monitoring scheme
The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
spoon插入更新oracle数据库,插了一部分提示报错Assertion botch: negative time
Structure of ViewModel
Why are there fewer and fewer good products produced by big Internet companies such as Tencent and Alibaba?
Introduce reflow & repaint, and how to optimize it?
2021 Li Hongyi machine learning (2): pytorch
【微服务|SCG】Filters的33种用法
openresty ngx_lua执行阶段
Character painting, I use characters to draw a Bing Dwen Dwen
Spark SQL learning bullet 2
Android advanced interview question record in 2022
Design and implementation of high availability website architecture
Summary and practice of knowledge map construction technology
Design and implementation of community hospital information system