Aggregations and Windows
This page covers grouped aggregation (group_by, rollup, cube), the
GroupedData interface, and window functions built with
Window / WindowSpec. The API mirrors PySpark closely,
so if you know DataFrame.groupBy(...).agg(...) and Window.partitionBy(...)
you will feel at home.
All examples assume a session and the functions module are in scope:
require "spark-connect"
spark = SparkConnect::SparkSession.builder.remote("sc://localhost:15002").get_or_create
F = SparkConnect::F # alias for SparkConnect::Functions
A sample DataFrame used throughout this page:
employees = spark.create_data_frame(
[
{ dept: "eng", name: "ana", salary: 130, year: 2023 },
{ dept: "eng", name: "ben", salary: 110, year: 2023 },
{ dept: "eng", name: "cy", salary: 150, year: 2024 },
{ dept: "sales", name: "dot", salary: 90, year: 2023 },
{ dept: "sales", name: "evan", salary: 120, year: 2024 }
]
)
Grouping
DataFrame#group_by (aliased groupBy / groupby) returns a
GroupedData. Columns may be given as names or Column
objects.
employees.group_by("dept").count.show
# +-----+-----+
# | dept|count|
# +-----+-----+
# | eng| 3|
# |sales| 2|
# +-----+-----+
rollup and cube produce multi-dimensional aggregates with subtotals. A
rollup of (a, b) yields groupings for (a, b), (a), and (); a cube
additionally yields (b).
employees.rollup("dept", "year").sum("salary").order_by("dept", "year").show
employees.cube("dept", "year").count.show
To aggregate the whole DataFrame with no grouping columns, call DataFrame#agg
directly (it is shorthand for group_by.agg):
employees.agg(F.sum("salary").alias("total"), F.avg("salary").alias("avg")).show
GroupedData
GroupedData exposes the standard aggregates. Each method returns a new
DataFrame.
agg
agg accepts either a list of aggregate Columns or a single Hash mapping
column names to function names.
# Column form (most flexible: supports aliases, expressions, multiple stats).
employees.group_by("dept").agg(
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.count("*").alias("headcount")
).show
# Hash form: {column => function_name}.
employees.group_by("dept").agg("salary" => "max", "year" => "min").show
count, sum, avg, max, min
count counts rows per group. sum, avg (aliased mean), max, and min
take one or more numeric column names; with no arguments they apply to every
numeric column.
employees.group_by("dept").count.show
employees.group_by("dept").sum("salary").show
employees.group_by("dept").avg("salary").show # avg and mean are equivalent
employees.group_by("dept").max("salary", "year").show
employees.group_by("dept").min("salary").show
pivot
pivot rotates the values of a column into separate output columns. Supplying
the list of values explicitly is faster and deterministic because the client
does not have to scan for distinct values first.
# Inferred pivot values.
employees.group_by("dept").pivot("year").sum("salary").show
# Explicit pivot values (recommended in production).
employees.group_by("dept").pivot("year", [2023, 2024]).sum("salary").show
# +-----+----+----+
# | dept|2023|2024|
# +-----+----+----+
# | eng| 240| 150|
# |sales| 90| 120|
# +-----+----+----+
Window functions
Window functions compute a value for each row over a related set of rows (a
“window”) without collapsing them into one row per group. Build a WindowSpec
with the SparkConnect::Window factory and attach it to an analytic column with
Column#over.
WindowSpec is immutable: each of partition_by, order_by, rows_between,
and range_between returns a new spec, so you can chain them freely.
w = SparkConnect::Window.partition_by("dept").order_by(F.col("salary").desc)
Ranking functions
row_number, rank, and dense_rank are no-argument functions; call them and
attach a window with over.
ranked = employees.select(
F.col("dept"),
F.col("name"),
F.col("salary"),
F.row_number.over(w).alias("row_num"),
F.rank.over(w).alias("rank"),
F.dense_rank.over(w).alias("dense_rank")
)
ranked.show
percent_rank, cume_dist, and ntile(n) are also available:
employees.select(
F.col("dept"),
F.col("salary"),
F.percent_rank.over(w).alias("pct_rank"),
F.cume_dist.over(w).alias("cume_dist"),
F.ntile(2).over(w).alias("half")
).show
Analytic functions: lag and lead
lag and lead look at preceding or following rows in the window. Both take an
optional offset (default 1) and an optional default value.
ordered = SparkConnect::Window.partition_by("dept").order_by("year")
employees.select(
F.col("dept"),
F.col("year"),
F.col("salary"),
F.lag("salary", 1).over(ordered).alias("prev_salary"),
F.lead("salary", 1, 0).over(ordered).alias("next_salary")
).show
Frames: rows_between and range_between
A window frame restricts which rows in the partition are included relative to
the current row. rows_between counts physical rows; range_between compares
values of the ordering column. Use the boundary constants for unbounded edges
and the current row.
include SparkConnect # brings Window into scope; or qualify with SparkConnect::Window
running = Window
.partition_by("dept")
.order_by("year")
.rows_between(Window::UNBOUNDED_PRECEDING, Window::CURRENT_ROW)
employees.select(
F.col("dept"),
F.col("year"),
F.col("salary"),
F.sum("salary").over(running).alias("running_total")
).show
The boundary constants are:
| Constant | Meaning |
|---|---|
Window::UNBOUNDED_PRECEDING |
start of the partition |
Window::UNBOUNDED_FOLLOWING |
end of the partition |
Window::CURRENT_ROW |
the current row (offset 0) |
Any integer is also valid as an offset, e.g. rows_between(-1, 1) for a
three-row sliding window. range_between uses the same boundaries but
interprets offsets as values of the ordering expression:
salary_band = Window.partition_by("dept").order_by("salary").range_between(-20, 20)
employees.select(
F.col("name"),
F.col("salary"),
F.count("*").over(salary_band).alias("peers_within_20")
).show
See also
- Functions for the full list of aggregate and analytic functions.
- DataFrames for the underlying transformation API.
- Catalog for inspecting tables you aggregate over.