当前位置:网站首页>Cluster script of data warehouse project
Cluster script of data warehouse project
2022-07-05 05:29:00 【SYBY】
Catalog
- Previous e-commerce warehouse project , To sum up
- Synchronous distribution scripts
- hadoop Group start and stop script for
- zookeeper Group start and stop script for
- kafka Group start and stop script for
- User behavior log collection end Flume Group start and stop script
- User behavior log collection end Flume To configure
- User behavior log, group start and group stop script on the consumer side
- User behavior log consumer Flume To configure
- Business data collection increment table Flume To configure
- Use Maxwell Script for full synchronization of incremental table on the first day
- DataX Sync script
Previous e-commerce warehouse project , To sum up
Synchronous distribution scripts
#!/bin/bash
#1. Judge the number of parameters
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. Traverse all machines in the cluster
for host in hadoop102 hadoop103 hadoop104
do
echo ==================== $host ====================
#3. Traverse all directories , Send... One by one
for file in [email protected]
do
#4. Judge whether the file exists
if [ -e $file ]
then
#5. Get parent directory
pdir=$(cd -P $(dirname $file); pwd)
#6. Get the name of the current file
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
hadoop Group start and stop script for
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== start-up hadoop colony ==================="
echo " --------------- start-up hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo " --------------- start-up yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo " --------------- start-up historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== close hadoop colony ==================="
echo " --------------- close historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
echo " --------------- close yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo " --------------- close hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac
zookeeper Group start and stop script for
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104; do
echo "--------------zookeeper $i start-up --------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104; do
echo "--------------zookeeper $i close --------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104; do
echo "--------------zookeeper $i state --------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac
kafka Group start and stop script for
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104; do
echo "--------- start-up $i Kafka---------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"start"){
for i in hadoop102 hadoop103 hadoop104; do
echo "--------- start-up $i Kafka---------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac
User behavior log collection end Flume Group start and stop script
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " -------- start-up $i collection flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " -------- stop it $i collection flume-------"
ssh $i "ps -ef | grep file_to_kafka.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
User behavior log collection end Flume To configure
Flume Of source,channel and sink Use the following types :
1.taildir source 2.kafka channel 3. Not configured sink
# Defining components
a1.sources=r1
a1.channels=c1
# To configure source (taildirsource)
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/applog/log/app.*
a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json
# Configure interceptors interceptors(ETL Data cleaning Judge json Is it complete )
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.serenity.flume.interceptor.ETLInterceptor$Builder
# To configure channel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic=topic_log
a1.channels.c1.parseAsFlumeEvent=false
# To configure sink( No, )
# Splice components
a1.sources.r1.channels=c1
User behavior log, group start and group stop script on the consumer side
#!/bin/bash
case $1 in
"start")
echo " -------- start-up hadoop104 Log data flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")
echo " -------- stop it hadoop104 Log data flume-------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9"
;;
esac
User behavior log consumer Flume To configure
Flume Of source,channel and sink Use the following types :
1.kafka source 2.file channel 3.hdfs sink
## Components
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.consumer.group.id = flume2
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail,order_detail_activity,order_detail_coupon,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.serenity.flume.interceptor.TimestampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{
topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## The control output file is a native file .
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## assemble
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
Business data collection increment table Flume To configure
The start and stop script is the same as the previous Flume The startup and shutdown scripts are basically the same , Pay attention to the configuration address .
Flume Of source,channel and sink Use the following types :
1.kafka source 2.file channel 3.hdfs sink
TimestampInterceptor And user behavior logs Flume Of TimestampInterceptor, Different time units .
## Components
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.consumer.group.id = flume2
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail,order_detail_activity,order_detail_coupon,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.serenity.flume.interceptor.db.TimestampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{
topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## The control output file is a native file .
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## assemble
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
Use Maxwell Script for full synchronization of incremental table on the first day
In the use of Maxwell After the first day full synchronization of the incremental table , Find data , Need to use where type='bootstrap-insert’ Filter , Because use bootstrap The first place for full synchronization will be marked with start and end marks ,type='bootstrap-start’ and type=‘bootstrap-complete’, It does not contain data .
#!/bin/bash
# The role of this script is to initialize all incremental tables , Only once
MAXWELL_HOME=/opt/module/maxwell
import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
" Table name ")
import_data Table name
;;
.
.
.
" Table name ")
import_data Table name
;;
" Table name ")
import_data Table name
;;
"all")
# Synchronize multiple tables at once
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
import_data Table name
;;
esac
DataX Sync script
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb
#MySQL Related configuration , It needs to be modified according to the actual situation
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"
#HDFS NameNode Related configuration , It needs to be modified according to the actual situation
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"
# The target path to generate the configuration file , It can be modified according to the actual situation
output_path = "/opt/module/datax/job/import"
# obtain mysql Connect
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)
# Get the metadata of the table Contains column names and data types
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall
# obtain mysql The names of the tables
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))
# Add the obtained metadata to mysql The data type of is converted to hive Data type of Write to hdfswriter in
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]
meta = get_mysql_meta(database, table)
return map(lambda x: {
"name": x[0], "type": type_mapping(x[1].lower())}, meta)
# Generate json file
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"fileType": "text",
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)
def main(args):
source_database = ""
source_table = ""
options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)
if __name__ == '__main__':
main(sys.argv[1:])
边栏推荐
- Sword finger offer 04 Search in two-dimensional array
- 过拟合与正则化
- [to be continued] [UE4 notes] L2 interface introduction
- Solon Auth 认证框架使用演示(更简单的认证框架)
- National teacher qualification examination in the first half of 2022
- [turn to] MySQL operation practice (III): table connection
- 服务熔断 Hystrix
- Introduction to memory layout of FVP and Juno platforms
- 对象的序列化
- Warning using room database: schema export directory is not provided to the annotation processor so we cannot export
猜你喜欢
随机推荐
Add level control and logger level control of Solon logging plug-in
Animation scoring data analysis and visualization and it industry recruitment data analysis and visualization
Acwing 4300. Two operations
剑指 Offer 09. 用两个栈实现队列
过拟合与正则化
[转]:Apache Felix Framework配置属性
Demonstration of using Solon auth authentication framework (simpler authentication framework)
[interval problem] 435 Non overlapping interval
Reflection summary of Haut OJ freshmen on Wednesday
What is the agile proportion of PMP Exam? Dispel doubts
2022上半年全国教师资格证下
Maximum number of "balloons"
YOLOv5-Shufflenetv2
Web APIs DOM node
2022年上半年国家教师资格证考试
National teacher qualification examination in the first half of 2022
kubeadm系列-00-overview
SAP-修改系统表数据的方法
用STM32点个灯
Haut OJ 1357: lunch question (I) -- high precision multiplication