Added Databricks pipeline Lineage (#22014)

This commit is contained in:
Suman Maharana 2025-06-30 10:41:22 +05:30 committed by GitHub
parent b4cd7b7046
commit e36e5da26e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 589 additions and 29 deletions

View File

@ -14,14 +14,23 @@ Client to interact with databricks apis
import json
import traceback
from datetime import timedelta
from typing import Iterable, List
from typing import Iterable, List, Optional, Tuple, Union
import requests
from sqlalchemy import text
from sqlalchemy.engine import Engine
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import (
DatabricksPipelineConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB,
DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
@ -30,6 +39,8 @@ logger = ingestion_logger()
API_TIMEOUT = 10
PAGE_SIZE = 100
QUERIES_PATH = "/sql/history/queries"
API_VERSION = "/api/2.0"
JOB_API_VERSION = "/api/2.1"
class DatabricksClientException(Exception):
@ -43,30 +54,58 @@ class DatabricksClient:
DatabricksClient creates a Databricks connection based on DatabricksCredentials.
"""
def __init__(self, config: DatabricksConnection):
def __init__(
self,
config: Union[DatabricksConnection, DatabricksPipelineConnection],
engine: Optional[Engine] = None,
):
self.config = config
base_url, *_ = self.config.hostPort.split(":")
api_version = "/api/2.0"
job_api_version = "/api/2.1"
auth_token = self.config.token.get_secret_value()
self.base_url = f"https://{base_url}{api_version}"
self.base_url = f"https://{base_url}{API_VERSION}"
self.base_query_url = f"{self.base_url}{QUERIES_PATH}"
self.base_job_url = f"https://{base_url}{job_api_version}/jobs"
self.base_job_url = f"https://{base_url}{JOB_API_VERSION}/jobs"
self.jobs_list_url = f"{self.base_job_url}/list"
self.jobs_run_list_url = f"{self.base_job_url}/runs/list"
self.headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
}
self.api_timeout = self.config.connectionTimeout or 120
self.job_table_lineage: dict[str, list[dict[str, str]]] = {}
self.job_column_lineage: dict[
str, dict[Tuple[str, str], list[Tuple[str, str]]]
] = {}
self.engine = engine
self.client = requests
def test_query_api_access(self) -> None:
res = self.client.get(
self.base_query_url, headers=self.headers, timeout=API_TIMEOUT
self.base_query_url, headers=self.headers, timeout=self.api_timeout
)
if res.status_code != 200:
raise APIError(res.json)
def test_lineage_query(self) -> None:
try:
with self.engine.connect() as connection:
test_table_lineage = connection.execute(
text(DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB + " LIMIT 1")
)
test_column_lineage = connection.execute(
text(DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB + " LIMIT 1")
)
# Check if queries executed successfully by fetching results
table_result = test_table_lineage.fetchone()
column_result = test_column_lineage.fetchone()
logger.info("Lineage queries executed successfully")
except Exception as exc:
logger.debug(f"Error testing lineage queries: {traceback.format_exc()}")
raise DatabricksClientException(
f"Failed to test lineage queries Make sure you have access"
"to the tables table_lineage and column_lineage: {exc}"
)
def _run_query_paginator(self, data, result, end_time, response):
while True:
if response:
@ -85,7 +124,7 @@ class DatabricksClient:
self.base_query_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
yield from response.get("res") or []
@ -117,7 +156,7 @@ class DatabricksClient:
self.base_query_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
result = response.get("res") or []
@ -145,7 +184,7 @@ class DatabricksClient:
self.jobs_list_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
)
if response.status_code != 200:
raise DatabricksClientException(response.text)
@ -154,6 +193,7 @@ class DatabricksClient:
"""
Method returns List all the created jobs in a Databricks Workspace
"""
self.cache_lineage()
try:
iteration_count = 1
data = {"limit": PAGE_SIZE, "expand_tasks": True, "offset": 0}
@ -162,7 +202,7 @@ class DatabricksClient:
self.jobs_list_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
yield from response.get("jobs") or []
@ -174,7 +214,7 @@ class DatabricksClient:
self.jobs_list_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
iteration_count += 1
yield from response.get("jobs") or []
@ -200,7 +240,7 @@ class DatabricksClient:
self.jobs_run_list_url,
params=params,
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
yield from response.get("runs") or []
@ -212,7 +252,7 @@ class DatabricksClient:
self.jobs_run_list_url,
params=params,
headers=self.headers,
timeout=API_TIMEOUT,
timeout=self.api_timeout,
).json()
yield from response.get("runs") or []
@ -220,3 +260,98 @@ class DatabricksClient:
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
def get_table_lineage(self, job_id: str) -> List[dict[str, str]]:
"""
Method returns table lineage for a job by the specified job_id
"""
try:
return self.job_table_lineage.get(str(job_id))
except Exception as exc:
logger.debug(
f"Error getting table lineage for job {job_id} due to {traceback.format_exc()}"
)
logger.error(exc)
return []
def get_column_lineage(
self, job_id: str, TableKey: Tuple[str, str]
) -> List[Tuple[str, str]]:
"""
Method returns column lineage for a job by the specified job_id and table key
"""
try:
return self.job_column_lineage.get(str(job_id), {}).get(TableKey)
except Exception as exc:
logger.debug(
f"Error getting column lineage for table {TableKey} due to {traceback.format_exc()}"
)
logger.error(exc)
return []
def run_lineage_query(self, query: str) -> List[dict]:
"""
Method runs a lineage query and returns the result
"""
try:
with self.engine.connect() as connection:
result = connection.execute(text(query))
return result
except Exception as exc:
logger.debug(f"Error caching table lineage due to {traceback.format_exc()}")
logger.error(exc)
return []
def cache_lineage(self):
"""
Method caches table and column lineage for a job by the specified job_id
"""
logger.info(f"Caching table lineage")
table_lineage = self.run_lineage_query(DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB)
if table_lineage:
for row in table_lineage:
try:
if row.job_id not in self.job_table_lineage:
self.job_table_lineage[row.job_id] = []
self.job_table_lineage[row.job_id].append(
{
"source_table_full_name": row.source_table_full_name,
"target_table_full_name": row.target_table_full_name,
}
)
except Exception as exc:
logger.debug(
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
# Not every job has column lineage, so we need to check if the job exists in the column_lineage table
# we will cache the column lineage for jobs that have column lineage
logger.info("Caching column lineage")
column_lineage = self.run_lineage_query(DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB)
if column_lineage:
for row in column_lineage:
try:
table_key = (
row.source_table_full_name,
row.target_table_full_name,
)
column_pair = (
row.source_column_name,
row.target_column_name,
)
if row.job_id not in self.job_column_lineage:
self.job_column_lineage[row.job_id] = {}
if table_key not in self.job_column_lineage[row.job_id]:
self.job_column_lineage[row.job_id][table_key] = []
self.job_column_lineage[row.job_id][table_key].append(column_pair)
except Exception as exc:
logger.debug(
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue

View File

@ -86,3 +86,29 @@ DATABRICKS_GET_COLUMN_TAGS = textwrap.dedent(
)
DATABRICKS_DDL = "SHOW CREATE TABLE `{table_name}`"
DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB = textwrap.dedent(
"""
SELECT DISTINCT
entity_id as job_id,
entity_type as job_type,
source_table_full_name as source_table_full_name,
target_table_full_name as target_table_full_name
FROM system.access.table_lineage
WHERE entity_type ILIKE 'job'
"""
)
DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB = textwrap.dedent(
"""
SELECT DISTINCT
entity_id as job_id,
entity_type as job_type,
source_table_full_name as source_table_full_name,
source_column_name as source_column_name,
target_table_full_name as target_table_full_name,
target_column_name as target_column_name
FROM system.access.column_lineage
WHERE entity_type ILIKE 'job'
"""
)

View File

@ -24,17 +24,38 @@ from metadata.generated.schema.entity.services.connections.pipeline.databricksPi
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
init_empty_connection_arguments,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.utils.constants import THREE_MIN
def get_connection_url(connection: DatabricksPipelineConnection) -> str:
url = f"databricks+connector://token:{connection.token.get_secret_value()}@{connection.hostPort}"
return url
def get_connection(connection: DatabricksPipelineConnection) -> DatabricksClient:
"""
Create connection
"""
return DatabricksClient(connection)
if connection.httpPath:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
connection.connectionArguments.root["http_path"] = connection.httpPath
engine = create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url,
get_connection_args_fn=get_connection_args_common,
)
return DatabricksClient(connection, engine)
def test_connection(
@ -49,7 +70,10 @@ def test_connection(
of a metadata workflow or during an Automation Workflow
"""
test_fn = {"GetPipelines": client.list_jobs_test_connection}
test_fn = {
"GetPipelines": client.list_jobs_test_connection,
"GetLineage": client.test_lineage_query,
}
return test_connection_steps(
metadata=metadata,

View File

@ -14,7 +14,7 @@ Databricks pipeline source to extract metadata
"""
import traceback
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Tuple
from pydantic import ValidationError
@ -27,6 +27,7 @@ from metadata.generated.schema.entity.data.pipeline import (
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import (
DatabricksPipelineConnection,
)
@ -43,8 +44,16 @@ from metadata.generated.schema.type.basic import (
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.databrickspipeline.models import (
@ -54,7 +63,6 @@ from metadata.ingestion.source.pipeline.databrickspipeline.models import (
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -200,21 +208,15 @@ class DatabrickspipelineSource(PipelineServiceSource):
executionStatus=STATUS_MAP.get(
run.state.result_state, StatusType.Failed
),
startTime=Timestamp(
convert_timestamp_to_milliseconds(run.start_time)
),
endTime=Timestamp(
convert_timestamp_to_milliseconds(run.end_time)
),
startTime=Timestamp(run.start_time),
endTime=Timestamp(run.end_time) if run.end_time else None,
logLink=run.run_page_url,
)
for task in run.tasks or []
]
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(
convert_timestamp_to_milliseconds(run.start_time)
),
timestamp=Timestamp(run.start_time),
executionStatus=STATUS_MAP.get(
run.state.result_state,
StatusType.Failed,
@ -235,13 +237,173 @@ class DatabrickspipelineSource(PipelineServiceSource):
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_fqn,
name=pipeline_details.job_id,
error=f"Failed to yield pipeline status: {exc}",
stackTrace=traceback.format_exc(),
)
)
def _process_and_validate_column_lineage(
self,
column_lineage: List[Tuple[str, str]],
from_entity: Table,
to_entity: Table,
) -> List[ColumnLineage]:
"""
Process and validate column lineage
"""
processed_column_lineage = []
if column_lineage:
for column_tuple in column_lineage or []:
try:
if len(column_tuple) < 2:
logger.debug(f"Skipping invalid column tuple: {column_tuple}")
continue
source_col = column_tuple[0]
target_col = column_tuple[-1]
if not source_col or not target_col:
logger.debug(
f"Skipping column tuple with empty values: source={source_col}, "
f"target={target_col}, to_entity={to_entity.name}"
)
continue
from_column = get_column_fqn(
table_entity=from_entity, column=str(source_col)
)
to_column = get_column_fqn(
table_entity=to_entity,
column=str(target_col),
)
if from_column and to_column:
processed_column_lineage.append(
ColumnLineage(
fromColumns=[from_column],
toColumn=to_column,
)
)
except Exception as err:
logger.warning(
f"Error processing column lineage {column_tuple}: {err}"
)
logger.debug(traceback.format_exc())
continue
if not processed_column_lineage:
logger.warning(
f"No column lineage found for {from_entity.name} to {to_entity.name}"
)
return processed_column_lineage or []
def yield_pipeline_lineage_details(
self, pipeline_details: DataBrickPipelineDetails
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage between pipeline and data sources. Not Implemented."""
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
pipeline_entity = self.metadata.get_by_name(
entity=Pipeline, fqn=pipeline_fqn
)
table_lineage_list = self.client.get_table_lineage(
job_id=pipeline_details.job_id
)
if table_lineage_list:
for table_lineage in table_lineage_list:
source_table_full_name = table_lineage.get("source_table_full_name")
target_table_full_name = table_lineage.get("target_table_full_name")
if source_table_full_name and target_table_full_name:
source = fqn.split_table_name(source_table_full_name)
target = fqn.split_table_name(target_table_full_name)
for dbservicename in self.get_db_service_names() or ["*"]:
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=source.get("table"),
database_name=source.get("database"),
schema_name=source.get("database_schema"),
service_name=dbservicename,
),
)
if from_entity is None:
continue
to_entity = self.metadata.get_by_name(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=target.get("table"),
database_name=target.get("database"),
schema_name=target.get("database_schema"),
service_name=dbservicename,
),
)
if to_entity is None:
continue
processed_column_lineage = (
self._process_and_validate_column_lineage(
column_lineage=self.client.get_column_lineage(
job_id=pipeline_details.job_id,
TableKey=(
source_table_full_name,
target_table_full_name,
),
),
from_entity=from_entity,
to_entity=to_entity,
)
)
lineage_details = LineageDetails(
pipeline=EntityReference(
id=pipeline_entity.id.root, type="pipeline"
),
source=LineageSource.PipelineLineage,
columnsLineage=processed_column_lineage,
)
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id,
type="table",
),
toEntity=EntityReference(
id=to_entity.id,
type="table",
),
lineageDetails=lineage_details,
)
)
)
else:
logger.debug(
f"No source or target table full name found for job {pipeline_details.job_id}"
)
else:
logger.debug(
f"No table lineage found for job {pipeline_details.job_id}"
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.job_id,
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)

View File

@ -13,17 +13,20 @@ Databricks Pipeline utils tests
"""
import json
import uuid
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
@ -33,8 +36,14 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.databrickspipeline.metadata import (
DatabrickspipelineSource,
)
@ -67,6 +76,7 @@ mock_databricks_config = {
"type": "DatabricksPipeline",
"token": "random_token",
"hostPort": "localhost:443",
"connectionTimeout": 120,
"connectionArguments": {
"http_path": "sql/1.0/endpoints/path",
},
@ -198,6 +208,36 @@ EXPECTED_PIPELINE_STATUS = [
PIPELINE_LIST = [DataBrickPipelineDetails(**data) for data in mock_data]
EXPECTED_PIPELINE_LINEAGE = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="cced5342-12e8-45fb-b50a-918529d43ed1", type="table"
),
toEntity=EntityReference(
id="6f5ad342-12e8-45fb-b50a-918529d43ed1", type="table"
),
lineageDetails=LineageDetails(
columnsLineage=[
ColumnLineage(
fromColumns=[
FullyQualifiedEntityName(
root="local_table.dev.table_1.column_1"
)
],
toColumn=FullyQualifiedEntityName(
root="local_table.dev.table_2.column_2"
),
)
],
pipeline=EntityReference(
id="1fa49082-a32c-4e71-ba4a-6a111b489ed6",
type="pipeline",
),
source="PipelineLineage",
),
)
)
class DatabricksPipelineTests(TestCase):
"""
@ -224,6 +264,9 @@ class DatabricksPipelineTests(TestCase):
self.databricks.context.get().__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root
self.databricks.metadata = OpenMetadata(
config.workflowConfig.openMetadataServerConfig
)
@patch(
"metadata.ingestion.source.database.databricks.client.DatabricksClient.list_jobs"
@ -256,3 +299,141 @@ class DatabricksPipelineTests(TestCase):
)
]
self.assertEqual(pipeline_status, EXPECTED_PIPELINE_STATUS)
def test_databricks_pipeline_lineage(self):
self.databricks.context.get().__dict__["pipeline"] = "11223344"
self.databricks.context.get().__dict__[
"pipeline_service"
] = "databricks_pipeline_test"
mock_pipeline = Pipeline(
id=uuid.uuid4(),
name="11223344",
fullyQualifiedName="databricks_pipeline_test.11223344",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
)
# Create source and target tables
mock_source_table = Table(
id="cced5342-12e8-45fb-b50a-918529d43ed1",
name="table_1",
fullyQualifiedName="local_table.dev.table_1",
database=EntityReference(id=uuid.uuid4(), type="database"),
columns=[
Column(
name="column_1",
fullyQualifiedName="local_table.dev.table_1.column_1",
dataType="VARCHAR",
)
],
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
)
mock_target_table = Table(
id="6f5ad342-12e8-45fb-b50a-918529d43ed1",
name="table_2",
fullyQualifiedName="local_table.dev.table_2",
database=EntityReference(id=uuid.uuid4(), type="database"),
columns=[
Column(
name="column_2",
fullyQualifiedName="local_table.dev.table_2.column_2",
dataType="VARCHAR",
)
],
databaseSchema=EntityReference(id=uuid.uuid4(), type="databaseSchema"),
)
with patch.object(self.databricks.metadata, "get_by_name") as mock_get_by_name:
def get_by_name_side_effect(entity, fqn):
if entity == Pipeline:
if fqn == "databricks_pipeline_test.11223344":
return mock_pipeline
elif entity == Table:
if "table_1" in fqn:
return mock_source_table
elif "table_2" in fqn:
return mock_target_table
return None
mock_get_by_name.side_effect = get_by_name_side_effect
with patch.object(
self.databricks.client, "get_table_lineage"
) as mock_get_table_lineage:
mock_get_table_lineage.return_value = [
{
"source_table_full_name": "local_table.dev.table_1",
"target_table_full_name": "local_table.dev.table_2",
}
]
with patch.object(
self.databricks.client, "get_column_lineage"
) as mock_get_column_lineage:
mock_get_column_lineage.return_value = [
("column_1", "column_2"),
("column_3", "column_4"),
]
lineage_details = list(
self.databricks.yield_pipeline_lineage_details(
DataBrickPipelineDetails(**mock_data[0])
)
)[0].right
self.assertEqual(
lineage_details.edge.fromEntity.id,
EXPECTED_PIPELINE_LINEAGE.edge.fromEntity.id,
)
self.assertEqual(
lineage_details.edge.toEntity.id,
EXPECTED_PIPELINE_LINEAGE.edge.toEntity.id,
)
self.assertEqual(
lineage_details.edge.lineageDetails.columnsLineage,
EXPECTED_PIPELINE_LINEAGE.edge.lineageDetails.columnsLineage,
)
with patch.object(self.databricks.metadata, "get_by_name") as mock_get_by_name:
def get_by_name_side_effect(entity, fqn):
if entity == Pipeline:
if fqn == "databricks_pipeline_test.11223344":
return mock_pipeline
elif entity == Table:
if "table_1" in fqn:
return mock_source_table
elif "table_2" in fqn:
return mock_target_table
return None
mock_get_by_name.side_effect = get_by_name_side_effect
with patch.object(
self.databricks.client, "get_table_lineage"
) as mock_get_table_lineage:
mock_get_table_lineage.return_value = [
{
"source_table_full_name": "local_table.dev.table_1",
"target_table_full_name": "local_table.dev.table_2",
}
]
with patch.object(
self.databricks.client, "get_column_lineage"
) as mock_get_column_lineage:
mock_get_column_lineage.return_value = [] # No column lineage
lineage_details = list(
self.databricks.yield_pipeline_lineage_details(
DataBrickPipelineDetails(**mock_data[0])
)
)[0].right
self.assertEqual(
lineage_details.edge.fromEntity.id,
EXPECTED_PIPELINE_LINEAGE.edge.fromEntity.id,
)
self.assertEqual(
lineage_details.edge.toEntity.id,
EXPECTED_PIPELINE_LINEAGE.edge.toEntity.id,
)
self.assertEqual(
lineage_details.edge.lineageDetails.columnsLineage,
[],
)

View File

@ -9,6 +9,13 @@
"errorMessage": "Failed to fetch pipelines, please validate the credentials or validate if user has access to fetch pipelines",
"shortCircuit": true,
"mandatory": true
},
{
"name": "GetLineage",
"description": "Get lineage for the pipelines",
"errorMessage": "Failed to fetch lineage, please validate the credentials or validate if user has access to system.access.table_lineage and system.access.column_lineage tables",
"shortCircuit": false,
"mandatory": false
}
]
}

View File

@ -33,6 +33,12 @@
"type": "string",
"format": "password"
},
"connectionTimeout": {
"title": "Connection Timeout",
"description": "Connection timeout in seconds.",
"type": "number",
"default": 120
},
"httpPath": {
"title": "Http Path",
"description": "Databricks compute resources URL.",

View File

@ -8,6 +8,16 @@ To learn more about the Databricks Connection Details (`hostPort`,`token`, `http
You can find further information on the Databricks Pipeline connector in the [docs](https://docs.open-metadata.org/connectors/pipeline/databrickspipeline).
## Lineage Requirements
To enable lineage extraction for Databricks pipelines, the user associated with the provided token must have `SELECT` privileges on the following system tables:
- `system.access.table_lineage` - Required for table-level lineage information
- `system.access.column_lineage` - Required for column-level lineage information
Without these permissions, the lineage extraction step will fail with an access error. Ensure your Databricks workspace administrator has granted the necessary permissions to the user whose token is being used for the connection.
## Connection Details
$$section
@ -20,6 +30,11 @@ $$section
Generated Token to connect to Databricks. E.g., `dapw488e89a7176f7eb39bbc718617891564`.
$$
$$section
### Connection Timeout $(id="connectionTimeout")
Connection timeout in seconds. The default value is 120 seconds if not specified.
$$
$$section
### HTTP Path $(id="httpPath")
Databricks compute resources URL. E.g., `/sql/1.0/warehouses/xyz123`.

View File

@ -15,6 +15,10 @@
*/
export interface DatabricksPipelineConnection {
connectionArguments?: { [key: string]: any };
/**
* Connection timeout in seconds.
*/
connectionTimeout?: number;
/**
* Host and port of the Databricks service.
*/