当前位置:网站首页>Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
2022-07-30 20:37:00 【7&】
【文章目录】
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
0. 写在前面
- Spark :
Spark3.0.0
- Scala :
Scala2.12
1. 报错
overloaded method value foreachBatch with alternatives:
2. 代码及报错信息
Error:(48, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {
import java.util.Properties
import org.apache.spark.sql.streaming.{
StreamingQuery, Trigger}
import org.apache.spark.sql.{
DataFrame, SparkSession}
object ForeachBatchSink1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df, batchId) => {
val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
})
// .trigger(Trigger.ProcessingTime(0))
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
}
}
/**
- Error:(43, 12) overloaded method value foreachBatch with alternatives:
- (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
- (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
- cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
- .foreachBatch((df, batchId) => {
*/
import java.util.Properties
import org.apache.spark.sql.streaming.{
StreamingQuery, Trigger}
import org.apache.spark.sql.{
DataFrame, SparkSession}
object ForeachBatchSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ForeachSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("complete")
.foreachBatch((df, batchId) => {
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
})
.start
query.awaitTermination()
}
}
3. 原因及纠错
Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样
正确代码如下
import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink {
def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
println("BatchId" + batchId)
if (df.count() != 0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
}
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count() // value count
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = wordCount.writeStream
.outputMode("complete")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
})
.start
query.awaitTermination()
}
}
import java.util.Properties
import org.apache.spark.sql.streaming.{
StreamingQuery, Trigger}
import org.apache.spark.sql.{
DataFrame, Dataset, Row, SparkSession}
object ForeachBatchSink1 {
def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
import spark.implicits._
println("BatchId = " + batchId)
if (df.count() != 0) {
val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
}
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachBatchSink1")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // TODO 设置数据源
.option("host", "cluster01")
.option("port", 10000)
.load
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
val query: StreamingQuery = lines.writeStream
.outputMode("update")
.foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
})
.trigger(Trigger.Continuous(10))
.start
query.awaitTermination()
}
}
4. 参考链接
https://blog.csdn.net/Shockang/article/details/120961968
边栏推荐
猜你喜欢
使用map函数,对list中的每个元素进行操作 好像不用map
ECCV2022 | 对比视觉Transformer的在线持续学习
MySQL笔记2(函数,约束,多表查询,事务)
Common Expression Recognition Based on Face (1) - Basic Knowledge of Deep Learning
MySql 创建索引
WPS没有在任务栏显示所有窗口选项怎么回事?
To the operation of the int variable assignment is atom?
MySQL----多表查询
What is the common factor
啊?现在初级测试招聘都要求会自动化了?
随机推荐
Can't find the distributed lock of Redisson?
MySQL——几种常见的嵌套查询
MySQL 视图(详解)
Apple Silicon配置二进制环境(一)
服务器不稳定因素
@RequestParam使用
明解C语言第七章习题
How to make a deb package
想要写出好的测试用例,先要学会测试设计
ECCV2022 | 对比视觉Transformer的在线持续学习
KEIL problem: [keil Error: failed to execute 'C:\Keil\ARM\ARMCC']
线性结构:栈和队列
基于Apache Doris的湖仓分析
对一次生产环境产生OOM的记录,结论:除非在自己完全有把握的情况下,否则不要偷懒查询无用字段
Cookie中的JSESSIONID说明
excel数字下拉递增怎么设置?
如何解决gedit 深色模式下高亮文本不可见?
微信读书,导出笔记
TensorFlow2:概述
ENS 表情包域名火了!是炒作还是机遇?