当前位置:网站首页>pyflink连接iceberg 实践
pyflink连接iceberg 实践
2022-07-28 06:30:00 【路新航】
参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/index.html
pyflink是什么
- 数据流处理的框架
- 这个框架是同时运行在多台主机上
- 通过某种方式这多台主机之间可以通信
- 可以单机运行
pyflink只是对java的flink的一个调用工具,不能直接用python来对source、sink组件进行实现。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
PyFlink 架构
PyFlink 的核心目标:
1.将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。
2.将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。
应用场景
- 第一个,事件驱动型,比如:刷单,监控等;
- 第二个,数据分析型的,比如:库存,双11大屏等;
- 第三个适用的场景是数据管道,也就是ETL场景,比如一些日志的解析等;
- 第四个场景,机器学习,比如个性推荐等。
API
Flink 为流/批处理应用程序提供了不同级别的抽象。
- SQL
- Table API
- DataStream/DataSet API(核心 API)
- Stateful Stream Processing
PyFlink API 完全与 Java Table API 对齐,各种关系操作都支持,同时对 window 也有很好的支持,除了这些 APIs,PyFlink还提供多种定义 Python UDF 的方式.
UDF自定义函数
首先,可以扩展 ScalarFunction,这种方式可以提供更多的辅助功能,比如添加 Metrics 。除此之外 Python 语言所支持的任何方式的方法定义,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。
当定义完方法后,用 PyFlink 所提供的 Decorators 进行打标,并描述 input 和 output 的数据类型就可以了。后面版本也可以根据 Python 语言的 type hint 特性再进一步简化,进行类型推导。
from pyflink.table import ScalarFunction, DataTypes
from pyflink.table.udf import udf
# Extend ScalarFunction
class ADD(ScalarFunction):
def eval(self, i ,j):
return i + j
add1 = udf (ADD(), [DataTypes.BIGINT(), DataTypes.BIGINT()] ,DataTypes.BIGINT())
# Named function
@udf(input_types = [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
def add2( i ,j):
return i + j
# Lambda function
add3 = udf(lambda i,j :i+j, [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
# Callable Function
class CallableAdd(object):
def __call__(self, i,j):
return i + j
add4 = udf(CallableAdd(),[DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
pyflink安装
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple/ apache-flink==1.13.5
最好指定版本,如1.13.2
实战
apache-flink 1.13.2
读取iceberg数据
pyflink安装目录/lib
flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
hive-exec-3.1.2.jar
alluxio-2.6.2-client.jar
iceberg-flink-runtime-1.13-0.13.1.jar
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment,StreamTableEnvironment
os.environ.setdefault('HADOOP_USER_NAME', 'root')
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env,environment_settings=env_settings )
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
iceberg_hive_catalog = """ CREATE CATALOG iceberg WITH ( 'type'='iceberg' ,'catalog-type'='hive' -- 可选 catalog类型 hive、hadoop、custom ,'property-version'='1' -- 可选 属性版本号,可向后兼容,目前版本号为1 ,'cache-enabled' = 'true' -- 可选 是否启用catalog缓存 默认为true ,'uri'='thrift://192.168.xxx.xxx:9083' -- 必填 hive 元数据存储连接 ,'clients'='5' -- hive metastore clients连接池大小,默认为2 ,'warehouse'='hdfs://ns1/lakehouse/' ) """
t_env.get_current_catalog()
t_env.get_current_database()
# t_env.execute_sql(iceberg_hive_catalog).print()
t_env.execute_sql("use catalog iceberg").print()
t_env.execute_sql("show current catalog").print()
#
t_env.execute_sql("show databases").print()
t_env.execute_sql("use dbname").print()
t_env.execute_sql("show tables").print()
table1 = t_env.execute_sql(
"select * from ***")
table2 = t_env.sql_query(
"select * from xxx")
pd = table2.to_pandas()
读取mysql数据
配置
pyflink安装目录/lib
mysql-connector-java-8.0.16.jar
flink-connector-jdbc_2.12-1.13.2.jar(https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.13.2)
mysql创表
CREATE TABLE `flink_test` (
`f0` int(11) DEFAULT NULL,
`f1` int(11) DEFAULT NULL
)
insert into flink_test VALUES(1,11)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import EnvironmentSettings, TableEnvironment
# create environment
from pyflink.table.expressions import lit
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
mysql_sink_ddl = """ CREATE TABLE flink_test ( id BIGINT, word VARCHAR, `count` BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.url' = 'jdbc:mysql://192.168.xx.xx:3306/test', 'connector.table' = 'flink_test', 'connector.username' = 'xxx', 'connector.password' = 'xxx', 'connector.write.flush.interval' = '1s' ) """
mysql_sink_ddl = """ create table flink_test ( f0 INT, f1 INT ) WITH ( 'connector' = 'jdbc', -- 使用 jdbc connector 'url' = 'jdbc:mysql://192.168.xx.xxx:3306/test', 'username' = 'xx', 'table-name' = 'flink_test', 'password' = 'xx' ) """
t_env.execute_sql(mysql_sink_ddl)
table = t_env.execute_sql("select * from flink_test")
# +-------------+-------------+
# | f0 | f1 |
# +-------------+-------------+
# | 1 | 11 |
# +-------------+-------------+
# 1 row in set
table = t_env.execute_sql("insert into flink_test values(2,22)")
table2 = t_env.sql_query('select * from flink_test')
# tab = t_env.from_path('flink_test')
table2.to_pandas()
# f0 f1
# 0 1 11
# 1 2 22
读取一个 csv 文件,计算词频,并将结果写到一个结果文件中
文件名:word_count.py
参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/
from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
# 创建TableEnvironment 。这是Python Table API作业的入口类。
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)
# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# 创建源表和结果表,在ExecutionEnvironment中注册表名分别为mySource和mySink的表。
t_env.connect(FileSystem().path('./input/word_count.csv')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('./ouput/output.csv')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
# 下列代码实现的输出文件名是乱码,'/ouput/output.csv'被当作目录
''' my_source_ddl = """ create table mySource ( word VARCHAR ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = './input/word_count.csv' ) """ my_sink_ddl = """ create table mySink ( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/ouput/output.csv' ) """ t_env.execute_sql(my_source_ddl) t_env.execute_sql(my_sink_ddl) '''
# 该作业读取表mySource中的数据
tab = t_env.from_path('mySource')
# 启动Flink Python Table API作业
# 当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.execute_insert('mySink').wait()
报错
Could not find any factory for identifier ‘jdbc‘
flink-connector-jdbc_2.12-1.13.2.jar包没有放入指定位置
org.apache.flink.sql.parser.impl.ParseException: Encountered “)” at line 12, column 1.
建表语句最后一个括号前多了个逗号
参考
实例:
csv读入参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/
使用mysql_Flink 使用python连接mysql: https://blog.csdn.net/weixin_32136203/article/details/112684291
边栏推荐
- Yaml parameter configuration based on singleton mode
- Tensorflow uses deep learning (II)
- Chairman tree review
- Freezing and thawing of pytoch
- Basic dictionary of deep learning --- activation function, batch size, normalization
- New generation cloud native message queue (II)
- [environment configuration] ppyoole trains its own data set (for its own use)
- Prescan quick start to master the track editing path of Lecture 16
- Enum class
- 【活动报名】云原生技术交流 Meetup,8 月 6 日广州见
猜你喜欢

Chairman tree review

Is the salary of test / development programmers unbalanced? Busy life, all kinds of job hopping

Melt cloud x chat, create a "stress free social" habitat with sound

No super high-rise buildings | new regulations: what information does it reveal that no new buildings above 500 meters should be built?

Discrimination coverage index / index coverage / Samsung index

Plantuml Usage Summary

Draw.io image saving path settings

jquey的基础语法

sql server时间字段排序

Meituan Er Mian: why does redis have sentinels?
随机推荐
网口网络水晶头RJ45、POE接口定义线序
解析树形结构 js
我们如何在mysql中运行批处理模式?
Kubernetes技术与架构(七)
Regular expression for mobile number verification
Openstack dashboard configuring public network access
CarSim simulation quick start (XII) - Driver Model (2)
Prescan quick start to proficient in lecture 17, speed curve editor
SWM32系列教程5-ADC应用
Tell you step by step what you need to do to apply for PMP? What should I do?
What are the different tables in MySQL?
Discrimination coverage index / index coverage / Samsung index
Solve the inherent defects of CNN! Common CNN architecture ccnn is coming | icml2022
Prescan quick start to master the track editing path of Lecture 16
DCL singleton mode
Fxksmdb.exe process description
Awk from introduction to earth (16) discussion on the types of awk variables -- about the two types of numbers and strings
OpenTSDB-时序数据库
五张图看懂EMI电磁干扰的传播过程-方波陡峭程度对高频成分的影响,时序到频域频谱图形,波形形状对EMI辐射的影响。
c语言中函数的介绍(血书20000字!!!!)