Class: SparkConnect::DataFrameWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/writer.rb

Overview

Saves the contents of a DataFrame to external storage. Returned by SparkConnect::DataFrame#write. Mirrors PySpark's DataFrameWriter.

Examples:

df.write.format("parquet").mode(:overwrite).save("out.parquet")
df.write.mode(:append).save_as_table("my_table")

Constant Summary collapse

Proto =
SparkConnect::Proto
WO =
Proto::WriteOperation
SAVE_MODES =
{
  append: :SAVE_MODE_APPEND,
  overwrite: :SAVE_MODE_OVERWRITE,
  error: :SAVE_MODE_ERROR_IF_EXISTS,
  errorifexists: :SAVE_MODE_ERROR_IF_EXISTS,
  error_if_exists: :SAVE_MODE_ERROR_IF_EXISTS,
  ignore: :SAVE_MODE_IGNORE,
  default: :SAVE_MODE_UNSPECIFIED,
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(df) ⇒ DataFrameWriter

Returns a new instance of DataFrameWriter.

Parameters:



25
26
27
28
29
30
31
32
33
34
# File 'lib/spark_connect/writer.rb', line 25

def initialize(df)
  @df = df
  @source = nil
  @mode = :SAVE_MODE_UNSPECIFIED
  @options = {}
  @partitioning_columns = []
  @sort_columns = []
  @bucket_cols = nil
  @num_buckets = nil
end

Instance Method Details

#bucket_by(num_buckets, *cols) ⇒ self Also known as: bucketBy

Returns bucket the output into num_buckets by these columns.

Returns:

  • (self)

    bucket the output into num_buckets by these columns.



78
79
80
81
82
# File 'lib/spark_connect/writer.rb', line 78

def bucket_by(num_buckets, *cols)
  @num_buckets = num_buckets
  @bucket_cols = cols.flatten.map(&:to_s)
  self
end

#csv(path) ⇒ Object



115
# File 'lib/spark_connect/writer.rb', line 115

def csv(path) = format("csv").save(path)

#format(source) ⇒ self

Returns set the output format.

Returns:

  • (self)

    set the output format.



37
38
39
40
# File 'lib/spark_connect/writer.rb', line 37

def format(source)
  @source = source.to_s
  self
end

#insert_into(name) ⇒ void Also known as: insertInto

This method returns an undefined value.

Insert into an existing table (by position).



105
106
107
108
109
# File 'lib/spark_connect/writer.rb', line 105

def insert_into(name)
  op = base_operation
  op.table = WO::SaveTable.new(table_name: name.to_s, save_method: :TABLE_SAVE_METHOD_INSERT_INTO)
  execute(op)
end

#json(path) ⇒ Object



114
# File 'lib/spark_connect/writer.rb', line 114

def json(path) = format("json").save(path)

#mode(save_mode) ⇒ self

Returns set the save mode (:append, :overwrite, :ignore, :error).

Returns:

  • (self)

    set the save mode (:append, :overwrite, :ignore, :error).



44
45
46
47
48
49
# File 'lib/spark_connect/writer.rb', line 44

def mode(save_mode)
  @mode = SAVE_MODES.fetch(save_mode.to_s.downcase.to_sym) do
    raise IllegalArgumentError, "Unknown save mode: #{save_mode}"
  end
  self
end

#option(key, value) ⇒ self

Returns set a write option.

Returns:

  • (self)

    set a write option.



52
53
54
55
# File 'lib/spark_connect/writer.rb', line 52

def option(key, value)
  @options[key.to_s] = value.to_s
  self
end

#options(opts) ⇒ self

Returns set multiple write options.

Returns:

  • (self)

    set multiple write options.



58
59
60
61
# File 'lib/spark_connect/writer.rb', line 58

def options(opts)
  opts.each { |k, v| @options[k.to_s] = v.to_s }
  self
end

#orc(path) ⇒ Object



116
# File 'lib/spark_connect/writer.rb', line 116

def orc(path) = format("orc").save(path)

#parquet(path) ⇒ void

This method returns an undefined value.

Returns convenience for format("parquet").save(path).



113
# File 'lib/spark_connect/writer.rb', line 113

def parquet(path) = format("parquet").save(path)

#partition_by(*cols) ⇒ self Also known as: partitionBy

Returns partition the output by these columns.

Returns:

  • (self)

    partition the output by these columns.



64
65
66
67
# File 'lib/spark_connect/writer.rb', line 64

def partition_by(*cols)
  @partitioning_columns = cols.flatten.map(&:to_s)
  self
end

#save(path = nil) ⇒ void

This method returns an undefined value.

Save to a path.

Parameters:

  • path (String, nil) (defaults to: nil)


88
89
90
91
92
# File 'lib/spark_connect/writer.rb', line 88

def save(path = nil)
  op = base_operation
  op.path = path if path
  execute(op)
end

#save_as_table(name) ⇒ void Also known as: saveAsTable

This method returns an undefined value.

Save as a managed/registered table.



96
97
98
99
100
# File 'lib/spark_connect/writer.rb', line 96

def save_as_table(name)
  op = base_operation
  op.table = WO::SaveTable.new(table_name: name.to_s, save_method: :TABLE_SAVE_METHOD_SAVE_AS_TABLE)
  execute(op)
end

#sort_by(*cols) ⇒ self Also known as: sortBy

Returns sort within partitions/buckets by these columns.

Returns:

  • (self)

    sort within partitions/buckets by these columns.



71
72
73
74
# File 'lib/spark_connect/writer.rb', line 71

def sort_by(*cols)
  @sort_columns = cols.flatten.map(&:to_s)
  self
end

#text(path) ⇒ Object



117
# File 'lib/spark_connect/writer.rb', line 117

def text(path) = format("text").save(path)