Class: SparkConnect::DataStreamWriter
- Inherits:
-
Object
- Object
- SparkConnect::DataStreamWriter
- Defined in:
- lib/spark_connect/streaming.rb
Overview
Writes a streaming DataFrame to a streaming sink and starts the query.
Returned by SparkConnect::DataFrame#write_stream. Mirrors PySpark's DataStreamWriter.
foreach/foreach_batch are intentionally unsupported: they require
user-defined functions, whose Spark Connect protobuf definitions are not yet
finalized.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- WSO =
Proto::WriteStreamOperationStart
Instance Method Summary collapse
-
#format(source) ⇒ self
Set the sink format (
"console","memory","kafka", ...). -
#initialize(df) ⇒ DataStreamWriter
constructor
A new instance of DataStreamWriter.
-
#option(key, value) ⇒ self
Set a single sink option.
-
#options(opts) ⇒ self
Set multiple sink options.
-
#output_mode(mode) ⇒ self
Set the output mode (
"append","complete","update"). -
#partition_by(*cols) ⇒ self
Partition the output by these columns.
-
#query_name(name) ⇒ self
Name the streaming query (required by the memory sink).
-
#start(path = nil) ⇒ StreamingQuery
Start the streaming query to a file/data path.
-
#to_table(name) ⇒ StreamingQuery
(also: #toTable)
Start the streaming query, writing into the given table.
-
#trigger(processing_time: nil, once: nil, available_now: nil, continuous: nil) ⇒ self
Configure the query trigger.
Constructor Details
#initialize(df) ⇒ DataStreamWriter
Returns a new instance of DataStreamWriter.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/spark_connect/streaming.rb', line 97 def initialize(df) @df = df @format = nil @options = {} @partitioning = [] @output_mode = nil @query_name = nil @trigger = nil @path = nil @table = nil end |
Instance Method Details
#format(source) ⇒ self
Returns set the sink format ("console", "memory", "kafka", ...).
110 111 112 113 |
# File 'lib/spark_connect/streaming.rb', line 110 def format(source) @format = source.to_s self end |
#option(key, value) ⇒ self
Returns set a single sink option.
122 123 124 125 |
# File 'lib/spark_connect/streaming.rb', line 122 def option(key, value) @options[key.to_s] = value.to_s self end |
#options(opts) ⇒ self
Returns set multiple sink options.
128 129 130 131 |
# File 'lib/spark_connect/streaming.rb', line 128 def (opts) opts.each { |k, v| @options[k.to_s] = v.to_s } self end |
#output_mode(mode) ⇒ self
Returns set the output mode ("append", "complete", "update").
116 117 118 119 |
# File 'lib/spark_connect/streaming.rb', line 116 def output_mode(mode) @output_mode = mode.to_s self end |
#partition_by(*cols) ⇒ self
Returns partition the output by these columns.
134 135 136 137 |
# File 'lib/spark_connect/streaming.rb', line 134 def partition_by(*cols) @partitioning = cols.flatten.map(&:to_s) self end |
#query_name(name) ⇒ self
Returns name the streaming query (required by the memory sink).
140 141 142 143 |
# File 'lib/spark_connect/streaming.rb', line 140 def query_name(name) @query_name = name.to_s self end |
#start(path = nil) ⇒ StreamingQuery
Start the streaming query to a file/data path.
166 167 168 169 |
# File 'lib/spark_connect/streaming.rb', line 166 def start(path = nil) @path = path if path run end |
#to_table(name) ⇒ StreamingQuery Also known as: toTable
Start the streaming query, writing into the given table.
175 176 177 178 |
# File 'lib/spark_connect/streaming.rb', line 175 def to_table(name) @table = name.to_s run end |
#trigger(processing_time: nil, once: nil, available_now: nil, continuous: nil) ⇒ self
Configure the query trigger. Provide exactly one of:
152 153 154 155 156 157 158 159 160 |
# File 'lib/spark_connect/streaming.rb', line 152 def trigger(processing_time: nil, once: nil, available_now: nil, continuous: nil) @trigger = if processing_time then [:processing_time_interval, processing_time.to_s] elsif once then [:once, true] elsif available_now then [:available_now, true] elsif continuous then [:continuous_checkpoint_interval, continuous.to_s] end self end |