当前位置:网站首页>Pyflink implements custom sourcefunction
Pyflink implements custom sourcefunction
2022-06-13 01:44:00 【Hua Weiyun】
brief introduction
pyflink At present, it is impossible to be like Map、FlatMap Same definition python UDF To achieve Source UDF Of , Instead, you need to implement Java SourceFunction, And then in python Introduction in operation .
// pyflink in SourceFunction The definition of class SourceFunction(JavaFunctionWrapper): """ Base class for all stream data source in Flink. """ def __init__(self, source_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param source_func: The java SourceFunction object. """ super(SourceFunction, self).__init__(source_func)Method 1
First of all use first java Simple implementation SourceFunction.
public class MyCustomSourceFunction implements SourceFunction<Row> { private static final String[] NAMES = {"Bob", "Marry", "Henry", "Mike", "Ted", "Jack"}; public void run(SourceContext sourceContext) { Random random = new Random(); for (int i = 0; i < NAMES.length; i++) { Row row = Row.of(i, NAMES[i]); sourceContext.collect(row); } } @Override public void cancel() {}}After that python Introduction in operation SourceFunction
# Fill in the classpath directly , Realize to java source References to custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction") # add_sourceds = env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()]))Method 2
Method 1 has been able to implement simple Source 了 , But there is a problem , Method 1 cannot pass parameters , For example, to achieve a RedisSource, Hope to pass python Job transfer parameters , Such as redis Of host and port, At this point, method 1 cannot be achieved , So we have method two .
package com.test;public class MyRedisSourceFunction implements SourceFunction<Row> { private string host; private int port; public MyRedisSourceFunction(string host, int port) { this.host = host; this.port = port; } public void run(SourceContext sourceContext) { Jedis jedis = xxxx; // initialization jedis while(true) { // Connect redis, get data } } @Override public void cancel() {}}here python How to write ? First you need to define a Python class, Initialize internally Java Redis Source.
# Realization Python sourcefunction, Internal initialization java sourcefunctionclass PyRedisSourceFunction(SourceFunction): def __init__(self, host, port): # obtain Java RedisSource class JMyRedisSourceFunction = get_gateway().jvm.com.test.MyRedisSourceFunction # initialization Java redis source j_redis_source = JMyRedisSourceFunction(host, port) # Call the parent class __init__ super(PyRedisSourceFunction, self).__init__(sink_func=j_redis_source)And then in main Function Python class
host = 127.0.0.1port = 12345# initialization Python classredis_source = PyRedisSourceFunction(host, port) # add_sourceds = env.add_source(redis_source, type_info=Types.STRING())边栏推荐
- What is the path field—— Competitive advertising
- 微服务开发环境搭建
- How do you use your own data to achieve your marketing goals?
- H5 open the app. If the app is not downloaded, jump to the download page. If the app has been downloaded, wake up the app
- Auto commit attribute of MySQL
- 【软考】软件设计师知识点整理(待更新)
- [WSL2]WSL2迁移虚拟磁盘文件ext4.vhdx
- 30: Kakfa simulates JSON data generation and transmission
- Cmake has no obvious error after compilation, but prompts that pthread cannot be found
- MySQL ---- where后使用字段别名
猜你喜欢

水管工游戏
![[andoid][step pit]cts 11_ Testbootclasspathandsystemserverclasspath at the beginning of R3_ Analysis of nonduplicateclasses fail](/img/b5/7ea603775dc0448368d209de037a43.png)
[andoid][step pit]cts 11_ Testbootclasspathandsystemserverclasspath at the beginning of R3_ Analysis of nonduplicateclasses fail

Explanation and application of prefix sum (one-dimensional, two-dimensional)

How does Google's audience work?

Introduction to ROS runtime

五、库存查询功能的完善

5、 Improvement of inventory query function

Uuid/guid introduction, generation rules and generation codes

Workspace for ROS

服务器安装jupyterlab以及远程登录配置
随机推荐
Reinstall opencv and step on the pit.
Note: common gadgets in project architecture
September 3, 2021 visual notes
ng-tv-focusable
QT color extraction
Crypto JS reports uglifyjs error
On February 26, 2022, the latest news of national oil price adjustment today
Sonarqube local installation
The interviewer only asked me five questions and the interview was over
Matplotlib drawing Chinese garbled code
Getting started with phaser 3
Spit bubbles (stack)
【软考】软件设计师知识点整理(待更新)
MySQL performance optimization
移动IPv6光猫登录的一般ip地址账号与密码,移动光猫变桥接模式
How many smart bids does Google have?
Redis usage optimization summary learning
How does Google's audience work?
C language implementation of the classic eight queens problem
How to solve the problems when using TV focusable to package APK in uni app