Class: SparkConnect::DataStreamReader
- Inherits:
-
Object
- Object
- SparkConnect::DataStreamReader
- Defined in:
- lib/spark_connect/streaming.rb
Overview
Loads a streaming DataFrame from a streaming source. Returned by
SparkSession#read_stream. Mirrors PySpark's DataStreamReader.
Constant Summary collapse
- Proto =
SparkConnect::Proto
Instance Method Summary collapse
-
#csv(path) ⇒ DataFrame
Convenience for
format(...).load(path). -
#format(source) ⇒ self
Set the streaming source format (
"rate","kafka", ...). -
#initialize(session) ⇒ DataStreamReader
constructor
A new instance of DataStreamReader.
- #json(path) ⇒ Object
-
#load(path = nil) ⇒ DataFrame
Load a streaming DataFrame from the configured source.
-
#option(key, value) ⇒ self
Set a single source option.
-
#options(opts) ⇒ self
Set multiple source options.
- #orc(path) ⇒ Object
- #parquet(path) ⇒ Object
-
#schema(schema) ⇒ self
Set the input schema (a Types::StructType or DDL string).
-
#table(name) ⇒ DataFrame
Load a streaming DataFrame from a registered table.
- #text(path) ⇒ Object
Constructor Details
#initialize(session) ⇒ DataStreamReader
Returns a new instance of DataStreamReader.
15 16 17 18 19 20 |
# File 'lib/spark_connect/streaming.rb', line 15 def initialize(session) @session = session @format = nil @schema = nil @options = {} end |
Instance Method Details
#csv(path) ⇒ DataFrame
Returns convenience for format(...).load(path).
64 |
# File 'lib/spark_connect/streaming.rb', line 64 def csv(path) = format("csv").load(path) |
#format(source) ⇒ self
Returns set the streaming source format ("rate", "kafka", ...).
23 24 25 26 |
# File 'lib/spark_connect/streaming.rb', line 23 def format(source) @format = source.to_s self end |
#json(path) ⇒ Object
65 |
# File 'lib/spark_connect/streaming.rb', line 65 def json(path) = format("json").load(path) |
#load(path = nil) ⇒ DataFrame
Load a streaming DataFrame from the configured source.
50 51 52 53 54 55 |
# File 'lib/spark_connect/streaming.rb', line 50 def load(path = nil) ds = Proto::Read::DataSource.new(options: @options, paths: path ? [path.to_s] : []) ds.format = @format if @format ds.schema = @schema if @schema stream_relation(data_source: ds) end |
#option(key, value) ⇒ self
Returns set a single source option.
35 36 37 38 |
# File 'lib/spark_connect/streaming.rb', line 35 def option(key, value) @options[key.to_s] = value.to_s self end |
#options(opts) ⇒ self
Returns set multiple source options.
41 42 43 44 |
# File 'lib/spark_connect/streaming.rb', line 41 def (opts) opts.each { |k, v| @options[k.to_s] = v.to_s } self end |
#orc(path) ⇒ Object
67 |
# File 'lib/spark_connect/streaming.rb', line 67 def orc(path) = format("orc").load(path) |
#parquet(path) ⇒ Object
66 |
# File 'lib/spark_connect/streaming.rb', line 66 def parquet(path) = format("parquet").load(path) |
#schema(schema) ⇒ self
Returns set the input schema (a Types::StructType or DDL string).
29 30 31 32 |
# File 'lib/spark_connect/streaming.rb', line 29 def schema(schema) @schema = schema.is_a?(Types::StructType) ? schema.simple_string : schema.to_s self end |
#table(name) ⇒ DataFrame
Load a streaming DataFrame from a registered table.
59 60 61 |
# File 'lib/spark_connect/streaming.rb', line 59 def table(name) stream_relation(named_table: Proto::Read::NamedTable.new(unparsed_identifier: name.to_s, options: @options)) end |
#text(path) ⇒ Object
68 |
# File 'lib/spark_connect/streaming.rb', line 68 def text(path) = format("text").load(path) |