当前位置:网站首页>Spark Learning: implement compact table command
Spark Learning: implement compact table command
2022-07-24 09:37:00 【I love evening primrose a】
Realization compact table command
requirement : Add add compact table command , Used to merge small files
One 、 modify SqlBase.g4
#statement Add below
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE FILES)? #compactTable
#ansiNonReserved Add below
| FILES
#nonReserved Add below
| FILES
--keywords list Add below
FILES:'FILES';
Two 、maven compile

3、 ... and 、 modify SparkSqlParser.scala
override def visitCompactTable(
ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier())
val filesNum = if (ctx.INTEGER_VALUE() != null) {
Some(ctx.INTEGER_VALUE().getText)
} else {
None
}
val partition = if (ctx.partitionSpec() != null) {
Some(ctx.partitionSpec().getText)
} else {
None
}
CompactTableCommand(table,filesNum,partition);
}
Four 、 increase compactTableCommand class
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{
Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{
Row, SaveMode, SparkSession}
case class CompactTableCommand(table: TableIdentifier,
filesNum: Option[String],
partitionSpec: Option[String]) extends LeafRunnableCommand {
private val defaultSize = 128 * 1024 * 1024
override def output: Seq[Attribute] = Seq(
AttributeReference("COMPACT_TABLE", StringType, nullable = false)()
)
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.setCurrentDatabase(table.database.getOrElse("default"))
val tempTableName = "`" + table.identifier + "_" + System.currentTimeMillis() + "`"
val originDataFrame = sparkSession.table(table.identifier)
val partitions = filesNum match {
case Some(files) => files.toInt
case None => (sparkSession.sessionState
.executePlan(originDataFrame.queryExecution.logical)
.optimizedPlan.stats.sizeInBytes / defaultSize).toInt + 1
}
// scalastyle:off println
println(partitions, tempTableName)
if (partitionSpec.nonEmpty) {
// https://stackoverflow.com/questions/38487667/
// overwrite-specific-partitions-in-spark-dataframe-write-method
sparkSession.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
val conditionExpr = partitionSpec.get.trim.stripPrefix("partition(").dropRight(1)
.replace(",", "AND")
// scalastyle:off println
println(conditionExpr)
originDataFrame.where(conditionExpr).repartition(partitions)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(tempTableName)
sparkSession.table(tempTableName).write
.mode(SaveMode.Overwrite)
.insertInto(table.identifier)
} else {
//
originDataFrame.repartition(partitions)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(tempTableName)
sparkSession.table(tempTableName)
.write
.mode(SaveMode.Overwrite)
.saveAsTable(table.identifier)
}
// sparkSession.sql(s"DROP TABLE ${tempTableName}")
Seq(Row(s"compact table ${
table.identifier} finished."))
}
}
5、 ... and 、 Report errors
1、 Recompile to make it effective
mvn package -DskipTests -Phive -Phive-thriftserver
2、 Create test data
#spark-sql Create a table
create table if not exists orders
(
id int COMMENT 'order id'
,name string COMMENT 'user name'
);
# insert data
INSERT INTO orders
VALUES
(1, 'part-1'),
(2, 'part-2'),
(3, 'part-3'),
(4, 'part-4'),
(5, 'part-5'),
(6, 'part-6');
3、 Report errors 
This error report took a day or two , It's useless to refer to online solutions , Finally, we reconfigured a set hadoop Environmental Science
1) download hadoop, I downloaded it 3.3.2
hadoop Mirror address
2) To configure HADOOP_HOME To the directory just extracted
3) Download the corresponding version of winutils.exe and hadoop.dll Put it in the decompressed bin Catalog , And put hadoop.dll Put it in System32 Under the table of contents
hadoop.dll、winutils.exe Download address
4) Reinsert data 
6、 ... and 、 verification
newly build orders surface , Insert 6 Data , You can see that there is 6 File 
Use compact table command command
compact table orders into 3 Files;

As shown in the figure below , You can see that it becomes three files , And a backup table is added 
边栏推荐
- NVIDIA set persistent mode
- Embedded development: Tools - optimizing firmware using DRT
- It is reported that the prices of some Intel FPGA chip products have increased by up to 20%
- SQL server2012 installation method details [easy to understand]
- Getting started with web security - open source firewall pfsense installation configuration
- [don't bother with reinforcement learning] video notes (I) 1. What is reinforcement learning?
- Gin framework uses session and redis to realize distributed session & Gorm operation mysql
- [example] v-contextmenu right click menu component
- What does CRM mean? Three "key points" for CRM management software selection
- Racecar multi-point navigation experiment based on ROS communication mechanism
猜你喜欢

科目1-3

详解LinkedList
![[MySQL] - deep understanding of index](/img/a6/6ca1356fe11bd33ec7362ce7cdc652.png)
[MySQL] - deep understanding of index
![[leetcode] 31. Next arrangement](/img/83/50a3cc17fc252582458bf32d1dd36b.png)
[leetcode] 31. Next arrangement

What if path is deleted by mistake when configuring system environment variables?

Why is TCP a triple handshake

Write a simple memo using localstorage

Linked list - 24. Exchange nodes in the linked list in pairs

Recursion - if the function calls itself internally, then the function is a recursive function & the effect is the same as that of the loop & the push condition return should be added, otherwise stack

Scarcity in Web3: how to become a winner in a decentralized world
随机推荐
dp最长公共子序列详细版本(LCS)
Dorissql syntax Usage Summary
[leetcode] 31. Next arrangement
Open source summer interview | learn with problems, Apache dolphin scheduler, Wang Fuzheng
What is the component customization event we are talking about?
It's eleven again. Those jokes about nagging programmers going home for blind dates
[200 opencv routines] 236. Principal component analysis of feature extraction (openCV)
Leetcode skimming: dynamic planning 03 (climb stairs with minimum cost)
[don't bother to strengthen learning] video notes (II) 2. Write a small example of Q learning
[Luogu p5410] [template] extend KMP (Z function) (string)
Huawei wireless device security policy configuration command
Map processing background management menu data
Makefile变量及动态库静态库
力扣300-最长递增子序列——动态规划
PHP Basics - PHP magic constants
(5) Cloud integrated gateway gateway +swagger documentation tool
What if path is deleted by mistake when configuring system environment variables?
We were tossed all night by a Kong performance bug
NVIDIA set persistent mode
Embedded development: Tools - optimizing firmware using DRT