Module: SparkConnect::ArrowConverter

Defined in:
lib/spark_connect/arrow.rb

Overview

Decodes the Apache Arrow IPC stream payloads returned by the server into Ruby Rows. Each ExecutePlanResponse.arrow_batch.data chunk is a complete, self-contained Arrow IPC stream (schema + record batches); this converter reads each chunk and flattens all record batches into rows.

Named ArrowConverter (not Arrow) so that references to the red-arrow top-level Arrow constant inside the gem are unambiguous.

Class Method Summary collapse

Class Method Details

.arrow_field_type(data_type) ⇒ Object

Map a Spark type to the corresponding red-arrow data type used when building local relations.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/spark_connect/arrow.rb', line 94

def arrow_field_type(data_type)
  case data_type
  when Types::BooleanType then :boolean
  when Types::ByteType then :int8
  when Types::ShortType then :int16
  when Types::IntegerType then :int32
  when Types::LongType then :int64
  when Types::FloatType then :float
  when Types::DoubleType then :double
  when Types::StringType, Types::CharType, Types::VarcharType then :string
  when Types::BinaryType then :binary
  when Types::DateType then :date32
  when Types::TimestampType, Types::TimestampNTZType then { type: :timestamp, unit: :micro }
  when Types::ArrayType
    { type: :list, field: { name: "element", type: arrow_field_type(data_type.element_type) } }
  when Types::StructType
    { type: :struct, fields: data_type.fields.map { |f| { name: f.name, type: arrow_field_type(f.data_type) } } }
  else :string # rubocop:disable Lint/DuplicateBranch -- string default for unsupported-locally types
  end
end

.build_arrow_schema(schema) ⇒ Object



73
74
75
76
77
78
# File 'lib/spark_connect/arrow.rb', line 73

def build_arrow_schema(schema)
  fields = schema.fields.map do |f|
    ::Arrow::Field.new(f.name, arrow_field_type(f.data_type))
  end
  ::Arrow::Schema.new(fields)
end

.extract_value(row, name, idx) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/spark_connect/arrow.rb', line 80

def extract_value(row, name, idx)
  case row
  when Row then row[name]
  when Hash
    if row.key?(name) then row[name]
    elsif row.key?(name.to_sym) then row[name.to_sym]
    end
  when Array then row[idx]
  else row
  end
end

.from_rows(rows, schema) ⇒ String

Serialize an array of Ruby hashes into a single Arrow IPC stream given a Spark Types::StructType. Used by SparkSession#create_data_frame to ship local data to the server as a LocalRelation.

Parameters:

Returns:

  • (String)

    Arrow IPC stream bytes.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/spark_connect/arrow.rb', line 58

def from_rows(rows, schema)
  arrow_schema = build_arrow_schema(schema)
  raw_rows = rows.map do |row|
    schema.fields.each_with_index.map { |field, idx| extract_value(row, field.name, idx) }
  end
  record_batch = ::Arrow::RecordBatch.new(arrow_schema, raw_rows)
  buffer = ::Arrow::ResizableBuffer.new(1024)
  ::Arrow::BufferOutputStream.open(buffer) do |output|
    ::Arrow::RecordBatchStreamWriter.open(output, arrow_schema) do |writer|
      writer.write_record_batch(record_batch)
    end
  end
  buffer.data.to_s
end

.to_rows(batches) ⇒ Array<Row>

Decode IPC stream chunks into rows.

Parameters:

  • batches (Array<String>)

    Arrow IPC stream byte chunks.

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/spark_connect/arrow.rb', line 20

def to_rows(batches)
  rows = []
  field_names = nil
  batches.each do |data|
    next if data.nil? || data.empty?

    reader = ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data)))
    reader.each do |record_batch|
      field_names ||= record_batch.schema.fields.map(&:name)
      record_batch.raw_records.each do |values|
        rows << Row.new(values, fields: field_names)
      end
    end
  end
  rows
end

.to_table(batches) ⇒ Arrow::Table?

Decode IPC stream chunks into a single Arrow Table (for advanced/columnar consumers who want zero-copy access to the underlying data).

Parameters:

  • batches (Array<String>)

Returns:

  • (Arrow::Table, nil)


42
43
44
45
46
47
48
49
# File 'lib/spark_connect/arrow.rb', line 42

def to_table(batches)
  tables = batches.reject { |b| b.nil? || b.empty? }.map do |data|
    ::Arrow::RecordBatchStreamReader.new(::Arrow::BufferInputStream.new(::Arrow::Buffer.new(data))).read_all
  end
  return nil if tables.empty?

  tables.reduce { |acc, t| acc.concatenate([t]) }
end