当前位置:网站首页>Spark TPCDS Data Gen
Spark TPCDS Data Gen
2022-07-06 17:31:00 【zhixingheyi_tian】
开启 Spark-Shell
$SPARK_HOME/bin/spark-shell --master local[10] --jars {PATH}/spark-sql-perf-1.2/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
Gen Data
Gen TCPDS Parquet
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://{IP}:9000/tpcds_parquet_tpcds_kit_1_0/1"
val database_name = "tpcds_parquet_tpcds_kit_1_0_scale_1_db"
val scale = "1"
val p = scale.toInt / 2048.0
val catalog_returns_p = (263 * p + 1).toInt
val catalog_sales_p = (2285 * p * 0.5 * 0.5 + 1).toInt
val store_returns_p = (429 * p + 1).toInt
val store_sales_p = (3164 * p * 0.5 * 0.5 + 1).toInt
val web_returns_p = (198 * p + 1).toInt
val web_sales_p = (1207 * p * 0.5 * 0.5 + 1).toInt
val format = "parquet"
val codec = "snappy"
val useDoubleForDecimal = false
val partitionTables = false
val clusterByPartitionColumns = partitionTables
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
spark.sqlContext.setConf(s"spark.sql.$format.compression.codec", codec)
val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_address", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "date_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "household_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "income_band", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "inventory", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "item", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "promotion", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "reason", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "ship_mode", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "time_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "warehouse", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_site", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_sales", catalog_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_returns", catalog_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_sales", store_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_returns", store_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_sales", web_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_returns", web_returns_p)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
Gen TPCH ORC
import com.databricks.spark.sql.perf.tpch._
val tools_path = "/opt/Beaver/tpch-dbgen"
val format = "orc"
val useDoubleForDecimal = false
val partitionTables = false
val scaleFactor = "1"
val data_path = s"hdfs://{IP}:9000/tpch_${format}_${scaleFactor}"
val numPartitions =1
val databaseName = s"tpch_${format}_${scaleFactor}_db"
val clusterByPartitionColumns = partitionTables
val tables = new TPCHTables(spark, spark.sqlContext,
dbgenDir = tools_path,
scaleFactor = scaleFactor,
useDoubleForDecimal = useDoubleForDecimal,
useStringForDate = false)
spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "200000000")
tables.genData(
location = data_path,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables, // do not create the partitioned fact tables
clusterByPartitionColumns, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = numPartitions) // how many dsdgen partitions to run - number of input tasks.
// Create the specified database
sql(s"drop database if exists $databaseName CASCADE")
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = false)
创建Metadata
Parquet create database/tables
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://10.1.2.206:9000/user/sparkuser/part_tpcds_decimal_1000/"
val database_name = "sr242_parquet_part_tpcds_decimal_1000"
val scale = "1000"
val useDoubleForDecimal = false
val format = "parquet"
val partitionTables = true
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
Arrow create database/tables
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "arrow_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
val partitionTables = true
spark.sql(s"DROP database if exists $databaseName CASCADE")
if (spark.catalog.databaseExists(s"$databaseName")) {
println(s"$databaseName has exists!")
}else{
spark.sql(s"create database if not exists $databaseName").show
spark.sql(s"use $databaseName").show
for (table <- tables) {
if (spark.catalog.tableExists(s"$table")){
println(s"$table has exists!")
}else{
spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
}
}
if (partitionTables) {
for (table <- tables) {
try{
spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
}catch{
case e: Exception => println(e)
}
}
}
}
使用ALTER 修改meta 信息
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "parquet_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
spark.sql(s"use $databaseName").show
for (table <- tables) {
try{
spark.sql(s"ALTER TABLE $table SET LOCATION '$data_path/$table'").show
}catch{
case e: Exception => println(e)
}
}
边栏推荐
- 重上吹麻滩——段芝堂创始人翟立冬游记
- 筑梦数字时代,城链科技战略峰会西安站顺利落幕
- Tensorflow 1.14 specify GPU running settings
- mysql: error while loading shared libraries: libtinfo.so.5: cannot open shared object file: No such
- Body mass index program, entry to write dead applet project
- Chenglian premium products has completed the first step to enter the international capital market by taking shares in halber international
- pyflink的安装和测试
- Informatics Orsay Ibn YBT 1172: find the factorial of n within 10000 | 1.6 14: find the factorial of n within 10000
- 第六篇,STM32脉冲宽度调制(PWM)编程
- Informatics Olympiad YBT 1171: factors of large integers | 1.6 13: factors of large integers
猜你喜欢
迈动互联中标北京人寿保险,助推客户提升品牌价值
Return to blowing marshland -- travel notes of zhailidong, founder of duanzhitang
"Exquisite store manager" youth entrepreneurship incubation camp - the first phase of Shunde market has been successfully completed!
[牛客] B-完全平方数
pyflink的安装和测试
golang中的Mutex原理解析
Summary of being a microservice R & D Engineer in the past year
[Niuke] b-complete square
做微服务研发工程师的一年来的总结
Data type of pytorch tensor
随机推荐
在jupyter中实现实时协同是一种什么体验
Openjudge noi 1.7 10: simple password
第三方跳转网站 出现 405 Method Not Allowed
Analysis of mutex principle in golang
Part VI, STM32 pulse width modulation (PWM) programming
The printf function is realized through the serial port, and the serial port data reception is realized by interrupt
[Niuke] [noip2015] jumping stone
深入探索编译插桩技术(四、ASM 探秘)
通过串口实现printf函数,中断实现串口数据接收
筑梦数字时代,城链科技战略峰会西安站顺利落幕
Let's see through the network i/o model from beginning to end
Segmenttree
Tensorflow 1.14 specify GPU running settings
pytorch之数据类型tensor
What kind of experience is it to realize real-time collaboration in jupyter
做微服务研发工程师的一年来的总结
"Exquisite store manager" youth entrepreneurship incubation camp - the first phase of Shunde market has been successfully completed!
C # method of calculating lunar calendar date 2022
Oracle:CDB限制PDB资源实战
golang中的atomic,以及CAS操作