Class: SparkConnect::DataFrameReader
- Inherits:
-
Object
- Object
- SparkConnect::DataFrameReader
- Defined in:
- lib/spark_connect/reader.rb
Overview
Loads data from external sources into a DataFrame. Returned by
SparkSession#read. Mirrors PySpark's DataFrameReader.
Constant Summary collapse
- Proto =
SparkConnect::Proto
Instance Method Summary collapse
-
#csv(*paths) ⇒ DataFrame
CSV at
paths. -
#format(source) ⇒ self
Set the input format (
"csv","json","parquet","orc", ...). -
#initialize(session) ⇒ DataFrameReader
constructor
A new instance of DataFrameReader.
-
#jdbc(url, table, properties = {}) ⇒ DataFrame
Read from a JDBC source.
-
#json(*paths) ⇒ DataFrame
JSON at
paths. -
#load(*paths) ⇒ DataFrame
Load data from the given path(s) using the configured format.
-
#option(key, value) ⇒ self
Set a single read option.
-
#options(opts) ⇒ self
Set multiple read options.
-
#orc(*paths) ⇒ DataFrame
ORC at
paths. -
#parquet(*paths) ⇒ DataFrame
Parquet at
paths. -
#schema(schema) ⇒ self
Set the input schema (a Types::StructType or DDL string).
-
#table(name) ⇒ DataFrame
Read a registered table or view.
-
#text(*paths) ⇒ DataFrame
Text at
paths(onevaluecolumn per line).
Constructor Details
#initialize(session) ⇒ DataFrameReader
Returns a new instance of DataFrameReader.
15 16 17 18 19 20 |
# File 'lib/spark_connect/reader.rb', line 15 def initialize(session) @session = session @format = nil @schema = nil @options = {} end |
Instance Method Details
#csv(*paths) ⇒ DataFrame
Returns CSV at paths.
71 72 |
# File 'lib/spark_connect/reader.rb', line 71 def csv(*paths) = format("csv").load(*paths) # @return [DataFrame] JSON at `paths`. |
#format(source) ⇒ self
Set the input format ("csv", "json", "parquet", "orc", ...).
24 25 26 27 |
# File 'lib/spark_connect/reader.rb', line 24 def format(source) @format = source.to_s self end |
#jdbc(url, table, properties = {}) ⇒ DataFrame
Read from a JDBC source.
87 88 89 90 |
# File 'lib/spark_connect/reader.rb', line 87 def jdbc(url, table, properties = {}) opts = { "url" => url, "dbtable" => table }.merge(properties.transform_keys(&:to_s)) format("jdbc").(opts).load end |
#json(*paths) ⇒ DataFrame
Returns JSON at paths.
73 74 |
# File 'lib/spark_connect/reader.rb', line 73 def json(*paths) = format("json").load(*paths) # @return [DataFrame] Parquet at `paths`. |
#load(*paths) ⇒ DataFrame
Load data from the given path(s) using the configured format.
54 55 56 57 58 59 |
# File 'lib/spark_connect/reader.rb', line 54 def load(*paths) ds = Proto::Read::DataSource.new(options: @options, paths: paths.flatten.map(&:to_s)) ds.format = @format if @format ds.schema = @schema if @schema read_relation(data_source: ds) end |
#option(key, value) ⇒ self
Set a single read option.
38 39 40 41 |
# File 'lib/spark_connect/reader.rb', line 38 def option(key, value) @options[key.to_s] = value.to_s self end |
#options(opts) ⇒ self
Set multiple read options.
45 46 47 48 |
# File 'lib/spark_connect/reader.rb', line 45 def (opts) opts.each { |k, v| @options[k.to_s] = v.to_s } self end |
#orc(*paths) ⇒ DataFrame
Returns ORC at paths.
77 78 |
# File 'lib/spark_connect/reader.rb', line 77 def orc(*paths) = format("orc").load(*paths) # @return [DataFrame] text at `paths` (one `value` column per line). |
#parquet(*paths) ⇒ DataFrame
Returns Parquet at paths.
75 76 |
# File 'lib/spark_connect/reader.rb', line 75 def parquet(*paths) = format("parquet").load(*paths) # @return [DataFrame] ORC at `paths`. |
#schema(schema) ⇒ self
Set the input schema (a Types::StructType or DDL string).
31 32 33 34 |
# File 'lib/spark_connect/reader.rb', line 31 def schema(schema) @schema = schema.is_a?(Types::StructType) ? schema.simple_string : schema.to_s self end |
#table(name) ⇒ DataFrame
Read a registered table or view.
65 66 67 68 |
# File 'lib/spark_connect/reader.rb', line 65 def table(name) nt = Proto::Read::NamedTable.new(unparsed_identifier: name.to_s, options: @options) read_relation(named_table: nt) end |
#text(*paths) ⇒ DataFrame
Returns text at paths (one value column per line).
79 |
# File 'lib/spark_connect/reader.rb', line 79 def text(*paths) = format("text").load(*paths) |