当前位置:网站首页>数仓项目的集群脚本

数仓项目的集群脚本

2022-07-05 05:25:00 SYBY


之前做的电商数仓项目,总结一下

同步分发脚本

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in [email protected]
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

hadoop的群起群停脚本

#!/bin/bash

if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi

case $1 in
"start")
        echo " =================== 启动 hadoop集群 ==================="

        echo " --------------- 启动 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
        echo " --------------- 启动 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
        echo " --------------- 启动 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
        echo " =================== 关闭 hadoop集群 ==================="

        echo " --------------- 关闭 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
        echo " --------------- 关闭 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
        echo " --------------- 关闭 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
    echo "Input Args Error..."
;;
esac

zookeeper的群起群停脚本

#!/bin/bash

case $1 in
"start"){
    
	for i in hadoop102 hadoop103 hadoop104; do
		echo "--------------zookeeper $i 启动--------------"
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
};;	
"stop"){
    
	for i in hadoop102 hadoop103 hadoop104; do
		echo "--------------zookeeper $i 关闭--------------"
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
};;	
"status"){
    
	for i in hadoop102 hadoop103 hadoop104; do
		echo "--------------zookeeper $i 状态--------------"
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
};;
esac	

kafka的群起群停脚本

#!/bin/bash

case $1 in
"start"){
    
	for i in hadoop102 hadoop103 hadoop104; do
		echo "---------启动 $i Kafka---------"
		ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
	done
};;
"start"){
    
	for i in hadoop102 hadoop103 hadoop104; do
		echo "---------启动 $i Kafka---------"
		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
	done
};;
esac

用户行为日志采集端的Flume群起群停脚本

#!/bin/bash

case $1 in
"start"){
    
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
    
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

用户行为日志采集端的Flume配置

Flume的source,channel和sink分别使用如下类型:
1.taildir source 2.kafka channel 3.未配置sink

#定义组件
a1.sources=r1
a1.channels=c1

#配置source (taildirsource)
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/module/applog/log/app.*
a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json

#配置拦截器interceptors(ETL数据清洗 判断json是否完整)
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.serenity.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic=topic_log
a1.channels.c1.parseAsFlumeEvent=false

#配置sink(没有)

#拼接组件
a1.sources.r1.channels=c1

用户行为日志消费端的群起群停脚本

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9"
;;
esac

用户行为日志消费端的Flume配置

Flume的source,channel和sink分别使用如下类型:
1.kafka source 2.file channel 3.hdfs sink

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.consumer.group.id = flume2
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail,order_detail_activity,order_detail_coupon,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.serenity.flume.interceptor.TimestampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{
    topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

业务数据采集增量表的Flume配置

启停脚本与之前的Flume启停脚本基本一致,注意配置地址。
Flume的source,channel和sink分别使用如下类型:
1.kafka source 2.file channel 3.hdfs sink
TimestampInterceptor与用户行为日志Flume的TimestampInterceptor,时间单位不同。

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.consumer.group.id = flume2
a1.sources.r1.kafka.topics=cart_info,comment_info,coupon_use,favor_info,order_detail,order_detail_activity,order_detail_coupon,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.serenity.flume.interceptor.db.TimestampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{
    topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

使用Maxwell对增量表首日全量同步的脚本

在使用Maxwell进行增量表首日全量同步之后,查找数据,需要使用where type='bootstrap-insert’过滤,因为使用bootstrap进行全量同步的首位会有开始和结束标志,type='bootstrap-start’和type=‘bootstrap-complete’,其不包含数据。

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"表名")
  import_data 表名
  ;;
.
.
.
"表名")
  import_data 表名
  ;;
"表名")
  import_data 表名
  ;;
"all")
  #一次同步多表
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名
  import_data 表名	
  ;;
esac

DataX同步脚本

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

#获取mysql连接
def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

#获取表格的元数据 包含列名和数据类型
def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall

#获取mysql表的列名
def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))

#将获取的元数据中mysql的数据类型转换为hive的数据类型 写入到hdfswriter中
def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
    
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {
    "name": x[0], "type": type_mapping(x[1].lower())}, meta)

#生成json文件
def generate_json(source_database, source_table):
    job = {
    
        "job": {
    
            "setting": {
    
                "speed": {
    
                    "channel": 3
                },
                "errorLimit": {
    
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
    
                "reader": {
    
                    "name": "mysqlreader",
                    "parameter": {
    
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
    
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
    
                    "name": "hdfswriter",
                    "parameter": {
    
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)
    

if __name__ == '__main__':
    main(sys.argv[1:])    
原网站

版权声明
本文为[SYBY]所创,转载请带上原文链接,感谢
https://blog.csdn.net/SYBYy6/article/details/125551853