当前位置:网站首页>pyspark更改列顺序存入iceberg数据库
pyspark更改列顺序存入iceberg数据库
2022-07-28 06:30:00 【路新航】
创建环境,指定catalog
def get_spark():
os.environ.setdefault('HADOOP_USER_NAME', 'root')
# total size of serialized results of tasks is bigger than spark.driver.maxResultSize
# ERROR DataWritingSparkTask: Aborting commit for partition 2 (task 2, attempt 0, stage 0.0) 内存不足
spark = SparkSession.builder \
.config('spark.sql.debug.maxToStringFields', 2000) \
.config('spark.debug.maxToStringFields', 2000) \
.config('spark.driver.memory', '16g') \
.config('spark.executor.memory', '16g') \
.config('spark.driver.maxResultSize', '4g') \
.config('spark.network.timeout', 180) \
.getOrCreate()
spark.conf.set("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri",
"thrift://192.168.x.xx:9083")
spark.conf.set("spark.sql.session.timeZone", "GMT+8")
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", True)
return spark
列 A B 转为 B A存入数据库,不更改顺序会报错列不匹配
1.spark dataframe 创建临时视图,再转为spark dataframe
spark = get_Spark()
spark_df = spark.sql('select A from ***')
spark_df = spark_df.withColumn("B", )
spark_df.createOrReplaceTempView("test")
DF = spark.sql("select B A from test")
DF.write.insertInto('xxx', True)
2.创建schema(不推荐)
2.1直接 spark.createDataFrame(df)可能报错 schema ValueError: Some of types cannot be determined after inferring
存在字段spark无法推断它的类型,
2.2指定每个字段的类型
数据转换可能报错IntegerType can not accept object 2.0 in type <class 'float'>
schema = StructType([
StructField("xx", StringType(), True),
StructField("xx", IntegerType(), True)
])
边栏推荐
- [book club issue 13] Chapter 2 notes on the packaging format and coding format of video files
- Recommend a fully open source, feature rich, beautiful interface mall system
- 单片机IO口控制12V电压通断,MOS和三极管电路
- 【花书笔记】 之 Chapter01 引言
- OSPF comprehensive experiment (7.12)
- Discrimination coverage index / index coverage / Samsung index
- 【17】 Establish data path (upper): instruction + operation =cpu
- UE4 engine customizes screenpass and MRT output
- 机械革命蛟龙p有线网卡驱动打不上
- Mechanical revolution Jiaolong P wired network card driver can't play
猜你喜欢

Find out whether the number exists from the matrix
![MySQL query error [err] 1046 - no database selected](/img/32/7d877571397c1e2024ec488b783e87.png)
MySQL query error [err] 1046 - no database selected

Is the salary of test / development programmers unbalanced? Busy life, all kinds of job hopping

网口网络水晶头RJ45、POE接口定义线序

本人男,27岁技术经理,收入太高,心头慌得一比

SWM32系列教程5-ADC应用

sql server时间字段排序

数字签名和CA证书

Recommend a fully open source, feature rich, beautiful interface mall system

CarSim simulation quick start (XII) - Driver Model (2)
随机推荐
@Documented 的作用
Talk about synchronous, asynchronous, blocking and non blocking
【13】 Adder: how to build a circuit like Lego (Part 1)?
Information system project manager must recite the core examination site (41) risk management plan
Prescan quick start to master the transportation elements in lesson 14, prescan
C#,入门教程——程序运行时的调试技巧与逻辑错误探针技术与源代码
二维数组及操作
百度智能云九州区县大脑,描绘城乡新蓝图!
mysql,可以使用多少列创建索引?
js卡片层叠样式的图片切换js特效
The fourth phase (2021-2022) research on the implementation of cloud native technology in traditional industries - central state-owned enterprises was officially released
MySQL: what is the difference between like and regexp operations?
How to understand the adjective prefix of socket: "connection oriented" and "connectionless"
机械革命蛟龙p有线网卡驱动打不上
[300 + selected interview questions from big companies continued to share] big data operation and maintenance sharp knife interview question column (VIII)
Kubernetes技术与架构(七)
GD32使用ST的HAL库和GD官方库的一些体会
[environment configuration] ppyoole trains its own data set (for its own use)
Yaml parameter configuration based on singleton mode
五张图看懂EMI电磁干扰的传播过程-方波陡峭程度对高频成分的影响,时序到频域频谱图形,波形形状对EMI辐射的影响。