diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index d2bf4d94fa0..2462de40ddd 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -14,14 +14,23 @@ Client to interact with databricks apis import json import traceback from datetime import timedelta -from typing import Iterable, List +from typing import Iterable, List, Optional, Tuple, Union import requests +from sqlalchemy import text +from sqlalchemy.engine import Engine from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import ( + DatabricksPipelineConnection, +) from metadata.ingestion.ometa.client import APIError +from metadata.ingestion.source.database.databricks.queries import ( + DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB, + DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB, +) from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger @@ -30,6 +39,8 @@ logger = ingestion_logger() API_TIMEOUT = 10 PAGE_SIZE = 100 QUERIES_PATH = "/sql/history/queries" +API_VERSION = "/api/2.0" +JOB_API_VERSION = "/api/2.1" class DatabricksClientException(Exception): @@ -43,30 +54,58 @@ class DatabricksClient: DatabricksClient creates a Databricks connection based on DatabricksCredentials. """ - def __init__(self, config: DatabricksConnection): + def __init__( + self, + config: Union[DatabricksConnection, DatabricksPipelineConnection], + engine: Optional[Engine] = None, + ): self.config = config base_url, *_ = self.config.hostPort.split(":") - api_version = "/api/2.0" - job_api_version = "/api/2.1" auth_token = self.config.token.get_secret_value() - self.base_url = f"https://{base_url}{api_version}" + self.base_url = f"https://{base_url}{API_VERSION}" self.base_query_url = f"{self.base_url}{QUERIES_PATH}" - self.base_job_url = f"https://{base_url}{job_api_version}/jobs" + self.base_job_url = f"https://{base_url}{JOB_API_VERSION}/jobs" self.jobs_list_url = f"{self.base_job_url}/list" self.jobs_run_list_url = f"{self.base_job_url}/runs/list" self.headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json", } + self.api_timeout = self.config.connectionTimeout or 120 + self.job_table_lineage: dict[str, list[dict[str, str]]] = {} + self.job_column_lineage: dict[ + str, dict[Tuple[str, str], list[Tuple[str, str]]] + ] = {} + self.engine = engine self.client = requests def test_query_api_access(self) -> None: res = self.client.get( - self.base_query_url, headers=self.headers, timeout=API_TIMEOUT + self.base_query_url, headers=self.headers, timeout=self.api_timeout ) if res.status_code != 200: raise APIError(res.json) + def test_lineage_query(self) -> None: + try: + with self.engine.connect() as connection: + test_table_lineage = connection.execute( + text(DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB + " LIMIT 1") + ) + test_column_lineage = connection.execute( + text(DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB + " LIMIT 1") + ) + # Check if queries executed successfully by fetching results + table_result = test_table_lineage.fetchone() + column_result = test_column_lineage.fetchone() + logger.info("Lineage queries executed successfully") + except Exception as exc: + logger.debug(f"Error testing lineage queries: {traceback.format_exc()}") + raise DatabricksClientException( + f"Failed to test lineage queries Make sure you have access" + "to the tables table_lineage and column_lineage: {exc}" + ) + def _run_query_paginator(self, data, result, end_time, response): while True: if response: @@ -85,7 +124,7 @@ class DatabricksClient: self.base_query_url, data=json.dumps(data), headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() yield from response.get("res") or [] @@ -117,7 +156,7 @@ class DatabricksClient: self.base_query_url, data=json.dumps(data), headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() result = response.get("res") or [] @@ -145,7 +184,7 @@ class DatabricksClient: self.jobs_list_url, data=json.dumps(data), headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ) if response.status_code != 200: raise DatabricksClientException(response.text) @@ -154,6 +193,7 @@ class DatabricksClient: """ Method returns List all the created jobs in a Databricks Workspace """ + self.cache_lineage() try: iteration_count = 1 data = {"limit": PAGE_SIZE, "expand_tasks": True, "offset": 0} @@ -162,7 +202,7 @@ class DatabricksClient: self.jobs_list_url, data=json.dumps(data), headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() yield from response.get("jobs") or [] @@ -174,7 +214,7 @@ class DatabricksClient: self.jobs_list_url, data=json.dumps(data), headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() iteration_count += 1 yield from response.get("jobs") or [] @@ -200,7 +240,7 @@ class DatabricksClient: self.jobs_run_list_url, params=params, headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() yield from response.get("runs") or [] @@ -212,7 +252,7 @@ class DatabricksClient: self.jobs_run_list_url, params=params, headers=self.headers, - timeout=API_TIMEOUT, + timeout=self.api_timeout, ).json() yield from response.get("runs") or [] @@ -220,3 +260,98 @@ class DatabricksClient: except Exception as exc: logger.debug(traceback.format_exc()) logger.error(exc) + + def get_table_lineage(self, job_id: str) -> List[dict[str, str]]: + """ + Method returns table lineage for a job by the specified job_id + """ + try: + return self.job_table_lineage.get(str(job_id)) + except Exception as exc: + logger.debug( + f"Error getting table lineage for job {job_id} due to {traceback.format_exc()}" + ) + logger.error(exc) + return [] + + def get_column_lineage( + self, job_id: str, TableKey: Tuple[str, str] + ) -> List[Tuple[str, str]]: + """ + Method returns column lineage for a job by the specified job_id and table key + """ + try: + return self.job_column_lineage.get(str(job_id), {}).get(TableKey) + except Exception as exc: + logger.debug( + f"Error getting column lineage for table {TableKey} due to {traceback.format_exc()}" + ) + logger.error(exc) + return [] + + def run_lineage_query(self, query: str) -> List[dict]: + """ + Method runs a lineage query and returns the result + """ + try: + with self.engine.connect() as connection: + result = connection.execute(text(query)) + return result + + except Exception as exc: + logger.debug(f"Error caching table lineage due to {traceback.format_exc()}") + logger.error(exc) + return [] + + def cache_lineage(self): + """ + Method caches table and column lineage for a job by the specified job_id + """ + logger.info(f"Caching table lineage") + table_lineage = self.run_lineage_query(DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB) + if table_lineage: + for row in table_lineage: + try: + if row.job_id not in self.job_table_lineage: + self.job_table_lineage[row.job_id] = [] + self.job_table_lineage[row.job_id].append( + { + "source_table_full_name": row.source_table_full_name, + "target_table_full_name": row.target_table_full_name, + } + ) + except Exception as exc: + logger.debug( + f"Error parsing row: {row} due to {traceback.format_exc()}" + ) + continue + + # Not every job has column lineage, so we need to check if the job exists in the column_lineage table + # we will cache the column lineage for jobs that have column lineage + logger.info("Caching column lineage") + column_lineage = self.run_lineage_query(DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB) + if column_lineage: + for row in column_lineage: + try: + table_key = ( + row.source_table_full_name, + row.target_table_full_name, + ) + column_pair = ( + row.source_column_name, + row.target_column_name, + ) + + if row.job_id not in self.job_column_lineage: + self.job_column_lineage[row.job_id] = {} + + if table_key not in self.job_column_lineage[row.job_id]: + self.job_column_lineage[row.job_id][table_key] = [] + + self.job_column_lineage[row.job_id][table_key].append(column_pair) + + except Exception as exc: + logger.debug( + f"Error parsing row: {row} due to {traceback.format_exc()}" + ) + continue diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py index c45afd47e2d..80b86819f70 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -86,3 +86,29 @@ DATABRICKS_GET_COLUMN_TAGS = textwrap.dedent( ) DATABRICKS_DDL = "SHOW CREATE TABLE `{table_name}`" + +DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB = textwrap.dedent( + """ + SELECT DISTINCT + entity_id as job_id, + entity_type as job_type, + source_table_full_name as source_table_full_name, + target_table_full_name as target_table_full_name + FROM system.access.table_lineage + WHERE entity_type ILIKE 'job' + """ +) + +DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB = textwrap.dedent( + """ + SELECT DISTINCT + entity_id as job_id, + entity_type as job_type, + source_table_full_name as source_table_full_name, + source_column_name as source_column_name, + target_table_full_name as target_table_full_name, + target_column_name as target_column_name + FROM system.access.column_lineage + WHERE entity_type ILIKE 'job' + """ +) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py index c6739b70e85..7164b481d24 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py @@ -24,17 +24,38 @@ from metadata.generated.schema.entity.services.connections.pipeline.databricksPi from metadata.generated.schema.entity.services.connections.testConnectionResult import ( TestConnectionResult, ) +from metadata.ingestion.connections.builders import ( + create_generic_db_connection, + get_connection_args_common, + init_empty_connection_arguments, +) from metadata.ingestion.connections.test_connections import test_connection_steps from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.databricks.client import DatabricksClient from metadata.utils.constants import THREE_MIN +def get_connection_url(connection: DatabricksPipelineConnection) -> str: + url = f"databricks+connector://token:{connection.token.get_secret_value()}@{connection.hostPort}" + return url + + def get_connection(connection: DatabricksPipelineConnection) -> DatabricksClient: """ Create connection """ - return DatabricksClient(connection) + + if connection.httpPath: + if not connection.connectionArguments: + connection.connectionArguments = init_empty_connection_arguments() + connection.connectionArguments.root["http_path"] = connection.httpPath + + engine = create_generic_db_connection( + connection=connection, + get_connection_url_fn=get_connection_url, + get_connection_args_fn=get_connection_args_common, + ) + return DatabricksClient(connection, engine) def test_connection( @@ -49,7 +70,10 @@ def test_connection( of a metadata workflow or during an Automation Workflow """ - test_fn = {"GetPipelines": client.list_jobs_test_connection} + test_fn = { + "GetPipelines": client.list_jobs_test_connection, + "GetLineage": client.test_lineage_query, + } return test_connection_steps( metadata=metadata, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py index 31b734f8ece..c32c89a5c69 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -14,7 +14,7 @@ Databricks pipeline source to extract metadata """ import traceback -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple from pydantic import ValidationError @@ -27,6 +27,7 @@ from metadata.generated.schema.entity.data.pipeline import ( Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import ( DatabricksPipelineConnection, ) @@ -43,8 +44,16 @@ from metadata.generated.schema.type.basic import ( SourceUrl, Timestamp, ) +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.databrickspipeline.models import ( @@ -54,7 +63,6 @@ from metadata.ingestion.source.pipeline.databrickspipeline.models import ( from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.logger import ingestion_logger -from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -200,21 +208,15 @@ class DatabrickspipelineSource(PipelineServiceSource): executionStatus=STATUS_MAP.get( run.state.result_state, StatusType.Failed ), - startTime=Timestamp( - convert_timestamp_to_milliseconds(run.start_time) - ), - endTime=Timestamp( - convert_timestamp_to_milliseconds(run.end_time) - ), + startTime=Timestamp(run.start_time), + endTime=Timestamp(run.end_time) if run.end_time else None, logLink=run.run_page_url, ) for task in run.tasks or [] ] pipeline_status = PipelineStatus( taskStatus=task_status, - timestamp=Timestamp( - convert_timestamp_to_milliseconds(run.start_time) - ), + timestamp=Timestamp(run.start_time), executionStatus=STATUS_MAP.get( run.state.result_state, StatusType.Failed, @@ -235,13 +237,173 @@ class DatabrickspipelineSource(PipelineServiceSource): except Exception as exc: yield Either( left=StackTraceError( - name=pipeline_fqn, + name=pipeline_details.job_id, error=f"Failed to yield pipeline status: {exc}", stackTrace=traceback.format_exc(), ) ) + def _process_and_validate_column_lineage( + self, + column_lineage: List[Tuple[str, str]], + from_entity: Table, + to_entity: Table, + ) -> List[ColumnLineage]: + """ + Process and validate column lineage + """ + processed_column_lineage = [] + if column_lineage: + for column_tuple in column_lineage or []: + try: + if len(column_tuple) < 2: + logger.debug(f"Skipping invalid column tuple: {column_tuple}") + continue + + source_col = column_tuple[0] + target_col = column_tuple[-1] + + if not source_col or not target_col: + logger.debug( + f"Skipping column tuple with empty values: source={source_col}, " + f"target={target_col}, to_entity={to_entity.name}" + ) + continue + + from_column = get_column_fqn( + table_entity=from_entity, column=str(source_col) + ) + to_column = get_column_fqn( + table_entity=to_entity, + column=str(target_col), + ) + if from_column and to_column: + processed_column_lineage.append( + ColumnLineage( + fromColumns=[from_column], + toColumn=to_column, + ) + ) + except Exception as err: + logger.warning( + f"Error processing column lineage {column_tuple}: {err}" + ) + logger.debug(traceback.format_exc()) + continue + if not processed_column_lineage: + logger.warning( + f"No column lineage found for {from_entity.name} to {to_entity.name}" + ) + return processed_column_lineage or [] + def yield_pipeline_lineage_details( self, pipeline_details: DataBrickPipelineDetails ) -> Iterable[Either[AddLineageRequest]]: - """Get lineage between pipeline and data sources. Not Implemented.""" + try: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + pipeline_entity = self.metadata.get_by_name( + entity=Pipeline, fqn=pipeline_fqn + ) + + table_lineage_list = self.client.get_table_lineage( + job_id=pipeline_details.job_id + ) + if table_lineage_list: + for table_lineage in table_lineage_list: + source_table_full_name = table_lineage.get("source_table_full_name") + target_table_full_name = table_lineage.get("target_table_full_name") + if source_table_full_name and target_table_full_name: + source = fqn.split_table_name(source_table_full_name) + target = fqn.split_table_name(target_table_full_name) + for dbservicename in self.get_db_service_names() or ["*"]: + + from_entity = self.metadata.get_by_name( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=source.get("table"), + database_name=source.get("database"), + schema_name=source.get("database_schema"), + service_name=dbservicename, + ), + ) + + if from_entity is None: + continue + + to_entity = self.metadata.get_by_name( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=target.get("table"), + database_name=target.get("database"), + schema_name=target.get("database_schema"), + service_name=dbservicename, + ), + ) + + if to_entity is None: + continue + + processed_column_lineage = ( + self._process_and_validate_column_lineage( + column_lineage=self.client.get_column_lineage( + job_id=pipeline_details.job_id, + TableKey=( + source_table_full_name, + target_table_full_name, + ), + ), + from_entity=from_entity, + to_entity=to_entity, + ) + ) + + lineage_details = LineageDetails( + pipeline=EntityReference( + id=pipeline_entity.id.root, type="pipeline" + ), + source=LineageSource.PipelineLineage, + columnsLineage=processed_column_lineage, + ) + + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id, + type="table", + ), + lineageDetails=lineage_details, + ) + ) + ) + + else: + logger.debug( + f"No source or target table full name found for job {pipeline_details.job_id}" + ) + else: + logger.debug( + f"No table lineage found for job {pipeline_details.job_id}" + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=pipeline_details.job_id, + error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py index 5e3ab452b7d..7333b846ec2 100644 --- a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py +++ b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py @@ -13,17 +13,20 @@ Databricks Pipeline utils tests """ import json +import uuid from pathlib import Path from unittest import TestCase from unittest.mock import patch from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( Pipeline, PipelineStatus, Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -33,8 +36,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.databrickspipeline.metadata import ( DatabrickspipelineSource, ) @@ -67,6 +76,7 @@ mock_databricks_config = { "type": "DatabricksPipeline", "token": "random_token", "hostPort": "localhost:443", + "connectionTimeout": 120, "connectionArguments": { "http_path": "sql/1.0/endpoints/path", }, @@ -198,6 +208,36 @@ EXPECTED_PIPELINE_STATUS = [ PIPELINE_LIST = [DataBrickPipelineDetails(**data) for data in mock_data] +EXPECTED_PIPELINE_LINEAGE = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id="cced5342-12e8-45fb-b50a-918529d43ed1", type="table" + ), + toEntity=EntityReference( + id="6f5ad342-12e8-45fb-b50a-918529d43ed1", type="table" + ), + lineageDetails=LineageDetails( + columnsLineage=[ + ColumnLineage( + fromColumns=[ + FullyQualifiedEntityName( + root="local_table.dev.table_1.column_1" + ) + ], + toColumn=FullyQualifiedEntityName( + root="local_table.dev.table_2.column_2" + ), + ) + ], + pipeline=EntityReference( + id="1fa49082-a32c-4e71-ba4a-6a111b489ed6", + type="pipeline", + ), + source="PipelineLineage", + ), + ) +) + class DatabricksPipelineTests(TestCase): """ @@ -224,6 +264,9 @@ class DatabricksPipelineTests(TestCase): self.databricks.context.get().__dict__[ "pipeline_service" ] = MOCK_PIPELINE_SERVICE.name.root + self.databricks.metadata = OpenMetadata( + config.workflowConfig.openMetadataServerConfig + ) @patch( "metadata.ingestion.source.database.databricks.client.DatabricksClient.list_jobs" @@ -256,3 +299,141 @@ class DatabricksPipelineTests(TestCase): ) ] self.assertEqual(pipeline_status, EXPECTED_PIPELINE_STATUS) + + def test_databricks_pipeline_lineage(self): + self.databricks.context.get().__dict__["pipeline"] = "11223344" + self.databricks.context.get().__dict__[ + "pipeline_service" + ] = "databricks_pipeline_test" + mock_pipeline = Pipeline( + id=uuid.uuid4(), + name="11223344", + fullyQualifiedName="databricks_pipeline_test.11223344", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + ) + + # Create source and target tables + mock_source_table = Table( + id="cced5342-12e8-45fb-b50a-918529d43ed1", + name="table_1", + fullyQualifiedName="local_table.dev.table_1", + database=EntityReference(id=uuid.uuid4(), type="database"), + columns=[ + Column( + name="column_1", + fullyQualifiedName="local_table.dev.table_1.column_1", + dataType="VARCHAR", + ) + ], + databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), + ) + + mock_target_table = Table( + id="6f5ad342-12e8-45fb-b50a-918529d43ed1", + name="table_2", + fullyQualifiedName="local_table.dev.table_2", + database=EntityReference(id=uuid.uuid4(), type="database"), + columns=[ + Column( + name="column_2", + fullyQualifiedName="local_table.dev.table_2.column_2", + dataType="VARCHAR", + ) + ], + databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"), + ) + + with patch.object(self.databricks.metadata, "get_by_name") as mock_get_by_name: + + def get_by_name_side_effect(entity, fqn): + if entity == Pipeline: + if fqn == "databricks_pipeline_test.11223344": + return mock_pipeline + elif entity == Table: + if "table_1" in fqn: + return mock_source_table + elif "table_2" in fqn: + return mock_target_table + return None + + mock_get_by_name.side_effect = get_by_name_side_effect + + with patch.object( + self.databricks.client, "get_table_lineage" + ) as mock_get_table_lineage: + mock_get_table_lineage.return_value = [ + { + "source_table_full_name": "local_table.dev.table_1", + "target_table_full_name": "local_table.dev.table_2", + } + ] + with patch.object( + self.databricks.client, "get_column_lineage" + ) as mock_get_column_lineage: + mock_get_column_lineage.return_value = [ + ("column_1", "column_2"), + ("column_3", "column_4"), + ] + lineage_details = list( + self.databricks.yield_pipeline_lineage_details( + DataBrickPipelineDetails(**mock_data[0]) + ) + )[0].right + self.assertEqual( + lineage_details.edge.fromEntity.id, + EXPECTED_PIPELINE_LINEAGE.edge.fromEntity.id, + ) + self.assertEqual( + lineage_details.edge.toEntity.id, + EXPECTED_PIPELINE_LINEAGE.edge.toEntity.id, + ) + self.assertEqual( + lineage_details.edge.lineageDetails.columnsLineage, + EXPECTED_PIPELINE_LINEAGE.edge.lineageDetails.columnsLineage, + ) + + with patch.object(self.databricks.metadata, "get_by_name") as mock_get_by_name: + + def get_by_name_side_effect(entity, fqn): + if entity == Pipeline: + if fqn == "databricks_pipeline_test.11223344": + return mock_pipeline + elif entity == Table: + if "table_1" in fqn: + return mock_source_table + elif "table_2" in fqn: + return mock_target_table + return None + + mock_get_by_name.side_effect = get_by_name_side_effect + + with patch.object( + self.databricks.client, "get_table_lineage" + ) as mock_get_table_lineage: + mock_get_table_lineage.return_value = [ + { + "source_table_full_name": "local_table.dev.table_1", + "target_table_full_name": "local_table.dev.table_2", + } + ] + with patch.object( + self.databricks.client, "get_column_lineage" + ) as mock_get_column_lineage: + mock_get_column_lineage.return_value = [] # No column lineage + lineage_details = list( + self.databricks.yield_pipeline_lineage_details( + DataBrickPipelineDetails(**mock_data[0]) + ) + )[0].right + self.assertEqual( + lineage_details.edge.fromEntity.id, + EXPECTED_PIPELINE_LINEAGE.edge.fromEntity.id, + ) + self.assertEqual( + lineage_details.edge.toEntity.id, + EXPECTED_PIPELINE_LINEAGE.edge.toEntity.id, + ) + self.assertEqual( + lineage_details.edge.lineageDetails.columnsLineage, + [], + ) diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/databrickspipeline.json b/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/databrickspipeline.json index ba07eff5ad7..786a6fb32dc 100644 --- a/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/databrickspipeline.json +++ b/openmetadata-service/src/main/resources/json/data/testConnections/pipeline/databrickspipeline.json @@ -9,6 +9,13 @@ "errorMessage": "Failed to fetch pipelines, please validate the credentials or validate if user has access to fetch pipelines", "shortCircuit": true, "mandatory": true + }, + { + "name": "GetLineage", + "description": "Get lineage for the pipelines", + "errorMessage": "Failed to fetch lineage, please validate the credentials or validate if user has access to system.access.table_lineage and system.access.column_lineage tables", + "shortCircuit": false, + "mandatory": false } ] } \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json index 4a0038e2c08..18eb2813bc9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json @@ -33,6 +33,12 @@ "type": "string", "format": "password" }, + "connectionTimeout": { + "title": "Connection Timeout", + "description": "Connection timeout in seconds.", + "type": "number", + "default": 120 + }, "httpPath": { "title": "Http Path", "description": "Databricks compute resources URL.", diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DatabricksPipeline.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DatabricksPipeline.md index 36a19ccc661..fb766e14a59 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DatabricksPipeline.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/DatabricksPipeline.md @@ -8,6 +8,16 @@ To learn more about the Databricks Connection Details (`hostPort`,`token`, `http You can find further information on the Databricks Pipeline connector in the [docs](https://docs.open-metadata.org/connectors/pipeline/databrickspipeline). +## Lineage Requirements + +To enable lineage extraction for Databricks pipelines, the user associated with the provided token must have `SELECT` privileges on the following system tables: + +- `system.access.table_lineage` - Required for table-level lineage information +- `system.access.column_lineage` - Required for column-level lineage information + +Without these permissions, the lineage extraction step will fail with an access error. Ensure your Databricks workspace administrator has granted the necessary permissions to the user whose token is being used for the connection. + + ## Connection Details $$section @@ -20,6 +30,11 @@ $$section Generated Token to connect to Databricks. E.g., `dapw488e89a7176f7eb39bbc718617891564`. $$ +$$section +### Connection Timeout $(id="connectionTimeout") +Connection timeout in seconds. The default value is 120 seconds if not specified. +$$ + $$section ### HTTP Path $(id="httpPath") Databricks compute resources URL. E.g., `/sql/1.0/warehouses/xyz123`. diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/databricksPipelineConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/databricksPipelineConnection.ts index d2129643477..2c132c0ca55 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/databricksPipelineConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/pipeline/databricksPipelineConnection.ts @@ -15,6 +15,10 @@ */ export interface DatabricksPipelineConnection { connectionArguments?: { [key: string]: any }; + /** + * Connection timeout in seconds. + */ + connectionTimeout?: number; /** * Host and port of the Databricks service. */