当前位置:网站首页>Offline data synchronization platform datax+ report visualization platform metabase
Offline data synchronization platform datax+ report visualization platform metabase
2022-06-09 05:48:00 【liaomin416100569】
datax
DataX It is an offline data synchronization tool widely used in Alibaba group / platform , Implementation include MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS And other efficient data synchronization functions among various heterogeneous data sources .
characteristic
DataX As a data synchronization framework , The synchronization of different data sources is abstracted as reading data from the source data source Reader plug-in unit , And write data to the target Writer plug-in unit , Theoretically DataX The framework can support data synchronization of any data source type . meanwhile DataX Plug in system as a set of ecosystem , Each time a new set of data sources is accessed, the newly added data sources can realize interworking with the existing data sources .
DataX3.0 framework design

DataX As an offline data synchronization framework , use Framework + plugin Architecture building . Abstract data source read and write as Reader/Writer plug-in unit , Integrated into the entire synchronization framework .
- Reader:Reader Data acquisition module , Responsible for collecting data from data sources , Send the data to Framework.
- Writer: Writer Write module for data , To be responsible for keeping up with Framework Take the data , And write the data to the destination .
- Framework:Framework Used to connect to reader and writer, As a data transmission channel for both , And handle buffers , Flow control , Concurrent , Data conversion and other core technical issues .
DataX3.0 Plug-in system
After years of accumulation ,DataX At present, there is a relatively comprehensive plug-in system , Mainstream RDBMS database 、NOSQL、 Big data computing systems have been connected .DataX The current supporting data are as follows :
| type | data source | Reader( read ) | Writer( Write ) | file |
|---|---|---|---|---|
| RDBMS Relational database | MySQL | √ | √ | read 、 Write |
| Oracle | √ | √ | read 、 Write | |
| SQLServer | √ | √ | read 、 Write | |
| PostgreSQL | √ | √ | read 、 Write | |
| DRDS | √ | √ | read 、 Write | |
| Reach a dream | √ | √ | read 、 Write | |
| Universal RDBMS( Support all relational databases ) | √ | √ | read 、 Write | |
| Alicloud data warehouse data storage | ODPS | √ | √ | read 、 Write |
| ADS | √ | Write | ||
| OSS | √ | √ | read 、 Write | |
| OCS | √ | √ | read 、 Write | |
| NoSQL data storage | OTS | √ | √ | read 、 Write |
| Hbase0.94 | √ | √ | read 、 Write | |
| Hbase1.1 | √ | √ | read 、 Write | |
| MongoDB | √ | √ | read 、 Write | |
| Hive | √ | √ | read 、 Write | |
| Unstructured data storage | TxtFile | √ | √ | read 、 Write |
| FTP | √ | √ | read 、 Write | |
| HDFS | √ | √ | read 、 Write | |
| Elasticsearch | √ | Write |
DataX Framework Provides a simple interface to interact with plug-ins , Provide a simple plug-in access mechanism , Just add any plug-in , Can seamlessly connect to other data sources . Details please see :DataX Data source guide
metabase
summary :Metabase It can help you better present the data in the database to more people , Data analysts build a ” Inquire about “(Metabase Is defined as Question) To refine the data , Then through the dashboard (Dashboards) To combine and show to company members
function :
Set only 5 minute
- Let the members of the team know SQL Ask questions in the case of
- Rich and beautiful dashboard with auto refresh and full screen mode
- Exclusive to analysts and data experts SQL Pattern
- Create specification segments and metrics for your team to use
- Send data to Slack Or email to Pulses Your schedule
- Use Metabot Check at any time Slack Data in
- By renaming 、 Annotate and hide fields to humanize data for your team

Examples demonstrate
Example design
Suppose the following scenarios exist under the microservice system :
- The user management service has an association database of users and merchants .
- The commodity management service has a relationship table between commodities and commodity classification , There is a classification identifier in the product table id, But the merchant table is not in the commodity Library .
The following report statistics are required :
- Statistical commodity data of commodity classification ( clothes ->200, Electronic products, ->300).
- Count the commodity data of merchants .
- According to the national statistical commodity data .
- Count product creation quantity by year (20190->2000,2020->5000).
The following is the design drawing :
Task practice
Please create the original table according to the design drawing ( User table + Merchant list + Country table + Commodity list + Classification list )
install datax
download datax: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
Extract to the specified directory :
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
Mission 1:
This task is a table Association Library , because datax Support reader Use sql As a data source , You can use table association to query data and merge it into a new table , But it is widely used in Internet applications , Microservice libraries exist independently , You need to synchronize the associated data to the same database first , Then the wide table is generated after the association .
Consolidated user merchant table :
SELECT t.id AS tenant_id,t.uid,t.name AS tenant_name,u.name AS userName,g.code,g.name AS country_name FROM tenant t INNER JOIN USER u ON t.uid=u.id LEFT JOIN region g ON g.code=u.country
Consolidated commodity aggregation table :
SELECT p.tenant_id,p.id AS product_id,p.name AS product_name,p.create_time,c.id AS class_id,c.name AS class_name FROM tenant_product p INNER JOIN tenant_class c ON p.tenant_class_id=c.id
Create a landing database :myreport, Create user merchant table and commodity aggregation table :
DROP TEMPORARY TABLE IF EXISTS myreport.usertenanttmp;
CREATE TEMPORARY TABLE myreport.usertenanttmp AS SELECT t.id AS tenant_id,t.uid,t.name AS tenant_name,u.name AS userName,g.code,g.name AS country_name FROM tenant t INNER JOIN USER u ON t.uid=u.id LEFT JOIN region g ON g.code=u.country LIMIT 0;
DROP TABLE IF EXISTS myreport.usertenant;
CREATE TABLE myreport.usertenant LIKE myreport.usertenanttmp;
DROP TEMPORARY TABLE IF EXISTS myreport.tenantproductdetailtmp;
CREATE TEMPORARY TABLE myreport.tenantproductdetailtmp AS SELECT p.tenant_id,p.id AS product_id,p.name AS product_name,p.create_time,c.id AS class_id,c.name AS class_name FROM tenant_product p INNER JOIN tenant_class c ON p.tenant_class_id=c.id LIMIT 0;
DROP TABLE IF EXISTS myreport.tenantproductdetail;
CREATE TABLE myreport.tenantproductdetail LIKE myreport.tenantproductdetailtmp;
Create consolidated user merchant task json (usertenant.json)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "Qjkj2018",
"connection": [
{
"querySql": [
"SELECT t.id AS tenant_id,t.uid,t.name AS tenant_name,u.name AS userName,g.code,g.name AS country_name FROM tenant t INNER JOIN USER u ON t.uid=u.id LEFT JOIN region g ON g.code=u.country;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.230:3306/ums_docker_fat"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "Qjkj2018",
"column": [
"tenant_id", "uid", "tenant_name", "userName","code", "country_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from usertenant"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.230:3306/myreport",
"table": [
"usertenant"
]
}
]
}
}
}
]
}
}
Note that it must be python2.7 edition ,python3 I won't support it , have access to conda establish 2.7 after activate.
Carry out orders
python datax.py usertenant.json
View synchronization data after task execution 
Create a merge item aggregation task json (tenantproductdetail .json)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "Qjkj2018",
"connection": [
{
"querySql": [
"SELECT p.tenant_id,p.id AS product_id,p.name AS product_name,p.create_time,c.id AS class_id,c.name AS class_name FROM tenant_product p INNER JOIN tenant_class c ON p.tenant_class_id=c.id;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.230:3306/gvtgms_test"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "Qjkj2018",
"column": [
"tenant_id","product_id", "product_name", "create_time", "class_id", "class_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from tenantproductdetail"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.230:3306/myreport",
"table": [
"tenantproductdetail"
]
}
]
}
}
}
]
}
}
Carry out orders
python datax.py tenantproductdetail.json
View synchronization data after task execution 
Mission 2:
Merge the generated two tables into a large wide table , Single table query will be faster during statistical analysis .
Merge wide tables :
SELECT u.tenant_id,u.`tenant_name`,u.`uid`,u.`userName`,u.`code`,u.`country_name`,t.`class_id`,t.`class_name`,t.product_id,t.`product_name`
FROM usertenant u INNER JOIN tenantproductdetail t ON u.tenant_id=t.tenant_id
Create a landing database :myreport, Create user merchant table and commodity aggregation table :
DROP TEMPORARY TABLE IF EXISTS myreport.widetabletmp;
CREATE TEMPORARY TABLE myreport.widetabletmp AS SELECT u.tenant_id,u.tenant_name,u.uid,u.userName,u.code,u.country_name,t.class_id,t.class_name,t.product_id,t.product_name
FROM usertenant u INNER JOIN tenantproductdetail t ON u.tenant_id=t.tenant_id LIMIT 0;
DROP TABLE IF EXISTS myreport.widetable;
CREATE TABLE myreport.widetable LIKE myreport.widetabletmp;
Create a merge item aggregation task json (wide.json)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "Qjkj2018",
"connection": [
{
"querySql": [
"SELECT u.tenant_id,u.tenant_name,u.uid,u.userName,u.code,u.country_name,t.class_id,t.class_name,t.product_id,t.product_name,t.create_time FROM usertenant u INNER JOIN tenantproductdetail t ON u.tenant_id=t.tenant_id;"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.230:3306/myreport"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "Qjkj2018",
"column": [
"tenant_id", "tenant_name", "uid", "userName","code", "country_name", "class_id", "class_name", "product_id", "product_name","create_time"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from widetable"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.230:3306/myreport",
"table": [
"widetable"
]
}
]
}
}
}
]
}
}
Carry out orders
python datax.py wide.json

Mission 3:
Here, we only show how to generate the commodity quantity report by year , Other reports are from the wide table sql export , Not one by one .
Compiling statistics sql sentence
Create table
CREATE TABLE year_report (report_year VARCHAR(4),report_count INT)
Statistics
SELECT YEAR(DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%s')) AS report_year,COUNT(*),report_count FROM widetable GROUP BY YEAR(DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%s'))
Create statistical report task based on year json (year.json)
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "Qjkj2018",
"connection": [
{
"querySql": [
"SELECT YEAR(DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%s')) AS report_year,COUNT(*) as report_count FROM widetable GROUP BY YEAR(DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%s'));"
],
"jdbcUrl": [
"jdbc:mysql://192.168.1.230:3306/myreport"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "Qjkj2018",
"column": [
"report_year", "report_count"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from year_report"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.230:3306/myreport",
"table": [
"year_report"
]
}
]
}
}
}
]
}
}

automation :
To write shell The script serializes the task , Start scheduled task autorun (crontab etc. )
python --version
@pushd %~dp0
@echo """"""""""""""""""""""""""""""""""""
@echo "------ Start merging user merchant tables --------"
@echo """"""""""""""""""""""""""""""""""""
@C:\Users\liaomin\anaconda3\envs\py27\python datax.py usertenant.json
@echo """"""""""""""""""""""""""""""""""""
@echo "------ Start merging commodity classification tables --------"
@echo """"""""""""""""""""""""""""""""""""
@C:\Users\liaomin\anaconda3\envs\py27\python datax.py tenantproductdetail.json
@echo """"""""""""""""""""""""""""""""""""
@echo "------ Start merging wide tables --------"
@echo """"""""""""""""""""""""""""""""""""
@C:\Users\liaomin\anaconda3\envs\py27\python datax.py wide.json
@echo """"""""""""""""""""""""""""""""""""
@echo "------ From the wide table statistics commodity year report -------"
@echo """"""""""""""""""""""""""""""""""""
@C:\Users\liaomin\anaconda3\envs\py27\python datax.py year.json
@pause
Reports show :
docker install metabase
docker run -d -p 3000:3000 --name metabase metabase/metabase
The data source that generates the report and the corresponding login administrator account password are required for installation and configuration :
http://192.168.0.49:3000/
After entering the system, you can add a new database ( Click admin )
The navigation bar - database - New database 
After exiting the Administrator , Select Create question ( There is something wrong with the translation ), Select Native Query , Select your database , To write sql, Click the query button in the lower right corner 
Click the visualization at the bottom left , The pie chart , It can be generated from the data 
Choose to finish , Choose Save , You can also create a dashboard to aggregate multiple reports .
Click analysis to view your own reports and dashboards .
At the same time, you can integrate your own reports and dashboards into your project .
Click Share and choose to embed this in the application question
There is related front-end embedded code 
The bottom left corner of the embedded report : from Metabase Provide support footer , This can only be removed from the commercial version .
边栏推荐
- 对pyqt5和SQL Server数据库进行连接
- Lambda anonymous function
- @Differences between jsonformat and @datetimeformat
- ThreadLocal parsing
- MySQL add field or create table SQL statement
- Gstreamer应用开发实战指南(一)
- Esmascript 6.0 advanced
- Morsel driven parallelism: a NUMA aware parallel query execution framework
- Redis cache avalanche, penetration and breakdown
- Yolov5-6.0 series | yolov5 module design
猜你喜欢

Bubble sort, print diamond, print right triangle, print inverted triangle, print equilateral triangle, print 99 multiplication table
groupby函数详解

Morsel driven parallelism: a NUMA aware parallel query execution framework

三大队列cxq,entrylist,waitset 个人理解分析

How to monitor JVM GC

XML建模

Heqibao's trip to Chongqing ~

MySql中事务详细理解学习(事务管理、事务隔离级别、事务传播机制)

Heap and priority queues
![[it] Fuxin PDF Keeping Tool Selection](/img/1e/87dbd435e830c139bc3d5cf86d6d57.png)
[it] Fuxin PDF Keeping Tool Selection
随机推荐
关于用vsCode格式化代码时,代码自动换行问题
Alibaba cloud AI training camp - machine learning 2:xgboost
WAMP环境搭建(apache+mysql+php)
关于AIR代码签名及发布者标识符的相关介绍
latex中\cdots后面接上句子,后面的句子格式会乱怎么回事。
pytorch with Automatic Mixed Precision(AMP)
Fundamentals of deep learning: face based common expression recognition (2) - data acquisition and collation
Data summit 2022 conference information sharing (23 in total)
Seaweedfs client adapts to the higher version of seaweedfs service
Wamp environment setup (apache+mysql+php)
SET DECIMAL_V2=FALSE及UDF ERROR: Cannot divide decimal by zero及Incompatible return types DECIMAL问题排查
Notes on index building and search execution in Lucene
Practical guide to GStreamer application development (I)
Jitsi meet video recording with jibri
输入两个正整数m和n,求其最大公约数和最小公倍数。
Heqibao's trip to Chongqing ~
Complete jitsi meet guide for webrtc
代码签名证书的时间戳验证码签名方法
SSL证书包含了哪些信息?
三大队列cxq,entrylist,waitset 个人理解分析