Spark Structured Streaming 实战

前言

近期博主开发过程中, 需要利用 Kafka 进行大批量数据推送, 经过调研, 发现 Spark Structured Streaming 是个很好的实现思路, 无需自行实现生产者和消费者, 实现批数据推送.

Structured Streaming

Spark2.x 中,新开放了一个基于 DataFrame 的无下限的流式处理组件 Structured Streaming. 实现了 有且仅有一次(Exectly Once) 语义.

在 Kafka 应用

注: 需要 Kafka broker 不低于 0.10.0

如下示例, 仅是最为简单的读写操作, 可以查看 参考资料-1 进行拓展设置(如 多Topic订阅等).

实时流

// 实时读取 KAFKA 数据成 DATAFRAME
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 实时将 DATAFRAME 数据写入 KAFKA
val ds = df
  .selectExpr("CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

批数据

// 批量读取 KAFKA 数据成 DATAFRAME
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 批量将 DATAFRAME 数据写入 KAFKA
df.selectExpr("CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

参考资料


Spark Structured Streaming 实战
https://www.windism.cn/3388315422.html
作者
windism
发布于
2021年8月9日
许可协议