Class: SparkConnect::DataFrameReader

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/reader.rb

Overview

Loads data from external sources into a DataFrame. Returned by SparkSession#read. Mirrors PySpark's DataFrameReader.

Examples:

spark.read.format("csv").option("header", true).load("data.csv")
spark.read.json("events.json")
spark.read.table("my_table")

Constant Summary collapse

Proto =
SparkConnect::Proto

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ DataFrameReader

Returns a new instance of DataFrameReader.

Parameters:



15
16
17
18
19
20
# File 'lib/spark_connect/reader.rb', line 15

def initialize(session)
  @session = session
  @format = nil
  @schema = nil
  @options = {}
end

Instance Method Details

#csv(*paths) ⇒ DataFrame

Returns CSV at paths.

Returns:



71
72
# File 'lib/spark_connect/reader.rb', line 71

def csv(*paths) = format("csv").load(*paths)
# @return [DataFrame] JSON at `paths`.

#format(source) ⇒ self

Set the input format ("csv", "json", "parquet", "orc", ...).

Returns:

  • (self)


24
25
26
27
# File 'lib/spark_connect/reader.rb', line 24

def format(source)
  @format = source.to_s
  self
end

#jdbc(url, table, properties = {}) ⇒ DataFrame

Read from a JDBC source.

Parameters:

  • url (String)

    the JDBC URL.

  • table (String)

    the table name (or subquery).

  • properties (Hash) (defaults to: {})

    connection properties (user, password, ...).

Returns:



87
88
89
90
# File 'lib/spark_connect/reader.rb', line 87

def jdbc(url, table, properties = {})
  opts = { "url" => url, "dbtable" => table }.merge(properties.transform_keys(&:to_s))
  format("jdbc").options(opts).load
end

#json(*paths) ⇒ DataFrame

Returns JSON at paths.

Returns:



73
74
# File 'lib/spark_connect/reader.rb', line 73

def json(*paths) = format("json").load(*paths)
# @return [DataFrame] Parquet at `paths`.

#load(*paths) ⇒ DataFrame

Load data from the given path(s) using the configured format.

Parameters:

  • paths (Array<String>)

Returns:



54
55
56
57
58
59
# File 'lib/spark_connect/reader.rb', line 54

def load(*paths)
  ds = Proto::Read::DataSource.new(options: @options, paths: paths.flatten.map(&:to_s))
  ds.format = @format if @format
  ds.schema = @schema if @schema
  read_relation(data_source: ds)
end

#option(key, value) ⇒ self

Set a single read option.

Returns:

  • (self)


38
39
40
41
# File 'lib/spark_connect/reader.rb', line 38

def option(key, value)
  @options[key.to_s] = value.to_s
  self
end

#options(opts) ⇒ self

Set multiple read options.

Returns:

  • (self)


45
46
47
48
# File 'lib/spark_connect/reader.rb', line 45

def options(opts)
  opts.each { |k, v| @options[k.to_s] = v.to_s }
  self
end

#orc(*paths) ⇒ DataFrame

Returns ORC at paths.

Returns:



77
78
# File 'lib/spark_connect/reader.rb', line 77

def orc(*paths) = format("orc").load(*paths)
# @return [DataFrame] text at `paths` (one `value` column per line).

#parquet(*paths) ⇒ DataFrame

Returns Parquet at paths.

Returns:



75
76
# File 'lib/spark_connect/reader.rb', line 75

def parquet(*paths) = format("parquet").load(*paths)
# @return [DataFrame] ORC at `paths`.

#schema(schema) ⇒ self

Set the input schema (a Types::StructType or DDL string).

Returns:

  • (self)


31
32
33
34
# File 'lib/spark_connect/reader.rb', line 31

def schema(schema)
  @schema = schema.is_a?(Types::StructType) ? schema.simple_string : schema.to_s
  self
end

#table(name) ⇒ DataFrame

Read a registered table or view.

Parameters:

  • name (String)

Returns:



65
66
67
68
# File 'lib/spark_connect/reader.rb', line 65

def table(name)
  nt = Proto::Read::NamedTable.new(unparsed_identifier: name.to_s, options: @options)
  read_relation(named_table: nt)
end

#text(*paths) ⇒ DataFrame

Returns text at paths (one value column per line).

Returns:

  • (DataFrame)

    text at paths (one value column per line).



79
# File 'lib/spark_connect/reader.rb', line 79

def text(*paths) = format("text").load(*paths)