当前位置:网站首页>Pyspark writes data to iceberg
Pyspark writes data to iceberg
2022-07-28 08:34:00 【Lu Xinhang】
pyspark Environment building
1.D:\Python\python37\Lib\site-packages\pyspark\jars
Put in
iceberg-spark3-runtime-0.13.1.jar
alluxio-2.6.2-client.jar
2.D:\Python\python37\Lib\site-packages\pyspark
establish conf Folder Put in hdfs-site.xml hive-site.xml
Code
import os
import warnings
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StructField,StructType,DecimalType,IntegerType,TimestampType, StringType
import pypinyin
warnings.filterwarnings("ignore")
def get_spark():
os.environ.setdefault('HADOOP_USER_NAME', 'root')
spark = SparkSession.builder\
.config('spark.sql.debug.maxToStringFields', 2000) \
.config('spark.debug.maxToStringFields', 2000) \
.getOrCreate()
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.x.xx:9083")
spark.conf.set("spark.sql.iceberg.handle-timestamp-without-timezone", True)
# Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'spark.sql.iceberg.handle-timestamp-without-timezone' to true
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
# spark.conf.set("spark.sql.storeAssignmentPolicy", "LEGACY")
# https://www.cnblogs.com/songchaolin/p/12098618.html pyspark.sql.utils.AnalysisException: LEGACY store assignment policy is disallowed in Spark data source V2. Please set the configuration spark.sql.storeAssignmentPolicy to other values.
return spark
def Capitalize_hanzipinyin(word):
return ''
def main_run(dt):
table_name='iceberg.xxx.xxx'
target_table_name = 'iceberg.xxx.xxx'
target_table_name_columns = ['A','B']
sql = """ select A,B from %s where dt = '%s' """%(table_name, dt)
spark = get_spark()
spark_df = spark.sql(sql)
toPinyinUDF = udf(Capitalize_hanzipinyin, StringType())
spark_df = spark_df.withColumn('A_pinyin', toPinyinUDF('A'))
# soulution 1
delete_sql = "delete from %s where dt = '%s' "%(target_table_name,dt)
spark.sql(delete_sql)
spark_df.write.saveAsTable(target_table_name, None, "append", partitionBy='dt')
# solution 2
spark_df.createOrReplaceTempView("test")#: Create a temporary view
spark.sql(
"insert overwrite table %s partition(dt) select A,B,A_pinyin from test" % target_table_name)
# Use select * Will report a mistake Cannot safely cast '': string to int
# soulution 3
new_spark_df = spark.sql("SELECT A,B,A_pinyin from test")
new_spark_df.write.insertInto(target_table_name, True)
# solution 4 All the data in the table will be overwritten
# new_spark_df.write.saveAsTable(target_table_name, None, "overwrite",partitionBy='dt')
# solution 5 spark df turn pandas df Data type matching may fail
df = spark_df.toPandas()
# This data frame is converted to pandas when , Column type from spark Medium integer Change to pandas Medium float
df['A_pinyin'] = df['A'].apply(Capitalize_hanzipinyin)
df = df[target_table_name_columns] # Change position
schema = StructType([
StructField("A", StringType(), True),
...
])
# Set up scheam field A: IntegerType can not accept object 2.0 in type <class 'float'>
DF = spark.createDataFrame(df,schema)
# No, schema ValueError: Some of types cannot be determined after inferring There are fields spark Its type cannot be inferred
DF.write.insertInto(target_table_name, True)
边栏推荐
- Feign call
- See how Google uses pre training weights in target detection tasks | CVPR 2022
- [reprint] man Rsync translation (Chinese Manual of Rsync command)
- 【OpenCV】生成透明的PNG图像
- tkMapper的使用-超详细
- Plantuml Usage Summary
- [Qt5] small software with 5 people randomly selected from the bid evaluation expert base
- Three different numbers with 0 in leetcode/ array
- Meituan Er Mian: why does redis have sentinels?
- 解决EMC、EMI传导干扰的八大方法
猜你喜欢

UE4 engine customizes screenpass and MRT output
![[mindspire YiDianTong robot-01] you may have seen many Knowledge Q & A robots, but this is a little different](/img/d1/c2c2e4a605deddd0073a05d528733f.jpg)
[mindspire YiDianTong robot-01] you may have seen many Knowledge Q & A robots, but this is a little different

中标捷报!南大通用GBase 8s中标南瑞集团2022年数据库框架项目

Solve the inherent defects of CNN! Common CNN architecture ccnn is coming | icml2022

tkMapper的使用-超详细

MCU IO port controls 12V voltage on and off, MOS and triode circuit

Prescan quick start to master the track editing path of Lecture 16

Unity切换到另一个场景的时候,发现该场景变暗了

Change the dataDir path after mysql8.0.16 installation

Es6: template string
随机推荐
数字签名和CA证书
Prescan quick start to proficient in lecture 17, speed curve editor
2021-07-02
opencv+paddle orc 识别图片提取表格信息
How does QT delete all controls in a layout?
Maximum product of leetcode/ word length
Melt cloud x chat, create a "stress free social" habitat with sound
pyflink连接iceberg 实践
CI框架如何集成Smarty模板
uniapp---- 获取当前位置的经纬度等信息的详细步骤(包含小程序)
What happens when you unplug the power? Gaussdb (for redis) dual life keeps you prepared
[Qt5] a method of multi window parameter transmission (using custom signal slot) and case code download
EMC EMI磁珠的特性
‘全局事件总线’&‘消息订阅与发布’
leetcode/单词长度的最大乘积
Unity中队列(Queue)的简单使用
一篇文章搞懂数据仓库:元数据分类、元数据管理
Day112.尚医通:手机验证码登录功能
Talk about row storage and column storage of database
Brief introduction to ThreadLocal class