Class: SparkConnect::Column

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/column.rb

Overview

A column expression: a lazily-evaluated reference to a column or a computation over columns. Columns are immutable; operators and methods return new Columns.

A Column wraps a protobuf Expression. Build them with Functions.col, Functions.lit, by indexing a DataFrame (df["id"]), or by combining other columns with operators.

Examples:

F = SparkConnect::F
(F.col("age") + 1).alias("next_age")
F.col("name").like("a%") & (F.col("age") >= 18)

Constant Summary collapse

Proto =
SparkConnect::Proto

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(expr) ⇒ Column

Returns a new instance of Column.

Parameters:

  • expr (Spark::Connect::Expression)


26
27
28
# File 'lib/spark_connect/column.rb', line 26

def initialize(expr)
  @expr = expr
end

Instance Attribute Details

#exprSpark::Connect::Expression (readonly)

Returns the wrapped protobuf expression.

Returns:

  • (Spark::Connect::Expression)

    the wrapped protobuf expression.



23
24
25
# File 'lib/spark_connect/column.rb', line 23

def expr
  @expr
end

Class Method Details

.from_expr(expr) ⇒ Column

Wrap an existing protobuf expression.

Returns:



38
39
40
# File 'lib/spark_connect/column.rb', line 38

def from_expr(expr)
  new(expr)
end

.from_name(name) ⇒ Column

An unresolved attribute reference by (possibly dotted) name. The special name "*" expands to all columns.

Parameters:

  • name (String)

Returns:



47
48
49
50
51
52
53
54
55
# File 'lib/spark_connect/column.rb', line 47

def from_name(name)
  if name == "*"
    new(Proto::Expression.new(unresolved_star: Proto::Expression::UnresolvedStar.new))
  else
    new(Proto::Expression.new(
          unresolved_attribute: Proto::Expression::UnresolvedAttribute.new(unparsed_identifier: name.to_s)
        ))
  end
end

.infer_type(value) ⇒ Types::DataType

Infer the Spark Types::DataType for a Ruby value (used when building array/map literals). Mirrors PySpark's literal type inference.

Parameters:

  • value (Object)

Returns:



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/spark_connect/column.rb', line 152

def infer_type(value)
  case value
  when nil then Types.null
  when true, false then Types.boolean
  when Integer
    value.between?(-2_147_483_648, 2_147_483_647) ? Types.integer : Types.long
  when Float, Rational then Types.double
  when BigDecimal then Types.decimal(38, 18)
  when String then value.encoding == Encoding::ASCII_8BIT ? Types.binary : Types.string
  when Symbol then Types.string
  when Time, DateTime then Types.timestamp
  when Date then Types.date
  when Array then Types.array(value.empty? ? Types.null : infer_type(value.find { |v| !v.nil? }))
  when Hash
    Types.map(value.empty? ? Types.string : infer_type(value.keys.first),
              value.empty? ? Types.string : infer_type(value.values.first))
  else
    raise IllegalArgumentError, "Cannot infer Spark type for #{value.class}"
  end
end

.invoke(name, *args, is_distinct: false) ⇒ Column

Build an UnresolvedFunction call column.

Parameters:

  • name (String)

    the Spark function name.

  • args (Array<Column, Object>)

    arguments (non-columns become literals).

  • is_distinct (Boolean) (defaults to: false)

Returns:



74
75
76
77
78
79
80
81
82
# File 'lib/spark_connect/column.rb', line 74

def invoke(name, *args, is_distinct: false)
  new(Proto::Expression.new(
        unresolved_function: Proto::Expression::UnresolvedFunction.new(
          function_name: name.to_s,
          arguments: args.map { |a| to_col(a).to_expr },
          is_distinct: is_distinct
        )
      ))
end

.lit(value) ⇒ Column

Build a literal column from a Ruby value.

Parameters:

  • value (Object)

    nil, Boolean, Integer, Float, String, Symbol, Time, Date, BigDecimal, Array, Hash, or an existing SparkConnect::Column.

Returns:



62
63
64
65
66
# File 'lib/spark_connect/column.rb', line 62

def lit(value)
  return value if value.is_a?(Column)

  new(Proto::Expression.new(literal: to_literal(value)))
end

.to_col(value) ⇒ Column

Coerce a value into a SparkConnect::Column (literals are wrapped).

Returns:



86
87
88
# File 'lib/spark_connect/column.rb', line 86

def to_col(value)
  value.is_a?(Column) ? value : lit(value)
end

.to_literal(value) ⇒ Spark::Connect::Expression::Literal

Encode a Ruby value as a protobuf Expression.Literal.

Parameters:

  • value (Object)

Returns:

  • (Spark::Connect::Expression::Literal)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/spark_connect/column.rb', line 94

def to_literal(value)
  l = Proto::Expression::Literal
  case value
  when nil
    l.new(null: Types.null.to_proto)
  when true, false
    l.new(boolean: value)
  when Integer
    if value.between?(-2_147_483_648, 2_147_483_647)
      l.new(integer: value)
    else
      l.new(long: value)
    end
  when Float
    l.new(double: value)
  when BigDecimal
    l.new(decimal: l::Decimal.new(value: value.to_s("F")))
  when Rational
    l.new(double: value.to_f)
  when String
    if value.encoding == Encoding::ASCII_8BIT
      l.new(binary: value)
    else
      l.new(string: value)
    end
  when Symbol
    l.new(string: value.to_s)
  when Time
    l.new(timestamp: (value.to_r * 1_000_000).to_i)
  when DateTime
    l.new(timestamp: (value.to_time.to_r * 1_000_000).to_i)
  when Date
    l.new(date: (value - Date.new(1970, 1, 1)).to_i)
  when Array
    elem_type = infer_array_element_type(value)
    l.new(array: l::Array.new(
      element_type: elem_type.to_proto,
      elements: value.map { |v| to_literal(v) }
    ))
  when Hash
    key_type = value.empty? ? Types.string : infer_type(value.keys.first)
    val_type = value.empty? ? Types.string : infer_type(value.values.first)
    l.new(map: l::Map.new(
      key_type: key_type.to_proto,
      value_type: val_type.to_proto,
      keys: value.keys.map { |k| to_literal(k) },
      values: value.values.map { |v| to_literal(v) }
    ))
  else
    raise IllegalArgumentError, "Unsupported literal value of type #{value.class}: #{value.inspect}"
  end
end

Instance Method Details

#!Object Also known as: not



210
211
212
# File 'lib/spark_connect/column.rb', line 210

def !
  Column.invoke("not", self)
end

#!=(other) ⇒ Object



196
# File 'lib/spark_connect/column.rb', line 196

def !=(other) = bin_op("!=", other)

#%(other) ⇒ Object



186
# File 'lib/spark_connect/column.rb', line 186

def %(other) = bin_op("%", other)

#&(other) ⇒ Object

---- Boolean -----------------------------------------------------------



207
# File 'lib/spark_connect/column.rb', line 207

def &(other) = bin_op("and", other)

#*(other) ⇒ Object



184
# File 'lib/spark_connect/column.rb', line 184

def *(other) = bin_op("*", other)

#**(other) ⇒ Column

Raise this column to the power of other.

Returns:



192
# File 'lib/spark_connect/column.rb', line 192

def **(other) = bin_op("power", other)

#+(other) ⇒ Object

---- Arithmetic --------------------------------------------------------



182
# File 'lib/spark_connect/column.rb', line 182

def +(other) = bin_op("+", other)

#+@Object



188
# File 'lib/spark_connect/column.rb', line 188

def +@ = self

#-(other) ⇒ Object



183
# File 'lib/spark_connect/column.rb', line 183

def -(other) = bin_op("-", other)

#-@Object



187
# File 'lib/spark_connect/column.rb', line 187

def -@ = Column.invoke("negative", self)

#/(other) ⇒ Object



185
# File 'lib/spark_connect/column.rb', line 185

def /(other) = bin_op("/", other)

#<(other) ⇒ Object



197
# File 'lib/spark_connect/column.rb', line 197

def <(other) = bin_op("<", other)

#<=(other) ⇒ Object



198
# File 'lib/spark_connect/column.rb', line 198

def <=(other) = bin_op("<=", other)

#==(other) ⇒ Object

---- Comparison --------------------------------------------------------



195
# File 'lib/spark_connect/column.rb', line 195

def ==(other) = bin_op("==", other)

#>(other) ⇒ Object



199
# File 'lib/spark_connect/column.rb', line 199

def >(other) = bin_op(">", other)

#>=(other) ⇒ Object



200
# File 'lib/spark_connect/column.rb', line 200

def >=(other) = bin_op(">=", other)

#[](key) ⇒ Column

---- Complex-type access ---------------------------------------------- Extract an array element by index, a map value by key, or a struct field.

Returns:



258
259
260
# File 'lib/spark_connect/column.rb', line 258

def [](key)
  get_item(key)
end

#alias(*names, metadata: nil) ⇒ Column Also known as: name, as

---- Aliasing / naming ------------------------------------------------- Assign one or more output names. With multiple names the expression must produce a struct/multiple columns (e.g. inline).

Parameters:

  • names (Array<String>)
  • metadata (Hash, nil) (defaults to: nil)

    optional JSON metadata for a single alias.

Returns:



283
284
285
286
287
# File 'lib/spark_connect/column.rb', line 283

def alias(*names, metadata: nil)
  a = Proto::Expression::Alias.new(expr: @expr, name: names.map(&:to_s))
  a. = JSON.generate() if 
  Column.new(Proto::Expression.new(alias: a))
end

#ascObject

---- Sort ordering -----------------------------------------------------



310
# File 'lib/spark_connect/column.rb', line 310

def asc = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_FIRST)

#asc_nulls_firstObject



312
# File 'lib/spark_connect/column.rb', line 312

def asc_nulls_first = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_FIRST)

#asc_nulls_lastObject



313
# File 'lib/spark_connect/column.rb', line 313

def asc_nulls_last = sort_order(:SORT_DIRECTION_ASCENDING, :SORT_NULLS_LAST)

#between(lower, upper) ⇒ Column

True if lower <= self <= upper.

Returns:



237
238
239
# File 'lib/spark_connect/column.rb', line 237

def between(lower, upper)
  (self >= lower) & (self <= upper)
end

#bitwise_and(other) ⇒ Object

---- Bitwise -----------------------------------------------------------



216
# File 'lib/spark_connect/column.rb', line 216

def bitwise_and(other) = bin_op("&", other)

#bitwise_or(other) ⇒ Object



217
# File 'lib/spark_connect/column.rb', line 217

def bitwise_or(other) = bin_op("|", other)

#bitwise_xor(other) ⇒ Object



218
# File 'lib/spark_connect/column.rb', line 218

def bitwise_xor(other) = bin_op("^", other)

#cast(data_type) ⇒ Column Also known as: as_type, astype

---- Casting ----------------------------------------------------------- Cast to another type, given either a Types::DataType or a DDL type string (e.g. "int", "decimal(10,2)").

Parameters:

Returns:



297
298
299
300
301
302
303
304
305
# File 'lib/spark_connect/column.rb', line 297

def cast(data_type)
  c = Proto::Expression::Cast.new(expr: @expr)
  if data_type.is_a?(String)
    c.type_str = data_type
  else
    c.type = data_type.to_proto
  end
  Column.new(Proto::Expression.new(cast: c))
end

#contains(other) ⇒ Object



245
# File 'lib/spark_connect/column.rb', line 245

def contains(other) = bin_op("contains", other)

#descObject



311
# File 'lib/spark_connect/column.rb', line 311

def desc = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_LAST)

#desc_nulls_firstObject



314
# File 'lib/spark_connect/column.rb', line 314

def desc_nulls_first = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_FIRST)

#desc_nulls_lastObject



315
# File 'lib/spark_connect/column.rb', line 315

def desc_nulls_last = sort_order(:SORT_DIRECTION_DESCENDING, :SORT_NULLS_LAST)

#endswith(other) ⇒ Object



247
# File 'lib/spark_connect/column.rb', line 247

def endswith(other) = bin_op("endswith", other)

#eq_null_safe(other) ⇒ Column

Null-safe equality (<=> in Spark SQL): null <=> null is true.

Returns:



204
# File 'lib/spark_connect/column.rb', line 204

def eq_null_safe(other) = bin_op("<=>", other)

#get_field(name) ⇒ Column

Extract a struct field by name.

Returns:



272
273
274
# File 'lib/spark_connect/column.rb', line 272

def get_field(name)
  get_item(name.to_s)
end

#get_item(key) ⇒ Object



262
263
264
265
266
267
268
# File 'lib/spark_connect/column.rb', line 262

def get_item(key)
  Column.new(Proto::Expression.new(
               unresolved_extract_value: Proto::Expression::UnresolvedExtractValue.new(
                 child: @expr, extraction: Column.lit(key).to_expr
               )
             ))
end

#ilike(pattern) ⇒ Object



244
# File 'lib/spark_connect/column.rb', line 244

def ilike(pattern) = bin_op("ilike", pattern)

#is_nanObject



223
# File 'lib/spark_connect/column.rb', line 223

def is_nan = Column.invoke("isNaN", self)

#is_not_nullObject Also known as: isNotNull



222
# File 'lib/spark_connect/column.rb', line 222

def is_not_null = Column.invoke("isNotNull", self)

#is_nullObject Also known as: isNull

---- Null / membership predicates -------------------------------------



221
# File 'lib/spark_connect/column.rb', line 221

def is_null = Column.invoke("isNull", self)

#isin(*values) ⇒ Column Also known as: in_list

True if the column's value is in values.

Returns:



229
230
231
232
# File 'lib/spark_connect/column.rb', line 229

def isin(*values)
  values = values.first if values.size == 1 && values.first.is_a?(Array)
  Column.invoke("in", self, *Array(values))
end

#like(pattern) ⇒ Object

---- String predicates -------------------------------------------------



242
# File 'lib/spark_connect/column.rb', line 242

def like(pattern) = bin_op("like", pattern)

#otherwise(value) ⇒ Column

Provide the default (ELSE) value for a CASE expression.

Returns:



334
335
336
337
338
339
340
341
342
343
# File 'lib/spark_connect/column.rb', line 334

def otherwise(value)
  unless @expr.expr_type == :unresolved_function && @expr.unresolved_function.function_name == "when"
    raise IllegalArgumentError, "otherwise() can only be applied on a Column previously generated by when()"
  end

  args = @expr.unresolved_function.arguments.to_a + [Column.to_col(value).to_expr]
  Column.new(Proto::Expression.new(
               unresolved_function: Proto::Expression::UnresolvedFunction.new(function_name: "when", arguments: args)
             ))
end

#over(window) ⇒ Column

---- Windowing --------------------------------------------------------- Define a windowed aggregation / analytic computation over this column.

Parameters:

Returns:



350
351
352
353
354
355
356
357
358
# File 'lib/spark_connect/column.rb', line 350

def over(window)
  w = Proto::Expression::Window.new(
    window_function: @expr,
    partition_spec: window.partition_spec,
    order_spec: window.order_spec
  )
  w.frame_spec = window.frame_spec if window.frame_spec
  Column.new(Proto::Expression.new(window: w))
end

#rlike(pattern) ⇒ Object



243
# File 'lib/spark_connect/column.rb', line 243

def rlike(pattern) = bin_op("rlike", pattern)

#startswith(other) ⇒ Object



246
# File 'lib/spark_connect/column.rb', line 246

def startswith(other) = bin_op("startswith", other)

#substr(start, len) ⇒ Column

Substring of length len starting at 1-based position start.

Returns:



251
252
253
# File 'lib/spark_connect/column.rb', line 251

def substr(start, len)
  Column.invoke("substr", self, start, len)
end

#to_exprSpark::Connect::Expression

Returns:

  • (Spark::Connect::Expression)


31
32
33
# File 'lib/spark_connect/column.rb', line 31

def to_expr
  @expr
end

#to_sObject Also known as: inspect



360
361
362
# File 'lib/spark_connect/column.rb', line 360

def to_s
  "Column<#{@expr.expr_type}>"
end

#when(condition, value) ⇒ Column

---- CASE WHEN --------------------------------------------------------- Add a branch to a CASE expression started by Functions#when.

Returns:



321
322
323
324
325
326
327
328
329
330
# File 'lib/spark_connect/column.rb', line 321

def when(condition, value)
  unless @expr.expr_type == :unresolved_function && @expr.unresolved_function.function_name == "when"
    raise IllegalArgumentError, "when() can only be applied on a Column previously generated by when()"
  end

  args = @expr.unresolved_function.arguments.to_a + [Column.to_col(condition).to_expr, Column.to_col(value).to_expr]
  Column.new(Proto::Expression.new(
               unresolved_function: Proto::Expression::UnresolvedFunction.new(function_name: "when", arguments: args)
             ))
end

#|(other) ⇒ Object



208
# File 'lib/spark_connect/column.rb', line 208

def |(other) = bin_op("or", other)