Class: SparkConnect::Catalog

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_connect/catalog.rb

Overview

The catalog interface for inspecting and managing databases, tables, functions, and the query cache. Returned by SparkSession#catalog. Mirrors PySpark's Catalog.

Methods that return rows (#list_databases, #list_tables, ...) return arrays of Row; predicate methods return booleans.

Examples:

spark.catalog.list_tables.each { |t| puts t["name"] }
spark.catalog.table_exists("my_table")  #=> true

Constant Summary collapse

Proto =
SparkConnect::Proto
C =
Proto::Catalog

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ Catalog

Returns a new instance of Catalog.

Parameters:



19
20
21
# File 'lib/spark_connect/catalog.rb', line 19

def initialize(session)
  @session = session
end

Instance Method Details

#cache_table(table_name) ⇒ Object

Cache a table in memory. @return [void]



111
112
113
# File 'lib/spark_connect/catalog.rb', line 111

def cache_table(table_name)
  run(C.new(cache_table: Proto::CacheTable.new(table_name: table_name.to_s)))
end

#cached?(table_name) ⇒ Boolean

Returns whether the table is cached.

Returns:

  • (Boolean)

    whether the table is cached.



106
107
108
# File 'lib/spark_connect/catalog.rb', line 106

def cached?(table_name)
  scalar(C.new(is_cached: Proto::IsCached.new(table_name: table_name.to_s))) == true
end

#clear_cacheObject

Clear all cached tables. @return [void]



121
122
123
# File 'lib/spark_connect/catalog.rb', line 121

def clear_cache
  run(C.new(clear_cache: Proto::ClearCache.new))
end

#create_external_table(table_name, path: nil, source: nil, schema: nil, options: {}) ⇒ DataFrame

Create a table backed by data at path (an external/unmanaged table).

Returns:



157
158
159
160
161
162
163
164
# File 'lib/spark_connect/catalog.rb', line 157

def create_external_table(table_name, path: nil, source: nil, schema: nil, options: {})
  ct = Proto::CreateExternalTable.new(table_name: table_name.to_s, options: stringify(options))
  ct.path = path if path
  ct.source = source if source
  ct.schema = schema.to_proto if schema
  catalog_df(C.new(create_external_table: ct)).collect # eagerly create the table
  @session.table(table_name.to_s)
end

#create_table(table_name, path: nil, source: nil, schema: nil, description: nil, options: {}) ⇒ DataFrame

Create a managed table and return a DataFrame over it.

Parameters:

  • table_name (String)
  • path (String, nil) (defaults to: nil)
  • source (String, nil) (defaults to: nil)

    the data source/format.

  • schema (Types::StructType, nil) (defaults to: nil)
  • description (String, nil) (defaults to: nil)
  • options (Hash{String=>String}) (defaults to: {})

Returns:



144
145
146
147
148
149
150
151
152
# File 'lib/spark_connect/catalog.rb', line 144

def create_table(table_name, path: nil, source: nil, schema: nil, description: nil, options: {})
  ct = Proto::CreateTable.new(table_name: table_name.to_s, options: stringify(options))
  ct.path = path if path
  ct.source = source if source
  ct.description = description if description
  ct.schema = schema.to_proto if schema
  catalog_df(C.new(create_table: ct)).collect # eagerly create the table
  @session.table(table_name.to_s)
end

#current_catalogString

Returns the current default catalog.

Returns:

  • (String)

    the current default catalog.



24
25
26
# File 'lib/spark_connect/catalog.rb', line 24

def current_catalog
  scalar(C.new(current_catalog: Proto::CurrentCatalog.new))
end

#current_databaseString

Returns the current default database.

Returns:

  • (String)

    the current default database.



39
40
41
# File 'lib/spark_connect/catalog.rb', line 39

def current_database
  scalar(C.new(current_database: Proto::CurrentDatabase.new))
end

#database_exists(db_name) ⇒ Boolean

Returns whether a database exists.

Returns:

  • (Boolean)

    whether a database exists.



84
85
86
# File 'lib/spark_connect/catalog.rb', line 84

def database_exists(db_name)
  scalar(C.new(database_exists: Proto::DatabaseExists.new(db_name: db_name.to_s))) == true
end

#drop_global_temp_view(view_name) ⇒ Object

Drop a global temporary view. @return [Boolean]



101
102
103
# File 'lib/spark_connect/catalog.rb', line 101

def drop_global_temp_view(view_name)
  scalar(C.new(drop_global_temp_view: Proto::DropGlobalTempView.new(view_name: view_name.to_s))) == true
end

#drop_temp_view(view_name) ⇒ Object

Drop a session-local temporary view. @return [Boolean]



96
97
98
# File 'lib/spark_connect/catalog.rb', line 96

def drop_temp_view(view_name)
  scalar(C.new(drop_temp_view: Proto::DropTempView.new(view_name: view_name.to_s))) == true
end

#function_exists(function_name, db_name = nil) ⇒ Boolean

Returns whether a function exists.

Returns:

  • (Boolean)

    whether a function exists.



89
90
91
92
93
# File 'lib/spark_connect/catalog.rb', line 89

def function_exists(function_name, db_name = nil)
  fe = Proto::FunctionExists.new(function_name: function_name.to_s)
  fe.db_name = db_name if db_name
  scalar(C.new(function_exists: fe)) == true
end

#list_catalogsArray<Row>

Returns all catalogs.

Returns:

  • (Array<Row>)

    all catalogs.



34
35
36
# File 'lib/spark_connect/catalog.rb', line 34

def list_catalogs
  rows(C.new(list_catalogs: Proto::ListCatalogs.new))
end

#list_columns(table_name, db_name = nil) ⇒ Array<Row>

Returns the columns of a table.

Parameters:

  • table_name (String)

Returns:

  • (Array<Row>)

    the columns of a table.



70
71
72
73
74
# File 'lib/spark_connect/catalog.rb', line 70

def list_columns(table_name, db_name = nil)
  lc = Proto::ListColumns.new(table_name: table_name.to_s)
  lc.db_name = db_name if db_name
  rows(C.new(list_columns: lc))
end

#list_databasesArray<Row>

Returns all databases.

Returns:

  • (Array<Row>)

    all databases.



49
50
51
# File 'lib/spark_connect/catalog.rb', line 49

def list_databases
  rows(C.new(list_databases: Proto::ListDatabases.new))
end

#list_functions(db_name = nil) ⇒ Array<Row>

Returns functions registered in the catalog.

Returns:

  • (Array<Row>)

    functions registered in the catalog.



62
63
64
65
66
# File 'lib/spark_connect/catalog.rb', line 62

def list_functions(db_name = nil)
  lf = Proto::ListFunctions.new
  lf.db_name = db_name if db_name
  rows(C.new(list_functions: lf))
end

#list_tables(db_name = nil) ⇒ Array<Row>

Returns tables (and views).

Parameters:

  • db_name (String, nil) (defaults to: nil)

    restrict to a database.

Returns:

  • (Array<Row>)

    tables (and views).



55
56
57
58
59
# File 'lib/spark_connect/catalog.rb', line 55

def list_tables(db_name = nil)
  lt = Proto::ListTables.new
  lt.db_name = db_name if db_name
  rows(C.new(list_tables: lt))
end

#recover_partitions(table_name) ⇒ Object

Recover all partitions of a table. @return [void]



131
132
133
# File 'lib/spark_connect/catalog.rb', line 131

def recover_partitions(table_name)
  run(C.new(recover_partitions: Proto::RecoverPartitions.new(table_name: table_name.to_s)))
end

#refresh_table(table_name) ⇒ Object

Invalidate and refresh cached metadata for a table. @return [void]



126
127
128
# File 'lib/spark_connect/catalog.rb', line 126

def refresh_table(table_name)
  run(C.new(refresh_table: Proto::RefreshTable.new(table_name: table_name.to_s)))
end

#set_current_catalog(name) ⇒ Object

Set the current catalog. @return [void]



29
30
31
# File 'lib/spark_connect/catalog.rb', line 29

def set_current_catalog(name)
  run(C.new(set_current_catalog: Proto::SetCurrentCatalog.new(catalog_name: name.to_s)))
end

#set_current_database(name) ⇒ Object

Set the current database. @return [void]



44
45
46
# File 'lib/spark_connect/catalog.rb', line 44

def set_current_database(name)
  run(C.new(set_current_database: Proto::SetCurrentDatabase.new(db_name: name.to_s)))
end

#table_exists(table_name, db_name = nil) ⇒ Boolean

Returns whether a table or view exists.

Returns:

  • (Boolean)

    whether a table or view exists.



77
78
79
80
81
# File 'lib/spark_connect/catalog.rb', line 77

def table_exists(table_name, db_name = nil)
  te = Proto::TableExists.new(table_name: table_name.to_s)
  te.db_name = db_name if db_name
  scalar(C.new(table_exists: te)) == true
end

#uncache_table(table_name) ⇒ Object

Remove a table from the cache. @return [void]



116
117
118
# File 'lib/spark_connect/catalog.rb', line 116

def uncache_table(table_name)
  run(C.new(uncache_table: Proto::UncacheTable.new(table_name: table_name.to_s)))
end