Class: SparkConnect::StreamingQuery
- Inherits:
-
Object
- Object
- SparkConnect::StreamingQuery
- Defined in:
- lib/spark_connect/streaming.rb
Overview
A handle to a running streaming query. Returned by DataStreamWriter#start.
Mirrors PySpark's StreamingQuery.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- Cmd =
Proto::StreamingQueryCommand
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
The stable query id (survives restarts from a checkpoint).
-
#name ⇒ String?
readonly
The query name, if one was set.
-
#run_id ⇒ String
readonly
The run id (unique per start).
Instance Method Summary collapse
-
#active? ⇒ Boolean
Whether the query is still running.
-
#await_termination(timeout_ms = nil) ⇒ Boolean
Block until the query terminates, or until
timeout_mselapses. -
#exception ⇒ String?
The query's exception message, if it has failed.
-
#explain(extended: false) ⇒ String
The query's execution plan.
-
#initialize(session, instance_id, name) ⇒ StreamingQuery
constructor
A new instance of StreamingQuery.
-
#last_progress ⇒ Hash?
The most recent progress object, if any.
-
#process_all_available ⇒ void
Process all available data, then return (useful for tests with bounded sources).
-
#recent_progress ⇒ Array<Hash>
Parsed JSON progress objects for recent micro-batches.
-
#status ⇒ Hash
The current status (
message,is_data_available,is_trigger_active,is_active). -
#stop ⇒ Object
Stop the query.
- #to_s ⇒ Object (also: #inspect)
Constructor Details
#initialize(session, instance_id, name) ⇒ StreamingQuery
Returns a new instance of StreamingQuery.
218 219 220 221 222 223 224 |
# File 'lib/spark_connect/streaming.rb', line 218 def initialize(session, instance_id, name) @session = session @instance_id = instance_id @id = instance_id.id @run_id = instance_id.run_id @name = name.nil? || name.empty? ? nil : name end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns the stable query id (survives restarts from a checkpoint).
209 210 211 |
# File 'lib/spark_connect/streaming.rb', line 209 def id @id end |
#name ⇒ String? (readonly)
Returns the query name, if one was set.
213 214 215 |
# File 'lib/spark_connect/streaming.rb', line 213 def name @name end |
#run_id ⇒ String (readonly)
Returns the run id (unique per start).
211 212 213 |
# File 'lib/spark_connect/streaming.rb', line 211 def run_id @run_id end |
Instance Method Details
#active? ⇒ Boolean
Returns whether the query is still running.
239 240 241 |
# File 'lib/spark_connect/streaming.rb', line 239 def active? status["isActive"] end |
#await_termination(timeout_ms = nil) ⇒ Boolean
Block until the query terminates, or until timeout_ms elapses.
257 258 259 260 261 |
# File 'lib/spark_connect/streaming.rb', line 257 def await_termination(timeout_ms = nil) ac = Cmd::AwaitTerminationCommand.new ac.timeout_ms = timeout_ms if timeout_ms command(await_termination: ac).await_termination.terminated end |
#exception ⇒ String?
Returns the query's exception message, if it has failed.
277 278 279 280 |
# File 'lib/spark_connect/streaming.rb', line 277 def exception result = command(exception: true).exception result. && result..empty? ? nil : result. end |
#explain(extended: false) ⇒ String
Returns the query's execution plan.
283 284 285 |
# File 'lib/spark_connect/streaming.rb', line 283 def explain(extended: false) command(explain: Cmd::ExplainCommand.new(extended: extended)).explain.result end |
#last_progress ⇒ Hash?
Returns the most recent progress object, if any.
249 250 251 |
# File 'lib/spark_connect/streaming.rb', line 249 def last_progress command(last_progress: true).recent_progress.recent_progress_json.map { |j| JSON.parse(j) }.last end |
#process_all_available ⇒ void
This method returns an undefined value.
Process all available data, then return (useful for tests with bounded sources).
265 266 267 268 |
# File 'lib/spark_connect/streaming.rb', line 265 def process_all_available command(process_all_available: true) nil end |
#recent_progress ⇒ Array<Hash>
Returns parsed JSON progress objects for recent micro-batches.
244 245 246 |
# File 'lib/spark_connect/streaming.rb', line 244 def recent_progress command(recent_progress: true).recent_progress.recent_progress_json.map { |j| JSON.parse(j) } end |
#status ⇒ Hash
Returns the current status (message, is_data_available,
is_trigger_active, is_active).
228 229 230 231 232 233 234 235 236 |
# File 'lib/spark_connect/streaming.rb', line 228 def status s = command(status: true).status { "message" => s., "isDataAvailable" => s.is_data_available, "isTriggerActive" => s.is_trigger_active, "isActive" => s.is_active, } end |
#stop ⇒ Object
Stop the query. @return [void]
271 272 273 274 |
# File 'lib/spark_connect/streaming.rb', line 271 def stop command(stop: true) nil end |
#to_s ⇒ Object Also known as: inspect
287 288 289 |
# File 'lib/spark_connect/streaming.rb', line 287 def to_s "#<SparkConnect::StreamingQuery id=#{@id} name=#{@name.inspect}>" end |