Databricks comments & view definitions in bulk (#10112)

* Add databricks comments

* Add view definition

* Add view definition
This commit is contained in:
Pere Miquel Brull 2023-02-06 08:55:15 +01:00 committed by GitHub
parent 97747d9803
commit 1f11fdefb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 0 deletions

View File

@ -35,9 +35,17 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_VIEW_DEFINITIONS,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_view_definitions,
get_view_definition_wrapper,
)
logger = ingestion_logger()
@ -167,9 +175,47 @@ def get_view_names(
return views
@reflection.cache
def get_table_comment( # pylint: disable=unused-argument
self, connection, table_name, schema_name, **kw
):
"""
Returns comment of table
"""
cursor = connection.execute(
DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=schema_name, table_name=table_name
)
)
try:
for result in list(cursor):
data = result.values()
if data[0] and data[0].strip() == "Comment":
return {"text": data[1] if data and data[1] else None}
except Exception:
return {"text": None}
return {"text": None}
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument
):
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DATABRICKS_VIEW_DEFINITIONS,
)
DatabricksDialect.get_table_comment = get_table_comment
DatabricksDialect.get_view_names = get_view_names
DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
DatabricksDialect.get_view_definition = get_view_definition
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
reflection.Inspector.get_schema_names = get_schema_names_reflection

View File

@ -0,0 +1,28 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SQL Queries used during ingestion
"""
import textwrap
DATABRICKS_VIEW_DEFINITIONS = textwrap.dedent(
"""
select
TABLE_NAME as view_name,
TABLE_SCHEMA as schema,
VIEW_DEFINITION as view_def
from INFORMATION_SCHEMA.VIEWS WHERE VIEW_DEFINITION IS NOT NULL
"""
)
DATABRICKS_GET_TABLE_COMMENTS = "DESCRIBE TABLE EXTENDED {schema_name}.{table_name}"