当前位置:网站首页>Pyflink implements custom sourcefunction
Pyflink implements custom sourcefunction
2022-06-13 01:44:00 【Hua Weiyun】
brief introduction
pyflink At present, it is impossible to be like Map、FlatMap Same definition python UDF To achieve Source UDF Of , Instead, you need to implement Java SourceFunction, And then in python Introduction in operation .
// pyflink in SourceFunction The definition of class SourceFunction(JavaFunctionWrapper): """ Base class for all stream data source in Flink. """ def __init__(self, source_func: Union[str, JavaObject]): """ Constructor of SinkFunction. :param source_func: The java SourceFunction object. """ super(SourceFunction, self).__init__(source_func)Method 1
First of all use first java Simple implementation SourceFunction.
public class MyCustomSourceFunction implements SourceFunction<Row> { private static final String[] NAMES = {"Bob", "Marry", "Henry", "Mike", "Ted", "Jack"}; public void run(SourceContext sourceContext) { Random random = new Random(); for (int i = 0; i < NAMES.length; i++) { Row row = Row.of(i, NAMES[i]); sourceContext.collect(row); } } @Override public void cancel() {}}After that python Introduction in operation SourceFunction
# Fill in the classpath directly , Realize to java source References to custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction") # add_sourceds = env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()]))Method 2
Method 1 has been able to implement simple Source 了 , But there is a problem , Method 1 cannot pass parameters , For example, to achieve a RedisSource, Hope to pass python Job transfer parameters , Such as redis Of host and port, At this point, method 1 cannot be achieved , So we have method two .
package com.test;public class MyRedisSourceFunction implements SourceFunction<Row> { private string host; private int port; public MyRedisSourceFunction(string host, int port) { this.host = host; this.port = port; } public void run(SourceContext sourceContext) { Jedis jedis = xxxx; // initialization jedis while(true) { // Connect redis, get data } } @Override public void cancel() {}}here python How to write ? First you need to define a Python class, Initialize internally Java Redis Source.
# Realization Python sourcefunction, Internal initialization java sourcefunctionclass PyRedisSourceFunction(SourceFunction): def __init__(self, host, port): # obtain Java RedisSource class JMyRedisSourceFunction = get_gateway().jvm.com.test.MyRedisSourceFunction # initialization Java redis source j_redis_source = JMyRedisSourceFunction(host, port) # Call the parent class __init__ super(PyRedisSourceFunction, self).__init__(sink_func=j_redis_source)And then in main Function Python class
host = 127.0.0.1port = 12345# initialization Python classredis_source = PyRedisSourceFunction(host, port) # add_sourceds = env.add_source(redis_source, type_info=Types.STRING())边栏推荐
- How to solve the problems when using TV focusable to package APK in uni app
- PyFlink实现自定义SourceFunction
- [从零开始学习FPGA编程-21]:进阶篇 - 架构 - VerilogHDL编码规范
- This of phaser3 add. add. image
- Machine learning basic SVM (support vector machine)
- JSON and protobuf Any interchange
- Workspace for ROS
- leetcode743. Network latency (medium, Dijkstra)
- Tweets movement description and chart display
- QT color extraction
猜你喜欢

About tkinter Canvas does not display pictures

How to solve practical problems through audience positioning?

Magics 23.0如何激活和使用视图工具页的切片预览功能

路径字段是什么? ——竞价广告

Set and array conversion, list, array

The second round of mesa

6、 Implementation of warehouse out management function

【MathType】利用MathType输出LaTex样式的公式

leetcode743. Network latency (medium, Dijkstra)
![[MathType] use MathType to output latex style formula](/img/46/ac2041fbc2eb90474e2e2d7d1b0a40.png)
[MathType] use MathType to output latex style formula
随机推荐
五、库存查询功能的完善
Workspace for ROS
水管工遊戲
Unity JsonUtility 无法序列化List
D template instance does not match declaration
Record the VMware installation process of VMware Tools and some problems encountered
How many smart bids does Google have?
The interviewer only asked me five questions and the interview was over
C language implementation of the classic eight queens problem
#pragma comment(lib,“urlmon.lib“)
MySQL ---- where后使用字段别名
Sliding window summary of TCP connections
谷歌的智能出价有几种?
Leetcode question 20
如何利用您的自有数据来实现营销目标?
How to solve the problems when using TV focusable to package APK in uni app
On February 26, 2022, the latest news of national oil price adjustment today
Spit bubbles (stack)
Golang context (context summary)
受众群体应该选择观察模式还是定位模式?