Configuration and Connection¶
Connection strings¶
You connect by giving the builder a Spark Connect connection string. The grammar matches the official Spark Connect clients:
The host is required; the port defaults to 15002. Parameters live after a
/ and are separated by ;. Common parameters:
| Parameter | Meaning |
|---|---|
token |
Bearer token. Implies TLS and adds an authorization header. |
user_id |
The Spark user id. |
user_agent |
Client user agent. |
use_ssl |
true / false to force TLS on or off. |
session_id |
Reuse a specific server-side session id. |
Examples:
// Plain local server, no TLS.
SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
// A remote host with a bearer token (TLS is implied by the token).
SparkSession.builder
.remote("sc://spark.example.com:443/;token=abc123;user_id=alice")
.getOrCreate()
// Force TLS on and set a custom user agent.
SparkSession.builder
.remote("sc://spark.example.com:15002/;use_ssl=true;user_agent=my-app/1.0")
.getOrCreate()
Building a session¶
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.remote("sc://localhost:15002")
.appName("analytics")
.userAgent("analytics-job/1.0")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate()
remote(url)sets the connection string.appName(name)sets the application name.userAgent(agent)sets the gRPC user agent.config(key, value)sets a session configuration option. Overloads acceptString,Boolean,Long, andDoublevalues.getOrCreate()returns the active session if one exists, otherwise creates and activates a new session.create()always builds a fresh session.
Runtime configuration¶
spark.conf is the live, server-side configuration for the session:
spark.conf.set("spark.sql.shuffle.partitions", "16")
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.get("spark.sql.shuffle.partitions") // throws if unset
spark.conf.get("spark.sql.shuffle.partitions", "200") // with a default
spark.conf.getOption("spark.sql.shuffle.partitions") // Option[String]
spark.conf.getAll // Map[String, String]
spark.conf.unset("spark.sql.shuffle.partitions")
Session lifecycle¶
spark.version // the Spark version reported by the server
spark.sessionId // the client session id
spark.setActive() // mark this session as the active one
spark.stop() // release the server-side session and close the connection
A robust pattern wraps the work in try/finally:
val spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
try {
// ... work ...
} finally {
spark.stop()
}
Pointing examples and tests at a server¶
The example programs and integration tests read the connection string from an environment variable so you can retarget them without editing code:
export SPARK_REMOTE=sc://localhost:15002
sbt "examples/runMain examples.QuickStart"
SPARK_CONNECT_TEST_REMOTE=sc://localhost:15002 sbt "client/testOnly *Integration*"
Resiliency: retries and reattachable execution¶
The client is resilient to transient network failures out of the box:
- Automatic retries. Idempotent RPCs and the initial
ExecutePlanare retried on transient gRPC errors (for exampleUNAVAILABLE) using exponential backoff with jitter, honoring any server-providedRetryInforetry delay. - Reattachable execution. A query's result stream is requested as reattachable. If
the connection drops part-way through, the client resumes from the last response it
saw via
ReattachExecuteinstead of re-running the query, and periodically issuesReleaseExecuteso the server can free buffered results. This is the same mechanism used by the official PySpark and JVM clients and works against Spark 3.5 and newer.
Reattachable execution is enabled by default. It can be turned off (for debugging, or against a server that does not support it) when constructing the client: