Fix #9282: Support Databricks Unity Catalog (#11896)

This commit is contained in:
Mayur Singal 2023-06-07 11:50:31 +05:30 committed by GitHub
parent c6fdbc43bb
commit 9e6e00caf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1155 additions and 370 deletions

View File

@ -143,7 +143,7 @@ plugins: Dict[str, Set[str]] = {
"dbt-artifacts-parser",
},
"db2": {"ibm-db-sa~=0.3"},
"databricks": {"sqlalchemy-databricks~=0.1"},
"databricks": {"sqlalchemy-databricks~=0.1", "databricks-sdk~=0.1"},
"datalake-azure": {
"azure-storage-blob~=12.14",
"azure-identity~=1.12",

View File

@ -8,6 +8,7 @@ source:
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
useUnityCatalog: true
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:

View File

@ -44,13 +44,7 @@ from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_column_fqn,
get_lineage_by_query,
get_lineage_via_table_entity,
)
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -60,6 +54,7 @@ from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandl
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import calculate_execution_time_generator
from metadata.utils.logger import ingestion_logger
@ -430,52 +425,12 @@ class CommonDbSourceService(
for view in [
v for v in self.context.table_views if v.view_definition is not None
]:
table_name = view.table_name
schema_name = view.schema_name
db_name = view.db_name
view_definition = view.view_definition
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
yield from get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
connection_type=self.service_connection.type.value,
)
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
try:
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineage_parser = LineageParser(view_definition, dialect)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
dialect=dialect,
) or []
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
dialect=dialect,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not parse query [{view_definition}] ingesting lineage failed: {exc}"
)
def _get_foreign_constraints(
self, table_constraints: OMetaTableConstraints

View File

@ -22,12 +22,19 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo
DatabricksConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.models import (
LineageColumnStreams,
LineageTableStreams,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
API_TIMEOUT = 10
QUERIES_PATH = "/sql/history/queries"
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
class DatabricksClient:
@ -41,7 +48,8 @@ class DatabricksClient:
api_version = "/api/2.0"
job_api_version = "/api/2.1"
auth_token = self.config.token.get_secret_value()
self.base_query_url = f"https://{base_url}{api_version}/sql/history/queries"
self.base_url = f"https://{base_url}{api_version}"
self.base_query_url = f"{self.base_url}{QUERIES_PATH}"
self.base_job_url = f"https://{base_url}{job_api_version}/jobs"
self.jobs_list_url = f"{self.base_job_url}/list"
self.jobs_run_list_url = f"{self.base_job_url}/runs/list"
@ -208,3 +216,55 @@ class DatabricksClient:
logger.error(exc)
return job_runs
def get_table_lineage(self, table_name: str) -> LineageTableStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
}
response = self.client.get(
f"{self.base_url}{TABLE_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageTableStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageTableStreams()
def get_column_lineage(
self, table_name: str, column_name: str
) -> LineageColumnStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
"column_name": column_name,
}
response = self.client.get(
f"{self.base_url}{COLUMN_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageColumnStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageColumnStreams()

View File

@ -13,8 +13,9 @@
Source connection handler
"""
from functools import partial
from typing import Optional
from typing import Optional, Union
from databricks.sdk import WorkspaceClient
from sqlalchemy.engine import Engine
from sqlalchemy.inspection import inspect
@ -36,9 +37,11 @@ from metadata.ingestion.connections.test_connections import (
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.models import DatabricksTable
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
)
from metadata.utils.db_utils import get_host_from_host_port
def get_connection_url(connection: DatabricksConnection) -> str:
@ -46,10 +49,17 @@ def get_connection_url(connection: DatabricksConnection) -> str:
return url
def get_connection(connection: DatabricksConnection) -> Engine:
def get_connection(connection: DatabricksConnection) -> Union[Engine, WorkspaceClient]:
"""
Create connection
"""
if connection.useUnityCatalog:
return WorkspaceClient(
host=get_host_from_host_port(connection.hostPort),
token=connection.token.get_secret_value(),
)
if connection.httpPath:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
@ -64,7 +74,7 @@ def get_connection(connection: DatabricksConnection) -> Engine:
def test_connection(
metadata: OpenMetadata,
engine: Engine,
connection: Union[Engine, WorkspaceClient],
service_connection: DatabricksConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
@ -72,22 +82,51 @@ def test_connection(
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
inspector = inspect(engine)
client = DatabricksClient(service_connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, engine),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_query,
engine=engine,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}
if service_connection.useUnityCatalog:
table_obj = DatabricksTable()
def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable):
for catalog in connection.catalogs.list():
table_obj.catalog_name = catalog.name
break
def get_schemas(connection: WorkspaceClient, table_obj: DatabricksTable):
for schema in connection.schemas.list(catalog_name=table_obj.catalog_name):
table_obj.schema_name = schema.name
break
def get_tables(connection: WorkspaceClient, table_obj: DatabricksTable):
for table in connection.tables.list(
catalog_name=table_obj.catalog_name, schema_name=table_obj.schema_name
):
table_obj.name = table.name
break
test_fn = {
"CheckAccess": connection.catalogs.list,
"GetDatabases": partial(get_catalogs, connection, table_obj),
"GetSchemas": partial(get_schemas, connection, table_obj),
"GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access,
}
else:
inspector = inspect(connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_query,
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}
test_connection_steps(
metadata=metadata,

View File

@ -0,0 +1,51 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Databricks lineage module
"""
import traceback
from datetime import datetime
from typing import Iterator, Optional
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksLineageLegacySource(DatabricksQueryParserSource, LineageSource):
"""
Databricks Lineage Legacy Source
"""
def yield_table_query(self) -> Optional[Iterator[TableQuery]]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")

View File

@ -0,0 +1,313 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Databricks legacy source module"""
import re
import traceback
from copy import deepcopy
from typing import Iterable
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_VIEW_DEFINITIONS,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_view_definitions,
get_view_definition_wrapper,
)
logger = ingestion_logger()
class STRUCT(String):
# This class is added to support STRUCT datatype
"""The SQL STRUCT type."""
__visit_name__ = "STRUCT"
class ARRAY(String):
# This class is added to support ARRAY datatype
"""The SQL ARRAY type."""
__visit_name__ = "ARRAY"
class MAP(String):
# This class is added to support MAP datatype
"""The SQL MAP type."""
__visit_name__ = "MAP"
# overriding pyhive.sqlalchemy_hive._type_map
# mapping struct, array & map to custom classed instead of sqltypes.String
_type_map.update(
{
"struct": STRUCT,
"array": ARRAY,
"map": MAP,
"void": create_sqlalchemy_type("VOID"),
"interval": create_sqlalchemy_type("INTERVAL"),
"binary": create_sqlalchemy_type("BINARY"),
}
)
def _get_column_rows(self, connection, table_name, schema):
# get columns and strip whitespace
table_columns = self._get_table_columns( # pylint: disable=protected-access
connection, table_name, schema
)
column_rows = [
[col.strip() if col else None for col in row] for row in table_columns
]
# Filter out empty rows and comment
return [row for row in column_rows if row[0] and row[0] != "# col_name"]
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
to add support for struct, array & map datatype
Extract the Database Name from the keyword arguments parameter if it is present. This
value should match what is provided in the 'source.config.database' field in the
Databricks ingest config file.
"""
db_name = kw["db_name"] if "db_name" in kw else None
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for col_name, col_type, _comment in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_col_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType
col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
}
if col_type in {"array", "struct", "map"}:
if db_name and schema:
rows = dict(
connection.execute(
f"DESCRIBE {db_name}.{schema}.{table_name} {col_name}"
).fetchall()
)
else:
rows = dict(
connection.execute(
f"DESCRIBE {schema}.{table_name} {col_name}"
if schema
else f"DESCRIBE {table_name} {col_name}"
).fetchall()
)
col_info["system_data_type"] = rows["data_type"]
col_info["is_complex"] = True
result.append(col_info)
return result
@reflection.cache
def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument
# Equivalent to SHOW DATABASES
if kw.get("database"):
connection.execute(f"USE CATALOG '{kw.get('database')}'")
return [row[0] for row in connection.execute("SHOW SCHEMAS")]
def get_schema_names_reflection(self, **kw):
"""Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw)
return []
def get_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
view_in_schema = connection.execute(query)
views = []
for row in view_in_schema:
# check number of columns in result
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
# else it is hive with 1 column in the result
if len(row) > 1:
views.append(row[1])
else:
views.append(row[0])
return views
@reflection.cache
def get_table_comment( # pylint: disable=unused-argument
self, connection, table_name, schema_name, **kw
):
"""
Returns comment of table
"""
cursor = connection.execute(
DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=schema_name, table_name=table_name
)
)
try:
for result in list(cursor):
data = result.values()
if data[0] and data[0].strip() == "Comment":
return {"text": data[1] if data and data[1] else None}
except Exception:
return {"text": None}
return {"text": None}
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument
):
schema_name = [row[0] for row in connection.execute("SHOW SCHEMAS")]
if "information_schema" in schema_name:
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DATABRICKS_VIEW_DEFINITIONS,
)
return None
DatabricksDialect.get_table_comment = get_table_comment
DatabricksDialect.get_view_names = get_view_names
DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
DatabricksDialect.get_view_definition = get_view_definition
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
reflection.Inspector.get_schema_names = get_schema_names_reflection
class DatabricksLegacySource(CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Databricks Source using
the legacy hive metastore method
"""
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)
def set_inspector(self, database_name: str) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
logger.info(f"Ingesting from catalog: {database_name}")
new_service_connection = deepcopy(self.service_connection)
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
def get_database_names(self) -> Iterable[str]:
configured_catalog = self.service_connection.__dict__.get("catalog")
if configured_catalog:
self.set_inspector(database_name=configured_catalog)
yield configured_catalog
else:
results = self.connection.execute("SHOW CATALOGS")
for res in results:
if res:
new_catalog = res[0]
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name=new_catalog,
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_catalog,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
logger.error(traceback.format_exc())
logger.warning(
f"Error trying to process database {new_catalog}: {exc}"
)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names(
database=self.context.database.name.__root__
):
yield schema_name

View File

@ -11,41 +11,40 @@
"""
Databricks lineage module
"""
import traceback
from datetime import datetime
from typing import Iterator, Optional
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.databricks.legacy.lineage import (
DatabricksLineageLegacySource,
)
from metadata.ingestion.source.database.databricks.unity_catalog.lineage import (
DatabricksUnityCatalogLineageSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
class DatabricksLineageSource:
"""
Databricks Lineage Source
"""
def yield_table_query(self) -> Optional[Iterator[TableQuery]]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
if not connection.useUnityCatalog:
return DatabricksLineageLegacySource(config, metadata_config)
return DatabricksUnityCatalogLineageSource(config, metadata_config)

View File

@ -8,21 +8,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Clickhouse source module"""
"""Databricks source module"""
import re
import traceback
from copy import deepcopy
from typing import Iterable
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
@ -33,214 +20,18 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_VIEW_DEFINITIONS,
from metadata.ingestion.source.database.databricks.legacy.metadata import (
DatabricksLegacySource,
)
from metadata.ingestion.source.database.databricks.unity_catalog.metadata import (
DatabricksUnityCatalogSource,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_view_definitions,
get_view_definition_wrapper,
)
logger = ingestion_logger()
class STRUCT(String):
# This class is added to support STRUCT datatype
"""The SQL STRUCT type."""
__visit_name__ = "STRUCT"
class ARRAY(String):
# This class is added to support ARRAY datatype
"""The SQL ARRAY type."""
__visit_name__ = "ARRAY"
class MAP(String):
# This class is added to support MAP datatype
"""The SQL MAP type."""
__visit_name__ = "MAP"
# overriding pyhive.sqlalchemy_hive._type_map
# mapping struct, array & map to custom classed instead of sqltypes.String
_type_map.update(
{
"struct": STRUCT,
"array": ARRAY,
"map": MAP,
"void": create_sqlalchemy_type("VOID"),
"interval": create_sqlalchemy_type("INTERVAL"),
"binary": create_sqlalchemy_type("BINARY"),
}
)
def _get_column_rows(self, connection, table_name, schema):
# get columns and strip whitespace
table_columns = self._get_table_columns( # pylint: disable=protected-access
connection, table_name, schema
)
column_rows = [
[col.strip() if col else None for col in row] for row in table_columns
]
# Filter out empty rows and comment
return [row for row in column_rows if row[0] and row[0] != "# col_name"]
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
to add support for struct, array & map datatype
Extract the Database Name from the keyword arguments parameter if it is present. This
value should match what is provided in the 'source.config.database' field in the
Databricks ingest config file.
"""
db_name = kw["db_name"] if "db_name" in kw else None
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for col_name, col_type, _comment in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_col_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType
col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
}
if col_type in {"array", "struct", "map"}:
if db_name and schema:
rows = dict(
connection.execute(
f"DESCRIBE {db_name}.{schema}.{table_name} {col_name}"
).fetchall()
)
else:
rows = dict(
connection.execute(
f"DESCRIBE {schema}.{table_name} {col_name}"
if schema
else f"DESCRIBE {table_name} {col_name}"
).fetchall()
)
col_info["system_data_type"] = rows["data_type"]
col_info["is_complex"] = True
result.append(col_info)
return result
@reflection.cache
def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument
# Equivalent to SHOW DATABASES
if kw.get("database"):
connection.execute(f"USE CATALOG '{kw.get('database')}'")
return [row[0] for row in connection.execute("SHOW SCHEMAS")]
def get_schema_names_reflection(self, **kw):
"""Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw)
return []
def get_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
view_in_schema = connection.execute(query)
views = []
for row in view_in_schema:
# check number of columns in result
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
# else it is hive with 1 column in the result
if len(row) > 1:
views.append(row[1])
else:
views.append(row[0])
return views
@reflection.cache
def get_table_comment( # pylint: disable=unused-argument
self, connection, table_name, schema_name, **kw
):
"""
Returns comment of table
"""
cursor = connection.execute(
DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=schema_name, table_name=table_name
)
)
try:
for result in list(cursor):
data = result.values()
if data[0] and data[0].strip() == "Comment":
return {"text": data[1] if data and data[1] else None}
except Exception:
return {"text": None}
return {"text": None}
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument
):
schema_name = [row[0] for row in connection.execute("SHOW SCHEMAS")]
if "information_schema" in schema_name:
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DATABRICKS_VIEW_DEFINITIONS,
)
return None
DatabricksDialect.get_table_comment = get_table_comment
DatabricksDialect.get_view_names = get_view_names
DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
DatabricksDialect.get_view_definition = get_view_definition
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
reflection.Inspector.get_schema_names = get_schema_names_reflection
class DatabricksSource(CommonDbSourceService):
class DatabricksSource:
"""
Implements the necessary methods to extract
Database metadata from Databricks Source
@ -254,59 +45,6 @@ class DatabricksSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)
def set_inspector(self, database_name: str) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
logger.info(f"Ingesting from catalog: {database_name}")
new_service_connection = deepcopy(self.service_connection)
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
def get_database_names(self) -> Iterable[str]:
configured_catalog = self.service_connection.__dict__.get("catalog")
if configured_catalog:
self.set_inspector(database_name=configured_catalog)
yield configured_catalog
else:
results = self.connection.execute("SHOW CATALOGS")
for res in results:
if res:
new_catalog = res[0]
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name=new_catalog,
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_catalog,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
logger.error(traceback.format_exc())
logger.warning(
f"Error trying to process database {new_catalog}: {exc}"
)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names(
database=self.context.database.name.__root__
):
yield schema_name
if not connection.useUnityCatalog:
return DatabricksLegacySource(config, metadata_config)
return DatabricksUnityCatalogSource(config, metadata_config)

View File

@ -0,0 +1,41 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Databricks Source Model module
"""
from typing import List, Optional
from pydantic import BaseModel
class DatabricksTable(BaseModel):
name: Optional[str]
catalog_name: Optional[str]
schema_name: Optional[str]
class DatabricksColumn(BaseModel):
name: Optional[str]
catalog_name: Optional[str]
schema_name: Optional[str]
table_name: Optional[str]
class LineageTableStreams(BaseModel):
upstream_tables: Optional[List[DatabricksTable]] = []
downstream_tables: Optional[List[DatabricksTable]] = []
class LineageColumnStreams(BaseModel):
upstream_cols: Optional[List[DatabricksColumn]] = []
downstream_cols: Optional[List[DatabricksColumn]] = []

View File

@ -0,0 +1,160 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Databricks Unity Catalog Lineage Source Module
"""
from typing import Iterable, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import LineageTableStreams
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksUnityCatalogLineageSource(Source[AddLineageRequest]):
"""
Databricks Lineage Unity Catalog Source
"""
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config = self.config.sourceConfig.config
self.client = DatabricksClient(self.service_connection)
self.connection_obj = get_connection(self.service_connection)
self.test_connection()
def close(self):
"""
By default, there is nothing to close
"""
def prepare(self):
"""
By default, there's nothing to prepare
"""
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)
def _get_lineage_details(
self, from_table: Table, to_table: Table, databricks_table_fqn: str
) -> Optional[LineageDetails]:
col_lineage = []
for column in to_table.columns:
column_streams = self.client.get_column_lineage(
databricks_table_fqn, column_name=column.name.__root__
)
from_columns = []
for col in column_streams.upstream_cols:
col_fqn = get_column_fqn(from_table, col.name)
if col_fqn:
from_columns.append(col_fqn)
if from_columns:
col_lineage.append(
ColumnLineage(
fromColumns=from_columns,
toColumn=column.fullyQualifiedName.__root__,
)
)
if col_lineage:
return LineageDetails(columnsLineage=col_lineage)
return None
def next_record(self) -> Iterable[AddLineageRequest]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
for database in self.metadata.list_all_entities(
entity=Database, params={"service": self.config.serviceName}
):
for table in self.metadata.list_all_entities(
entity=Table, params={"database": database.fullyQualifiedName.__root__}
):
databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.__root__}"
table_streams: LineageTableStreams = self.client.get_table_lineage(
databricks_table_fqn
)
for upstream_table in table_streams.upstream_tables:
from_entity_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
database_name=upstream_table.catalog_name,
schema_name=upstream_table.schema_name,
table_name=upstream_table.name,
service_name=self.config.serviceName,
)
from_entity = self.metadata.get_by_name(
entity=Table, fqn=from_entity_fqn
)
if from_entity:
lineage_details = self._get_lineage_details(
from_table=from_entity,
to_table=table,
databricks_table_fqn=databricks_table_fqn,
)
yield AddLineageRequest(
edge=EntitiesEdge(
toEntity=EntityReference(id=table.id, type="table"),
fromEntity=EntityReference(
id=from_entity.id, type="table"
),
lineageDetails=lineage_details,
)
)
def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)

View File

@ -0,0 +1,306 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Databricks Unity Catalog Source source methods.
"""
import traceback
from typing import Iterable, List, Optional, Tuple
from databricks.sdk.service.catalog import ColumnInfo
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksUnityCatalogSource(DatabaseServiceSource):
"""
Implements the necessary methods to extract
Database metadata from Databricks Source using
the unity catalog source
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.config = config
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.context.table_views = []
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection: DatabricksConnection = (
self.config.serviceConnection.__root__.config
)
self.client = get_connection(self.service_connection)
self.connection_obj = self.client
self.test_connection()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_database_names(self) -> Iterable[str]:
"""
Default case with a single database.
It might come informed - or not - from the source.
Sources with multiple databases should overwrite this and
apply the necessary filters.
Catalog ID -> Database
"""
if self.service_connection.catalog:
yield self.service_connection.catalog
else:
for catalog in self.client.catalogs.list():
try:
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name=catalog.name,
)
if filter_by_database(
self.config.sourceConfig.config.databaseFilterPattern,
database_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else catalog.name,
):
self.status.filter(
database_fqn,
"Database (Catalog ID) Filtered Out",
)
continue
yield catalog.name
except Exception as exc:
error = f"Unexpected exception to get database name [{catalog.name}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(catalog.name, error, traceback.format_exc())
def yield_database(self, database_name: str) -> Iterable[CreateDatabaseRequest]:
"""
From topology.
Prepare a database request and pass it to the sink
"""
yield CreateDatabaseRequest(
name=database_name,
service=self.context.database_service.fullyQualifiedName,
)
def get_database_schema_names(self) -> Iterable[str]:
"""
return schema names
"""
catalog_name = self.context.database.name.__root__
for schema in self.client.schemas.list(catalog_name=catalog_name):
try:
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=schema.name,
)
if filter_by_schema(
self.config.sourceConfig.config.schemaFilterPattern,
schema_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else schema.name,
):
self.status.filter(schema_fqn, "Schema Filtered Out")
continue
yield schema.name
except Exception as exc:
error = f"Unexpected exception to get database schema [{schema.name}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(schema.name, error, traceback.format_exc())
def yield_database_schema(
self, schema_name: str
) -> Iterable[CreateDatabaseSchemaRequest]:
"""
From topology.
Prepare a database schema request and pass it to the sink
"""
yield CreateDatabaseSchemaRequest(
name=schema_name,
database=self.context.database.fullyQualifiedName,
)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""
Handle table and views.
Fetches them up using the context information and
the inspector set when preparing the db.
:return: tables or views, depending on config
"""
schema_name = self.context.database_schema.name.__root__
catalog_name = self.context.database.name.__root__
for table in self.client.tables.list(
catalog_name=catalog_name,
schema_name=schema_name,
):
try:
table_name = table.name
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
table_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else table_name,
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
continue
table_type: TableType = TableType.Regular
if table.table_type.value.lower() == TableType.View.value.lower():
table_type: TableType = TableType.View
if table.table_type.value.lower() == TableType.External.value.lower():
table_type: TableType = TableType.External
self.context.table_data = table
yield table_name, table_type
except Exception as exc:
error = f"Unexpected exception to get table [{table.Name}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(table.Name, error, traceback.format_exc())
def yield_table(
self, table_name_and_type: Tuple[str, str]
) -> Iterable[Optional[CreateTableRequest]]:
"""
From topology.
Prepare a table request and pass it to the sink
"""
table_name, table_type = table_name_and_type
table = self.context.table_data
schema_name = self.context.database_schema.name.__root__
db_name = self.context.database.name.__root__
table_constraints = None
try:
columns = self.get_columns(table.columns)
table_request = CreateTableRequest(
name=table_name,
tableType=table_type,
description=table.comment,
columns=columns,
tableConstraints=table_constraints,
databaseSchema=self.context.database_schema.fullyQualifiedName,
)
yield table_request
if table_type == TableType.View or table.view_definition:
self.context.table_views.append(
TableView(
table_name=table_name,
schema_name=schema_name,
db_name=db_name,
view_definition=(
f'CREATE VIEW "{db_name}"."{schema_name}"'
f'."{table_name}" AS {table.view_definition}'
),
)
)
self.register_record(table_request=table_request)
except Exception as exc:
error = f"Unexpected exception to yield table [{table_name}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(table_name, error, traceback.format_exc())
def prepare(self):
pass
def get_columns(self, column_data: List[ColumnInfo]) -> Optional[Iterable[Column]]:
# process table regular columns info
for column in column_data:
if column.type_text.lower().startswith("union"):
column.type_text = column.Type.replace(" ", "")
parsed_string = ColumnTypeParser._parse_datatype_string( # pylint: disable=protected-access
column.type_text.lower()
)
parsed_string["name"] = column.name[:64]
parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
parsed_string["description"] = column.comment
yield Column(**parsed_string)
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
logger.info("Processing Lineage for Views")
for view in [
v for v in self.context.table_views if v.view_definition is not None
]:
yield from get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.context.database_service.name.__root__,
connection_type=self.service_connection.type.value,
)
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
pass
def close(self):
pass

View File

@ -0,0 +1,92 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers module for db sources
"""
import traceback
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.logger import utils_logger
logger = utils_logger()
def get_host_from_host_port(uri: str) -> str:
"""
if uri is like "localhost:9000"
then return the host "localhost"
"""
return uri.split(":")[0]
def get_view_lineage(
view: TableView, metadata: OpenMetadata, service_name: str, connection_type: str
):
"""
Method to generate view lineage
"""
table_name = view.table_name
schema_name = view.schema_name
db_name = view.db_name
view_definition = view.view_definition
table_fqn = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
database_name=db_name,
schema_name=schema_name,
table_name=table_name,
)
table_entity = metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
try:
connection_type = str(connection_type)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineage_parser = LineageParser(view_definition, dialect)
if lineage_parser.source_tables and lineage_parser.target_tables:
yield from get_lineage_by_query(
metadata,
query=view_definition,
service_name=service_name,
database_name=db_name,
schema_name=schema_name,
dialect=dialect,
) or []
else:
yield from get_lineage_via_table_entity(
metadata,
table_entity=table_entity,
service_name=service_name,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
dialect=dialect,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not parse query [{view_definition}] ingesting lineage failed: {exc}"
)

View File

@ -114,6 +114,12 @@ This is a sample config for Databricks:
{% /codeInfo %}
{% codeInfo srNumber=40 %}
**useUnityCatalog**: Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
{% /codeInfo %}
#### Source Configuration - Source Config
@ -196,6 +202,9 @@ source:
```yaml {% srNumber=6 %}
connectionTimeout: 120
```
```yaml {% srNumber=40 %}
useUnityCatalog: true
```
```yaml {% srNumber=7 %}
# connectionOptions:
# key: value

View File

@ -114,6 +114,12 @@ This is a sample config for Databricks:
{% /codeInfo %}
{% codeInfo srNumber=35 %}
**useUnityCatalog**: Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
{% /codeInfo %}
#### Source Configuration - Source Config
@ -196,6 +202,9 @@ source:
```yaml {% srNumber=6 %}
connectionTimeout: 120
```
```yaml {% srNumber=35 %}
useUnityCatalog: true
```
```yaml {% srNumber=7 %}
# connectionOptions:
# key: value

View File

@ -191,6 +191,7 @@ desired.
- **connectionTimeout**: The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.
- **Catalog**: Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.
- **DatabaseSchema**: databaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.
- **Use Unity Catalog**: Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
- **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Databricks during the connection. These details must be added as Key-Value pairs.
- **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Databricks during the connection. These details must be added as Key-Value pairs.
- In case you are using Single-Sign-On (SSO) for authentication, add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "sso_login_url"`

View File

@ -64,6 +64,12 @@
"type": "integer",
"default": 120
},
"useUnityCatalog": {
"title": "Use Unity Catalog",
"description": "Use unity catalog for fetching the metadata instead of using the hive metastore",
"type": "boolean",
"default": false
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"

View File

@ -57,6 +57,11 @@ The maximum amount of time (in seconds) to wait for a successful connection to t
If your connection fails because your cluster has not had enough time to start, you can try updating this parameter with a bigger number.
$$
$$section
### Use Unity Catalog $(id="useUnityCatalog")
Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
$$
$$section
### Connection Options $(id="connectionOptions")
Additional connection options to build the URL that can be sent to service during the connection.