当前位置:网站首页>Flink real-time data warehouse (IX): incremental synchronization of data in MySQL
Flink real-time data warehouse (IX): incremental synchronization of data in MySQL
2022-07-02 15:50:00 【wx5ba7ab4695f27】
List of articles
- Configuration class bean object
- flink State class programming
- MD5 encryption
- Hbase sink Templates
Configure the flow table in the database
CREATE TABLE `dbus_flow` (
`flowId` int(11) NOT NULL AUTO_INCREMENT COMMENT ' Self increasing ID',
`mode` int(11) NOT NULL COMMENT ' Storage type (#PHOENIX #NATIVE #STRING, Default STRING)',
`databaseName` varchar(50) NOT NULL COMMENT 'database',
`tableName` varchar(50) NOT NULL COMMENT 'table',
`hbaseTable` varchar(50) NOT NULL COMMENT 'hbaseTable',
`family` varchar(50) NOT NULL COMMENT 'family',
`uppercaseQualifier` tinyint(1) NOT NULL COMMENT ' The field name is capitalized , The default is true',
`commitBatch` int(11) NOT NULL COMMENT ' The field name is capitalized , The default is true',
`rowKey` varchar(100) NOT NULL COMMENT ' form rowkey Field name , Must be separated by commas ',
`status` int(11) NOT NULL COMMENT ' state :1- initial ,2: be ready ,3: function ',
PRIMARY KEY (`flowId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
Sample data
INSERT INTO `dbus_flow` VALUES ('1', '0', 'test', 'zyd_orders', 'learing_flink:zyd_orders', '0', '1', '10', 'orderId', '2');
- 1.
jdbc Tool class
package dbus.utils;
import dbus.config.GlobalConfig;
import java.sql.*;
/**
* jdbc The general method
*
*/
public class JdbcUtil {
//url
private static String url = GlobalConfig.DB_URL;
//user
private static String user = GlobalConfig.USER_MAME;
//password
private static String password = GlobalConfig.PASSWORD;
// Driver class
private static String driverClass = GlobalConfig.DRIVER_CLASS;
/**
* Register only once , Static code block
*/
static{
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* Get the connection method
*/
public static Connection getConnection(){
try {
Connection conn = DriverManager.getConnection(url, user, password);
return conn;
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* Ways to release resources
*/
public static void close(Statement stmt,Connection conn){
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
/**
* Ways to release resources
*/
public static void close(ResultSet rs,Statement stmt,Connection conn){
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
System.out.println(JdbcUtil.getConnection());
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
Enumeration class
CodeEnum
package enums;
public interface CodeEnum {
/**
* Gets the enumeration's code value
*
* @return
*/
Integer getCode();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
FlowStatusEnum
package enums;
import lombok.Getter;
@Getter
public enum FlowStatusEnum implements CodeEnum {
/**
* The initial state ( The newly added )
*/
FLOWSTATUS_INIT(0, " The initial state "),
/**
* Ready state , After initial acquisition , You can change the status to ready
*/
FLOWSTATUS_READY(1, " Ready state "),
/**
* Running state ( Incremental collection is running )
*/
FLOWSTATUS_RUNNING(2, " Running state ");
private Integer code;
private String message;
FlowStatusEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
HBaseStorageModeEnum
package enums;
import lombok.Getter;
@Getter
public enum HBaseStorageModeEnum implements CodeEnum{
/**
* STRING
*/
STRING(0, "STRING"),
/**
* NATIVE
*/
NATIVE(1, "NATIVE"),
/**
* PHOENIX
*/
PHOENIX(2, "PHOENIX");
private Integer code;
private String message;
HBaseStorageModeEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
Configuration class bean object
Flow
**package dbus.model;
import enums.FlowStatusEnum;
import enums.HBaseStorageModeEnum;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Flow implements Serializable{
private Integer flowId;
/**
* HBase Storage types in , The default unified save is String,
*/
private int mode= HBaseStorageModeEnum.STRING.getCode();
/**
* Database name /schema name
*/
private String databaseName;
/**
* mysql Table name
*/
private String tableName;
/**
* hbase Table name
*/
private String hbaseTable;
/**
* Default unification Column Family name
*/
private String family;
/**
* The field name is capitalized , The default is true
*/
private boolean uppercaseQualifier=true;
/**
* The size of the batch submission , ETL It is used in
*/
private int commitBatch;
/**
* form rowkey Field name , Must be separated by commas
*/
private String rowKey;
/**
* state
*/
private int status= FlowStatusEnum.FLOWSTATUS_INIT.getCode();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
flink State class programming
package dbus.function;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.incrementssync.IncrementSyncApp;
import dbus.model.Flow;
import enums.FlowStatusEnum;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class DbusProcessFunction extends KeyedBroadcastProcessFunction<String, FlatMessage, Flow, Tuple2<FlatMessage, Flow>> {
@Override
public void processElement(FlatMessage flatMessage, ReadOnlyContext ctx, Collector<Tuple2<FlatMessage, Flow>> out) throws Exception {
// Get configuration flow
Flow flow = ctx.getBroadcastState(IncrementSyncApp.flowStateDescriptor).get(flatMessage.getDatabase() + flatMessage.getTable());
if (null != flow && flow.getStatus() == FlowStatusEnum.FLOWSTATUS_RUNNING.getCode()) {
out.collect(Tuple2.of(flatMessage, flow));
}
}
@Override
public void processBroadcastElement(Flow flow, Context ctx, Collector<Tuple2<FlatMessage, Flow>> collector) throws Exception {
// obtain state state
BroadcastState<String, Flow> broadcastState = ctx.getBroadcastState(IncrementSyncApp.flowStateDescriptor);
// to update state
broadcastState.put(flow.getDatabaseName() + flow.getTableName(), flow);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
MD5 encryption
package dbus.utils;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5Utils {
public static String getMD5String(String str) {
try {
MessageDigest instance = MessageDigest.getInstance("MD5");
byte[] digest = instance.digest(str.getBytes(StandardCharsets.UTF_8));
StringBuffer sb = new StringBuffer();
for (byte by : digest) {
// Get the lower eight significant values of bytes
int i = by & 0xff;
// Convert an integer to 16 Base number
String hexString = Integer.toHexString(i);
if (hexString.length() < 2) {
// If it is 1 A word of , repair 0
hexString = "0" + hexString;
}
sb.append(hexString);
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
Hbase sink Templates
bean object
package dbus.sink;
import java.util.ArrayList;
import java.util.List;
public class HRow {
private byte[] rowkey;
private List<HCell> cells = new ArrayList<>();
public HRow() {
}
public HRow(byte[] rowkey) {
this.rowkey = rowkey;
}
public byte[] getRowkey() {
return rowkey;
}
public void setRowkey(byte[] rowkey) {
this.rowkey = rowkey;
}
public List<HCell> getCells() {
return cells;
}
public void setCells(List<HCell> cells) {
this.cells = cells;
}
public void addCell(String family, String qualifier, byte[] value) {
HCell hCell = new HCell(family, qualifier, value);
cells.add(hCell);
}
public class HCell{
private String family;
private String qualifier;
private byte[] value;
public HCell() {
}
public HCell(String family, String qualifier, byte[] value) {
this.family = family;
this.qualifier = qualifier;
this.value = value;
}
public String getFamily() {
return family;
}
public void setFamily(String family) {
this.family = family;
}
public String getQualifier() {
return qualifier;
}
public void setQualifier(String qualifier) {
this.qualifier = qualifier;
}
public byte[] getValue() {
return value;
}
public void setValue(byte[] value) {
this.value = value;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
hbase Tool class
package dbus.sink;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* hbase Tool class
*/
@Slf4j
public class HbaseTemplate implements Serializable {
private Configuration hbaseConfig; // hbase Configuration object
private Connection conn; // hbase Connect
public HbaseTemplate(Configuration hbaseConfig){
this.hbaseConfig = hbaseConfig;
initConn();
}
private void initConn() {
try {
this.conn = ConnectionFactory.createConnection(hbaseConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Connection getConnection() {
if (conn == null || conn.isAborted() || conn.isClosed()) {
initConn();
}
return conn;
}
public boolean tableExists(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
return admin.tableExists(TableName.valueOf(tableName));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void createTable(String tableName, String... familyNames) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
// Add columns cluster
if (familyNames != null) {
for (String familyName : familyNames) {
HColumnDescriptor hcd = new HColumnDescriptor(familyName);
desc.addFamily(hcd);
}
}
admin.createTable(desc);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void disableTable(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
admin.disableTable(tableName);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void deleteTable(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
if (admin.isTableEnabled(tableName)) {
disableTable(tableName);
}
admin.deleteTable(tableName);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
/**
* Insert a row of data
*
* @param tableName Table name
* @param hRow Row data object
* @return The success of
*/
public Boolean put(String tableName, HRow hRow) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
Put put = new Put(hRow.getRowkey());
for (HRow.HCell hCell : hRow.getCells()) {
put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue());
}
table.put(put);
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
/**
* Batch insert
*
* @param tableName Table name
* @param rows Collection of row data objects
* @return The success of
*/
public Boolean puts(String tableName, List<HRow> rows) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
List<Put> puts = new ArrayList<>();
System.out.println(tableName+"------------------------------------------------");
for (HRow hRow : rows) {
Put put = new Put(hRow.getRowkey());
for (HRow.HCell hCell : hRow.getCells()) {
put.addColumn(Bytes.toBytes(hCell.getFamily()),
Bytes.toBytes(hCell.getQualifier()),
hCell.getValue());
}
puts.add(put);
}
if (!puts.isEmpty()) {
table.put(puts);
}
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
/**
* Batch deletion of data
*
* @param tableName Table name
* @param rowKeys rowKey aggregate
* @return The success of
*/
public Boolean deletes(String tableName, Set<byte[]> rowKeys) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
List<Delete> deletes = new ArrayList<>();
for (byte[] rowKey : rowKeys) {
Delete delete = new Delete(rowKey);
deletes.add(delete);
}
if (!deletes.isEmpty()) {
table.delete(deletes);
}
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
public void close() throws IOException {
if (conn != null) {
conn.close();
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
- 184.
- 185.
according to cannal The data in is parsed into hbase data format
package dbus.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.model.Flow;
import dbus.utils.Md5Utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* HBase Synchronous operation business
*/
@Slf4j
public class HbaseSyncService implements Serializable {
private HbaseTemplate hbaseTemplate; // HBase Operating templates
public HbaseSyncService(HbaseTemplate hbaseTemplate){
this.hbaseTemplate = hbaseTemplate;
}
public void sync(Flow flow, FlatMessage dml) {
if (flow != null) {
String type = dml.getType();
if (type != null && type.equalsIgnoreCase("INSERT")) {
insert(flow, dml);
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
// update(flow, dml);
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
// delete(flow, dml);
}
if (log.isDebugEnabled()) {
log.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
}
}
}
// public void sync(Flow flow, Row row) {
// if (row != null) {
//
// }
// }
/**
* The insert
*
* @param flow Configuration item
* @param dml DML data
*/
private void insert(Flow flow, FlatMessage dml) {
List<Map<String, String>> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
int i = 1;
boolean complete = false;
List<HRow> rows = new ArrayList<>();
for (Map<String, String> r : data) {
HRow hRow = new HRow();
// Splicing composite rowKey
if (flow.getRowKey() != null) {
String[] rowKeyColumns = flow.getRowKey().trim().split(",");
String rowKeyVale = getRowKey(rowKeyColumns, r);
hRow.setRowkey(Bytes.toBytes(rowKeyVale));
}
convertData2Row(flow, hRow, r);
if (hRow.getRowkey() == null) {
throw new RuntimeException("empty rowKey: " + hRow.toString()+",Flow: "+flow.toString());
}
rows.add(hRow);
complete = false;
if (i % flow.getCommitBatch() == 0 && !rows.isEmpty()) {
hbaseTemplate.puts(flow.getHbaseTable(), rows);
rows.clear();
complete = true;
}
i++;
}
if (!complete && !rows.isEmpty()) {
hbaseTemplate.puts(flow.getHbaseTable(), rows);
}
}
/**
* Get the compound field as rowKey The joining together of
*
* @param rowKeyColumns Reunite with rowK Corresponding field
* @param data data
* @return
*/
private static String getRowKey(String[] rowKeyColumns, Map<String, String> data) {
StringBuilder rowKeyValue = new StringBuilder();
for (String rowKeyColumnName : rowKeyColumns) {
Object obj = data.get(rowKeyColumnName);
if (obj != null) {
rowKeyValue.append(obj.toString());
}
rowKeyValue.append("|");
}
int len = rowKeyValue.length();
if (len > 0) {
rowKeyValue.delete(len - 1, len);
}
// It can be expanded by itself and support a variety of rowkey Generation strategy , It says here that death is md5 Prefix
return Md5Utils.getMD5String(rowKeyValue.toString()).substring(0, 8) + "_" + rowKeyValue.toString();
}
/**
* take Map Data to HRow Row data
*
* @param flow hbase Mapping configuration
* @param hRow Line object
* @param data Map data
*/
private static void convertData2Row(Flow flow, HRow hRow, Map<String, String> data) {
String familyName = flow.getFamily();
for (Map.Entry<String, String> entry : data.entrySet()) {
if (entry.getValue() != null) {
byte[] bytes = Bytes.toBytes(entry.getValue().toString());
String qualifier = entry.getKey();
if (flow.isUppercaseQualifier()) {
qualifier = qualifier.toUpperCase();
}
hRow.addCell(familyName, qualifier, bytes);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
Universal sink
package dbus.sink;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.config.GlobalConfig;
import dbus.model.Flow;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
@Slf4j
public class HbaseSyncSink extends RichSinkFunction<Tuple2<FlatMessage, Flow>> {
private HbaseSyncService hbaseSyncService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", GlobalConfig.HBASE_ZOOKEEPER_QUORUM);
hbaseConfig.set("hbase.zookeeper.property.clientPort", GlobalConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT);
hbaseConfig.set("zookeeper.znode.parent", GlobalConfig.ZOOKEEPER_ZNODE_PARENT);
HbaseTemplate hbaseTemplate = new HbaseTemplate(hbaseConfig);
hbaseSyncService = new HbaseSyncService(hbaseTemplate);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void invoke(Tuple2<FlatMessage, Flow> value, Context context) throws Exception {
hbaseSyncService.sync(value.f1, value.f0);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
边栏推荐
- /bin/ld: 找不到 -lxslt
- Soul torture, what is AQS???
- XPT2046 四线电阻式触摸屏
- 6091. Divide the array so that the maximum difference is K
- 解决BASE64Encoder报错的问题
- 6090. Minimax games
- matlab中wavedec2,说说wavedec2函数[通俗易懂]
- College entrance examination score line climbing
- Cultural scores of summer college entrance examination
- Comparison between rstan Bayesian regression model and standard linear regression model of R language MCMC
猜你喜欢
PostgresSQL 流复制 主备切换 主库无读写宕机场景
[leetcode] 1162 map analysis
[experience cloud] how to get the metadata of experience cloud in vscode
Review materials for the special topic of analog electronics with all essence: basic amplification circuit knowledge points
Deux séquences ergodiques connues pour construire des arbres binaires
可视化技术在 Nebula Graph 中的应用
爱可可AI前沿推介(7.2)
Wechat Alipay account system and payment interface business process
基于 Nebula Graph 构建百亿关系知识图谱实践
如何实现十亿级离线 CSV 导入 Nebula Graph
随机推荐
Target detection - make your own deep learning target detection data set with labelimg
/bin/ld: 找不到 -lpam
数据库系统概论第一章简答题-期末考得怎么样?
/bin/ld: 找不到 -llz4
Wavedec2 in MATLAB, talk about the wavedec2 function [easy to understand]
PTA ladder game exercise set l2-001 inter city emergency rescue
6091. 划分数组使最大差为 K
PostgresSQL 流复制 主备切换 主库无读写宕机场景
Ssh/scp does not prompt all activities are monitored and reported
fastjson List转JSONArray以及JSONArray转List「建议收藏」
[leetcode] 1905 statistics sub Island
The sea of stars hidden behind the nebula graph
Moveit obstacle avoidance path planning demo
《大学“电路分析基础”课程实验合集.实验七》丨正弦稳态电路的研究
[leetcode] 1162 map analysis
locate: 无法执行 stat () `/var/lib/mlocate/mlocate.db‘: 没有那个文件或目录
Golang MD5 encryption and MD5 salt value encryption
动态规划入门二(5.647.62)
Review materials for the special topic of analog electronics with all essence: basic amplification circuit knowledge points
/bin/ld: 找不到 -lgssapi_krb5