当前位置:网站首页>Flume configuration 4 - customize mysqlsource
Flume configuration 4 - customize mysqlsource
2022-07-05 02:45:00 【A vegetable chicken that is working hard】
Customize MySQLSource explain
- Official source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source
- Source The purpose of is to receive data from an external client and store it in the configured Channels in
- Such as : Real-time monitoring MySQL, from MySQL Get data from and transfer it to HDFS Or other storage framework , So at this time, we need to realize MySQLSource
The virtual machine is not installed MySQL Install first
1.mysql8( recommend )
2.mysql5
Realization
1. New projects Flume-MySQLSource
2. Add dependency
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
3. Add configuration information , No spring project , So I didn't use yml To configure
- jdbc.properties
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop100:3306/mysource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=aaaa #linux Database in
- log4j. properties
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
4.SQLSourceHelper
- explain
- The code analysis
- Code implementation
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:50 */
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay;// Time interval between two queries
private int startFrom;// Start id
private int currentIndex;// At present id
private int recordSixe = 0;// The number of returned results per query
private int maxRow;// Maximum number of entries per query
private String table;// Watch to operate
private String columnsToSelect;// The column of the query passed in by the user
private String customQuery;// Query statement passed in by user
private String query;// Query statement constructed
private String defaultCharsetResultSet;// Encoding set
// Context , Used to get the configuration file
private Context context;
// Assign values to defined variables ( The default value is ), Can be found in flume Task in the configuration file
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
// Load static resources
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
// obtain JDBC Connect
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null) {
throw new SQLException();
}
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Construction method
SQLSourceHelper(Context context) throws ParseException {
// Initialization context
this.context = context;
// Parameters with default values : obtain flume Parameters in the task configuration file , Unreadable use default value
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
// No default parameter : obtain flume Parameters in the task configuration file
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
// Verify the corresponding configuration information , If there is no default value, the parameter is not assigned , Throw an exception
checkMandatoryProperties();
// Get current id
currentIndex = getStatusDBIndex(startFrom);
// Building query statements
query = buildQuery();
}
// Verify the corresponding configuration information ( surface , Query statements and database connection parameters )
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
// structure sql sentence
private String buildQuery() {
String sql = "";
// Get current id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
// With id As offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
// Execute the query
List<List<Object>> executeQuery() {
try {
// The query is regenerated every time it is executed sql, because id Different
customQuery = buildQuery();
// A collection of results
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
// A collection that holds a piece of data ( Multiple columns )
List<Object> row = new ArrayList<>();
// Put the returned results into the collection
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// Reconnect the
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
// Convert result set to string , Every piece of data is a list aggregate , Take each small one list Set to a string
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty()) {
return allRows;
}
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
// to update offset Metadata status , Each time the result set is returned, it is called. . The number of queries must be recorded offset value , Used when the program interrupts the run data , With id by offset
void updateOffset2DB(int size) {
// With source_tab As KEY, If not, insert , If it exists, it will be updated ( Each source table corresponds to a record )
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
// perform sql sentence
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
// Get current id Of offset
private Integer getStatusDBIndex(int startFrom) {
// from flume_meta The current id How much is the
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
// If there is no data , It means that the data has not been saved in the first query or data table , Returns the value originally passed in
return startFrom;
}
// Query a data execution statement ( At present id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Shut down related resources
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
5.SQLSource
package com.yc;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:57 */
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
// Print log
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
// Definition sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public void configure(Context context) {
try {
// initialization
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
// Query data table
List<List<Object>> result = sqlSourceHelper.executeQuery();
// Deposit event Set
List<Event> events = new ArrayList<>();
// Deposit event Header set
HashMap<String, String> header = new HashMap<>();
// If there is data returned , Then encapsulate the data as event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
// take event write in channel
this.getChannelProcessor().processEventBatch(events);
// Update the offset Information
sqlSourceHelper.updateOffset2DB(result.size());
}
// The waiting time
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
// close resource
sqlSourceHelper.close();
} finally {
super.stop();
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
6. Two jar Put in flume Of lib Under the table of contents
- project jar
- mysql
7./jobs/t9 Write the configuration file
- vim mysql-flume-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.10.100:3306/mysource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = aaaa
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
8. Building database , Build table
- Building database
CREATE DATABASE mysource;
- Build table
use mysource;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
- insert 4 Data
insert into student(name) values ('zhaoliu');
9. start-up
bin/flume-ng agent --conf conf --conf-file jobs/t9/mysql-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
10. Similar to the figure, the result output is successful
边栏推荐
- Structure of ViewModel
- Asp+access campus network goods trading platform
- Problem solving: attributeerror: 'nonetype' object has no attribute 'append‘
- 【微服务|SCG】Filters的33种用法
- Azkaban概述
- Write a thread pool by hand, and take you to learn the implementation principle of ThreadPoolExecutor thread pool
- Design of kindergarten real-time monitoring and control system
- Spark SQL learning bullet 2
- [技术发展-26]:新型信息与通信网络的数据安全
- Master Fur
猜你喜欢
Elk log analysis system
Design and implementation of community hospital information system
[uc/os-iii] chapter 1.2.3.4 understanding RTOS
1.五层网络模型
Watch the online press conference of tdengine community heroes and listen to TD hero talk about the legend of developers
"C zero foundation introduction hundred knowledge and hundred cases" (72) multi wave entrustment -- Mom shouted for dinner
【LeetCode】501. Mode in binary search tree (2 wrong questions)
ELFK部署
CAM Pytorch
Hmi-30- [motion mode] the module on the right side of the instrument starts to write
随机推荐
Using druid to connect to MySQL database reports the wrong type
低度酒赛道进入洗牌期,新品牌如何破局三大难题?
el-select,el-option下拉选择框
Sqoop安装
Azkaban安装部署
Moco V2 literature research [self supervised learning]
Blue bridge - maximum common divisor and minimum common multiple
Single line function*
Comparison of advantages and disadvantages between platform entry and independent deployment
2021 Li Hongyi machine learning (2): pytorch
Kotlin - 协程 Coroutine
Serious bugs with lifted/nullable conversions from int, allowing conversion from decimal
Start the remedial work. Print the contents of the array using the pointer
Design and practice of kubernetes cluster and application monitoring scheme
Spoon inserts and updates the Oracle database, and some prompts are inserted with errors. Assertion botch: negative time
Learn game model 3D characters, come out to find a job?
Returns the lowest common ancestor of two nodes in a binary tree
[illumination du destin - 38]: Ghost Valley - chapitre 5 Flying clamp - one of the Warnings: There is a kind of killing called "hold Kill"
Why is this an undefined behavior- Why is this an undefined behavior?
Structure of ViewModel