当前位置:网站首页>Flink real-time data warehouse (7): Flink realizes the full pull module to extract data in MySQL
Flink real-time data warehouse (7): Flink realizes the full pull module to extract data in MySQL
2022-07-02 15:50:00 【wx5ba7ab4695f27】
List of articles
package dbus.fullPull;
import dbus.config.GlobalConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class fullPullApp {
public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
public static void main(String[] args) throws Exception {
// Get the implementation environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read the product table
JDBCInputFormat.JDBCInputFormatBuilder jdbcInputFormatBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(GlobalConfig.DRIVER_CLASS)
.setDBUrl(GlobalConfig.DB_URL)
.setUsername(GlobalConfig.USER_MAME)
.setPassword(GlobalConfig.PASSWORD)
.setQuery("select * from zyd_goods")
.setRowTypeInfo(ROW_TYPE_INFO);
// Read MySQL data
DataSet<Row> source = env.createInput(jdbcInputFormatBuilder.finish());
source.print();
// Generate hbase Output data of
DataSet<Tuple2<Text, Mutation>> hbaseResult = convertMysqlToHbase(source);
// Data output to hbase
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "note01,note02,note03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE, "learing_flink:zyd_goods");
conf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
// Create a new one job example
Job job = Job.getInstance(conf);
hbaseResult.output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<>(),job));
env.execute("FullPullerAPP");
}
private static DataSet<Tuple2<Text, Mutation>> convertMysqlToHbase(DataSet<Row> dataSet) {
return dataSet.map(new RichMapFunction<Row, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> resultTp;
private byte[] cf = "F".getBytes(ConfigConstants.DEFAULT_CHARSET);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
resultTp = new Tuple2<>();
}
@Override
public Tuple2<Text, Mutation> map(Row value) throws Exception {
resultTp.f0 = new Text(value.getField(0).toString());
Put put = new Put(value.getField(0).toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
if (null != value.getField(1)) {
put.addColumn(cf, Bytes.toBytes("goodsName"), Bytes.toBytes(value.getField(1).toString()));
}
put.addColumn(cf, Bytes.toBytes("sellingPrice"), Bytes.toBytes(value.getField(2).toString()));
put.addColumn(cf, Bytes.toBytes("goodsStock"), Bytes.toBytes(value.getField(3).toString()));
put.addColumn(cf, Bytes.toBytes("appraiseNum"), Bytes.toBytes(value.getField(4).toString()));
resultTp.f1 = put;
return resultTp;
}
});
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
Need to add new dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.6.2</version>
</dependency>
- 1.
- 2.
- 3.
- 4.
- 5.
No higher version
边栏推荐
- List of sergeant schools
- 6092. Replace elements in the array
- 《大学“电路分析基础”课程实验合集.实验七》丨正弦稳态电路的研究
- 已知兩種遍曆序列構造二叉樹
- 【idea】推荐一个idea翻译插件:Translation「建议收藏」
- The task cannot be submitted after the installation of flick is completed
- [leetcode] 1162 map analysis
- Floyed "suggestions collection"
- 智联招聘的基于 Nebula Graph 的推荐实践分享
- Moveit 避障路径规划 demo
猜你喜欢

树-二叉搜索树
![[leetcode] 1905 statistics sub Island](/img/82/d2f7b829f5beb7f9f1eabe8d101ecb.png)
[leetcode] 1905 statistics sub Island

Xpt2046 four wire resistive touch screen

XPT2046 四线电阻式触摸屏

基于 Nebula Graph 构建百亿关系知识图谱实践
![[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)](/img/26/3f19d36c048e669c736e27384e0fa7.jpg)
[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)

全是精华的模电专题复习资料:基本放大电路知识点

隐藏在 Nebula Graph 背后的星辰大海

如何實現十億級離線 CSV 導入 Nebula Graph

蚂蚁集团大规模图计算系统TuGraph通过国家级评测
随机推荐
College entrance examination admission score line crawler
【LeetCode】977-有序數組的平方
[leetcode] 200 number of islands
[leetcode] 577 reverse word III in string
[leetcode] 1140 stone game II
Make p12 certificate [easy to understand]
Aike AI frontier promotion (7.2)
Deux séquences ergodiques connues pour construire des arbres binaires
[leetcode] 876 intermediate node of linked list
Ant group's large-scale map computing system tugraph passed the national evaluation
/Bin/ld: cannot find -lgssapi_ krb5
【Salesforce】如何确认你的Salesforce版本?
beforeEach
《大学“电路分析基础”课程实验合集.实验五》丨线性有源二端网络等效电路的研究
Demo of converting point cloud coordinates to world coordinates
[leetcode] 977 square of ordered array
全是精华的模电专题复习资料:基本放大电路知识点
SQL modification statement
Locate: cannot execute stat() `/var/lib/mlocate/mlocate Db ': there is no such file or directory
Pyinstaller's method of packaging pictures attached to exe