Class: SparkConnect::SparkSession

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/session.rb,
lib/spark_connect/session.rb

Overview

Fluent builder for SparkSession. Returned by SparkSession.builder.

Defined Under Namespace

Classes: Builder

Constant Summary collapse

Proto =
SparkConnect::Proto

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ SparkSession

Returns a new instance of SparkSession.

Parameters:



25
26
27
28
29
# File 'lib/spark_connect/session.rb', line 25

def initialize(client)
  @client = client
  @plan_id = -1
  @conf = RuntimeConfig.new(client)
end

Class Attribute Details

.activeSparkSession?

The currently active session set by #set_active / SparkConnect::SparkSession::Builder#get_or_create.

Returns:



39
40
41
# File 'lib/spark_connect/session.rb', line 39

def active
  @active
end

Instance Attribute Details

#clientSparkConnectClient (readonly)

Returns:



22
23
24
# File 'lib/spark_connect/session.rb', line 22

def client
  @client
end

#confRuntimeConfig (readonly)

Returns runtime configuration facade.

Returns:



138
139
140
# File 'lib/spark_connect/session.rb', line 138

def conf
  @conf
end

Class Method Details

.builderBuilder

Returns a new session builder.

Returns:

  • (Builder)

    a new session builder.



33
34
35
# File 'lib/spark_connect/session.rb', line 33

def builder
  Builder.new
end

Instance Method Details

#add_tag(tag) ⇒ void

This method returns an undefined value.

Add an operation tag applied to all subsequent executions in this session.



184
185
186
# File 'lib/spark_connect/session.rb', line 184

def add_tag(tag)
  @client.add_tag(tag)
end

#catalogCatalog

Returns the catalog facade (databases, tables, functions, cache).

Returns:

  • (Catalog)

    the catalog facade (databases, tables, functions, cache).



141
142
143
# File 'lib/spark_connect/session.rb', line 141

def catalog
  @catalog ||= Catalog.new(self)
end

#clear_tagsObject

Remove all operation tags. @return [void]



199
200
201
# File 'lib/spark_connect/session.rb', line 199

def clear_tags
  @client.clear_tags
end

#create_data_frame(data, schema = nil) ⇒ DataFrame Also known as: create_dataframe, createDataFrame

Build a DataFrame from local Ruby data.

Parameters:

  • data (Array<Hash>, Array<Array>, Array<Row>)
  • schema (Types::StructType, Array<String>, String, nil) (defaults to: nil)

    an explicit schema, a list of column names, a DDL string, or nil to infer.

Returns:



127
128
129
130
131
132
133
# File 'lib/spark_connect/session.rb', line 127

def create_data_frame(data, schema = nil)
  data = data.to_a
  struct = resolve_schema(data, schema)
  arrow_bytes = ArrowConverter.from_rows(data, struct)
  local = Proto::LocalRelation.new(data: arrow_bytes, schema: struct.simple_string.sub(/\Astruct</, "").sub(/>\z/, ""))
  DataFrame.new(self, PlanBuilder.relation(self, local_relation: local))
end

#create_data_frame_from_relation(relation) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



212
213
214
# File 'lib/spark_connect/session.rb', line 212

def create_data_frame_from_relation(relation)
  DataFrame.new(self, relation)
end

#get_tagsArray<String>

Returns the currently active operation tags.

Returns:

  • (Array<String>)

    the currently active operation tags.



194
195
196
# File 'lib/spark_connect/session.rb', line 194

def get_tags
  @client.tags.dup
end

#interrupt_allArray<String>

Interrupt all operations running in this session.

Returns:

  • (Array<String>)

    the ids of the interrupted operations.



166
167
168
# File 'lib/spark_connect/session.rb', line 166

def interrupt_all
  @client.interrupt(type: :all).interrupted_ids.to_a
end

#interrupt_operation(operation_id) ⇒ Array<String>

Interrupt a single operation by id.

Returns:

  • (Array<String>)


178
179
180
# File 'lib/spark_connect/session.rb', line 178

def interrupt_operation(operation_id)
  @client.interrupt(type: :operation_id, value: operation_id.to_s).interrupted_ids.to_a
end

#interrupt_tag(tag) ⇒ Array<String>

Interrupt all operations tagged with tag (see #add_tag).

Returns:

  • (Array<String>)


172
173
174
# File 'lib/spark_connect/session.rb', line 172

def interrupt_tag(tag)
  @client.interrupt(type: :tag, value: tag.to_s).interrupted_ids.to_a
end

#new_sessionSparkSession

Start a brand-new session against the same endpoint (independent server-side session id, configuration, and temporary views).

Returns:



160
161
162
# File 'lib/spark_connect/session.rb', line 160

def new_session
  SparkSession.new(SparkConnectClient.new(@client.channel_builder))
end

#next_plan_idInteger

Allocate the next unique plan id. Used by PlanBuilder.relation.

Returns:

  • (Integer)


46
47
48
# File 'lib/spark_connect/session.rb', line 46

def next_plan_id
  @plan_id += 1
end

#pipeline(default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline

Create a new Spark Declarative Pipeline (a dataflow graph) in this session.

Parameters:

  • default_catalog (String, nil) (defaults to: nil)
  • default_database (String, nil) (defaults to: nil)
  • sql_conf (Hash{String=>String}) (defaults to: {})

    SQL configs applied to all flows.

Returns:



117
118
119
# File 'lib/spark_connect/session.rb', line 117

def pipeline(default_catalog: nil, default_database: nil, sql_conf: {})
  Pipeline.new(self, default_catalog: default_catalog, default_database: default_database, sql_conf: sql_conf)
end

#range(end_) ⇒ DataFrame #range(start, end_, step = 1, num_partitions = nil) ⇒ DataFrame

Create a DataFrame with a single id column over the given integer range.

Returns:



60
61
62
63
64
65
66
67
68
# File 'lib/spark_connect/session.rb', line 60

def range(start, end_ = nil, step = 1, num_partitions = nil)
  if end_.nil?
    end_ = start
    start = 0
  end
  r = Proto::Range.new(start: start, end: end_, step: step)
  r.num_partitions = num_partitions if num_partitions
  DataFrame.new(self, PlanBuilder.relation(self, range: r))
end

#readDataFrameReader

Returns interface for loading external data.

Returns:



96
97
98
# File 'lib/spark_connect/session.rb', line 96

def read
  DataFrameReader.new(self)
end

#read_streamDataStreamReader Also known as: readStream

Returns interface for loading a streaming DataFrame.

Returns:



101
102
103
# File 'lib/spark_connect/session.rb', line 101

def read_stream
  DataStreamReader.new(self)
end

#remove_tag(tag) ⇒ Object

Remove a previously added operation tag. @return [void]



189
190
191
# File 'lib/spark_connect/session.rb', line 189

def remove_tag(tag)
  @client.remove_tag(tag)
end

#session_idString

Returns the client session id (UUID).

Returns:

  • (String)

    the client session id (UUID).



51
52
53
# File 'lib/spark_connect/session.rb', line 51

def session_id
  @client.session_id
end

#set_activeself

Make this the active/default session.

Returns:

  • (self)


152
153
154
155
# File 'lib/spark_connect/session.rb', line 152

def set_active
  SparkSession.active = self
  self
end

#sql(query, args = nil) ⇒ DataFrame

Execute a SQL query and return a lazy DataFrame over its result.

Parameters:

  • query (String)
  • args (Hash{String=>Object}, Array<Object>, nil) (defaults to: nil)

    named or positional parameters bound into the query.

Returns:



76
77
78
79
80
81
82
83
84
85
# File 'lib/spark_connect/session.rb', line 76

def sql(query, args = nil)
  sql = Proto::SQL.new(query: query)
  case args
  when Hash
    args.each { |k, v| sql.named_arguments[k.to_s] = Column.to_col(v).to_expr }
  when Array
    sql.pos_arguments += args.map { |v| Column.to_col(v).to_expr }
  end
  DataFrame.new(self, PlanBuilder.relation(self, sql: sql))
end

#stopvoid

This method returns an undefined value.

Release the server-side session and stop the client.



205
206
207
208
209
# File 'lib/spark_connect/session.rb', line 205

def stop
  @client.release_session
  SparkSession.active = nil if SparkSession.active.equal?(self)
  nil
end

#streamsStreamingQueryManager

Returns the manager for this session's streaming queries.

Returns:



107
108
109
# File 'lib/spark_connect/session.rb', line 107

def streams
  StreamingQueryManager.new(self)
end

#table(name) ⇒ DataFrame

Return a DataFrame reading the named table or view.

Parameters:

  • name (String)

Returns:



91
92
93
# File 'lib/spark_connect/session.rb', line 91

def table(name)
  read.table(name)
end

#versionString

Returns the Spark version reported by the server.

Returns:

  • (String)

    the Spark version reported by the server.



146
147
148
# File 'lib/spark_connect/session.rb', line 146

def version
  @client.analyze(spark_version: Proto::AnalyzePlanRequest::SparkVersion.new).spark_version.version
end