当前位置:网站首页>Spark TPCDS Data Gen
Spark TPCDS Data Gen
2022-07-07 01:16:00 【zhixingheyi_ tian】
Turn on Spark-Shell
$SPARK_HOME/bin/spark-shell --master local[10] --jars {PATH}/spark-sql-perf-1.2/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar
Gen Data
Gen TCPDS Parquet
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://{IP}:9000/tpcds_parquet_tpcds_kit_1_0/1"
val database_name = "tpcds_parquet_tpcds_kit_1_0_scale_1_db"
val scale = "1"
val p = scale.toInt / 2048.0
val catalog_returns_p = (263 * p + 1).toInt
val catalog_sales_p = (2285 * p * 0.5 * 0.5 + 1).toInt
val store_returns_p = (429 * p + 1).toInt
val store_sales_p = (3164 * p * 0.5 * 0.5 + 1).toInt
val web_returns_p = (198 * p + 1).toInt
val web_sales_p = (1207 * p * 0.5 * 0.5 + 1).toInt
val format = "parquet"
val codec = "snappy"
val useDoubleForDecimal = false
val partitionTables = false
val clusterByPartitionColumns = partitionTables
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
spark.sqlContext.setConf(s"spark.sql.$format.compression.codec", codec)
val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "call_center", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_address", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "customer_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "date_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "household_demographics", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "income_band", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "inventory", 6)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "item", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "promotion", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "reason", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "ship_mode", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "time_dim", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "warehouse", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_page", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_site", 1)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_sales", catalog_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "catalog_returns", catalog_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_sales", store_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "store_returns", store_returns_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_sales", web_sales_p)
tables.genData(data_path, format, true, partitionTables, clusterByPartitionColumns, false, "web_returns", web_returns_p)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
Gen TPCH ORC
import com.databricks.spark.sql.perf.tpch._
val tools_path = "/opt/Beaver/tpch-dbgen"
val format = "orc"
val useDoubleForDecimal = false
val partitionTables = false
val scaleFactor = "1"
val data_path = s"hdfs://{IP}:9000/tpch_${format}_${scaleFactor}"
val numPartitions =1
val databaseName = s"tpch_${format}_${scaleFactor}_db"
val clusterByPartitionColumns = partitionTables
val tables = new TPCHTables(spark, spark.sqlContext,
dbgenDir = tools_path,
scaleFactor = scaleFactor,
useDoubleForDecimal = useDoubleForDecimal,
useStringForDate = false)
spark.sqlContext.setConf("spark.sql.files.maxRecordsPerFile", "200000000")
tables.genData(
location = data_path,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables, // do not create the partitioned fact tables
clusterByPartitionColumns, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = numPartitions) // how many dsdgen partitions to run - number of input tasks.
// Create the specified database
sql(s"drop database if exists $databaseName CASCADE")
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = false)
establish Metadata
Parquet create database/tables
val tools_path = "/opt/Beaver/tpcds-kit/tools"
val data_path = "hdfs://10.1.2.206:9000/user/sparkuser/part_tpcds_decimal_1000/"
val database_name = "sr242_parquet_part_tpcds_decimal_1000"
val scale = "1000"
val useDoubleForDecimal = false
val format = "parquet"
val partitionTables = true
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(spark, spark.sqlContext, tools_path, scale, useDoubleForDecimal)
tables.createExternalTables(data_path, format, database_name, overwrite = true, discoverPartitions = partitionTables)
Arrow create database/tables
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "arrow_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
val partitionTables = true
spark.sql(s"DROP database if exists $databaseName CASCADE")
if (spark.catalog.databaseExists(s"$databaseName")) {
println(s"$databaseName has exists!")
}else{
spark.sql(s"create database if not exists $databaseName").show
spark.sql(s"use $databaseName").show
for (table <- tables) {
if (spark.catalog.tableExists(s"$table")){
println(s"$table has exists!")
}else{
spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
}
}
if (partitionTables) {
for (table <- tables) {
try{
spark.sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
}catch{
case e: Exception => println(e)
}
}
}
}
Use ALTER modify meta Information
val data_path= "hdfs://{IP}:9000/{PATH}/part_tpcds_decimal_1000/"
val databaseName = "parquet_part_tpcds_decimal_1000"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
spark.sql(s"use $databaseName").show
for (table <- tables) {
try{
spark.sql(s"ALTER TABLE $table SET LOCATION '$data_path/$table'").show
}catch{
case e: Exception => println(e)
}
}
边栏推荐
- The MySQL database in Alibaba cloud was attacked, and finally the data was found
- Neon Optimization: summary of performance optimization experience
- THREE.AxesHelper is not a constructor
- [Niuke] b-complete square
- Mongodb client operation (mongorepository)
- Activereportsjs 3.1 Chinese version | | | activereportsjs 3.1 English version
- Deep learning framework TF installation
- "Exquisite store manager" youth entrepreneurship incubation camp - the first phase of Shunde market has been successfully completed!
- Can the system hibernation file be deleted? How to delete the system hibernation file
- Failed to successfully launch or connect to a child MSBuild. exe process. Verify that the MSBuild. exe
猜你喜欢
资产安全问题或制约加密行业发展 风控+合规成为平台破局关键
HMM notes
如何管理分布式团队?
Part VI, STM32 pulse width modulation (PWM) programming
Boot - Prometheus push gateway use
Body mass index program, entry to write dead applet project
Data type of pytorch tensor
[牛客] B-完全平方数
BFS realizes breadth first traversal of adjacency matrix (with examples)
线段树(SegmentTree)
随机推荐
Activereportsjs 3.1 Chinese version | | | activereportsjs 3.1 English version
Dell笔记本周期性闪屏故障
Build your own website (17)
实现mysql与ES的增量数据同步
「笔记」折半搜索(Meet in the Middle)
golang中的WaitGroup实现原理
MySQL script batch queries all tables containing specified field types in the database
Neon Optimization: performance optimization FAQ QA
UI control telerik UI for WinForms new theme - vs2022 heuristic theme
Openjudge noi 1.7 10: simple password
[100 cases of JVM tuning practice] 05 - Method area tuning practice (Part 2)
Install Firefox browser on raspberry pie /arm device
Realize incremental data synchronization between MySQL and ES
gnet: 一个轻量级且高性能的 Go 网络框架 使用笔记
How to evaluate load balancing performance parameters?
Can the system hibernation file be deleted? How to delete the system hibernation file
Force buckle 1037 Effective boomerang
Boot - Prometheus push gateway use
Grc: personal information protection law, personal privacy, corporate risk compliance governance
Niuke cold training camp 6B (Freund has no green name level)