当前位置:网站首页>Flink real-time data warehouse (7): Flink realizes the full pull module to extract data in MySQL
Flink real-time data warehouse (7): Flink realizes the full pull module to extract data in MySQL
2022-07-02 15:50:00 【wx5ba7ab4695f27】
List of articles
package dbus.fullPull;
import dbus.config.GlobalConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class fullPullApp {
public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
public static void main(String[] args) throws Exception {
// Get the implementation environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read the product table
JDBCInputFormat.JDBCInputFormatBuilder jdbcInputFormatBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(GlobalConfig.DRIVER_CLASS)
.setDBUrl(GlobalConfig.DB_URL)
.setUsername(GlobalConfig.USER_MAME)
.setPassword(GlobalConfig.PASSWORD)
.setQuery("select * from zyd_goods")
.setRowTypeInfo(ROW_TYPE_INFO);
// Read MySQL data
DataSet<Row> source = env.createInput(jdbcInputFormatBuilder.finish());
source.print();
// Generate hbase Output data of
DataSet<Tuple2<Text, Mutation>> hbaseResult = convertMysqlToHbase(source);
// Data output to hbase
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "note01,note02,note03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE, "learing_flink:zyd_goods");
conf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
// Create a new one job example
Job job = Job.getInstance(conf);
hbaseResult.output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<>(),job));
env.execute("FullPullerAPP");
}
private static DataSet<Tuple2<Text, Mutation>> convertMysqlToHbase(DataSet<Row> dataSet) {
return dataSet.map(new RichMapFunction<Row, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> resultTp;
private byte[] cf = "F".getBytes(ConfigConstants.DEFAULT_CHARSET);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
resultTp = new Tuple2<>();
}
@Override
public Tuple2<Text, Mutation> map(Row value) throws Exception {
resultTp.f0 = new Text(value.getField(0).toString());
Put put = new Put(value.getField(0).toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
if (null != value.getField(1)) {
put.addColumn(cf, Bytes.toBytes("goodsName"), Bytes.toBytes(value.getField(1).toString()));
}
put.addColumn(cf, Bytes.toBytes("sellingPrice"), Bytes.toBytes(value.getField(2).toString()));
put.addColumn(cf, Bytes.toBytes("goodsStock"), Bytes.toBytes(value.getField(3).toString()));
put.addColumn(cf, Bytes.toBytes("appraiseNum"), Bytes.toBytes(value.getField(4).toString()));
resultTp.f1 = put;
return resultTp;
}
});
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
Need to add new dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.6.2</version>
</dependency>
- 1.
- 2.
- 3.
- 4.
- 5.
No higher version
边栏推荐
- 2303. Calculate the total tax payable
- PTA ladder game exercise set l2-001 inter city emergency rescue
- How to import a billion level offline CSV into Nepal graph
- Moveit 避障路径规划 demo
- 2303. 计算应缴税款总额
- Pyinstaller打包exe附带图片的方法
- /bin/ld: 找不到 -lxslt
- 使用FFmpeg命令行进行UDP、RTP推流(H264、TS),ffplay接收
- [leetcode] 1254 - count the number of closed Islands
- (Wanzi essence knowledge summary) basic knowledge of shell script programming
猜你喜欢

可视化技术在 Nebula Graph 中的应用

How to use percona tool to add fields to MySQL table after interruption

Soul torture, what is AQS???

Introduction to dynamic planning I, BFS of queue (70.121.279.200)
![[leetcode] 417 - Pacific Atlantic current problem](/img/30/c541bc1e81eb4e348ca11116a05e84.png)
[leetcode] 417 - Pacific Atlantic current problem

Experiment collection of University "Fundamentals of circuit analysis". Experiment 7 - Research on sinusoidal steady-state circuit
![[leetcode] 1905 statistics sub Island](/img/82/d2f7b829f5beb7f9f1eabe8d101ecb.png)
[leetcode] 1905 statistics sub Island

Experiment collection of University Course "Fundamentals of circuit analysis". Experiment 5 - Research on equivalent circuit of linear active two terminal network

Deux séquences ergodiques connues pour construire des arbres binaires

Two traversal sequences are known to construct binary trees
随机推荐
/Bin/ld: cannot find -lssl
Folium, diagnosis and close contact trajectory above
Tree binary search tree
Moveit obstacle avoidance path planning demo
6091. 划分数组使最大差为 K
Ssh/scp does not prompt all activities are monitored and reported
SQL修改语句
数据湖(十一):Iceberg表数据组织与查询
《大学“电路分析基础”课程实验合集.实验六》丨典型信号的观察与测量
/Bin/ld: cannot find -lgssapi_ krb5
Soul torture, what is AQS???
[idea] recommend an idea translation plug-in: translation "suggestions collection"
2278. Percentage of letters in string
Application of visualization technology in Nebula graph
Make p12 certificate [easy to understand]
/Bin/ld: cannot find -lxslt
[experience cloud] how to get the metadata of experience cloud in vscode
Golang MD5 encryption and MD5 salt value encryption
【LeetCode】19-删除链表的倒数第N个结点
The outline dimension function application of small motherboard