Spark Structured Streaming 实战
前言
近期博主开发过程中, 需要利用 Kafka 进行大批量数据推送, 经过调研, 发现 Spark Structured Streaming 是个很好的实现思路, 无需自行实现生产者和消费者, 实现批数据推送.
Structured Streaming
在 Spark2.x
中,新开放了一个基于 DataFrame
的无下限的流式处理组件 Structured Streaming
. 实现了
有且仅有一次(Exectly Once) 语义.
在 Kafka 应用
注: 需要
Kafka broker
不低于0.10.0
如下示例, 仅是最为简单的读写操作, 可以查看 参考资料-1
进行拓展设置(如 多Topic订阅等).
实时流
// 实时读取 KAFKA 数据成 DATAFRAME
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 实时将 DATAFRAME 数据写入 KAFKA
val ds = df
.selectExpr("CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
批数据
// 批量读取 KAFKA 数据成 DATAFRAME
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 批量将 DATAFRAME 数据写入 KAFKA
df.selectExpr("CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
参考资料
Spark Structured Streaming 实战
https://www.windism.cn/3388315422.html