diff --git a/ingestion/src/metadata/ingestion/source/databricks.py b/ingestion/src/metadata/ingestion/source/databricks.py index cc83875ebb9..e404cc32b20 100644 --- a/ingestion/src/metadata/ingestion/source/databricks.py +++ b/ingestion/src/metadata/ingestion/source/databricks.py @@ -70,6 +70,12 @@ def _get_column_rows(self, connection, table_name, schema): def get_columns(self, connection, table_name, schema=None, **kw): # This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns # to add support for struct, array & map datatype + + # Extract the Database Name from the keyword arguments parameter if it is present. This + # value should match what is provided in the 'source.config.database' field in the + # Databricks ingest config file. + db_name = kw["db_name"] if "db_name" in kw else None + rows = _get_column_rows(self, connection, table_name, schema) result = [] for (col_name, col_type, _comment) in rows: @@ -96,11 +102,17 @@ def get_columns(self, connection, table_name, schema=None, **kw): "comment": _comment, } if col_type in {"array", "struct", "map"}: - rows = dict( - connection.execute( - "DESCRIBE {} {}".format(table_name, col_name) - ).fetchall() - ) + if db_name is not None: + rows = dict( + connection.execute( + f"DESCRIBE {db_name}.{table_name} {col_name}" + ).fetchall() + ) + else: + rows = dict( + connection.execute(f"DESCRIBE {table_name} {col_name}").fetchall() + ) + col_info["raw_data_type"] = rows["data_type"] result.append(col_info) return result diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index ee23f7ebf70..70f192b0ac1 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -621,7 +621,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): unique_columns.extend(constraint.get("column_names")) table_columns = [] - columns = inspector.get_columns(table, schema) + columns = inspector.get_columns(table, schema, db_name=self.service_connection.database) try: for column in columns: try: