Fix #9207: Add source in lineage details (#13046)

This commit is contained in:
Mayur Singal 2023-09-04 11:05:56 +05:30 committed by GitHub
parent 111950bd81
commit 029786d773
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 56 additions and 10 deletions

View File

@ -21,6 +21,7 @@ from metadata.generated.schema.type.entityLineage import (
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.lineage.models import Dialect
@ -223,6 +224,7 @@ def _build_table_lineage(
to_table_raw_name: str,
query: str,
column_lineage_map: dict,
lineage_source: LineageSource = LineageSource.QueryLineage,
) -> Iterable[Either[AddLineageRequest]]:
"""
Prepare the lineage request generator
@ -234,7 +236,7 @@ def _build_table_lineage(
from_table_raw_name=str(from_table_raw_name),
column_lineage_map=column_lineage_map,
)
lineage_details = LineageDetails(sqlQuery=query)
lineage_details = LineageDetails(sqlQuery=query, source=lineage_source)
if col_lineage:
lineage_details.columnsLineage = col_lineage
if from_entity and to_entity:
@ -265,6 +267,7 @@ def _create_lineage_by_table_name(
schema_name: Optional[str],
query: str,
column_lineage_map: dict,
lineage_source: LineageSource = LineageSource.QueryLineage,
) -> Iterable[Either[AddLineageRequest]]:
"""
This method is to create a lineage between two tables
@ -296,6 +299,7 @@ def _create_lineage_by_table_name(
from_table_raw_name=from_table,
query=query,
column_lineage_map=column_lineage_map,
lineage_source=lineage_source,
)
except Exception as exc:
@ -347,6 +351,7 @@ def get_lineage_by_query(
query: str,
dialect: Dialect,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage,
) -> Iterable[Either[AddLineageRequest]]:
"""
This method parses the query to get source, target and intermediate table names to create lineage,
@ -372,6 +377,7 @@ def get_lineage_by_query(
schema_name=schema_name,
query=query,
column_lineage_map=column_lineage,
lineage_source=lineage_source,
)
for target_table in lineage_parser.target_tables:
yield from _create_lineage_by_table_name(
@ -383,6 +389,7 @@ def get_lineage_by_query(
schema_name=schema_name,
query=query,
column_lineage_map=column_lineage,
lineage_source=lineage_source,
)
if not lineage_parser.intermediate_tables:
for target_table in lineage_parser.target_tables:
@ -396,6 +403,7 @@ def get_lineage_by_query(
schema_name=schema_name,
query=query,
column_lineage_map=column_lineage,
lineage_source=lineage_source,
)
except Exception as exc:
yield Either(
@ -416,6 +424,7 @@ def get_lineage_via_table_entity(
query: str,
dialect: Dialect,
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
lineage_source: LineageSource = LineageSource.QueryLineage,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage from table entity"""
column_lineage = {}
@ -435,6 +444,7 @@ def get_lineage_via_table_entity(
schema_name=schema_name,
query=query,
column_lineage_map=column_lineage,
lineage_source=lineage_source,
) or []
except Exception as exc: # pylint: disable=broad-except
Either(

View File

@ -44,7 +44,8 @@ from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipelin
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.delete import delete_entity_from_source
@ -419,6 +420,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
id=to_entity.id.__root__,
type=LINEAGE_MAP[type(to_entity)],
),
lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage
),
)
)
)

View File

@ -68,7 +68,8 @@ from metadata.generated.schema.security.credentials.bitbucketCredentials import
from metadata.generated.schema.security.credentials.githubCredentials import (
GitHubCredentials,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.models import Either, StackTraceError
@ -656,6 +657,9 @@ class LookerSource(DashboardServiceSource):
id=self.context.dashboard.id.__root__,
type="dashboard",
),
lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage
),
)
)
)

View File

@ -50,7 +50,8 @@ from metadata.generated.schema.tests.testDefinition import (
TestPlatform,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
@ -615,6 +616,9 @@ class DbtSource(DbtServiceSource):
id=to_entity.id.__root__,
type="table",
),
lineageDetails=LineageDetails(
source=LineageSource.DbtLineage
),
)
)
@ -656,6 +660,7 @@ class DbtSource(DbtServiceSource):
schema_name=source_elements[2],
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
lineage_source=LineageSource.DbtLineage,
)
for lineage_request in lineages or []:
yield lineage_request

View File

@ -36,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
@ -218,7 +219,8 @@ class AirbyteSource(PipelineServiceSource):
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
),
source=LineageSource.PipelineLineage,
)
yield Either(

View File

@ -40,6 +40,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
@ -417,7 +418,8 @@ class AirflowSource(PipelineServiceSource):
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
),
source=LineageSource.PipelineLineage,
)
xlets = get_xlets_from_dag(dag=pipeline_details) if pipeline_details else []

View File

@ -32,6 +32,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
@ -164,7 +165,8 @@ class FivetranSource(PipelineServiceSource):
lineage_details = LineageDetails(
pipeline=EntityReference(
id=self.context.pipeline.id.__root__, type="pipeline"
)
),
source=LineageSource.PipelineLineage,
)
yield Either(

View File

@ -28,6 +28,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
@ -193,7 +194,8 @@ class SplineSource(PipelineServiceSource):
pipeline=EntityReference(
id=self.context.pipeline.id.__root__,
type="pipeline",
)
),
source=LineageSource.PipelineLineage,
),
fromEntity=EntityReference(
id=from_table.id, type="table"

View File

@ -18,6 +18,7 @@ from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT, LineageParser
@ -83,6 +84,7 @@ def get_view_lineage(
schema_name=schema_name,
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
) or []
else:
@ -95,6 +97,7 @@ def get_view_lineage(
query=view_definition,
dialect=dialect,
timeout_seconds=timeout_seconds,
lineage_source=LineageSource.ViewLineage,
) or []
except Exception as exc:
logger.debug(traceback.format_exc())

View File

@ -41,7 +41,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats
from metadata.generated.schema.type.usageRequest import UsageRequest
@ -354,6 +355,9 @@ class LookerUnitTest(TestCase):
toEntity=EntityReference(
id=to_entity.id.__root__, type="dashboard"
),
lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage
),
)
),
)

View File

@ -34,7 +34,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -151,6 +152,7 @@ EXPECTED_LINEAGE = AddLineageRequest(
id="7b3766b1-7eb4-4ad4-b7c8-15a8b16edfdd",
type="dashboard",
),
lineageDetails=LineageDetails(source=LineageSource.DashboardLineage),
)
)

View File

@ -48,6 +48,12 @@
"description" :{
"description": "description of lineage",
"type": "string"
},
"source": {
"description": "Lineage type describes how a lineage was created.",
"type": "string",
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage"],
"default": "Manual"
}
}
},