diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index 2cd88e54acd..45a4e08e732 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -21,6 +21,7 @@ from metadata.generated.schema.type.entityLineage import ( EntitiesEdge, LineageDetails, ) +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.lineage.models import Dialect @@ -223,6 +224,7 @@ def _build_table_lineage( to_table_raw_name: str, query: str, column_lineage_map: dict, + lineage_source: LineageSource = LineageSource.QueryLineage, ) -> Iterable[Either[AddLineageRequest]]: """ Prepare the lineage request generator @@ -234,7 +236,7 @@ def _build_table_lineage( from_table_raw_name=str(from_table_raw_name), column_lineage_map=column_lineage_map, ) - lineage_details = LineageDetails(sqlQuery=query) + lineage_details = LineageDetails(sqlQuery=query, source=lineage_source) if col_lineage: lineage_details.columnsLineage = col_lineage if from_entity and to_entity: @@ -265,6 +267,7 @@ def _create_lineage_by_table_name( schema_name: Optional[str], query: str, column_lineage_map: dict, + lineage_source: LineageSource = LineageSource.QueryLineage, ) -> Iterable[Either[AddLineageRequest]]: """ This method is to create a lineage between two tables @@ -296,6 +299,7 @@ def _create_lineage_by_table_name( from_table_raw_name=from_table, query=query, column_lineage_map=column_lineage_map, + lineage_source=lineage_source, ) except Exception as exc: @@ -347,6 +351,7 @@ def get_lineage_by_query( query: str, dialect: Dialect, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, + lineage_source: LineageSource = LineageSource.QueryLineage, ) -> Iterable[Either[AddLineageRequest]]: """ This method parses the query to get source, target and intermediate table names to create lineage, @@ -372,6 +377,7 @@ def get_lineage_by_query( schema_name=schema_name, query=query, column_lineage_map=column_lineage, + lineage_source=lineage_source, ) for target_table in lineage_parser.target_tables: yield from _create_lineage_by_table_name( @@ -383,6 +389,7 @@ def get_lineage_by_query( schema_name=schema_name, query=query, column_lineage_map=column_lineage, + lineage_source=lineage_source, ) if not lineage_parser.intermediate_tables: for target_table in lineage_parser.target_tables: @@ -396,6 +403,7 @@ def get_lineage_by_query( schema_name=schema_name, query=query, column_lineage_map=column_lineage, + lineage_source=lineage_source, ) except Exception as exc: yield Either( @@ -416,6 +424,7 @@ def get_lineage_via_table_entity( query: str, dialect: Dialect, timeout_seconds: int = LINEAGE_PARSING_TIMEOUT, + lineage_source: LineageSource = LineageSource.QueryLineage, ) -> Iterable[Either[AddLineageRequest]]: """Get lineage from table entity""" column_lineage = {} @@ -435,6 +444,7 @@ def get_lineage_via_table_entity( schema_name=schema_name, query=query, column_lineage_map=column_lineage, + lineage_source=lineage_source, ) or [] except Exception as exc: # pylint: disable=broad-except Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 213ecef43fa..ff73a177b7a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -44,7 +44,8 @@ from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipelin from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.delete import delete_entity_from_source @@ -419,6 +420,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): id=to_entity.id.__root__, type=LINEAGE_MAP[type(to_entity)], ), + lineageDetails=LineageDetails( + source=LineageSource.DashboardLineage + ), ) ) ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index dc560b93be3..e5e8351b845 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -68,7 +68,8 @@ from metadata.generated.schema.security.credentials.bitbucketCredentials import from metadata.generated.schema.security.credentials.githubCredentials import ( GitHubCredentials, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.models import Either, StackTraceError @@ -656,6 +657,9 @@ class LookerSource(DashboardServiceSource): id=self.context.dashboard.id.__root__, type="dashboard", ), + lineageDetails=LineageDetails( + source=LineageSource.DashboardLineage + ), ) ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 846d1e9cd60..de365accef8 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -50,7 +50,8 @@ from metadata.generated.schema.tests.testDefinition import ( TestPlatform, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper @@ -615,6 +616,9 @@ class DbtSource(DbtServiceSource): id=to_entity.id.__root__, type="table", ), + lineageDetails=LineageDetails( + source=LineageSource.DbtLineage + ), ) ) @@ -656,6 +660,7 @@ class DbtSource(DbtServiceSource): schema_name=source_elements[2], dialect=dialect, timeout_seconds=self.source_config.parsingTimeoutLimit, + lineage_source=LineageSource.DbtLineage, ) for lineage_request in lineages or []: yield lineage_request diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py index 57ca7c24f1a..d98de23f1a5 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py @@ -36,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException @@ -218,7 +219,8 @@ class AirbyteSource(PipelineServiceSource): lineage_details = LineageDetails( pipeline=EntityReference( id=self.context.pipeline.id.__root__, type="pipeline" - ) + ), + source=LineageSource.PipelineLineage, ) yield Either( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 580ca40be04..964527df50a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -40,6 +40,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException @@ -417,7 +418,8 @@ class AirflowSource(PipelineServiceSource): lineage_details = LineageDetails( pipeline=EntityReference( id=self.context.pipeline.id.__root__, type="pipeline" - ) + ), + source=LineageSource.PipelineLineage, ) xlets = get_xlets_from_dag(dag=pipeline_details) if pipeline_details else [] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py index b43cfeb97c9..ad7d727ffa4 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py @@ -32,6 +32,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException @@ -164,7 +165,8 @@ class FivetranSource(PipelineServiceSource): lineage_details = LineageDetails( pipeline=EntityReference( id=self.context.pipeline.id.__root__, type="pipeline" - ) + ), + source=LineageSource.PipelineLineage, ) yield Either( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py index b8717519d2c..3ef04e07b93 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/spline/metadata.py @@ -28,6 +28,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException @@ -193,7 +194,8 @@ class SplineSource(PipelineServiceSource): pipeline=EntityReference( id=self.context.pipeline.id.__root__, type="pipeline", - ) + ), + source=LineageSource.PipelineLineage, ), fromEntity=EntityReference( id=from_table.id, type="table" diff --git a/ingestion/src/metadata/utils/db_utils.py b/ingestion/src/metadata/utils/db_utils.py index afb837b2439..ef3a7c8d608 100644 --- a/ingestion/src/metadata/utils/db_utils.py +++ b/ingestion/src/metadata/utils/db_utils.py @@ -18,6 +18,7 @@ from typing import Iterable from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser @@ -83,6 +84,7 @@ def get_view_lineage( schema_name=schema_name, dialect=dialect, timeout_seconds=timeout_seconds, + lineage_source=LineageSource.ViewLineage, ) or [] else: @@ -95,6 +97,7 @@ def get_view_lineage( query=view_definition, dialect=dialect, timeout_seconds=timeout_seconds, + lineage_source=LineageSource.ViewLineage, ) or [] except Exception as exc: logger.debug(traceback.format_exc()) diff --git a/ingestion/tests/unit/topology/dashboard/test_looker.py b/ingestion/tests/unit/topology/dashboard/test_looker.py index bc2bbfb9913..46fcec39bea 100644 --- a/ingestion/tests/unit/topology/dashboard/test_looker.py +++ b/ingestion/tests/unit/topology/dashboard/test_looker.py @@ -41,7 +41,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats from metadata.generated.schema.type.usageRequest import UsageRequest @@ -354,6 +355,9 @@ class LookerUnitTest(TestCase): toEntity=EntityReference( id=to_entity.id.__root__, type="dashboard" ), + lineageDetails=LineageDetails( + source=LineageSource.DashboardLineage + ), ) ), ) diff --git a/ingestion/tests/unit/topology/dashboard/test_metabase.py b/ingestion/tests/unit/topology/dashboard/test_metabase.py index b72527b233b..547a62907a2 100644 --- a/ingestion/tests/unit/topology/dashboard/test_metabase.py +++ b/ingestion/tests/unit/topology/dashboard/test_metabase.py @@ -34,7 +34,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -151,6 +152,7 @@ EXPECTED_LINEAGE = AddLineageRequest( id="7b3766b1-7eb4-4ad4-b7c8-15a8b16edfdd", type="dashboard", ), + lineageDetails=LineageDetails(source=LineageSource.DashboardLineage), ) ) diff --git a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json index 91dbf1977ed..5079d452f31 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json @@ -48,6 +48,12 @@ "description" :{ "description": "description of lineage", "type": "string" + }, + "source": { + "description": "Lineage type describes how a lineage was created.", + "type": "string", + "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage"], + "default": "Manual" } } },