Class: SparkConnect::DataFrameWriter
- Inherits:
-
Object
- Object
- SparkConnect::DataFrameWriter
- Defined in:
- lib/spark_connect/writer.rb
Overview
Saves the contents of a DataFrame to external storage. Returned by
SparkConnect::DataFrame#write. Mirrors PySpark's DataFrameWriter.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- WO =
Proto::WriteOperation
- SAVE_MODES =
{ append: :SAVE_MODE_APPEND, overwrite: :SAVE_MODE_OVERWRITE, error: :SAVE_MODE_ERROR_IF_EXISTS, errorifexists: :SAVE_MODE_ERROR_IF_EXISTS, error_if_exists: :SAVE_MODE_ERROR_IF_EXISTS, ignore: :SAVE_MODE_IGNORE, default: :SAVE_MODE_UNSPECIFIED, }.freeze
Instance Method Summary collapse
-
#bucket_by(num_buckets, *cols) ⇒ self
(also: #bucketBy)
Bucket the output into
num_bucketsby these columns. - #csv(path) ⇒ Object
-
#format(source) ⇒ self
Set the output format.
-
#initialize(df) ⇒ DataFrameWriter
constructor
A new instance of DataFrameWriter.
-
#insert_into(name) ⇒ void
(also: #insertInto)
Insert into an existing table (by position).
- #json(path) ⇒ Object
-
#mode(save_mode) ⇒ self
Set the save mode (
:append,:overwrite,:ignore,:error). -
#option(key, value) ⇒ self
Set a write option.
-
#options(opts) ⇒ self
Set multiple write options.
- #orc(path) ⇒ Object
-
#parquet(path) ⇒ void
Convenience for
format("parquet").save(path). -
#partition_by(*cols) ⇒ self
(also: #partitionBy)
Partition the output by these columns.
-
#save(path = nil) ⇒ void
Save to a path.
-
#save_as_table(name) ⇒ void
(also: #saveAsTable)
Save as a managed/registered table.
-
#sort_by(*cols) ⇒ self
(also: #sortBy)
Sort within partitions/buckets by these columns.
- #text(path) ⇒ Object
Constructor Details
#initialize(df) ⇒ DataFrameWriter
Returns a new instance of DataFrameWriter.
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/spark_connect/writer.rb', line 25 def initialize(df) @df = df @source = nil @mode = :SAVE_MODE_UNSPECIFIED @options = {} @partitioning_columns = [] @sort_columns = [] @bucket_cols = nil @num_buckets = nil end |
Instance Method Details
#bucket_by(num_buckets, *cols) ⇒ self Also known as: bucketBy
Returns bucket the output into num_buckets by these columns.
78 79 80 81 82 |
# File 'lib/spark_connect/writer.rb', line 78 def bucket_by(num_buckets, *cols) @num_buckets = num_buckets @bucket_cols = cols.flatten.map(&:to_s) self end |
#csv(path) ⇒ Object
115 |
# File 'lib/spark_connect/writer.rb', line 115 def csv(path) = format("csv").save(path) |
#format(source) ⇒ self
Returns set the output format.
37 38 39 40 |
# File 'lib/spark_connect/writer.rb', line 37 def format(source) @source = source.to_s self end |
#insert_into(name) ⇒ void Also known as: insertInto
This method returns an undefined value.
Insert into an existing table (by position).
105 106 107 108 109 |
# File 'lib/spark_connect/writer.rb', line 105 def insert_into(name) op = base_operation op.table = WO::SaveTable.new(table_name: name.to_s, save_method: :TABLE_SAVE_METHOD_INSERT_INTO) execute(op) end |
#json(path) ⇒ Object
114 |
# File 'lib/spark_connect/writer.rb', line 114 def json(path) = format("json").save(path) |
#mode(save_mode) ⇒ self
Returns set the save mode (:append, :overwrite, :ignore,
:error).
44 45 46 47 48 49 |
# File 'lib/spark_connect/writer.rb', line 44 def mode(save_mode) @mode = SAVE_MODES.fetch(save_mode.to_s.downcase.to_sym) do raise IllegalArgumentError, "Unknown save mode: #{save_mode}" end self end |
#option(key, value) ⇒ self
Returns set a write option.
52 53 54 55 |
# File 'lib/spark_connect/writer.rb', line 52 def option(key, value) @options[key.to_s] = value.to_s self end |
#options(opts) ⇒ self
Returns set multiple write options.
58 59 60 61 |
# File 'lib/spark_connect/writer.rb', line 58 def (opts) opts.each { |k, v| @options[k.to_s] = v.to_s } self end |
#orc(path) ⇒ Object
116 |
# File 'lib/spark_connect/writer.rb', line 116 def orc(path) = format("orc").save(path) |
#parquet(path) ⇒ void
This method returns an undefined value.
Returns convenience for format("parquet").save(path).
113 |
# File 'lib/spark_connect/writer.rb', line 113 def parquet(path) = format("parquet").save(path) |
#partition_by(*cols) ⇒ self Also known as: partitionBy
Returns partition the output by these columns.
64 65 66 67 |
# File 'lib/spark_connect/writer.rb', line 64 def partition_by(*cols) @partitioning_columns = cols.flatten.map(&:to_s) self end |
#save(path = nil) ⇒ void
This method returns an undefined value.
Save to a path.
88 89 90 91 92 |
# File 'lib/spark_connect/writer.rb', line 88 def save(path = nil) op = base_operation op.path = path if path execute(op) end |
#save_as_table(name) ⇒ void Also known as: saveAsTable
This method returns an undefined value.
Save as a managed/registered table.
96 97 98 99 100 |
# File 'lib/spark_connect/writer.rb', line 96 def save_as_table(name) op = base_operation op.table = WO::SaveTable.new(table_name: name.to_s, save_method: :TABLE_SAVE_METHOD_SAVE_AS_TABLE) execute(op) end |
#sort_by(*cols) ⇒ self Also known as: sortBy
Returns sort within partitions/buckets by these columns.
71 72 73 74 |
# File 'lib/spark_connect/writer.rb', line 71 def sort_by(*cols) @sort_columns = cols.flatten.map(&:to_s) self end |
#text(path) ⇒ Object
117 |
# File 'lib/spark_connect/writer.rb', line 117 def text(path) = format("text").save(path) |