#14115: Separate Unity Catalog From Databricks (#14138)

This commit is contained in:
Mayur Singal 2023-12-04 11:22:46 +05:30 committed by GitHub
parent 59d0e82ec3
commit 389ae79d3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1517 additions and 598 deletions

View File

@ -0,0 +1,85 @@
-- update service type to UnityCatalog - update database entity
UPDATE database_entity de
SET de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where id in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));
-- update service type to UnityCatalog - update database schema entity
UPDATE database_schema_entity dse
SET dse.json = JSON_INSERT(
JSON_REMOVE(dse.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where JSON_EXTRACT(dse.json, '$.database.id') in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));
-- update service type to UnityCatalog - update table entity
UPDATE table_entity te
SET te.json = JSON_INSERT(
JSON_REMOVE(te.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where JSON_EXTRACT(te.json, '$.database.id') in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));
-- update service type to UnityCatalog - update db service entity
UPDATE dbservice_entity de
SET de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.connection.config.type'),
'$.connection.config.type',
'UnityCatalog'
),de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
WHERE de.serviceType = 'Databricks'
AND JSON_EXTRACT(de.json, '$.connection.config.useUnityCatalog') = True
;
-- remove `useUnityCatalog` flag from service connection details of databricks
UPDATE dbservice_entity de
SET de.json = JSON_REMOVE(de.json, '$.connection.config.useUnityCatalog')
WHERE de.serviceType IN ('Databricks','UnityCatalog');

View File

@ -10,3 +10,87 @@ SET json = jsonb_set(
WHERE json #>> '{pipelineType}' = 'metadata'
AND json #>> '{sourceConfig,config,type}' = 'DatabaseMetadata'
AND json #>> '{sourceConfig,config,viewParsingTimeoutLimit}' is not null;
-- update service type to UnityCatalog - update database entity
UPDATE database_entity de
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where id in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));
-- update service type to UnityCatalog - update database schema entity
UPDATE database_schema_entity dse
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where json #>> '{database,id}' in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));
-- update service type to UnityCatalog - update table entity
UPDATE table_entity te
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where json #>> '{database,id}' in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));
-- update service type to UnityCatalog - update db service entity
UPDATE dbservice_entity de
SET json = jsonb_set(
jsonb_set(
de.json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"'
) #- '{connection,config,type}',
'{connection,config,type}',
'"UnityCatalog"'
)
WHERE de.serviceType = 'Databricks'
AND (de.json #>> '{connection,config,useUnityCatalog}')::bool = True
;
-- remove `useUnityCatalog` flag from service connection details of databricks
UPDATE dbservice_entity de
SET json = json #- '{connection,config,useUnityCatalog}'
WHERE de.serviceType IN ('Databricks','UnityCatalog');

View File

@ -8,7 +8,6 @@ source:
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
useUnityCatalog: true
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:

View File

@ -0,0 +1,27 @@
source:
type: unitycatalog
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,18 @@
source:
type: unitycatalog-lineage
serviceName: local_unitycatalog
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
resultLimit: 10000
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,35 @@
source:
type: unitycatalog-usage
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 10
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/databricks_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/databricks_usage
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -22,10 +22,6 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo
DatabricksConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.models import (
LineageColumnStreams,
LineageTableStreams,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
@ -33,8 +29,6 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
API_TIMEOUT = 10
QUERIES_PATH = "/sql/history/queries"
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
class DatabricksClient:
@ -216,55 +210,3 @@ class DatabricksClient:
logger.error(exc)
return job_runs
def get_table_lineage(self, table_name: str) -> LineageTableStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
}
response = self.client.get(
f"{self.base_url}{TABLE_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageTableStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageTableStreams()
def get_column_lineage(
self, table_name: str, column_name: str
) -> LineageColumnStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
"column_name": column_name,
}
response = self.client.get(
f"{self.base_url}{COLUMN_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageColumnStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageColumnStreams()

View File

@ -13,9 +13,8 @@
Source connection handler
"""
from functools import partial
from typing import Optional, Union
from typing import Optional
from databricks.sdk import WorkspaceClient
from sqlalchemy.engine import Engine
from sqlalchemy.exc import DatabaseError
from sqlalchemy.inspection import inspect
@ -37,11 +36,9 @@ from metadata.ingestion.connections.test_connections import (
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.models import DatabricksTable
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
)
from metadata.utils.db_utils import get_host_from_host_port
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -52,17 +49,11 @@ def get_connection_url(connection: DatabricksConnection) -> str:
return url
def get_connection(connection: DatabricksConnection) -> Union[Engine, WorkspaceClient]:
def get_connection(connection: DatabricksConnection) -> Engine:
"""
Create connection
"""
if connection.useUnityCatalog:
return WorkspaceClient(
host=get_host_from_host_port(connection.hostPort),
token=connection.token.get_secret_value(),
)
if connection.httpPath:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
@ -77,7 +68,7 @@ def get_connection(connection: DatabricksConnection) -> Union[Engine, WorkspaceC
def test_connection(
metadata: OpenMetadata,
connection: Union[Engine, WorkspaceClient],
connection: Engine,
service_connection: DatabricksConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
@ -99,49 +90,19 @@ def test_connection(
except DatabaseError as soe:
logger.debug(f"Failed to fetch catalogs due to: {soe}")
if service_connection.useUnityCatalog:
table_obj = DatabricksTable()
def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable):
for catalog in connection.catalogs.list():
table_obj.catalog_name = catalog.name
break
def get_schemas(connection: WorkspaceClient, table_obj: DatabricksTable):
for schema in connection.schemas.list(catalog_name=table_obj.catalog_name):
table_obj.schema_name = schema.name
break
def get_tables(connection: WorkspaceClient, table_obj: DatabricksTable):
for table in connection.tables.list(
catalog_name=table_obj.catalog_name, schema_name=table_obj.schema_name
):
table_obj.name = table.name
break
test_fn = {
"CheckAccess": connection.catalogs.list,
"GetDatabases": partial(get_catalogs, connection, table_obj),
"GetSchemas": partial(get_schemas, connection, table_obj),
"GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access,
}
else:
inspector = inspect(connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_database_query,
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}
inspector = inspect(connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_database_query,
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}
test_connection_steps(
metadata=metadata,

View File

@ -1,51 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Databricks lineage module
"""
import traceback
from datetime import datetime
from typing import Iterator
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksLineageLegacySource(DatabricksQueryParserSource, LineageSource):
"""
Databricks Lineage Legacy Source
"""
def yield_table_query(self) -> Iterator[TableQuery]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")

View File

@ -1,339 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Databricks legacy source module"""
import re
import traceback
from copy import deepcopy
from typing import Iterable, Optional
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection
from sqlalchemy.exc import DatabaseError
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_VIEW_DEFINITIONS,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_view_definitions,
get_view_definition_wrapper,
)
logger = ingestion_logger()
class STRUCT(String):
# This class is added to support STRUCT datatype
"""The SQL STRUCT type."""
__visit_name__ = "STRUCT"
class ARRAY(String):
# This class is added to support ARRAY datatype
"""The SQL ARRAY type."""
__visit_name__ = "ARRAY"
class MAP(String):
# This class is added to support MAP datatype
"""The SQL MAP type."""
__visit_name__ = "MAP"
# overriding pyhive.sqlalchemy_hive._type_map
# mapping struct, array & map to custom classed instead of sqltypes.String
_type_map.update(
{
"struct": STRUCT,
"array": ARRAY,
"map": MAP,
"void": create_sqlalchemy_type("VOID"),
"interval": create_sqlalchemy_type("INTERVAL"),
"binary": create_sqlalchemy_type("BINARY"),
}
)
def _get_column_rows(self, connection, table_name, schema):
# get columns and strip whitespace
table_columns = self._get_table_columns( # pylint: disable=protected-access
connection, table_name, schema
)
column_rows = [
[col.strip() if col else None for col in row] for row in table_columns
]
# Filter out empty rows and comment
return [row for row in column_rows if row[0] and row[0] != "# col_name"]
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
to add support for struct, array & map datatype
Extract the Database Name from the keyword arguments parameter if it is present. This
value should match what is provided in the 'source.config.database' field in the
Databricks ingest config file.
"""
db_name = kw["db_name"] if "db_name" in kw else None
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for col_name, col_type, _comment in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_col_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType
col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
}
if col_type in {"array", "struct", "map"}:
if db_name and schema:
rows = dict(
connection.execute(
f"DESCRIBE {db_name}.{schema}.{table_name} {col_name}"
).fetchall()
)
else:
rows = dict(
connection.execute(
f"DESCRIBE {schema}.{table_name} {col_name}"
if schema
else f"DESCRIBE {table_name} {col_name}"
).fetchall()
)
col_info["system_data_type"] = rows["data_type"]
col_info["is_complex"] = True
result.append(col_info)
return result
@reflection.cache
def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument
# Equivalent to SHOW DATABASES
if kw.get("database") and kw.get("is_old_version") is not True:
connection.execute(f"USE CATALOG '{kw.get('database')}'")
return [row[0] for row in connection.execute("SHOW SCHEMAS")]
def get_schema_names_reflection(self, **kw):
"""Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw)
return []
def get_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
view_in_schema = connection.execute(query)
views = []
for row in view_in_schema:
# check number of columns in result
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
# else it is hive with 1 column in the result
if len(row) > 1:
views.append(row[1])
else:
views.append(row[0])
return views
@reflection.cache
def get_table_comment( # pylint: disable=unused-argument
self, connection, table_name, schema_name, **kw
):
"""
Returns comment of table
"""
cursor = connection.execute(
DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=schema_name, table_name=table_name
)
)
try:
for result in list(cursor):
data = result.values()
if data[0] and data[0].strip() == "Comment":
return {"text": data[1] if data and data[1] else None}
except Exception:
return {"text": None}
return {"text": None}
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument
):
schema_name = [row[0] for row in connection.execute("SHOW SCHEMAS")]
if "information_schema" in schema_name:
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DATABRICKS_VIEW_DEFINITIONS,
)
return None
DatabricksDialect.get_table_comment = get_table_comment
DatabricksDialect.get_view_names = get_view_names
DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
DatabricksDialect.get_view_definition = get_view_definition
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
reflection.Inspector.get_schema_names = get_schema_names_reflection
class DatabricksLegacySource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Databricks Source using
the legacy hive metastore method
"""
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)
self.is_older_version = False
self._init_version()
def _init_version(self):
try:
self.connection.execute(DATABRICKS_GET_CATALOGS).fetchone()
self.is_older_version = False
except DatabaseError as soe:
logger.debug(f"Failed to fetch catalogs due to: {soe}")
self.is_older_version = True
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata)
def set_inspector(self, database_name: str) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
logger.info(f"Ingesting from catalog: {database_name}")
new_service_connection = deepcopy(self.service_connection)
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
def get_configured_database(self) -> Optional[str]:
return self.service_connection.catalog
def get_database_names_raw(self) -> Iterable[str]:
if not self.is_older_version:
results = self.connection.execute(DATABRICKS_GET_CATALOGS)
for res in results:
if res:
row = list(res)
yield row[0]
else:
yield DEFAULT_DATABASE
def get_database_names(self) -> Iterable[str]:
configured_catalog = self.service_connection.catalog
if configured_catalog:
self.set_inspector(database_name=configured_catalog)
yield configured_catalog
else:
for new_catalog in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service,
database_name=new_catalog,
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_catalog,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
logger.error(traceback.format_exc())
logger.warning(
f"Error trying to process database {new_catalog}: {exc}"
)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names(
database=self.context.database,
is_old_version=self.is_older_version,
):
yield schema_name

View File

@ -11,38 +11,41 @@
"""
Databricks lineage module
"""
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.legacy.lineage import (
DatabricksLineageLegacySource,
)
from metadata.ingestion.source.database.databricks.unity_catalog.lineage import (
DatabricksUnityCatalogLineageSource,
import traceback
from datetime import datetime
from typing import Iterator
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksLineageSource:
class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
"""
Databricks Lineage Source
Databricks Lineage Legacy Source
"""
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
if not connection.useUnityCatalog:
return DatabricksLineageLegacySource(config, metadata)
return DatabricksUnityCatalogLineageSource(config, metadata)
def yield_table_query(self) -> Iterator[TableQuery]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")

View File

@ -8,8 +8,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Databricks source module"""
"""Databricks legacy source module"""
import re
import traceback
from copy import deepcopy
from typing import Iterable, Optional
from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
from sqlalchemy.engine import reflection
from sqlalchemy.exc import DatabaseError
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.sqltypes import String
from sqlalchemy_databricks._dialect import DatabricksDialect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
@ -18,23 +32,236 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.legacy.metadata import (
DatabricksLegacySource,
)
from metadata.ingestion.source.database.databricks.unity_catalog.metadata import (
DatabricksUnityCatalogSource,
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_VIEW_DEFINITIONS,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_view_definitions,
get_view_definition_wrapper,
)
logger = ingestion_logger()
class DatabricksSource:
class STRUCT(String):
# This class is added to support STRUCT datatype
"""The SQL STRUCT type."""
__visit_name__ = "STRUCT"
class ARRAY(String):
# This class is added to support ARRAY datatype
"""The SQL ARRAY type."""
__visit_name__ = "ARRAY"
class MAP(String):
# This class is added to support MAP datatype
"""The SQL MAP type."""
__visit_name__ = "MAP"
# overriding pyhive.sqlalchemy_hive._type_map
# mapping struct, array & map to custom classed instead of sqltypes.String
_type_map.update(
{
"struct": STRUCT,
"array": ARRAY,
"map": MAP,
"void": create_sqlalchemy_type("VOID"),
"interval": create_sqlalchemy_type("INTERVAL"),
"binary": create_sqlalchemy_type("BINARY"),
}
)
def _get_column_rows(self, connection, table_name, schema):
# get columns and strip whitespace
table_columns = self._get_table_columns( # pylint: disable=protected-access
connection, table_name, schema
)
column_rows = [
[col.strip() if col else None for col in row] for row in table_columns
]
# Filter out empty rows and comment
return [row for row in column_rows if row[0] and row[0] != "# col_name"]
@reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw):
"""
This function overrides the sqlalchemy_databricks._dialect.DatabricksDialect.get_columns
to add support for struct, array & map datatype
Extract the Database Name from the keyword arguments parameter if it is present. This
value should match what is provided in the 'source.config.database' field in the
Databricks ingest config file.
"""
db_name = kw["db_name"] if "db_name" in kw else None
rows = _get_column_rows(self, connection, table_name, schema)
result = []
for col_name, col_type, _comment in rows:
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in ("# Partition Information", "# Partitioning"):
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_col_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType
col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
}
if col_type in {"array", "struct", "map"}:
if db_name and schema:
rows = dict(
connection.execute(
f"DESCRIBE {db_name}.{schema}.{table_name} {col_name}"
).fetchall()
)
else:
rows = dict(
connection.execute(
f"DESCRIBE {schema}.{table_name} {col_name}"
if schema
else f"DESCRIBE {table_name} {col_name}"
).fetchall()
)
col_info["system_data_type"] = rows["data_type"]
col_info["is_complex"] = True
result.append(col_info)
return result
@reflection.cache
def get_schema_names(self, connection, **kw): # pylint: disable=unused-argument
# Equivalent to SHOW DATABASES
if kw.get("database") and kw.get("is_old_version") is not True:
connection.execute(f"USE CATALOG '{kw.get('database')}'")
return [row[0] for row in connection.execute("SHOW SCHEMAS")]
def get_schema_names_reflection(self, **kw):
"""Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_schema_names(conn, info_cache=self.info_cache, **kw)
return []
def get_view_names(
self, connection, schema=None, **kw
): # pylint: disable=unused-argument
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
view_in_schema = connection.execute(query)
views = []
for row in view_in_schema:
# check number of columns in result
# if it is > 1, we use spark thrift server with 3 columns in the result (schema, table, is_temporary)
# else it is hive with 1 column in the result
if len(row) > 1:
views.append(row[1])
else:
views.append(row[0])
return views
@reflection.cache
def get_table_comment( # pylint: disable=unused-argument
self, connection, table_name, schema_name, **kw
):
"""
Returns comment of table
"""
cursor = connection.execute(
DATABRICKS_GET_TABLE_COMMENTS.format(
schema_name=schema_name, table_name=table_name
)
)
try:
for result in list(cursor):
data = result.values()
if data[0] and data[0].strip() == "Comment":
return {"text": data[1] if data and data[1] else None}
except Exception:
return {"text": None}
return {"text": None}
@reflection.cache
def get_view_definition(
self, connection, table_name, schema=None, **kw # pylint: disable=unused-argument
):
schema_name = [row[0] for row in connection.execute("SHOW SCHEMAS")]
if "information_schema" in schema_name:
return get_view_definition_wrapper(
self,
connection,
table_name=table_name,
schema=schema,
query=DATABRICKS_VIEW_DEFINITIONS,
)
return None
DatabricksDialect.get_table_comment = get_table_comment
DatabricksDialect.get_view_names = get_view_names
DatabricksDialect.get_columns = get_columns
DatabricksDialect.get_schema_names = get_schema_names
DatabricksDialect.get_view_definition = get_view_definition
DatabricksDialect.get_all_view_definitions = get_all_view_definitions
reflection.Inspector.get_schema_names = get_schema_names_reflection
class DatabricksSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Databricks Source
Database metadata from Databricks Source using
the legacy hive metastore method
"""
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata)
self.is_older_version = False
self._init_version()
def _init_version(self):
try:
self.connection.execute(DATABRICKS_GET_CATALOGS).fetchone()
self.is_older_version = False
except DatabaseError as soe:
logger.debug(f"Failed to fetch catalogs due to: {soe}")
self.is_older_version = True
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -43,6 +270,70 @@ class DatabricksSource:
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
if not connection.useUnityCatalog:
return DatabricksLegacySource(config, metadata)
return DatabricksUnityCatalogSource(config, metadata)
return cls(config, metadata)
def set_inspector(self, database_name: str) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
logger.info(f"Ingesting from catalog: {database_name}")
new_service_connection = deepcopy(self.service_connection)
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
def get_configured_database(self) -> Optional[str]:
return self.service_connection.catalog
def get_database_names_raw(self) -> Iterable[str]:
if not self.is_older_version:
results = self.connection.execute(DATABRICKS_GET_CATALOGS)
for res in results:
if res:
row = list(res)
yield row[0]
else:
yield DEFAULT_DATABASE
def get_database_names(self) -> Iterable[str]:
configured_catalog = self.service_connection.catalog
if configured_catalog:
self.set_inspector(database_name=configured_catalog)
yield configured_catalog
else:
for new_catalog in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service,
database_name=new_catalog,
)
if filter_by_database(
self.source_config.databaseFilterPattern,
database_fqn
if self.source_config.useFqnForFiltering
else new_catalog,
):
self.status.filter(database_fqn, "Database Filtered Out")
continue
try:
self.set_inspector(database_name=new_catalog)
yield new_catalog
except Exception as exc:
logger.error(traceback.format_exc())
logger.warning(
f"Error trying to process database {new_catalog}: {exc}"
)
def get_raw_database_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names(
database=self.context.database,
is_old_version=self.is_older_version,
):
yield schema_name

View File

@ -35,8 +35,12 @@ class DatabricksQueryParserSource(QueryParserSource, ABC):
filters: str
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
def _init_super(self, config: WorkflowSource, metadata: OpenMetadata):
super().__init__(config, metadata, False)
# pylint: disable=super-init-not-called
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self._init_super(config=config, metadata=metadata)
self.client = DatabricksClient(self.service_connection)
@classmethod

View File

@ -13,7 +13,7 @@ Databricks usage module
"""
import traceback
from datetime import datetime
from typing import Iterable, Optional
from typing import Iterable
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.database.databricks.query_parser import (
@ -30,7 +30,7 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource):
Databricks Usage Source
"""
def yield_table_queries(self) -> Optional[Iterable[TableQuery]]:
def yield_table_queries(self) -> Iterable[TableQuery]:
"""
Method to yield TableQueries
"""

View File

@ -0,0 +1,87 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client to interact with databricks apis
"""
import json
import traceback
from metadata.ingestion.source.database.databricks.client import (
API_TIMEOUT,
DatabricksClient,
)
from metadata.ingestion.source.database.unitycatalog.models import (
LineageColumnStreams,
LineageTableStreams,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
class UnityCatalogClient(DatabricksClient):
"""
UnityCatalogClient creates a Databricks connection based on DatabricksCredentials.
"""
def get_table_lineage(self, table_name: str) -> LineageTableStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
}
response = self.client.get(
f"{self.base_url}{TABLE_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageTableStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageTableStreams()
def get_column_lineage(
self, table_name: str, column_name: str
) -> LineageColumnStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
"column_name": column_name,
}
response = self.client.get(
f"{self.base_url}{COLUMN_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageColumnStreams(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return LineageColumnStreams()

View File

@ -0,0 +1,97 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source connection handler
"""
from functools import partial
from typing import Optional
from databricks.sdk import WorkspaceClient
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.models import DatabricksTable
from metadata.utils.db_utils import get_host_from_host_port
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def get_connection_url(connection: UnityCatalogConnection) -> str:
url = f"{connection.scheme.value}://token:{connection.token.get_secret_value()}@{connection.hostPort}"
return url
def get_connection(connection: UnityCatalogConnection) -> WorkspaceClient:
"""
Create connection
"""
return WorkspaceClient(
host=get_host_from_host_port(connection.hostPort),
token=connection.token.get_secret_value(),
)
def test_connection(
metadata: OpenMetadata,
connection: WorkspaceClient,
service_connection: UnityCatalogConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
client = UnityCatalogClient(service_connection)
table_obj = DatabricksTable()
def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable):
for catalog in connection.catalogs.list():
table_obj.catalog_name = catalog.name
break
def get_schemas(connection: WorkspaceClient, table_obj: DatabricksTable):
for schema in connection.schemas.list(catalog_name=table_obj.catalog_name):
table_obj.schema_name = schema.name
break
def get_tables(connection: WorkspaceClient, table_obj: DatabricksTable):
for table in connection.tables.list(
catalog_name=table_obj.catalog_name, schema_name=table_obj.schema_name
):
table_obj.name = table.name
break
test_fn = {
"CheckAccess": connection.catalogs.list,
"GetDatabases": partial(get_catalogs, connection, table_obj),
"GetSchemas": partial(get_schemas, connection, table_obj),
"GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access,
}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
timeout_seconds=service_connection.connectionTimeout,
)

View File

@ -16,8 +16,8 @@ from typing import Iterable, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
@ -33,18 +33,18 @@ from metadata.ingestion.api.steps import InvalidSourceException, Source
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import LineageTableStreams
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
from metadata.ingestion.source.database.unitycatalog.models import LineageTableStreams
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DatabricksUnityCatalogLineageSource(Source):
class UnitycatalogLineageSource(Source):
"""
Databricks Lineage Unity Catalog Source
Lineage Unity Catalog Source
"""
def __init__(
@ -57,7 +57,7 @@ class DatabricksUnityCatalogLineageSource(Source):
self.metadata = metadata
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config = self.config.sourceConfig.config
self.client = DatabricksClient(self.service_connection)
self.client = UnityCatalogClient(self.service_connection)
self.connection_obj = get_connection(self.service_connection)
self.test_connection()
@ -75,10 +75,10 @@ class DatabricksUnityCatalogLineageSource(Source):
def create(cls, config_dict, metadata: OpenMetadata):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
connection: UnityCatalogConnection = config.serviceConnection.__root__.config
if not isinstance(connection, UnityCatalogConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
f"Expected UnityCatalogConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -38,8 +38,8 @@ from metadata.generated.schema.entity.data.table import (
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
@ -54,15 +54,15 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import (
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
from metadata.ingestion.source.database.unitycatalog.models import (
ColumnJson,
ElementType,
ForeignConstrains,
Type,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
@ -85,7 +85,7 @@ def from_dict(cls, dct: Dict[str, Any]) -> "TableConstraintList":
TableConstraintList.from_dict = from_dict
class DatabricksUnityCatalogSource(DatabaseServiceSource, MultiDBSource):
class UnitycatalogSource(DatabaseServiceSource, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Databricks Source using
@ -100,7 +100,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource, MultiDBSource):
)
self.context.table_views = []
self.metadata = metadata
self.service_connection: DatabricksConnection = (
self.service_connection: UnityCatalogConnection = (
self.config.serviceConnection.__root__.config
)
self.client = get_connection(self.service_connection)
@ -118,10 +118,10 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource, MultiDBSource):
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatabricksConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatabricksConnection):
connection: UnityCatalogConnection = config.serviceConnection.__root__.config
if not isinstance(connection, UnityCatalogConnection):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
f"Expected UnityCatalogConnection, but got {connection}"
)
return cls(config, metadata)
@ -233,7 +233,7 @@ class DatabricksUnityCatalogSource(DatabaseServiceSource, MultiDBSource):
)
)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
def get_tables_name_and_type(self) -> Iterable[Tuple[str, str]]:
"""
Handle table and views.

View File

@ -0,0 +1,60 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
UnityCatalog Query parser module
"""
from abc import ABC
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.query_parser import (
DatabricksQueryParserSource,
)
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class UnityCatalogQueryParserSource(
DatabricksQueryParserSource, QueryParserSource, ABC
):
"""
UnityCatalog Query Parser Source
This class would be inheriting all the methods
from DatabricksQueryParserSource
"""
filters: str
# pylint: disable=super-init-not-called
def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
self._init_super(config=config, metadata=metadata)
self.client = UnityCatalogClient(self.service_connection)
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: UnityCatalogConnection = config.serviceConnection.__root__.config
if not isinstance(connection, UnityCatalogConnection):
raise InvalidSourceException(
f"Expected UnityCatalogConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,31 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
unity catalog usage module
"""
from metadata.ingestion.source.database.databricks.usage import DatabricksUsageSource
from metadata.ingestion.source.database.unitycatalog.query_parser import (
UnityCatalogQueryParserSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class UnitycatalogUsageSource(UnityCatalogQueryParserSource, DatabricksUsageSource):
"""
UnityCatalog Usage Source
This class would be inheriting all the methods from
DatabricksUsageSource as both the sources would call
the same API for fetching Usage Queries
"""

View File

@ -254,7 +254,7 @@ class DatabricksUnitTest(TestCase):
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection"
)
@patch(
"metadata.ingestion.source.database.databricks.legacy.metadata.DatabricksLegacySource._init_version"
"metadata.ingestion.source.database.databricks.metadata.DatabricksSource._init_version"
)
def __init__(self, methodName, test_connection, db_init_version) -> None:
super().__init__(methodName)

View File

@ -49,14 +49,7 @@ To deploy OpenMetadata, check the Deployment guides.
## Unity Catalog
If you are using unity catalog in Data Bricks, You can enable "use Unity catalog" option while configuring the DataBricks connection.
OpenMetadata extracts following metadata from Unity Catalog
1. Table and Column Descriptions
2. Ownership
3. Column Level Lineage
4. Queries
If you are using unity catalog in Databricks, then checkout the [Unity Catalog](/connectors/database/unity-catalog) connector.
## Metadata Ingestion
@ -81,7 +74,6 @@ OpenMetadata extracts following metadata from Unity Catalog
- **connectionTimeout**: The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.
- **Catalog**: Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.
- **DatabaseSchema**: databaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.
- **Use Unity Catalog**: Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
{% partial file="/v1.2/connectors/database/advanced-configuration.md" /%}

View File

@ -115,12 +115,6 @@ This is a sample config for Databricks:
{% /codeInfo %}
{% codeInfo srNumber=35 %}
**useUnityCatalog**: Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
{% /codeInfo %}
{% partial file="/v1.2/connectors/yaml/database/source-config-def.md" /%}
@ -174,9 +168,6 @@ source:
```yaml {% srNumber=6 %}
connectionTimeout: 120
```
```yaml {% srNumber=35 %}
useUnityCatalog: true
```
```yaml {% srNumber=7 %}
# connectionOptions:
# key: value

View File

@ -32,6 +32,7 @@ This is the supported list of connectors for Database Services:
- [SingleStore](/connectors/database/singlestore)
- [Snowflake](/connectors/database/snowflake)
- [Trino](/connectors/database/trino)
- [Unity Catalog](/connectors/database/unity-catalog)
- [Vertica](/connectors/database/vertica)
If you have a request for a new connector, don't hesitate to reach out in [Slack](https://slack.open-metadata.org/) or

View File

@ -0,0 +1,87 @@
---
title: Unity Catalog
slug: /connectors/database/unity-catalog
---
# Unity Catalog
{% multiTablesWrapper %}
| Feature | Status |
| :----------------- | :--------------------------- |
| Stage | PROD |
| Metadata | {% icon iconName="check" /%} |
| Query Usage | {% icon iconName="check" /%} |
| Data Profiler | {% icon iconName="cross" /%} |
| Data Quality | {% icon iconName="check" /%} |
| Stored Procedures | {% icon iconName="cross" /%} |
| DBT | {% icon iconName="check" /%} |
| Supported Versions | Databricks Runtime Version 9+ |
| Feature | Status |
| :----------- | :--------------------------- |
| Lineage | {% icon iconName="check" /%} |
| Table-level | {% icon iconName="check" /%} |
| Column-level | {% icon iconName="check" /%} |
{% /multiTablesWrapper %}
In this section, we provide guides and references to use the Unity Catalog connector.
Configure and schedule Unity Catalog metadata workflow from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage](/connectors/ingestion/workflows/usage)
- [Data Quality](/connectors/ingestion/workflows/data-quality)
- [Lineage](/connectors/ingestion/lineage)
- [dbt Integration](/connectors/ingestion/workflows/dbt)
{% partial file="/v1.2/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/unity-catalog/yaml"} /%}
{% partial file="/v1.2/connectors/external-ingestion-deployment.md" /%}
{%inlineCallout icon="description" bold="OpenMetadata 0.12 or later" href="/deployment"%}
To deploy OpenMetadata, check the Deployment guides.
{%/inlineCallout%}
## Metadata Ingestion
{% partial
file="/v1.2/connectors/metadata-ingestion-ui.md"
variables={
connector: "Unity Catalog",
selectServicePath: "/images/v1.2/connectors/unitycatalog/select-service.png",
addNewServicePath: "/images/v1.2/connectors/unitycatalog/add-new-service.png",
serviceConnectionPath: "/images/v1.2/connectors/unitycatalog/service-connection.png",
}
/%}
{% stepsContainer %}
{% extraContent parentTagName="stepsContainer" %}
#### Connection Details
- **Host and Port**: Enter the fully qualified hostname and port number for your Databricks deployment in the Host and Port field.
- **Token**: Generated Token to connect to Databricks.
- **HTTP Path**: Databricks compute resources URL.
- **connectionTimeout**: The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.
- **Catalog**: Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.
- **DatabaseSchema**: databaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.
{% partial file="/v1.2/connectors/database/advanced-configuration.md" /%}
{% /extraContent %}
{% partial file="/v1.2/connectors/test-connection.md" /%}
{% partial file="/v1.2/connectors/database/configure-ingestion.md" /%}
{% partial file="/v1.2/connectors/ingestion-schedule-and-deploy.md" /%}
{% /stepsContainer %}
{% partial file="/v1.2/connectors/troubleshooting.md" /%}
{% partial file="/v1.2/connectors/database/related.md" /%}

View File

@ -0,0 +1,81 @@
---
title: Unity Catalog Connector Troubleshooting
slug: /connectors/database/unity-catalog/troubleshooting
---
# Troubleshooting
## Unity Catalog connection details
```
source:
type: unitycatalog
serviceName: local_unity_catalog
serviceConnection:
config:
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth
```
Here are the steps to get `hostPort`, `token` and `http_path`.
First login to Azure Databricks and from side bar select SQL Warehouse (In SQL section)
{% image
src="/images/v1.2/connectors/unitycatalog/select-sql-warehouse.png"
alt="Select Sql Warehouse"
caption="Select Sql Warehouse" /%}
Now click on sql Warehouse from the SQL Warehouses list.
{% image
src="/images/v1.2/connectors/unitycatalog/Open-sql-warehouse.png"
alt="Open Sql Warehouse"
caption="Open Sql Warehouse" /%}
Now inside that page go to Connection details section.
In this page Server hostname and Port is your `hostPort`, HTTP path is your `http_path`.
{% image
src="/images/v1.2/connectors/unitycatalog/Connection-details.png"
alt="Connection details"
caption="Connection details" /%}
In Connection details section page click on Create a personal access token.
{% image
src="/images/v1.2/connectors/unitycatalog/Open-create-token-page.png"
alt="Open create token"
caption="Open create token" /%}
Now In this page you can create new `token`.
{% image
src="/images/v1.2/connectors/unitycatalog/Generate-token.png"
alt="Generate token"
caption="Generate token" /%}

View File

@ -0,0 +1,200 @@
---
title: Run the Unity Catalog Connector Externally
slug: /connectors/database/unity-catalog/yaml
---
# Run the Unity Catalog Connector Externally
{% multiTablesWrapper %}
| Feature | Status |
| :----------------- | :--------------------------- |
| Stage | PROD |
| Metadata | {% icon iconName="check" /%} |
| Query Usage | {% icon iconName="check" /%} |
| Data Profiler | {% icon iconName="cross" /%} |
| Data Quality | {% icon iconName="check" /%} |
| Stored Procedures | {% icon iconName="cross" /%} |
| DBT | {% icon iconName="check" /%} |
| Supported Versions | Databricks Runtime Version 9+|
| Feature | Status |
| :----------- | :--------------------------- |
| Lineage | {% icon iconName="check" /%} |
| Table-level | {% icon iconName="check" /%} |
| Column-level | {% icon iconName="check" /%} |
{% /multiTablesWrapper %}
In this section, we provide guides and references to use the Unity Catalog connector.
Configure and schedule Unity Catalog metadata workflow from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage](#query-usage)
- [Lineage](#lineage)
- [dbt Integration](#dbt-integration)
{% partial file="/v1.2/connectors/external-ingestion-deployment.md" /%}
## Requirements
{%inlineCallout icon="description" bold="OpenMetadata 0.12 or later" href="/deployment"%}
To deploy OpenMetadata, check the Deployment guides.
{%/inlineCallout%}
### Python Requirements
To run the Unity Catalog ingestion, you will need to install:
```bash
pip3 install "openmetadata-ingestion[databricks]"
```
## Metadata Ingestion
All connectors are defined as JSON Schemas.
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/databricksConnection.json)
you can find the structure to create a connection to Databricks.
In order to create and run a Metadata Ingestion workflow, we will follow
the steps to create a YAML configuration able to connect to the source,
process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)
### 1. Define the YAML Config
This is a sample config for Unity Catalog:
{% codePreview %}
{% codeInfoContainer %}
#### Source Configuration - Service Connection
{% codeInfo srNumber=1 %}
**catalog**: Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**databaseSchema**: DatabaseSchema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single databaseSchema. When left blank, OpenMetadata Ingestion attempts to scan all the databaseSchema.
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**hostPort**: Enter the fully qualified hostname and port number for your Databricks deployment in the Host and Port field.
{% /codeInfo %}
{% codeInfo srNumber=4 %}
**token**: Generated Token to connect to Databricks.
{% /codeInfo %}
{% codeInfo srNumber=5 %}
**httpPath**: Databricks compute resources URL.
{% /codeInfo %}
{% codeInfo srNumber=6 %}
**connectionTimeout**: The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.
{% /codeInfo %}
{% partial file="/v1.2/connectors/yaml/database/source-config-def.md" /%}
{% partial file="/v1.2/connectors/yaml/ingestion-sink-def.md" /%}
{% partial file="/v1.2/connectors/yaml/workflow-config-def.md" /%}
#### Advanced Configuration
{% codeInfo srNumber=7 %}
**Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Athena during the connection. These details must be added as Key-Value pairs.
{% /codeInfo %}
{% codeInfo srNumber=8 %}
**Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Athena during the connection. These details must be added as Key-Value pairs.
- In case you are using Single-Sign-On (SSO) for authentication, add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "sso_login_url"`
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="filename.yaml" %}
```yaml
source:
type: unitycatalog
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
```
```yaml {% srNumber=1 %}
catalog: hive_metastore
```
```yaml {% srNumber=2 %}
databaseSchema: default
```
```yaml {% srNumber=3 %}
token: <databricks token>
```
```yaml {% srNumber=4 %}
hostPort: <databricks connection host & port>
```
```yaml {% srNumber=5 %}
httpPath: <http path of databricks cluster>
```
```yaml {% srNumber=6 %}
connectionTimeout: 120
```
```yaml {% srNumber=7 %}
# connectionOptions:
# key: value
```
```yaml {% srNumber=8 %}
# connectionArguments:
# key: value
```
{% partial file="/v1.2/connectors/yaml/database/source-config.md" /%}
{% partial file="/v1.2/connectors/yaml/ingestion-sink.md" /%}
{% partial file="/v1.2/connectors/yaml/workflow-config.md" /%}
{% /codeBlock %}
{% /codePreview %}
{% partial file="/v1.2/connectors/yaml/ingestion-cli.md" /%}
{% partial file="/v1.2/connectors/yaml/query-usage.md" variables={connector: "unitycatalog"} /%}
## Lineage
You can learn more about how to ingest lineage [here](/connectors/ingestion/workflows/lineage).
## dbt Integration
You can learn more about how to ingest dbt models' definitions and their lineage [here](/connectors/ingestion/workflows/dbt).

View File

@ -62,6 +62,7 @@ the following docs to run the Ingestion Framework in any orchestrator externally
- [Snowflake](/connectors/database/snowflake)
- [SQLite](/connectors/database/sqlite)
- [Trino](/connectors/database/trino)
- [Unity Catalog](/connectors/database/unity-catalog)
- [Vertica](/connectors/database/vertica)
## Dashboard Services

View File

@ -351,6 +351,12 @@ site_menu:
url: /connectors/database/trino
- category: Connectors / Database / Trino / Run Externally
url: /connectors/database/trino/yaml
- category: Connectors / Database / Unity Catalog
url: /connectors/database/unity-catalog
- category: Connectors / Database / Unity Catalog / Run Externally
url: /connectors/database/unity-catalog/yaml
- category: Connectors / Database / Unity Catalog / Troubleshooting
url: /connectors/database/unity-catalog/troubleshooting
- category: Connectors / Database / Vertica
url: /connectors/database/vertica
- category: Connectors / Database / Vertica / Run Externally

Binary file not shown.

After

Width:  |  Height:  |  Size: 293 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 444 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 294 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 161 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 538 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 968 KiB

View File

@ -0,0 +1,44 @@
{
"name": "UnityCatalog",
"displayName": "UnityCatalog Test Connection",
"description": "This Test Connection validates the access against the database and basic metadata extraction of schemas and tables.",
"steps": [
{
"name": "CheckAccess",
"description": "Validate that we can properly reach the database and authenticate with the given credentials.",
"errorMessage": "Failed to connect to unity catalog, please validate to token, http path & hostport",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetDatabases",
"description": "List all the databases available to the user.",
"errorMessage": "Failed to fetch databases, please validate if the user has enough privilege to fetch databases.",
"mandatory": true
},
{
"name": "GetSchemas",
"description": "List all the schemas available to the user.",
"errorMessage": "Failed to fetch schemas, please validate if the user has enough privilege to fetch schemas.",
"mandatory": true
},
{
"name": "GetTables",
"description": "From a given schema, list the tables belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch tables, please validate if the user has enough privilege to fetch tables.",
"mandatory": true
},
{
"name": "GetViews",
"description": "From a given schema, list the views belonging to that schema. If no schema is specified, we'll list the tables of a random schema.",
"errorMessage": "Failed to fetch views, please validate if the user has enough privilege to fetch views.",
"mandatory": false
},
{
"name": "GetQueries",
"description": "Check if we can access the queries form `https://{your_host}/api/2.0/sql/history/queries` API. NOTE: To access this api you must have a premium subscription to unity catalog.",
"errorMessage": "Failed to fetch queries, please validate if the user has access to `https://{your_host}/api/2.0/sql/history/queries` API.",
"mandatory": false
}
]
}

View File

@ -64,12 +64,6 @@
"type": "integer",
"default": 120
},
"useUnityCatalog": {
"title": "Use Unity Catalog",
"description": "Use unity catalog for fetching the metadata instead of using the hive metastore",
"type": "boolean",
"default": false
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"

View File

@ -0,0 +1,104 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/unityCatalogConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UnityCatalogConnection",
"description": "UnityCatalog Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.database.UnityCatalogConnection",
"definitions": {
"databricksType": {
"description": "Service type.",
"type": "string",
"enum": ["UnityCatalog"],
"default": "UnityCatalog"
},
"databricksScheme": {
"description": "SQLAlchemy driver scheme options.",
"type": "string",
"enum": ["databricks+connector"],
"default": "databricks+connector"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/databricksType",
"default": "Databricks"
},
"scheme": {
"title": "Connection Scheme",
"description": "SQLAlchemy driver scheme options.",
"$ref": "#/definitions/databricksScheme",
"default": "databricks+connector"
},
"hostPort": {
"title": "Host and Port",
"description": "Host and port of the Databricks service.",
"type": "string"
},
"token": {
"title": "Token",
"description": "Generated Token to connect to Databricks.",
"type": "string",
"format": "password"
},
"httpPath": {
"title": "Http Path",
"description": "Databricks compute resources URL.",
"type": "string"
},
"catalog": {
"title": "Catalog",
"description": "Catalog of the data source(Example: hive_metastore). This is optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalog.",
"type": "string"
},
"databaseSchema": {
"title": "Database Schema",
"description": "Database Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.",
"type": "string"
},
"connectionTimeout": {
"title": "Connection Timeout",
"description": "The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.",
"type": "integer",
"default": 120
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDatabase": {
"title": "Supports Database",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
},
"supportsQueryComment": {
"title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
},
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
}
},
"additionalProperties": false,
"required": ["hostPort", "token"]
}

View File

@ -50,7 +50,8 @@
"MongoDB",
"Couchbase",
"Greenplum",
"Doris"
"Doris",
"UnityCatalog"
],
"javaEnums": [
{
@ -160,7 +161,11 @@
},
{
"name": "Doris"
},
{
"name": "UnityCatalog"
}
]
},
"databaseConnection": {
@ -274,6 +279,9 @@
},
{
"$ref": "./connections/database/dorisConnection.json"
},
{
"$ref": "./connections/database/unityCatalogConnection.json"
}
]
}

View File

@ -59,11 +59,6 @@ The maximum amount of time (in seconds) to wait for a successful connection to t
If your connection fails because your cluster has not had enough time to start, you can try updating this parameter with a bigger number.
$$
$$section
### Use Unity Catalog $(id="useUnityCatalog")
Enable this flag to extract the metadata and lineage information using databricks unity catalog instead of using legacy hive metastore. When you enable this flag make sure you have enabled the unity catalog on your instance.
$$
$$section
### Connection Options $(id="connectionOptions")
Additional connection options to build the URL that can be sent to service during the connection.

View File

@ -0,0 +1,70 @@
# Unity Catalog
In this section, we provide guides and references to use the Unity Catalog connector. You can view the full documentation for Unity Catalog [here](https://docs.open-metadata.org/connectors/database/unity-catalog).
## Requirements
To learn more about the Unity Catalog Connection Details (`hostPort`,`token`, `http_path`) information visit these [docs](https://docs.open-metadata.org/connectors/database/unity-catalog/troubleshooting).
$$note
We support Databricks runtime version 9 and above.
$$
### Usage & Lineage
$$note
To get Query Usage and Lineage details, you need a Databricks Premium account, since we will be extracting this information from your SQL Warehouse's history API.
$$
You can find further information on the Unity Catalog connector in the [docs](https://docs.open-metadata.org/connectors/database/unity-catalog).
## Connection Details
$$section
### Scheme $(id="scheme")
SQLAlchemy driver scheme options. If you are unsure about this setting, you can use the default value.
$$
$$section
### Host Port $(id="hostPort")
This parameter specifies the host and port of the Databricks instance. This should be specified as a string in the format `hostname:port`. For example, you might set the hostPort parameter to `adb-xyz.azuredatabricks.net:443`.
If you are running the OpenMetadata ingestion in a docker and your services are hosted on the `localhost`, then use `host.docker.internal:3000` as the value.
$$
$$section
### Token $(id="token")
Generated Token to connect to Databricks. E.g., `dapw488e89a7176f7eb39bbc718617891564`.
$$
$$section
### HTTP Path $(id="httpPath")
Databricks compute resources URL. E.g., `/sql/1.0/warehouses/xyz123`.
$$
$$section
### Catalog $(id="catalog")
Catalog of the data source. This is an optional parameter, if you would like to restrict the metadata reading to a single catalog. When left blank, OpenMetadata Ingestion attempts to scan all the catalogs. E.g., `hive_metastore`
$$
$$section
### Database Schema $(id="databaseSchema")
Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.
$$
$$section
### Connection Timeout $(id="connectionTimeout")
The maximum amount of time (in seconds) to wait for a successful connection to the data source. If the connection attempt takes longer than this timeout period, an error will be returned.
If your connection fails because your cluster has not had enough time to start, you can try updating this parameter with a bigger number.
$$
$$section
### Connection Options $(id="connectionOptions")
Additional connection options to build the URL that can be sent to service during the connection.
$$
$$section
### Connection Arguments $(id="connectionArguments")
Additional connection arguments such as security or protocol configs that can be sent to service during connection.
$$

View File

@ -142,6 +142,7 @@ export const METABASE = metabase;
export const AZURESQL = azuresql;
export const CLICKHOUSE = clickhouse;
export const DATABRICK = databrick;
export const UNITYCATALOG = databrick;
export const IBMDB2 = ibmdb2;
export const DORIS = doris;
export const DRUID = druid;

View File

@ -47,6 +47,7 @@ import singleStoreConnection from '../jsons/connectionSchemas/connections/databa
import snowflakeConnection from '../jsons/connectionSchemas/connections/database/snowflakeConnection.json';
import sqliteConnection from '../jsons/connectionSchemas/connections/database/sqliteConnection.json';
import trinoConnection from '../jsons/connectionSchemas/connections/database/trinoConnection.json';
import unityCatalogConnection from '../jsons/connectionSchemas/connections/database/unityCatalogConnection.json';
import verticaConnection from '../jsons/connectionSchemas/connections/database/verticaConnection.json';
export const getDatabaseConfig = (type: DatabaseServiceType) => {
@ -224,6 +225,11 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => {
break;
}
case DatabaseServiceType.UnityCatalog: {
schema = unityCatalogConnection;
break;
}
default: {
schema = {};

View File

@ -81,6 +81,7 @@ import {
TABLEAU,
TOPIC_DEFAULT,
TRINO,
UNITYCATALOG,
VERTICA,
} from '../constants/Services.constant';
import { StorageServiceType } from '../generated/entity/data/container';
@ -199,6 +200,9 @@ class ServiceUtilClassBase {
case DatabaseServiceType.Databricks:
return DATABRICK;
case DatabaseServiceType.UnityCatalog:
return UNITYCATALOG;
case DatabaseServiceType.Db2:
return IBMDB2;