当前位置:网站首页>Pyspark changes the column order and saves it into iceberg database
Pyspark changes the column order and saves it into iceberg database
2022-07-28 08:34:00 【Lu Xinhang】
Create an environment , Appoint catalog
def get_spark():
os.environ.setdefault('HADOOP_USER_NAME', 'root')
# total size of serialized results of tasks is bigger than spark.driver.maxResultSize
# ERROR DataWritingSparkTask: Aborting commit for partition 2 (task 2, attempt 0, stage 0.0) Out of memory
spark = SparkSession.builder \
.config('spark.sql.debug.maxToStringFields', 2000) \
.config('spark.debug.maxToStringFields', 2000) \
.config('spark.driver.memory', '16g') \
.config('spark.executor.memory', '16g') \
.config('spark.driver.maxResultSize', '4g') \
.config('spark.network.timeout', 180) \
.getOrCreate()
spark.conf.set("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri",
"thrift://192.168.x.xx:9083")
spark.conf.set("spark.sql.session.timeZone", "GMT+8")
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", True)
return spark
Column A B To B A Store in database , If you do not change the order, you will report wrong columns and mismatches
1.spark dataframe Create a temporary view , And then to spark dataframe
spark = get_Spark()
spark_df = spark.sql('select A from ***')
spark_df = spark_df.withColumn("B", )
spark_df.createOrReplaceTempView("test")
DF = spark.sql("select B A from test")
DF.write.insertInto('xxx', True)
2. establish schema( Not recommended )
2.1 direct spark.createDataFrame(df) There may be a mistake schema ValueError: Some of types cannot be determined after inferring
There are fields spark Its type cannot be inferred ,
2.2 Specify the type of each field
Data conversion may report errors IntegerType can not accept object 2.0 in type <class 'float'>
schema = StructType([
StructField("xx", StringType(), True),
StructField("xx", IntegerType(), True)
])
边栏推荐
- Recommend a fully open source, feature rich, beautiful interface mall system
- 百度智能云九州区县大脑,描绘城乡新蓝图!
- Can a flinksql script write insert statements for two tables?
- uniapp的swiper动态设置current值不生效解决办法
- There are two Kafka topics that need to write data intact to MySQL King through Flink. Scheme 1: write two f's
- 49-OpenCv深入分析轮廓
- 招贤纳士,GBASE高端人才招募进行中
- 模型预测控制(MPC)解析(九):二次规划的数值解(下)
- EMC EMI磁珠的特性
- Creation of status bar (29)
猜你喜欢

Is the salary of test / development programmers unbalanced? Busy life, all kinds of job hopping

What happens when you unplug the power? Gaussdb (for redis) dual life keeps you prepared

Allure use

Characteristics of EMC EMI beads

EMC EMI磁珠的特性

Can the variable modified by final be modified

Prescan quick start to master the track editing path of Lecture 16

机器学习如何做到疫情可视化——疫情数据分析与预测实战

PMP practice once a day | don't get lost in the exam -7.13

MCU IO port controls 12V voltage on and off, MOS and triode circuit
随机推荐
Matlab (3) matlab program flow control statement
Brief introduction to ThreadLocal class
GBASE亮相联通云巡展(四川站) 以专业赋能云生态
How to close the blocked program process?
模型预测控制(MPC)解析(九):二次规划的数值解(下)
When unity switches to another scene, he finds that the scene is dimmed
Five screens, VR, projection, "Wei Xiaoli" rolled up on the intelligent cockpit
2021-07-02
leetcode刷题,我推荐B站这个妹子学霸的视频
Prescan quick start to master the road elements of lecture 15
SQL function
DCL singleton mode
金属质感登录框样式
Usage of qmap
QT 怎么删除布局里的所有控件?
Maximum product of leetcode/ word length
招贤纳士,GBASE高端人才招募进行中
Enum class
Is the salary of test / development programmers unbalanced? Busy life, all kinds of job hopping
jquey的基础语法