Class: SparkConnect::SparkSession
- Inherits:
-
Object
- Object
- SparkConnect::SparkSession
- Defined in:
- lib/spark_connect/session.rb,
lib/spark_connect/session.rb
Overview
Fluent builder for SparkSession. Returned by SparkSession.builder.
Defined Under Namespace
Classes: Builder
Constant Summary collapse
- Proto =
SparkConnect::Proto
Class Attribute Summary collapse
-
.active ⇒ SparkSession?
The currently active session set by #set_active / Builder#get_or_create.
Instance Attribute Summary collapse
- #client ⇒ SparkConnectClient readonly
-
#conf ⇒ RuntimeConfig
readonly
Runtime configuration facade.
Class Method Summary collapse
-
.builder ⇒ Builder
A new session builder.
Instance Method Summary collapse
-
#add_tag(tag) ⇒ void
Add an operation tag applied to all subsequent executions in this session.
-
#catalog ⇒ Catalog
The catalog facade (databases, tables, functions, cache).
-
#clear_tags ⇒ Object
Remove all operation tags.
-
#create_data_frame(data, schema = nil) ⇒ DataFrame
(also: #create_dataframe, #createDataFrame)
Build a DataFrame from local Ruby data.
- #create_data_frame_from_relation(relation) ⇒ Object private
-
#get_tags ⇒ Array<String>
The currently active operation tags.
-
#initialize(client) ⇒ SparkSession
constructor
A new instance of SparkSession.
-
#interrupt_all ⇒ Array<String>
Interrupt all operations running in this session.
-
#interrupt_operation(operation_id) ⇒ Array<String>
Interrupt a single operation by id.
-
#interrupt_tag(tag) ⇒ Array<String>
Interrupt all operations tagged with
tag(see #add_tag). -
#new_session ⇒ SparkSession
Start a brand-new session against the same endpoint (independent server-side session id, configuration, and temporary views).
-
#next_plan_id ⇒ Integer
Allocate the next unique plan id.
-
#pipeline(default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline
Create a new Spark Declarative Pipeline (a dataflow graph) in this session.
-
#range(start, end_ = nil, step = 1, num_partitions = nil) ⇒ DataFrame
Create a DataFrame with a single
idcolumn over the given integer range. -
#read ⇒ DataFrameReader
Interface for loading external data.
-
#read_stream ⇒ DataStreamReader
(also: #readStream)
Interface for loading a streaming DataFrame.
-
#remove_tag(tag) ⇒ Object
Remove a previously added operation tag.
-
#session_id ⇒ String
The client session id (UUID).
-
#set_active ⇒ self
Make this the active/default session.
-
#sql(query, args = nil) ⇒ DataFrame
Execute a SQL query and return a lazy DataFrame over its result.
-
#stop ⇒ void
Release the server-side session and stop the client.
-
#streams ⇒ StreamingQueryManager
The manager for this session's streaming queries.
-
#table(name) ⇒ DataFrame
Return a DataFrame reading the named table or view.
-
#version ⇒ String
The Spark version reported by the server.
Constructor Details
#initialize(client) ⇒ SparkSession
Returns a new instance of SparkSession.
25 26 27 28 29 |
# File 'lib/spark_connect/session.rb', line 25 def initialize(client) @client = client @plan_id = -1 @conf = RuntimeConfig.new(client) end |
Class Attribute Details
.active ⇒ SparkSession?
The currently active session set by #set_active / SparkConnect::SparkSession::Builder#get_or_create.
39 40 41 |
# File 'lib/spark_connect/session.rb', line 39 def active @active end |
Instance Attribute Details
#client ⇒ SparkConnectClient (readonly)
22 23 24 |
# File 'lib/spark_connect/session.rb', line 22 def client @client end |
#conf ⇒ RuntimeConfig (readonly)
Returns runtime configuration facade.
138 139 140 |
# File 'lib/spark_connect/session.rb', line 138 def conf @conf end |
Class Method Details
Instance Method Details
#add_tag(tag) ⇒ void
This method returns an undefined value.
Add an operation tag applied to all subsequent executions in this session.
184 185 186 |
# File 'lib/spark_connect/session.rb', line 184 def add_tag(tag) @client.add_tag(tag) end |
#catalog ⇒ Catalog
Returns the catalog facade (databases, tables, functions, cache).
141 142 143 |
# File 'lib/spark_connect/session.rb', line 141 def catalog @catalog ||= Catalog.new(self) end |
#clear_tags ⇒ Object
Remove all operation tags. @return [void]
199 200 201 |
# File 'lib/spark_connect/session.rb', line 199 def @client. end |
#create_data_frame(data, schema = nil) ⇒ DataFrame Also known as: create_dataframe, createDataFrame
Build a DataFrame from local Ruby data.
127 128 129 130 131 132 133 |
# File 'lib/spark_connect/session.rb', line 127 def create_data_frame(data, schema = nil) data = data.to_a struct = resolve_schema(data, schema) arrow_bytes = ArrowConverter.from_rows(data, struct) local = Proto::LocalRelation.new(data: arrow_bytes, schema: struct.simple_string.sub(/\Astruct</, "").sub(/>\z/, "")) DataFrame.new(self, PlanBuilder.relation(self, local_relation: local)) end |
#create_data_frame_from_relation(relation) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
212 213 214 |
# File 'lib/spark_connect/session.rb', line 212 def create_data_frame_from_relation(relation) DataFrame.new(self, relation) end |
#get_tags ⇒ Array<String>
Returns the currently active operation tags.
194 195 196 |
# File 'lib/spark_connect/session.rb', line 194 def @client..dup end |
#interrupt_all ⇒ Array<String>
Interrupt all operations running in this session.
166 167 168 |
# File 'lib/spark_connect/session.rb', line 166 def interrupt_all @client.interrupt(type: :all).interrupted_ids.to_a end |
#interrupt_operation(operation_id) ⇒ Array<String>
Interrupt a single operation by id.
178 179 180 |
# File 'lib/spark_connect/session.rb', line 178 def interrupt_operation(operation_id) @client.interrupt(type: :operation_id, value: operation_id.to_s).interrupted_ids.to_a end |
#interrupt_tag(tag) ⇒ Array<String>
Interrupt all operations tagged with tag (see #add_tag).
172 173 174 |
# File 'lib/spark_connect/session.rb', line 172 def interrupt_tag(tag) @client.interrupt(type: :tag, value: tag.to_s).interrupted_ids.to_a end |
#new_session ⇒ SparkSession
Start a brand-new session against the same endpoint (independent server-side session id, configuration, and temporary views).
160 161 162 |
# File 'lib/spark_connect/session.rb', line 160 def new_session SparkSession.new(SparkConnectClient.new(@client.channel_builder)) end |
#next_plan_id ⇒ Integer
Allocate the next unique plan id. Used by PlanBuilder.relation.
46 47 48 |
# File 'lib/spark_connect/session.rb', line 46 def next_plan_id @plan_id += 1 end |
#pipeline(default_catalog: nil, default_database: nil, sql_conf: {}) ⇒ Pipeline
Create a new Spark Declarative Pipeline (a dataflow graph) in this session.
117 118 119 |
# File 'lib/spark_connect/session.rb', line 117 def pipeline(default_catalog: nil, default_database: nil, sql_conf: {}) Pipeline.new(self, default_catalog: default_catalog, default_database: default_database, sql_conf: sql_conf) end |
#range(end_) ⇒ DataFrame #range(start, end_, step = 1, num_partitions = nil) ⇒ DataFrame
Create a DataFrame with a single id column over the given integer range.
60 61 62 63 64 65 66 67 68 |
# File 'lib/spark_connect/session.rb', line 60 def range(start, end_ = nil, step = 1, num_partitions = nil) if end_.nil? end_ = start start = 0 end r = Proto::Range.new(start: start, end: end_, step: step) r.num_partitions = num_partitions if num_partitions DataFrame.new(self, PlanBuilder.relation(self, range: r)) end |
#read ⇒ DataFrameReader
Returns interface for loading external data.
96 97 98 |
# File 'lib/spark_connect/session.rb', line 96 def read DataFrameReader.new(self) end |
#read_stream ⇒ DataStreamReader Also known as: readStream
Returns interface for loading a streaming DataFrame.
101 102 103 |
# File 'lib/spark_connect/session.rb', line 101 def read_stream DataStreamReader.new(self) end |
#remove_tag(tag) ⇒ Object
Remove a previously added operation tag. @return [void]
189 190 191 |
# File 'lib/spark_connect/session.rb', line 189 def remove_tag(tag) @client.remove_tag(tag) end |
#session_id ⇒ String
Returns the client session id (UUID).
51 52 53 |
# File 'lib/spark_connect/session.rb', line 51 def session_id @client.session_id end |
#set_active ⇒ self
Make this the active/default session.
152 153 154 155 |
# File 'lib/spark_connect/session.rb', line 152 def set_active SparkSession.active = self self end |
#sql(query, args = nil) ⇒ DataFrame
Execute a SQL query and return a lazy DataFrame over its result.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/spark_connect/session.rb', line 76 def sql(query, args = nil) sql = Proto::SQL.new(query: query) case args when Hash args.each { |k, v| sql.named_arguments[k.to_s] = Column.to_col(v).to_expr } when Array sql.pos_arguments += args.map { |v| Column.to_col(v).to_expr } end DataFrame.new(self, PlanBuilder.relation(self, sql: sql)) end |
#stop ⇒ void
This method returns an undefined value.
Release the server-side session and stop the client.
205 206 207 208 209 |
# File 'lib/spark_connect/session.rb', line 205 def stop @client.release_session SparkSession.active = nil if SparkSession.active.equal?(self) nil end |
#streams ⇒ StreamingQueryManager
Returns the manager for this session's streaming queries.
107 108 109 |
# File 'lib/spark_connect/session.rb', line 107 def streams StreamingQueryManager.new(self) end |
#table(name) ⇒ DataFrame
Return a DataFrame reading the named table or view.
91 92 93 |
# File 'lib/spark_connect/session.rb', line 91 def table(name) read.table(name) end |
#version ⇒ String
Returns the Spark version reported by the server.
146 147 148 |
# File 'lib/spark_connect/session.rb', line 146 def version @client.analyze(spark_version: Proto::AnalyzePlanRequest::SparkVersion.new).spark_version.version end |