Class: SparkConnect::StreamingQueryManager
- Inherits:
-
Object
- Object
- SparkConnect::StreamingQueryManager
- Defined in:
- lib/spark_connect/streaming.rb
Overview
Manages the streaming queries of a session. Returned by SparkConnect::SparkSession#streams.
Mirrors PySpark's StreamingQueryManager.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- MCmd =
Proto::StreamingQueryManagerCommand
Instance Method Summary collapse
-
#active ⇒ Array<StreamingQuery>
The currently active queries.
-
#await_any_termination(timeout_ms = nil) ⇒ Boolean
Block until any query terminates, or until
timeout_mselapses. -
#get(id) ⇒ StreamingQuery?
Look up an active query by its id.
-
#initialize(session) ⇒ StreamingQueryManager
constructor
A new instance of StreamingQueryManager.
-
#reset_terminated ⇒ Object
Forget the cached termination state of all queries (so a subsequent #await_any_termination blocks again).
Constructor Details
#initialize(session) ⇒ StreamingQueryManager
Returns a new instance of StreamingQueryManager.
307 308 309 |
# File 'lib/spark_connect/streaming.rb', line 307 def initialize(session) @session = session end |
Instance Method Details
#active ⇒ Array<StreamingQuery>
Returns the currently active queries.
312 313 314 |
# File 'lib/spark_connect/streaming.rb', line 312 def active command(active: true).active.active_queries.map { |q| StreamingQuery.new(@session, q.id, q.name) } end |
#await_any_termination(timeout_ms = nil) ⇒ Boolean
Block until any query terminates, or until timeout_ms elapses.
331 332 333 334 335 |
# File 'lib/spark_connect/streaming.rb', line 331 def await_any_termination(timeout_ms = nil) ac = MCmd::AwaitAnyTerminationCommand.new ac.timeout_ms = timeout_ms if timeout_ms command(await_any_termination: ac).await_any_termination.terminated end |
#get(id) ⇒ StreamingQuery?
Look up an active query by its id.
320 321 322 323 324 325 |
# File 'lib/spark_connect/streaming.rb', line 320 def get(id) result = command(get_query: id.to_s) return nil unless result.result_type == :query StreamingQuery.new(@session, result.query.id, result.query.name) end |
#reset_terminated ⇒ Object
Forget the cached termination state of all queries (so a subsequent #await_any_termination blocks again). @return [void]
339 340 341 342 |
# File 'lib/spark_connect/streaming.rb', line 339 def reset_terminated command(reset_terminated: true) nil end |