Flink real-time data warehouse (7): Flink realizes the full pull module to extract data in MySQL
2022-07-02 15:50:00 【wx5ba7ab4695f27】
package dbus.fullPull;
import dbus.config.GlobalConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class fullPullApp {
public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
public static void main(String[] args) throws Exception {
// Get the implementation environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read the product table
JDBCInputFormat.JDBCInputFormatBuilder jdbcInputFormatBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setQuery("select * from zyd_goods")
// Read MySQL data
DataSet<Row> source = env.createInput(jdbcInputFormatBuilder.finish());
// Generate hbase Output data of
DataSet<Tuple2<Text, Mutation>> hbaseResult = convertMysqlToHbase(source);
// Data output to hbase
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "note01,note02,note03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE, "learing_flink:zyd_goods");
conf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
// Create a new one job example
Job job = Job.getInstance(conf);
hbaseResult.output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<>(),job));
private static DataSet<Tuple2<Text, Mutation>> convertMysqlToHbase(DataSet<Row> dataSet) {
return dataSet.map(new RichMapFunction<Row, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> resultTp;
private byte[] cf = "F".getBytes(ConfigConstants.DEFAULT_CHARSET);
public void open(Configuration parameters) throws Exception {
resultTp = new Tuple2<>();
public Tuple2<Text, Mutation> map(Row value) throws Exception {
resultTp.f0 = new Text(value.getField(0).toString());
Put put = new Put(value.getField(0).toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
if (null != value.getField(1)) {
put.addColumn(cf, Bytes.toBytes("goodsName"), Bytes.toBytes(value.getField(1).toString()));
put.addColumn(cf, Bytes.toBytes("sellingPrice"), Bytes.toBytes(value.getField(2).toString()));
put.addColumn(cf, Bytes.toBytes("goodsStock"), Bytes.toBytes(value.getField(3).toString()));
put.addColumn(cf, Bytes.toBytes("appraiseNum"), Bytes.toBytes(value.getField(4).toString()));
resultTp.f1 = put;
return resultTp;
Need to add new dependency
- 1.
- 2.
- 3.
- 4.
- 5.
No higher version
