Class: SparkConnect::Catalog
- Inherits:
-
Object
- Object
- SparkConnect::Catalog
- Defined in:
- lib/spark_connect/catalog.rb
Overview
The catalog interface for inspecting and managing databases, tables,
functions, and the query cache. Returned by SparkSession#catalog. Mirrors
PySpark's Catalog.
Methods that return rows (#list_databases, #list_tables, ...) return arrays of Row; predicate methods return booleans.
Constant Summary collapse
- Proto =
SparkConnect::Proto
- C =
Proto::Catalog
Instance Method Summary collapse
-
#cache_table(table_name) ⇒ Object
Cache a table in memory.
-
#cached?(table_name) ⇒ Boolean
Whether the table is cached.
-
#clear_cache ⇒ Object
Clear all cached tables.
-
#create_external_table(table_name, path: nil, source: nil, schema: nil, options: {}) ⇒ DataFrame
Create a table backed by data at
path(an external/unmanaged table). -
#create_table(table_name, path: nil, source: nil, schema: nil, description: nil, options: {}) ⇒ DataFrame
Create a managed table and return a DataFrame over it.
-
#current_catalog ⇒ String
The current default catalog.
-
#current_database ⇒ String
The current default database.
-
#database_exists(db_name) ⇒ Boolean
Whether a database exists.
-
#drop_global_temp_view(view_name) ⇒ Object
Drop a global temporary view.
-
#drop_temp_view(view_name) ⇒ Object
Drop a session-local temporary view.
-
#function_exists(function_name, db_name = nil) ⇒ Boolean
Whether a function exists.
-
#initialize(session) ⇒ Catalog
constructor
A new instance of Catalog.
-
#list_catalogs ⇒ Array<Row>
All catalogs.
-
#list_columns(table_name, db_name = nil) ⇒ Array<Row>
The columns of a table.
-
#list_databases ⇒ Array<Row>
All databases.
-
#list_functions(db_name = nil) ⇒ Array<Row>
Functions registered in the catalog.
-
#list_tables(db_name = nil) ⇒ Array<Row>
Tables (and views).
-
#recover_partitions(table_name) ⇒ Object
Recover all partitions of a table.
-
#refresh_table(table_name) ⇒ Object
Invalidate and refresh cached metadata for a table.
-
#set_current_catalog(name) ⇒ Object
Set the current catalog.
-
#set_current_database(name) ⇒ Object
Set the current database.
-
#table_exists(table_name, db_name = nil) ⇒ Boolean
Whether a table or view exists.
-
#uncache_table(table_name) ⇒ Object
Remove a table from the cache.
Constructor Details
#initialize(session) ⇒ Catalog
Returns a new instance of Catalog.
19 20 21 |
# File 'lib/spark_connect/catalog.rb', line 19 def initialize(session) @session = session end |
Instance Method Details
#cache_table(table_name) ⇒ Object
Cache a table in memory. @return [void]
111 112 113 |
# File 'lib/spark_connect/catalog.rb', line 111 def cache_table(table_name) run(C.new(cache_table: Proto::CacheTable.new(table_name: table_name.to_s))) end |
#cached?(table_name) ⇒ Boolean
Returns whether the table is cached.
106 107 108 |
# File 'lib/spark_connect/catalog.rb', line 106 def cached?(table_name) scalar(C.new(is_cached: Proto::IsCached.new(table_name: table_name.to_s))) == true end |
#clear_cache ⇒ Object
Clear all cached tables. @return [void]
121 122 123 |
# File 'lib/spark_connect/catalog.rb', line 121 def clear_cache run(C.new(clear_cache: Proto::ClearCache.new)) end |
#create_external_table(table_name, path: nil, source: nil, schema: nil, options: {}) ⇒ DataFrame
Create a table backed by data at path (an external/unmanaged table).
157 158 159 160 161 162 163 164 |
# File 'lib/spark_connect/catalog.rb', line 157 def create_external_table(table_name, path: nil, source: nil, schema: nil, options: {}) ct = Proto::CreateExternalTable.new(table_name: table_name.to_s, options: stringify()) ct.path = path if path ct.source = source if source ct.schema = schema.to_proto if schema catalog_df(C.new(create_external_table: ct)).collect # eagerly create the table @session.table(table_name.to_s) end |
#create_table(table_name, path: nil, source: nil, schema: nil, description: nil, options: {}) ⇒ DataFrame
Create a managed table and return a DataFrame over it.
144 145 146 147 148 149 150 151 152 |
# File 'lib/spark_connect/catalog.rb', line 144 def create_table(table_name, path: nil, source: nil, schema: nil, description: nil, options: {}) ct = Proto::CreateTable.new(table_name: table_name.to_s, options: stringify()) ct.path = path if path ct.source = source if source ct.description = description if description ct.schema = schema.to_proto if schema catalog_df(C.new(create_table: ct)).collect # eagerly create the table @session.table(table_name.to_s) end |
#current_catalog ⇒ String
Returns the current default catalog.
24 25 26 |
# File 'lib/spark_connect/catalog.rb', line 24 def current_catalog scalar(C.new(current_catalog: Proto::CurrentCatalog.new)) end |
#current_database ⇒ String
Returns the current default database.
39 40 41 |
# File 'lib/spark_connect/catalog.rb', line 39 def current_database scalar(C.new(current_database: Proto::CurrentDatabase.new)) end |
#database_exists(db_name) ⇒ Boolean
Returns whether a database exists.
84 85 86 |
# File 'lib/spark_connect/catalog.rb', line 84 def database_exists(db_name) scalar(C.new(database_exists: Proto::DatabaseExists.new(db_name: db_name.to_s))) == true end |
#drop_global_temp_view(view_name) ⇒ Object
Drop a global temporary view. @return [Boolean]
101 102 103 |
# File 'lib/spark_connect/catalog.rb', line 101 def drop_global_temp_view(view_name) scalar(C.new(drop_global_temp_view: Proto::DropGlobalTempView.new(view_name: view_name.to_s))) == true end |
#drop_temp_view(view_name) ⇒ Object
Drop a session-local temporary view. @return [Boolean]
96 97 98 |
# File 'lib/spark_connect/catalog.rb', line 96 def drop_temp_view(view_name) scalar(C.new(drop_temp_view: Proto::DropTempView.new(view_name: view_name.to_s))) == true end |
#function_exists(function_name, db_name = nil) ⇒ Boolean
Returns whether a function exists.
89 90 91 92 93 |
# File 'lib/spark_connect/catalog.rb', line 89 def function_exists(function_name, db_name = nil) fe = Proto::FunctionExists.new(function_name: function_name.to_s) fe.db_name = db_name if db_name scalar(C.new(function_exists: fe)) == true end |
#list_catalogs ⇒ Array<Row>
Returns all catalogs.
34 35 36 |
# File 'lib/spark_connect/catalog.rb', line 34 def list_catalogs rows(C.new(list_catalogs: Proto::ListCatalogs.new)) end |
#list_columns(table_name, db_name = nil) ⇒ Array<Row>
Returns the columns of a table.
70 71 72 73 74 |
# File 'lib/spark_connect/catalog.rb', line 70 def list_columns(table_name, db_name = nil) lc = Proto::ListColumns.new(table_name: table_name.to_s) lc.db_name = db_name if db_name rows(C.new(list_columns: lc)) end |
#list_databases ⇒ Array<Row>
Returns all databases.
49 50 51 |
# File 'lib/spark_connect/catalog.rb', line 49 def list_databases rows(C.new(list_databases: Proto::ListDatabases.new)) end |
#list_functions(db_name = nil) ⇒ Array<Row>
Returns functions registered in the catalog.
62 63 64 65 66 |
# File 'lib/spark_connect/catalog.rb', line 62 def list_functions(db_name = nil) lf = Proto::ListFunctions.new lf.db_name = db_name if db_name rows(C.new(list_functions: lf)) end |
#list_tables(db_name = nil) ⇒ Array<Row>
Returns tables (and views).
55 56 57 58 59 |
# File 'lib/spark_connect/catalog.rb', line 55 def list_tables(db_name = nil) lt = Proto::ListTables.new lt.db_name = db_name if db_name rows(C.new(list_tables: lt)) end |
#recover_partitions(table_name) ⇒ Object
Recover all partitions of a table. @return [void]
131 132 133 |
# File 'lib/spark_connect/catalog.rb', line 131 def recover_partitions(table_name) run(C.new(recover_partitions: Proto::RecoverPartitions.new(table_name: table_name.to_s))) end |
#refresh_table(table_name) ⇒ Object
Invalidate and refresh cached metadata for a table. @return [void]
126 127 128 |
# File 'lib/spark_connect/catalog.rb', line 126 def refresh_table(table_name) run(C.new(refresh_table: Proto::RefreshTable.new(table_name: table_name.to_s))) end |
#set_current_catalog(name) ⇒ Object
Set the current catalog. @return [void]
29 30 31 |
# File 'lib/spark_connect/catalog.rb', line 29 def set_current_catalog(name) run(C.new(set_current_catalog: Proto::SetCurrentCatalog.new(catalog_name: name.to_s))) end |
#set_current_database(name) ⇒ Object
Set the current database. @return [void]
44 45 46 |
# File 'lib/spark_connect/catalog.rb', line 44 def set_current_database(name) run(C.new(set_current_database: Proto::SetCurrentDatabase.new(db_name: name.to_s))) end |
#table_exists(table_name, db_name = nil) ⇒ Boolean
Returns whether a table or view exists.
77 78 79 80 81 |
# File 'lib/spark_connect/catalog.rb', line 77 def table_exists(table_name, db_name = nil) te = Proto::TableExists.new(table_name: table_name.to_s) te.db_name = db_name if db_name scalar(C.new(table_exists: te)) == true end |