Structured Streaming
spark-connect supports Spark Structured Streaming: read from streaming
sources, build streaming DataFrames with the same transformation API, and write
to streaming sinks, managing the resulting queries.
foreach/foreachBatchsinks and user-defined functions are not supported, because they rely on UDFs whose Spark Connect protobuf definitions are not yet finalized. Everything else (file/Kafka/console/memory sinks, triggers, output modes, watermarks, the query manager) works.
Reading a stream
Use {SparkConnect::SparkSession#read_stream} (alias readStream):
F = SparkConnect::F
stream = spark.read_stream
.format("rate")
.option("rowsPerSecond", 10)
.load
stream.streaming? #=> true
stream.schema.simple_string #=> "struct<timestamp:timestamp,value:bigint>"
format/option/options/schema/load/table mirror the batch
{SparkConnect::DataFrameReader}, plus csv/json/parquet/orc/text
shortcuts. The returned DataFrame is a normal {SparkConnect::DataFrame}: apply
select, filter, group_by, with_watermark, and so on.
Watermarks
events.with_watermark("event_time", "10 minutes")
.group_by(F.window(F.col("event_time"), "5 minutes"))
.count
Writing a stream
{SparkConnect::DataFrame#write_stream} (alias writeStream) returns a
{SparkConnect::DataStreamWriter}. Calling start (or to_table) launches the
query and returns a {SparkConnect::StreamingQuery}.
query = stream
.write_stream
.format("memory") # or "console", "parquet", "kafka", ...
.query_name("rates") # required for the memory sink
.output_mode("append") # "append" | "complete" | "update"
.trigger(processing_time: "1 second")
.start
query.id #=> stable query id (survives checkpoint restarts)
query.run_id #=> unique per start
query.active? #=> true
Triggers
| Call | Meaning |
|---|---|
trigger(processing_time: "10 seconds") |
micro-batch every interval |
trigger(once: true) |
process available data once, then stop |
trigger(available_now: true) |
process all available data (multiple batches), then stop |
trigger(continuous: "1 second") |
continuous processing with the given checkpoint interval |
Sinks
# Files (provide a checkpoint location)
stream.write_stream.format("parquet")
.option("checkpointLocation", "/chk/out")
.start("/data/out")
# A catalog table
stream.write_stream.format("parquet")
.option("checkpointLocation", "/chk/tbl")
.to_table("db.events")
Inspecting and controlling a query
query.status #=> {"message"=>..., "isActive"=>true, ...}
query.recent_progress #=> [ {parsed progress JSON}, ... ]
query.last_progress #=> the most recent progress object
query.process_all_available
query.await_termination(10_000) # block up to 10s; => terminated?
query.explain
query.exception #=> error message if the query failed, else nil
query.stop
Managing queries
{SparkConnect::SparkSession#streams} returns a {SparkConnect::StreamingQueryManager}:
spark.streams.active #=> [StreamingQuery, ...]
spark.streams.get(query.id) #=> the query, or nil
spark.streams.await_any_termination(30_000)
spark.streams.reset_terminated