Fix airbyte pipeline lineage extraction (#21151)

This commit is contained in:
Mohit Tilala 2025-05-19 10:14:33 +05:30 committed by GitHub
parent f33cf42290
commit 4c0ce77756
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 385 additions and 56 deletions

View File

@ -97,7 +97,7 @@ class ESMixin(Generic[T]):
fqdn_search = (
"/search/fieldQuery?fieldName={field_name}&fieldValue={field_value}&from={from_}"
"&size={size}&index={index}"
"&size={size}&index={index}&deleted=false"
)
# sort_field needs to be unique for the pagination to work, so we can use the FQN
@ -125,14 +125,20 @@ class ESMixin(Generic[T]):
if response:
if fields:
fields = fields.split(",")
return [
self.get_by_name(
entities = []
for hit in response["hits"]["hits"]:
entity = self.get_by_name(
entity=entity_type,
fqn=hit["_source"]["fullyQualifiedName"],
fields=fields,
)
for hit in response["hits"]["hits"]
] or None
if entity is None:
continue
entities.append(entity)
return entities or None
return None

View File

@ -0,0 +1,29 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Constants for Airbyte
"""
from enum import Enum
class AirbyteSource(Enum):
MYSQL = "MySQL"
POSTGRES = "Postgres"
MSSQL = "Microsoft SQL Server (MSSQL)"
MONGODB = "MongoDb"
class AirbyteDestination(Enum):
MYSQL = "MySQL"
POSTGRES = "Postgres"
MSSQL = "MS SQL Server"

View File

@ -29,7 +29,6 @@ from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import (
AirbyteConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -46,12 +45,16 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.openlineage.models import TableDetails
from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
from .utils import get_destination_table_details, get_source_table_details
logger = ingestion_logger()
@ -189,6 +192,26 @@ class AirbyteSource(PipelineServiceSource):
)
)
def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]:
"""
Get the FQN of the table
"""
try:
if self.get_db_service_names():
return self._get_table_fqn_from_om(table_details)
return fqn.build(
metadata=self.metadata,
entity_type=Table,
service_name="*",
database_name=table_details.database,
schema_name=table_details.schema,
table_name=table_details.name,
)
except FQNNotFoundException:
return None
# pylint: disable=too-many-locals
def yield_pipeline_lineage_details(
self, pipeline_details: AirbytePipelineDetails
) -> Iterable[Either[AddLineageRequest]]:
@ -197,47 +220,65 @@ class AirbyteSource(PipelineServiceSource):
:param pipeline_details: pipeline_details object from airbyte
:return: Lineage from inlets and outlets
"""
pipeline_name = pipeline_details.connection.get("name")
source_connection = self.client.get_source(
pipeline_details.connection.get("sourceId")
)
destination_connection = self.client.get_destination(
pipeline_details.connection.get("destinationId")
)
source_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=source_connection.get("name")
)
destination_service = self.metadata.get_by_name(
entity=DatabaseService, fqn=destination_connection.get("name")
)
if not source_service or not destination_service:
return
source_name = source_connection.get("sourceName")
destination_name = destination_connection.get("destinationName")
for task in (
pipeline_details.connection.get("syncCatalog", {}).get("streams") or []
):
stream = task.get("stream")
from_fqn = fqn.build(
self.metadata,
Table,
table_name=stream.get("name"),
database_name=None,
schema_name=stream.get("namespace"),
service_name=source_connection.get("name"),
source_table_details = get_source_table_details(stream, source_connection)
destination_table_details = get_destination_table_details(
stream, destination_connection
)
to_fqn = fqn.build(
self.metadata,
Table,
table_name=stream.get("name"),
database_name=None,
schema_name=stream.get("namespace"),
service_name=destination_connection.get("name"),
)
if not source_table_details or not destination_table_details:
continue
from_fqn = self._get_table_fqn(source_table_details)
to_fqn = self._get_table_fqn(destination_table_details)
if not from_fqn:
logger.warning(
f"While extracting lineage: [{pipeline_name}],"
f" source table: [{source_table_details.database or '*'}]"
f".[{source_table_details.schema}].[{source_table_details.name}]"
f" (type: {source_name}) not found in openmetadata"
)
continue
if not to_fqn:
logger.warning(
f"While extracting lineage: [{pipeline_name}],"
f" destination table: [{destination_table_details.database or '*'}]"
f".[{destination_table_details.schema}].[{destination_table_details.name}]"
f" (type: {destination_name}) not found in openmetadata"
)
continue
from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn)
to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn)
if not from_entity and not to_entity:
if not from_entity:
logger.warning(
f"While extracting lineage: [{pipeline_name}],"
f" source table (fqn: [{from_fqn}], type: {source_name}) not found"
" in openmetadata"
)
continue
if not to_entity:
logger.warning(
f"While extracting lineage: [{pipeline_name}],"
f" destination table (fqn: [{to_fqn}], type: {destination_name}) not found"
" in openmetadata"
)
continue
pipeline_fqn = fqn.build(
@ -279,4 +320,4 @@ class AirbyteSource(PipelineServiceSource):
"""
Get Pipeline Name
"""
return pipeline_details.connection.get("connectionId")
return pipeline_details.connection.get("name")

View File

@ -0,0 +1,99 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils for Airbyte
"""
from metadata.ingestion.source.pipeline.openlineage.models import TableDetails
from metadata.utils.logger import ingestion_logger
from .constants import AirbyteDestination, AirbyteSource
logger = ingestion_logger()
def get_source_table_details(stream: dict, source_connection: dict) -> TableDetails:
"""
Get the source table details
"""
source_name = source_connection.get("sourceName")
source_database = source_connection.get("connectionConfiguration", {}).get(
"database"
)
source_schema = stream.get("namespace")
# Check if source is supported
if source_name not in [
AirbyteSource.POSTGRES.value,
AirbyteSource.MSSQL.value,
AirbyteSource.MYSQL.value,
AirbyteSource.MONGODB.value,
]:
logger.warning(
f"Lineage of airbyte pipeline with source [{source_name}] is not supported yet"
)
return None
# Handle specific database configurations
if source_name == AirbyteSource.MYSQL.value:
source_schema = source_database
source_database = None
elif source_name == AirbyteSource.MONGODB.value:
source_schema = (
source_connection.get("connectionConfiguration", {})
.get("database_config", {})
.get("database")
)
source_database = None
return TableDetails(
name=stream["name"],
schema=source_schema,
database=source_database,
)
def get_destination_table_details(
stream: dict, destination_connection: dict
) -> TableDetails:
"""
Get the destination table details
"""
destination_name = destination_connection.get("destinationName")
destination_database = destination_connection.get(
"connectionConfiguration", {}
).get("database")
destination_schema = destination_connection.get("connectionConfiguration", {}).get(
"schema"
)
# Check if destination is supported
if destination_name not in [
AirbyteDestination.POSTGRES.value,
AirbyteDestination.MSSQL.value,
AirbyteDestination.MYSQL.value,
]:
logger.warning(
f"Lineage of airbyte pipeline with destination [{destination_name}] is not supported yet"
)
return None
# Handle specific database configurations
if destination_name == AirbyteDestination.MYSQL.value:
destination_schema = destination_database
destination_database = None
return TableDetails(
name=stream["name"],
schema=destination_schema,
database=destination_database,
)

View File

@ -14,7 +14,7 @@ Openlineage Source Model module
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
@dataclass
@ -76,8 +76,9 @@ class TableDetails:
Minimal table information.
"""
schema: str
name: str
schema: str
database: Optional[str] = None
class EventType(str, Enum):

View File

@ -319,8 +319,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]:
"""
Based on partial schema and table names look for matching table object in open metadata.
:param schema: schema name
:param table: table name
:param table_details: TableDetails object containing table name, schema, database information
:return: fully qualified name of a Table in Open Metadata
"""
result = None
@ -330,7 +329,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
metadata=self.metadata,
entity_type=Table,
service_name=db_service,
database_name=None,
database_name=table_details.database,
schema_name=table_details.schema,
table_name=table_details.name,
)

View File

@ -151,7 +151,7 @@ def build(
def _(
metadata: Optional[OpenMetadata],
*,
service_name: str,
service_name: Optional[str],
database_name: Optional[str],
schema_name: Optional[str],
table_name: str,
@ -161,7 +161,7 @@ def _(
"""
Building logic for tables
:param metadata: OMeta client
:param service_name: Service Name to filter
:param service_name: Service Name to filter or None
:param database_name: DB name or None
:param schema_name: Schema name or None
:param table_name: Table name
@ -180,8 +180,8 @@ def _(
service_name=service_name,
)
# if entity not found in ES proceed to build FQN with database_name and schema_name
if not entity and database_name and schema_name:
# if entity not found in ES proceed to build FQN with service_name, database_name and schema_name
if not entity and service_name and database_name and schema_name:
fqn = _build(service_name, database_name, schema_name, table_name)
return [fqn] if fetch_multiple_entities else fqn
if entity and fetch_multiple_entities:

View File

@ -18,6 +18,7 @@ from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
@ -25,6 +26,7 @@ from metadata.generated.schema.entity.data.pipeline import (
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
@ -34,6 +36,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.airbyte.metadata import (
@ -70,7 +74,7 @@ mock_airbyte_config = {
}
EXPECTED_ARIBYTE_DETAILS = AirbytePipelineDetails(
EXPECTED_AIRBYTE_DETAILS = AirbytePipelineDetails(
workspace=mock_data["workspace"][0], connection=mock_data["connection"][0]
)
@ -156,8 +160,79 @@ MOCK_PIPELINE = Pipeline(
),
)
# Mock data for lineage testing
MOCK_POSTGRES_SOURCE_TABLE = Table(
id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e",
name="mock_table_name",
fullyQualifiedName="mock_source_service.mock_source_db.mock_source_schema.mock_table_name",
columns=[{"name": "id", "dataType": "INT"}, {"name": "name", "dataType": "STRING"}],
)
MOCK_POSTGRES_DESTINATION_TABLE = Table(
id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e",
name="mock_table_name",
fullyQualifiedName=(
"mock_destination_service.mock_destination_db"
".mock_destination_schema.mock_table_name"
),
columns=[{"name": "id", "dataType": "INT"}, {"name": "name", "dataType": "STRING"}],
)
EXPECTED_LINEAGE = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e", type="table"
),
toEntity=EntityReference(
id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e", type="table"
),
lineageDetails=LineageDetails(
pipeline=EntityReference(
id="2aaa012e-099a-11ed-861d-0242ac120002", type="pipeline"
),
source=LineageSource.PipelineLineage,
),
)
)
MOCK_SOURCE_TABLE_FQN = (
"mock_source_service.mock_source_db.mock_source_schema.mock_table_name"
)
MOCK_DESTINATION_TABLE_FQN = "mock_destination_service.mock_destination_db.mock_destination_schema.mock_table_name"
# Configure mock for _get_table_fqn to return FQNs for source and destination tables
def mock_get_table_fqn(self, table_details): # pylint: disable=unused-argument
if table_details.name != "mock_table_name":
return None
if table_details.schema == "mock_source_schema":
return MOCK_SOURCE_TABLE_FQN
if table_details.schema == "mock_destination_schema":
return MOCK_DESTINATION_TABLE_FQN
if table_details.schema == "mock_source_db":
return MOCK_SOURCE_TABLE_FQN
if table_details.schema == "mock_destination_db":
return MOCK_DESTINATION_TABLE_FQN
return None
# Configure the mock to return our test tables and pipeline
def mock_get_by_name(entity, fqn):
if entity == Table:
if fqn == MOCK_SOURCE_TABLE_FQN:
return MOCK_POSTGRES_SOURCE_TABLE
if fqn == MOCK_DESTINATION_TABLE_FQN:
return MOCK_POSTGRES_DESTINATION_TABLE
if entity == Pipeline:
return MOCK_PIPELINE
return None
class AirbyteUnitTest(TestCase):
"""Test class for Airbyte source module."""
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
@ -180,20 +255,86 @@ class AirbyteUnitTest(TestCase):
self.client.list_connections.return_value = mock_data.get("connection")
def test_pipeline_list(self):
assert list(self.airbyte.get_pipelines_list())[0] == EXPECTED_ARIBYTE_DETAILS
assert list(self.airbyte.get_pipelines_list())[0] == EXPECTED_AIRBYTE_DETAILS
def test_pipeline_name(self):
assert self.airbyte.get_pipeline_name(
EXPECTED_ARIBYTE_DETAILS
) == mock_data.get("connection")[0].get("connectionId")
EXPECTED_AIRBYTE_DETAILS
) == mock_data.get("connection")[0].get("name")
def test_pipelines(self):
pipline = list(self.airbyte.yield_pipeline(EXPECTED_ARIBYTE_DETAILS))[0].right
assert pipline == EXPECTED_CREATED_PIPELINES
pipeline = list(self.airbyte.yield_pipeline(EXPECTED_AIRBYTE_DETAILS))[0].right
assert pipeline == EXPECTED_CREATED_PIPELINES
def test_pipeline_status(self):
status = [
either.right
for either in self.airbyte.yield_pipeline_status(EXPECTED_ARIBYTE_DETAILS)
for either in self.airbyte.yield_pipeline_status(EXPECTED_AIRBYTE_DETAILS)
]
assert status == EXPECTED_PIPELINE_STATUS
@patch.object(AirbyteSource, "_get_table_fqn", mock_get_table_fqn)
def test_yield_pipeline_lineage_details(self):
"""Test the Airbyte lineage generation functionality."""
# Mock the client methods needed for lineage with supported source and destination types
self.client.get_source.return_value = {
"sourceName": "Postgres",
"connectionConfiguration": {
"database": "mock_source_db",
"schema": "mock_source_schema",
},
}
self.client.get_destination.return_value = {
"destinationName": "Postgres",
"connectionConfiguration": {
"database": "mock_destination_db",
"schema": "mock_destination_schema",
},
}
# Mock connection with stream data for lineage test
test_connection = {
"connectionId": "test-connection-id",
"sourceId": "test-source-id",
"destinationId": "test-destination-id",
"name": "Test Connection",
"syncCatalog": {
"streams": [
{
"stream": {
"name": "mock_table_name",
"namespace": "mock_source_schema",
"jsonSchema": {},
}
}
]
},
}
test_workspace = {"workspaceId": "test-workspace-id"}
test_pipeline_details = AirbytePipelineDetails(
workspace=test_workspace, connection=test_connection
)
# Mock the metadata object directly in the Airbyte source
with patch.object(self.airbyte, "metadata") as mock_metadata:
mock_metadata.get_by_name.side_effect = mock_get_by_name
# Test yield_pipeline_lineage_details
lineage_results = list(
self.airbyte.yield_pipeline_lineage_details(test_pipeline_details)
)
# Check that we get at least one lineage result
assert len(lineage_results) > 0
# Extract the lineage details
lineage = lineage_results[0].right
# Verify the lineage structure
assert lineage.edge.fromEntity.id == MOCK_POSTGRES_SOURCE_TABLE.id
assert lineage.edge.toEntity.id == MOCK_POSTGRES_DESTINATION_TABLE.id
# Compare just the UUID string value from both sides
assert lineage.edge.lineageDetails.pipeline.id.root == MOCK_PIPELINE.id.root
assert lineage.edge.lineageDetails.source == LineageSource.PipelineLineage

View File

@ -409,10 +409,14 @@ public class SearchResource {
@Parameter(description = "Search Index name, defaults to table_search_index")
@DefaultValue("table_search_index")
@QueryParam("index")
String index)
String index,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@DefaultValue("false")
@QueryParam("deleted")
boolean deleted)
throws IOException {
return searchRepository.searchByField(fieldName, fieldValue, index);
return searchRepository.searchByField(fieldName, fieldValue, index, deleted);
}
@GET

View File

@ -257,7 +257,8 @@ public interface SearchClient {
Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD);
}
Response searchByField(String fieldName, String fieldValue, String index) throws IOException;
Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
throws IOException;
Response aggregate(AggregationRequest request) throws IOException;

View File

@ -1160,9 +1160,9 @@ public class SearchRepository {
.withIsConnectedVia(isConnectedVia(entityType)));
}
public Response searchByField(String fieldName, String fieldValue, String index)
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
throws IOException {
return searchClient.searchByField(fieldName, fieldValue, index);
return searchClient.searchByField(fieldName, fieldValue, index, deleted);
}
public Response aggregate(AggregationRequest request) throws IOException {

View File

@ -1310,13 +1310,17 @@ public class ElasticSearchClient implements SearchClient {
}
@Override
public Response searchByField(String fieldName, String fieldValue, String index)
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
throws IOException {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(index));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.wildcardQuery(fieldName, fieldValue));
BoolQueryBuilder query =
QueryBuilders.boolQuery()
.must(QueryBuilders.wildcardQuery(fieldName, fieldValue))
.filter(QueryBuilders.termQuery("deleted", deleted));
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
String response = client.search(searchRequest, RequestOptions.DEFAULT).toString();
return Response.status(OK).entity(response).build();

View File

@ -1381,13 +1381,17 @@ public class OpenSearchClient implements SearchClient {
}
@Override
public Response searchByField(String fieldName, String fieldValue, String index)
public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted)
throws IOException {
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(index));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.wildcardQuery(fieldName, fieldValue));
BoolQueryBuilder query =
QueryBuilders.boolQuery()
.must(QueryBuilders.wildcardQuery(fieldName, fieldValue))
.filter(QueryBuilders.termQuery("deleted", deleted));
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
String response = client.search(searchRequest, RequestOptions.DEFAULT).toString();
return Response.status(OK).entity(response).build();