diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py index 18b77b41de1..15b0c5919c6 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py @@ -35,9 +35,17 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.databricks.queries import ( + DATABRICKS_GET_TABLE_COMMENTS, + DATABRICKS_VIEW_DEFINITIONS, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger +from metadata.utils.sqlalchemy_utils import ( + get_all_view_definitions, + get_view_definition_wrapper, +) logger = ingestion_logger() @@ -167,9 +175,47 @@ def get_view_names( return views +@reflection.cache +def get_table_comment( # pylint: disable=unused-argument + self, connection, table_name, schema_name, **kw +): + """ + Returns comment of table + """ + cursor = connection.execute( + DATABRICKS_GET_TABLE_COMMENTS.format( + schema_name=schema_name, table_name=table_name + ) + ) + try: + for result in list(cursor): + data = result.values() + if data[0] and data[0].strip() == "Comment": + return {"text": data[1] if data and data[1] else None} + except Exception: + return {"text": None} + return {"text": None} + + +@reflection.cache +def get_view_definition( + self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument +): + return get_view_definition_wrapper( + self, + connection, + table_name=table_name, + schema=schema, + query=DATABRICKS_VIEW_DEFINITIONS, + ) + + +DatabricksDialect.get_table_comment = get_table_comment DatabricksDialect.get_view_names = get_view_names DatabricksDialect.get_columns = get_columns DatabricksDialect.get_schema_names = get_schema_names +DatabricksDialect.get_view_definition = get_view_definition +DatabricksDialect.get_all_view_definitions = get_all_view_definitions reflection.Inspector.get_schema_names = get_schema_names_reflection diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py new file mode 100644 index 00000000000..0882c8475aa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -0,0 +1,28 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during ingestion +""" + +import textwrap + +DATABRICKS_VIEW_DEFINITIONS = textwrap.dedent( + """ + select + TABLE_NAME as view_name, + TABLE_SCHEMA as schema, + VIEW_DEFINITION as view_def + from INFORMATION_SCHEMA.VIEWS WHERE VIEW_DEFINITION IS NOT NULL + """ +) + + +DATABRICKS_GET_TABLE_COMMENTS = "DESCRIBE TABLE EXTENDED {schema_name}.{table_name}"