当前位置:网站首页>pyspark 写入数据到iceberg
pyspark 写入数据到iceberg
2022-07-28 06:30:00 【路新航】
pyspark环境搭建
1.D:\Python\python37\Lib\site-packages\pyspark\jars
放入
iceberg-spark3-runtime-0.13.1.jar
alluxio-2.6.2-client.jar
2.D:\Python\python37\Lib\site-packages\pyspark
创建conf文件夹 放入 hdfs-site.xml hive-site.xml
代码
import os
import warnings
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructField,StructType,DecimalType,IntegerType,TimestampType, StringType
import pypinyin
warnings.filterwarnings("ignore")
def get_spark():
os.environ.setdefault('HADOOP_USER_NAME', 'root')
spark = SparkSession.builder\
.config('spark.sql.debug.maxToStringFields', 2000) \
.config('spark.debug.maxToStringFields', 2000) \
.getOrCreate()
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.x.xx:9083")
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", True)
# Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'spark.sql.iceberg.handle-timestamp-without-timezone' to true
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
# spark.conf.set("spark.sql.storeAssignmentPolicy", "LEGACY")
# https://www.cnblogs.com/songchaolin/p/12098618.html pyspark.sql.utils.AnalysisException: LEGACY store assignment policy is disallowed in Spark data source V2. Please set the configuration spark.sql.storeAssignmentPolicy to other values.
return spark
def Capitalize_hanzipinyin(word):
return ''
def main_run(dt):
table_name='iceberg.xxx.xxx'
target_table_name = 'iceberg.xxx.xxx'
target_table_name_columns = ['A','B']
sql = """ select A,B from %s where dt = '%s' """%(table_name, dt)
spark = get_spark()
spark_df = spark.sql(sql)
toPinyinUDF = udf(Capitalize_hanzipinyin, StringType())
spark_df = spark_df.withColumn('A_pinyin', toPinyinUDF('A'))
# soulution 1
delete_sql = "delete from %s where dt = '%s' "%(target_table_name,dt)
spark.sql(delete_sql)
spark_df.write.saveAsTable(target_table_name, None, "append", partitionBy='dt')
# solution 2
spark_df.createOrReplaceTempView("test")#:创建临时视图
spark.sql(
"insert overwrite table %s partition(dt) select A,B,A_pinyin from test" % target_table_name)
# 使用select * 会报错 Cannot safely cast '': string to int
# soulution 3
new_spark_df = spark.sql("SELECT A,B,A_pinyin from test")
new_spark_df.write.insertInto(target_table_name, True)
# solution 4 会全部覆盖表的数据
# new_spark_df.write.saveAsTable(target_table_name, None, "overwrite",partitionBy='dt')
# solution 5 spark df 转 pandas df 数据类型可能匹配失败
df = spark_df.toPandas()
# 此数据帧转换为pandas时,列类型从spark中的integer更改为pandas中的float
df['A_pinyin'] = df['A'].apply(Capitalize_hanzipinyin)
df = df[target_table_name_columns] #更换位置
schema = StructType([
StructField("A", StringType(), True),
...
])
# 设置了scheam field A: IntegerType can not accept object 2.0 in type <class 'float'>
DF = spark.createDataFrame(df,schema)
#没有schema ValueError: Some of types cannot be determined after inferring存在字段spark无法推断它的类型
DF.write.insertInto(target_table_name, True)
边栏推荐
- Parse tree structure JS
- Is the salary of test / development programmers unbalanced? Busy life, all kinds of job hopping
- 任务管理器中,显示的CPU速度大于它的最大速度【主频】
- “蔚来杯“2022牛客暑期多校训练营2补题记录(DGHJKL)
- Can a flinksql script write insert statements for two tables?
- Mysql, how many columns can be used to create an index?
- XSS knowledge points and 20 character short domain name bypass
- Prescan quick start to master the track editing path of Lecture 16
- 网口网络水晶头RJ45、POE接口定义线序
- Allure use
猜你喜欢

Find out whether the number exists from the matrix

Opencv's practical learning of credit card recognition (4)

@The role of documented

解析树形结构 js

03 | project deployment: how to quickly deploy a website developed based on the laravel framework

Solve the inherent defects of CNN! Common CNN architecture ccnn is coming | icml2022

“蔚来杯“2022牛客暑期多校训练营2补题记录(DGHJKL)

Will ordinary browsers disclose information? How to protect privacy by using a secure browser?
![[chart component kit] Shanghai daoning provides developers with steema download, trial and tutorial](/img/67/5373c45716ade5fbd1d3a980d8e5da.png)
[chart component kit] Shanghai daoning provides developers with steema download, trial and tutorial

PMP practice once a day | don't get lost in the exam -7.13
随机推荐
How to understand the adjective prefix of socket: "connection oriented" and "connectionless"
[Err] 1055 - Expression#2 of select list is not in GROUP BY clause and contains nonaggregated column
Understand the propagation process of EMI electromagnetic interference through five diagrams - the influence of square wave steepness on high-frequency components, the spectrum graph from time sequenc
JS thoroughly understand this point
Es6: template string
[chart component kit] Shanghai daoning provides developers with steema download, trial and tutorial
Solve the inherent defects of CNN! Common CNN architecture ccnn is coming | icml2022
protobuf 基本语法总结
No super high-rise buildings | new regulations: what information does it reveal that no new buildings above 500 meters should be built?
Kubernetes技术与架构(七)
OSPF comprehensive experiment (7.12)
Talk about synchronous, asynchronous, blocking and non blocking
Opencv's practical learning of credit card recognition (4)
[book club issue 13] Chapter 1 multimedia processing tools ffmpeg tools
@The role of documented
JS cartoon English alphabet typing game source code
Qt使用信号量控制线程(QSemaphore)
Spiral matrix
What if the computer desktop icon has a small yellow lock?
Prescan quick start to master the transportation elements in lesson 14, prescan