当前位置:网站首页>Flume configuration 4 - customize mysqlsource
Flume configuration 4 - customize mysqlsource
2022-07-05 02:45:00 【A vegetable chicken that is working hard】
Customize MySQLSource explain
- Official source There are many types , But sometimes it can't meet the needs of the actual development , At this point, we need to customize some Source
- Source The purpose of is to receive data from an external client and store it in the configured Channels in
- Such as : Real-time monitoring MySQL, from MySQL Get data from and transfer it to HDFS Or other storage framework , So at this time, we need to realize MySQLSource
The virtual machine is not installed MySQL Install first
1.mysql8( recommend )
2.mysql5
Realization
1. New projects Flume-MySQLSource
2. Add dependency
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
3. Add configuration information , No spring project , So I didn't use yml To configure
- jdbc.properties
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop100:3306/mysource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=aaaa #linux Database in
- log4j. properties
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
4.SQLSourceHelper
- explain
- The code analysis
- Code implementation
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:50 */
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay;// Time interval between two queries
private int startFrom;// Start id
private int currentIndex;// At present id
private int recordSixe = 0;// The number of returned results per query
private int maxRow;// Maximum number of entries per query
private String table;// Watch to operate
private String columnsToSelect;// The column of the query passed in by the user
private String customQuery;// Query statement passed in by user
private String query;// Query statement constructed
private String defaultCharsetResultSet;// Encoding set
// Context , Used to get the configuration file
private Context context;
// Assign values to defined variables ( The default value is ), Can be found in flume Task in the configuration file
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
// Load static resources
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
// obtain JDBC Connect
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null) {
throw new SQLException();
}
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Construction method
SQLSourceHelper(Context context) throws ParseException {
// Initialization context
this.context = context;
// Parameters with default values : obtain flume Parameters in the task configuration file , Unreadable use default value
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
// No default parameter : obtain flume Parameters in the task configuration file
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
// Verify the corresponding configuration information , If there is no default value, the parameter is not assigned , Throw an exception
checkMandatoryProperties();
// Get current id
currentIndex = getStatusDBIndex(startFrom);
// Building query statements
query = buildQuery();
}
// Verify the corresponding configuration information ( surface , Query statements and database connection parameters )
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
// structure sql sentence
private String buildQuery() {
String sql = "";
// Get current id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
// With id As offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
// Execute the query
List<List<Object>> executeQuery() {
try {
// The query is regenerated every time it is executed sql, because id Different
customQuery = buildQuery();
// A collection of results
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
// A collection that holds a piece of data ( Multiple columns )
List<Object> row = new ArrayList<>();
// Put the returned results into the collection
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// Reconnect the
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
// Convert result set to string , Every piece of data is a list aggregate , Take each small one list Set to a string
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty()) {
return allRows;
}
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
// to update offset Metadata status , Each time the result set is returned, it is called. . The number of queries must be recorded offset value , Used when the program interrupts the run data , With id by offset
void updateOffset2DB(int size) {
// With source_tab As KEY, If not, insert , If it exists, it will be updated ( Each source table corresponds to a record )
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
+ this.table
+ "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
// perform sql sentence
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
// Get current id Of offset
private Integer getStatusDBIndex(int startFrom) {
// from flume_meta The current id How much is the
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
// If there is no data , It means that the data has not been saved in the first query or data table , Returns the value originally passed in
return startFrom;
}
// Query a data execution statement ( At present id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// Shut down related resources
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
5.SQLSource
package com.yc;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/** * @program: Flume-MySQLSource * @description: * @author: author * @create: 2022-06-25 18:57 */
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
// Print log
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
// Definition sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public void configure(Context context) {
try {
// initialization
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
// Query data table
List<List<Object>> result = sqlSourceHelper.executeQuery();
// Deposit event Set
List<Event> events = new ArrayList<>();
// Deposit event Header set
HashMap<String, String> header = new HashMap<>();
// If there is data returned , Then encapsulate the data as event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
// take event write in channel
this.getChannelProcessor().processEventBatch(events);
// Update the offset Information
sqlSourceHelper.updateOffset2DB(result.size());
}
// The waiting time
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
// close resource
sqlSourceHelper.close();
} finally {
super.stop();
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
6. Two jar Put in flume Of lib Under the table of contents
- project jar
- mysql
7./jobs/t9 Write the configuration file
- vim mysql-flume-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.yc.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.10.100:3306/mysource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = aaaa
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
8. Building database , Build table
- Building database
CREATE DATABASE mysource;
- Build table
use mysource;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
- insert 4 Data
insert into student(name) values ('zhaoliu');
9. start-up
bin/flume-ng agent --conf conf --conf-file jobs/t9/mysql-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
10. Similar to the figure, the result output is successful
边栏推荐
- When to catch an exception and when to throw an exception- When to catch the Exception vs When to throw the Exceptions?
- Why is this an undefined behavior- Why is this an undefined behavior?
- 数据库和充值都没有了
- When the low alcohol race track enters the reshuffle period, how can the new brand break the three major problems?
- The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
- SFTP cannot connect to the server # yyds dry goods inventory #
- Vb+access hotel service management system
- LeetCode 314. Binary tree vertical order traversal - Binary Tree Series Question 6
- Exploration of short text analysis in the field of medical and health (I)
- Comparison of advantages and disadvantages between platform entry and independent deployment
猜你喜欢
Zabbix
Avoid material "minefields"! Play with super high conversion rate
Hmi-30- [motion mode] the module on the right side of the instrument starts to write
Port, domain name, protocol.
8. Commodity management - commodity classification
【LeetCode】404. Sum of left leaves (2 brushes of wrong questions)
LeetCode 314. Binary tree vertical order traversal - Binary Tree Series Question 6
单项框 复选框
Problem solving: attributeerror: 'nonetype' object has no attribute 'append‘
Bert fine tuning skills experiment
随机推荐
Spoon inserts and updates the Oracle database, and some prompts are inserted with errors. Assertion botch: negative time
[daily problem insight] Li Kou - the 280th weekly match (I really didn't know it could be so simple to solve other people's problems)
The perfect car for successful people: BMW X7! Superior performance, excellent comfort and safety
Unpool(nn.MaxUnpool2d)
Summary and practice of knowledge map construction technology
【LeetCode】501. Mode in binary search tree (2 wrong questions)
GFS distributed file system
Flume配置4——自定义MYSQLSource
[uc/os-iii] chapter 1.2.3.4 understanding RTOS
Utilisation simple de devtools
[micro service SCG] 33 usages of filters
Bumblebee: build, deliver, and run ebpf programs smoothly like silk
Hmi-30- [motion mode] the module on the right side of the instrument starts to write
When to catch an exception and when to throw an exception- When to catch the Exception vs When to throw the Exceptions?
[Yu Yue education] National Open University spring 2019 0505-22t basic nursing reference questions
Privatization lightweight continuous integration deployment scheme -- 01 environment configuration (Part 1)
Pytest (5) - assertion
Idea inheritance relationship
Introduce reflow & repaint, and how to optimize it?
丸子百度小程序详细配置教程,审核通过。