当前位置:网站首页>Flink real-time data warehouse (IX): incremental synchronization of data in MySQL
Flink real-time data warehouse (IX): incremental synchronization of data in MySQL
2022-07-02 15:50:00 【wx5ba7ab4695f27】
List of articles
- Configuration class bean object
- flink State class programming
- MD5 encryption
- Hbase sink Templates
Configure the flow table in the database
CREATE TABLE `dbus_flow` (
`flowId` int(11) NOT NULL AUTO_INCREMENT COMMENT ' Self increasing ID',
`mode` int(11) NOT NULL COMMENT ' Storage type (#PHOENIX #NATIVE #STRING, Default STRING)',
`databaseName` varchar(50) NOT NULL COMMENT 'database',
`tableName` varchar(50) NOT NULL COMMENT 'table',
`hbaseTable` varchar(50) NOT NULL COMMENT 'hbaseTable',
`family` varchar(50) NOT NULL COMMENT 'family',
`uppercaseQualifier` tinyint(1) NOT NULL COMMENT ' The field name is capitalized , The default is true',
`commitBatch` int(11) NOT NULL COMMENT ' The field name is capitalized , The default is true',
`rowKey` varchar(100) NOT NULL COMMENT ' form rowkey Field name , Must be separated by commas ',
`status` int(11) NOT NULL COMMENT ' state :1- initial ,2: be ready ,3: function ',
PRIMARY KEY (`flowId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
Sample data
INSERT INTO `dbus_flow` VALUES ('1', '0', 'test', 'zyd_orders', 'learing_flink:zyd_orders', '0', '1', '10', 'orderId', '2');
- 1.
jdbc Tool class
package dbus.utils;
import dbus.config.GlobalConfig;
import java.sql.*;
/**
* jdbc The general method
*
*/
public class JdbcUtil {
//url
private static String url = GlobalConfig.DB_URL;
//user
private static String user = GlobalConfig.USER_MAME;
//password
private static String password = GlobalConfig.PASSWORD;
// Driver class
private static String driverClass = GlobalConfig.DRIVER_CLASS;
/**
* Register only once , Static code block
*/
static{
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* Get the connection method
*/
public static Connection getConnection(){
try {
Connection conn = DriverManager.getConnection(url, user, password);
return conn;
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* Ways to release resources
*/
public static void close(Statement stmt,Connection conn){
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
/**
* Ways to release resources
*/
public static void close(ResultSet rs,Statement stmt,Connection conn){
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
System.out.println(JdbcUtil.getConnection());
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
Enumeration class
CodeEnum
package enums;
public interface CodeEnum {
/**
* Gets the enumeration's code value
*
* @return
*/
Integer getCode();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
FlowStatusEnum
package enums;
import lombok.Getter;
@Getter
public enum FlowStatusEnum implements CodeEnum {
/**
* The initial state ( The newly added )
*/
FLOWSTATUS_INIT(0, " The initial state "),
/**
* Ready state , After initial acquisition , You can change the status to ready
*/
FLOWSTATUS_READY(1, " Ready state "),
/**
* Running state ( Incremental collection is running )
*/
FLOWSTATUS_RUNNING(2, " Running state ");
private Integer code;
private String message;
FlowStatusEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
HBaseStorageModeEnum
package enums;
import lombok.Getter;
@Getter
public enum HBaseStorageModeEnum implements CodeEnum{
/**
* STRING
*/
STRING(0, "STRING"),
/**
* NATIVE
*/
NATIVE(1, "NATIVE"),
/**
* PHOENIX
*/
PHOENIX(2, "PHOENIX");
private Integer code;
private String message;
HBaseStorageModeEnum(Integer code, String message) {
this.code = code;
this.message = message;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
Configuration class bean object
Flow
**package dbus.model;
import enums.FlowStatusEnum;
import enums.HBaseStorageModeEnum;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Flow implements Serializable{
private Integer flowId;
/**
* HBase Storage types in , The default unified save is String,
*/
private int mode= HBaseStorageModeEnum.STRING.getCode();
/**
* Database name /schema name
*/
private String databaseName;
/**
* mysql Table name
*/
private String tableName;
/**
* hbase Table name
*/
private String hbaseTable;
/**
* Default unification Column Family name
*/
private String family;
/**
* The field name is capitalized , The default is true
*/
private boolean uppercaseQualifier=true;
/**
* The size of the batch submission , ETL It is used in
*/
private int commitBatch;
/**
* form rowkey Field name , Must be separated by commas
*/
private String rowKey;
/**
* state
*/
private int status= FlowStatusEnum.FLOWSTATUS_INIT.getCode();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
flink State class programming
package dbus.function;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.incrementssync.IncrementSyncApp;
import dbus.model.Flow;
import enums.FlowStatusEnum;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class DbusProcessFunction extends KeyedBroadcastProcessFunction<String, FlatMessage, Flow, Tuple2<FlatMessage, Flow>> {
@Override
public void processElement(FlatMessage flatMessage, ReadOnlyContext ctx, Collector<Tuple2<FlatMessage, Flow>> out) throws Exception {
// Get configuration flow
Flow flow = ctx.getBroadcastState(IncrementSyncApp.flowStateDescriptor).get(flatMessage.getDatabase() + flatMessage.getTable());
if (null != flow && flow.getStatus() == FlowStatusEnum.FLOWSTATUS_RUNNING.getCode()) {
out.collect(Tuple2.of(flatMessage, flow));
}
}
@Override
public void processBroadcastElement(Flow flow, Context ctx, Collector<Tuple2<FlatMessage, Flow>> collector) throws Exception {
// obtain state state
BroadcastState<String, Flow> broadcastState = ctx.getBroadcastState(IncrementSyncApp.flowStateDescriptor);
// to update state
broadcastState.put(flow.getDatabaseName() + flow.getTableName(), flow);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
MD5 encryption
package dbus.utils;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class Md5Utils {
public static String getMD5String(String str) {
try {
MessageDigest instance = MessageDigest.getInstance("MD5");
byte[] digest = instance.digest(str.getBytes(StandardCharsets.UTF_8));
StringBuffer sb = new StringBuffer();
for (byte by : digest) {
// Get the lower eight significant values of bytes
int i = by & 0xff;
// Convert an integer to 16 Base number
String hexString = Integer.toHexString(i);
if (hexString.length() < 2) {
// If it is 1 A word of , repair 0
hexString = "0" + hexString;
}
sb.append(hexString);
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return null;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
Hbase sink Templates
bean object
package dbus.sink;
import java.util.ArrayList;
import java.util.List;
public class HRow {
private byte[] rowkey;
private List<HCell> cells = new ArrayList<>();
public HRow() {
}
public HRow(byte[] rowkey) {
this.rowkey = rowkey;
}
public byte[] getRowkey() {
return rowkey;
}
public void setRowkey(byte[] rowkey) {
this.rowkey = rowkey;
}
public List<HCell> getCells() {
return cells;
}
public void setCells(List<HCell> cells) {
this.cells = cells;
}
public void addCell(String family, String qualifier, byte[] value) {
HCell hCell = new HCell(family, qualifier, value);
cells.add(hCell);
}
public class HCell{
private String family;
private String qualifier;
private byte[] value;
public HCell() {
}
public HCell(String family, String qualifier, byte[] value) {
this.family = family;
this.qualifier = qualifier;
this.value = value;
}
public String getFamily() {
return family;
}
public void setFamily(String family) {
this.family = family;
}
public String getQualifier() {
return qualifier;
}
public void setQualifier(String qualifier) {
this.qualifier = qualifier;
}
public byte[] getValue() {
return value;
}
public void setValue(byte[] value) {
this.value = value;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
hbase Tool class
package dbus.sink;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* hbase Tool class
*/
@Slf4j
public class HbaseTemplate implements Serializable {
private Configuration hbaseConfig; // hbase Configuration object
private Connection conn; // hbase Connect
public HbaseTemplate(Configuration hbaseConfig){
this.hbaseConfig = hbaseConfig;
initConn();
}
private void initConn() {
try {
this.conn = ConnectionFactory.createConnection(hbaseConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Connection getConnection() {
if (conn == null || conn.isAborted() || conn.isClosed()) {
initConn();
}
return conn;
}
public boolean tableExists(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
return admin.tableExists(TableName.valueOf(tableName));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void createTable(String tableName, String... familyNames) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
// Add columns cluster
if (familyNames != null) {
for (String familyName : familyNames) {
HColumnDescriptor hcd = new HColumnDescriptor(familyName);
desc.addFamily(hcd);
}
}
admin.createTable(desc);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void disableTable(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
admin.disableTable(tableName);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void deleteTable(String tableName) {
try (HBaseAdmin admin = (HBaseAdmin) getConnection().getAdmin()) {
if (admin.isTableEnabled(tableName)) {
disableTable(tableName);
}
admin.deleteTable(tableName);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
/**
* Insert a row of data
*
* @param tableName Table name
* @param hRow Row data object
* @return The success of
*/
public Boolean put(String tableName, HRow hRow) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
Put put = new Put(hRow.getRowkey());
for (HRow.HCell hCell : hRow.getCells()) {
put.addColumn(Bytes.toBytes(hCell.getFamily()), Bytes.toBytes(hCell.getQualifier()), hCell.getValue());
}
table.put(put);
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
/**
* Batch insert
*
* @param tableName Table name
* @param rows Collection of row data objects
* @return The success of
*/
public Boolean puts(String tableName, List<HRow> rows) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
List<Put> puts = new ArrayList<>();
System.out.println(tableName+"------------------------------------------------");
for (HRow hRow : rows) {
Put put = new Put(hRow.getRowkey());
for (HRow.HCell hCell : hRow.getCells()) {
put.addColumn(Bytes.toBytes(hCell.getFamily()),
Bytes.toBytes(hCell.getQualifier()),
hCell.getValue());
}
puts.add(put);
}
if (!puts.isEmpty()) {
table.put(puts);
}
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
/**
* Batch deletion of data
*
* @param tableName Table name
* @param rowKeys rowKey aggregate
* @return The success of
*/
public Boolean deletes(String tableName, Set<byte[]> rowKeys) {
boolean flag = false;
try {
HTable table = (HTable) getConnection().getTable(TableName.valueOf(tableName));
List<Delete> deletes = new ArrayList<>();
for (byte[] rowKey : rowKeys) {
Delete delete = new Delete(rowKey);
deletes.add(delete);
}
if (!deletes.isEmpty()) {
table.delete(deletes);
}
flag = true;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
return flag;
}
public void close() throws IOException {
if (conn != null) {
conn.close();
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
- 184.
- 185.
according to cannal The data in is parsed into hbase data format
package dbus.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.model.Flow;
import dbus.utils.Md5Utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* HBase Synchronous operation business
*/
@Slf4j
public class HbaseSyncService implements Serializable {
private HbaseTemplate hbaseTemplate; // HBase Operating templates
public HbaseSyncService(HbaseTemplate hbaseTemplate){
this.hbaseTemplate = hbaseTemplate;
}
public void sync(Flow flow, FlatMessage dml) {
if (flow != null) {
String type = dml.getType();
if (type != null && type.equalsIgnoreCase("INSERT")) {
insert(flow, dml);
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
// update(flow, dml);
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
// delete(flow, dml);
}
if (log.isDebugEnabled()) {
log.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
}
}
}
// public void sync(Flow flow, Row row) {
// if (row != null) {
//
// }
// }
/**
* The insert
*
* @param flow Configuration item
* @param dml DML data
*/
private void insert(Flow flow, FlatMessage dml) {
List<Map<String, String>> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
int i = 1;
boolean complete = false;
List<HRow> rows = new ArrayList<>();
for (Map<String, String> r : data) {
HRow hRow = new HRow();
// Splicing composite rowKey
if (flow.getRowKey() != null) {
String[] rowKeyColumns = flow.getRowKey().trim().split(",");
String rowKeyVale = getRowKey(rowKeyColumns, r);
hRow.setRowkey(Bytes.toBytes(rowKeyVale));
}
convertData2Row(flow, hRow, r);
if (hRow.getRowkey() == null) {
throw new RuntimeException("empty rowKey: " + hRow.toString()+",Flow: "+flow.toString());
}
rows.add(hRow);
complete = false;
if (i % flow.getCommitBatch() == 0 && !rows.isEmpty()) {
hbaseTemplate.puts(flow.getHbaseTable(), rows);
rows.clear();
complete = true;
}
i++;
}
if (!complete && !rows.isEmpty()) {
hbaseTemplate.puts(flow.getHbaseTable(), rows);
}
}
/**
* Get the compound field as rowKey The joining together of
*
* @param rowKeyColumns Reunite with rowK Corresponding field
* @param data data
* @return
*/
private static String getRowKey(String[] rowKeyColumns, Map<String, String> data) {
StringBuilder rowKeyValue = new StringBuilder();
for (String rowKeyColumnName : rowKeyColumns) {
Object obj = data.get(rowKeyColumnName);
if (obj != null) {
rowKeyValue.append(obj.toString());
}
rowKeyValue.append("|");
}
int len = rowKeyValue.length();
if (len > 0) {
rowKeyValue.delete(len - 1, len);
}
// It can be expanded by itself and support a variety of rowkey Generation strategy , It says here that death is md5 Prefix
return Md5Utils.getMD5String(rowKeyValue.toString()).substring(0, 8) + "_" + rowKeyValue.toString();
}
/**
* take Map Data to HRow Row data
*
* @param flow hbase Mapping configuration
* @param hRow Line object
* @param data Map data
*/
private static void convertData2Row(Flow flow, HRow hRow, Map<String, String> data) {
String familyName = flow.getFamily();
for (Map.Entry<String, String> entry : data.entrySet()) {
if (entry.getValue() != null) {
byte[] bytes = Bytes.toBytes(entry.getValue().toString());
String qualifier = entry.getKey();
if (flow.isUppercaseQualifier()) {
qualifier = qualifier.toUpperCase();
}
hRow.addCell(familyName, qualifier, bytes);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
Universal sink
package dbus.sink;
import com.alibaba.otter.canal.protocol.FlatMessage;
import dbus.config.GlobalConfig;
import dbus.model.Flow;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
@Slf4j
public class HbaseSyncSink extends RichSinkFunction<Tuple2<FlatMessage, Flow>> {
private HbaseSyncService hbaseSyncService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", GlobalConfig.HBASE_ZOOKEEPER_QUORUM);
hbaseConfig.set("hbase.zookeeper.property.clientPort", GlobalConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT);
hbaseConfig.set("zookeeper.znode.parent", GlobalConfig.ZOOKEEPER_ZNODE_PARENT);
HbaseTemplate hbaseTemplate = new HbaseTemplate(hbaseConfig);
hbaseSyncService = new HbaseSyncService(hbaseTemplate);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void invoke(Tuple2<FlatMessage, Flow> value, Context context) throws Exception {
hbaseSyncService.sync(value.f1, value.f0);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
边栏推荐
- /bin/ld: 找不到 -lcrypto
- 全是精华的模电专题复习资料:基本放大电路知识点
- The task cannot be submitted after the installation of flick is completed
- 《大学“电路分析基础”课程实验合集.实验六》丨典型信号的观察与测量
- 数字藏品系统开发(程序开发)丨数字藏品3D建模经济模式系统开发源码
- Floyed "suggestions collection"
- [leetcode] 577 reverse word III in string
- 6091. Divide the array so that the maximum difference is K
- 蚂蚁集团大规模图计算系统TuGraph通过国家级评测
- Solve the problem of base64encoder error
猜你喜欢

Pattern matching extraction of specific subgraphs in graphx graph Computing Practice

隐藏在 Nebula Graph 背后的星辰大海

中科大脑知识图谱平台建设及业务实践

《大学“电路分析基础”课程实验合集.实验四》丨线性电路特性的研究

如何实现十亿级离线 CSV 导入 Nebula Graph

The outline dimension function application of small motherboard
![[experience cloud] how to get the metadata of experience cloud in vscode](/img/45/012c2265402ba1b44f4497f468bc61.png)
[experience cloud] how to get the metadata of experience cloud in vscode
![[development environment] install the Chinese language pack for the 2013 version of visual studio community (install test agents 2013 | install visual studio 2013 simplified Chinese)](/img/cf/38e4035c3b318814672f21c8a42618.jpg)
[development environment] install the Chinese language pack for the 2013 version of visual studio community (install test agents 2013 | install visual studio 2013 simplified Chinese)

动态规划入门二(5.647.62)

《大学“电路分析基础”课程实验合集.实验六》丨典型信号的观察与测量
随机推荐
Wise target detection 23 - pytoch builds SSD target detection platform
动态规划入门一,队列的bfs(70.121.279.200)
/bin/ld: 找不到 -lgssapi_krb5
[leetcode] 344 reverse string
[leetcode] 1020 number of enclaves
The outline dimension function application of small motherboard
(万字精华知识总结)Shell脚本编程基础知识
Pyinstaller打包exe附带图片的方法
Pyinstaller's method of packaging pictures attached to exe
[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)
6090. Minimax games
制作p12证书[通俗易懂]
fastjson List转JSONArray以及JSONArray转List「建议收藏」
[leetcode] 695 - maximum area of the island
Make p12 certificate [easy to understand]
Why does the system convert the temp environment variable to a short file name?
【LeetCode】19-删除链表的倒数第N个结点
数组和链表的区别浅析
Introduction to dynamic planning I, BFS of queue (70.121.279.200)
/Bin/ld: cannot find -lcrypto