Configuration, Observations & Errors

Runtime configuration

spark.conf returns a {SparkConnect::RuntimeConfig} for getting and setting Spark SQL runtime properties.

spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.get("spark.sql.shuffle.partitions")          #=> "8"
spark.conf.get("spark.sql.session.timeZone", "UTC")     # with a default
spark.conf.unset("spark.sql.shuffle.partitions")
spark.conf.get_all("spark.sql")                         # filtered by prefix -> Hash
spark.conf.modifiable?("spark.sql.shuffle.partitions")  #=> true

Session-level options can also be supplied when building the session:

SparkConnect::SparkSession.builder
  .remote("sc://localhost:15002")
  .config("spark.sql.shuffle.partitions", 16)
  .get_or_create

Observations

An Observation collects named aggregate metrics while a DataFrame is materialised - without a second pass over the data.

obs = SparkConnect::Observation.new("metrics")
df.observe(obs, F.count(F.lit(1)).alias("rows"), F.max("id").alias("max_id")).collect
obs.get   #=> {"rows" => 1000, "max_id" => 999}

Error handling

All library errors descend from SparkConnect::Error:

Class Raised when
SparkConnect::ConnectionError a connection string is malformed
SparkConnect::IllegalArgumentError an argument is invalid before any request
SparkConnect::SparkConnectError the server returns an error (base)
SparkConnect::AnalysisError analysis-time failure (e.g. unresolved column)
SparkConnect::ParseError SQL parse failure
SparkConnect::RetriesExceededError transient failures exhausted the retry budget
begin
  spark.sql("SELECT * FROM does_not_exist").collect
rescue SparkConnect::AnalysisError => e
  warn "Analysis failed: #{e.message} (error_class=#{e.error_class})"
end

Server errors carry Spark’s canonical error_class and sql_state when the server provides them, plus the originating gRPC status code (grpc_code).

Tags and interrupts

Tag the operations a session runs, then interrupt by tag, by operation id, or all at once:

spark.add_tag("nightly-etl")
# ... run queries; each is tagged "nightly-etl" ...
spark.get_tags                 #=> ["nightly-etl"]
spark.interrupt_tag("nightly-etl")   # cancel everything with that tag
spark.interrupt_all                  # cancel all running operations
spark.clear_tags

New sessions

other = spark.new_session   # independent server-side session, config, temp views

Retries

The client automatically retries transient gRPC failures (UNAVAILABLE, DEADLINE_EXCEEDED, ABORTED, RESOURCE_EXHAUSTED) with exponential backoff and jitter. Tune the policy when constructing a client directly via SparkConnect::SparkConnectClient.new(channel_builder, max_retries:, retry_base_delay:, max_retry_delay:).