GEN-895: Added Glue Pipeline Lineage (#18063)

This commit is contained in:
Suman Maharana 2024-10-14 13:08:17 +05:30 committed by GitHub
parent 142a506120
commit dd08bc9ffd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 589 additions and 14 deletions

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import (
Task, Task,
TaskStatus, TaskStatus,
) )
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import (
GluePipelineConnection, GluePipelineConnection,
) )
@ -40,14 +41,25 @@ from metadata.generated.schema.type.basic import (
SourceUrl, SourceUrl,
Timestamp, Timestamp,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.gluepipeline.models import (
AmazonRedshift,
CatalogSource,
JDBCSource,
JobNodeResponse,
S3Source,
S3Target,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds from metadata.utils.time_utils import datetime_to_timestamp
logger = ingestion_logger() logger = ingestion_logger()
@ -63,6 +75,28 @@ STATUS_MAP = {
"incomplete": StatusType.Failed, "incomplete": StatusType.Failed,
"pending": StatusType.Pending, "pending": StatusType.Pending,
} }
TABLE_MODEL_MAP = {
"AmazonRedshiftSource": AmazonRedshift,
"AmazonRedshiftTarget": AmazonRedshift,
"AthenaConnectorSource": JDBCSource,
"JDBCConnectorSource": JDBCSource,
"JDBCConnectorTarget": JDBCSource,
"DirectJDBCSource": CatalogSource,
"RedshiftSource": CatalogSource,
"RedshiftTarget": CatalogSource,
"DirectJDBC": CatalogSource,
}
STORAGE_MODEL_MAP = {
"S3CsvSource": S3Source,
"S3JsonSource": S3Source,
"S3ParquetSource": S3Source,
"S3HudiSource": S3Source,
"S3DeltaSource": S3Source,
"S3DirectTarget": S3Target,
"S3DeltaDirectTarget": S3Target,
"S3GlueParquetTarget": S3Target,
"S3HudiDirectTarget": S3Target,
}
class GluepipelineSource(PipelineServiceSource): class GluepipelineSource(PipelineServiceSource):
@ -145,9 +179,88 @@ class GluepipelineSource(PipelineServiceSource):
downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]]) downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]])
return downstream_tasks return downstream_tasks
def get_lineage_details(self, job) -> Optional[dict]:
"""
Get the Lineage Details of the pipeline
"""
lineage_details = {"sources": [], "targets": []}
try:
job_details = JobNodeResponse.model_validate(
self.glue.get_job(JobName=job)
).Job
if job_details and job_details.config_nodes:
nodes = job_details.config_nodes
for _, node in nodes.items():
for key, entity in node.items():
table_model, storage_model = None, None
if key in TABLE_MODEL_MAP:
table_model = TABLE_MODEL_MAP[key].model_validate(entity)
elif "Catalog" in key:
table_model = CatalogSource.model_validate(entity)
elif key in STORAGE_MODEL_MAP:
storage_model = STORAGE_MODEL_MAP[key].model_validate(
entity
)
if table_model:
for db_service_name in self.get_db_service_names():
table_entity = self.metadata.get_entity_reference(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=table_model.table_name,
database_name=table_model.database_name,
schema_name=table_model.schema_name,
service_name=db_service_name,
),
)
if table_entity:
if key.endswith("Source"):
lineage_details["sources"].append(table_entity)
else:
lineage_details["targets"].append(table_entity)
break
if storage_model:
for path in storage_model.Paths or [storage_model.Path]:
container = self.metadata.es_search_container_by_path(
full_path=path
)
if container and container[0]:
storage_entity = EntityReference(
id=container[0].id,
type="container",
name=container[0].name.root,
fullyQualifiedName=container[
0
].fullyQualifiedName.root,
)
if storage_entity:
if key.endswith("Source"):
lineage_details["sources"].append(
storage_entity
)
else:
lineage_details["targets"].append(
storage_entity
)
break
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to get lineage details for job : {job} due to : {exc}"
)
return lineage_details
def yield_pipeline_status( def yield_pipeline_status(
self, pipeline_details: Any self, pipeline_details: Any
) -> Iterable[Either[OMetaPipelineStatus]]: ) -> Iterable[Either[OMetaPipelineStatus]]:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
for job in self.job_name_list: for job in self.job_name_list:
try: try:
runs = self.glue.get_job_runs(JobName=job) runs = self.glue.get_job_runs(JobName=job)
@ -161,13 +274,13 @@ class GluepipelineSource(PipelineServiceSource):
attempt["JobRunState"].lower(), StatusType.Pending attempt["JobRunState"].lower(), StatusType.Pending
).value, ).value,
startTime=Timestamp( startTime=Timestamp(
convert_timestamp_to_milliseconds( datetime_to_timestamp(
attempt["StartedOn"].timestamp() attempt["StartedOn"], milliseconds=True
) )
), ),
endTime=Timestamp( endTime=Timestamp(
convert_timestamp_to_milliseconds( datetime_to_timestamp(
attempt["CompletedOn"].timestamp() attempt["CompletedOn"], milliseconds=True
) )
), ),
) )
@ -175,20 +288,14 @@ class GluepipelineSource(PipelineServiceSource):
pipeline_status = PipelineStatus( pipeline_status = PipelineStatus(
taskStatus=task_status, taskStatus=task_status,
timestamp=Timestamp( timestamp=Timestamp(
convert_timestamp_to_milliseconds( datetime_to_timestamp(
attempt["StartedOn"].timestamp() attempt["StartedOn"], milliseconds=True
) )
), ),
executionStatus=STATUS_MAP.get( executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending attempt["JobRunState"].lower(), StatusType.Pending
).value, ).value,
) )
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either( yield Either(
right=OMetaPipelineStatus( right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_fqn=pipeline_fqn,
@ -199,7 +306,7 @@ class GluepipelineSource(PipelineServiceSource):
yield Either( yield Either(
left=StackTraceError( left=StackTraceError(
name=pipeline_fqn, name=pipeline_fqn,
error=f"Failed to yield pipeline status: {exc}", error=f"Failed to yield pipeline status for job {job}: {exc}",
stackTrace=traceback.format_exc(), stackTrace=traceback.format_exc(),
) )
) )
@ -210,3 +317,42 @@ class GluepipelineSource(PipelineServiceSource):
""" """
Get lineage between pipeline and data sources Get lineage between pipeline and data sources
""" """
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
pipeline_entity = self.metadata.get_by_name(
entity=Pipeline, fqn=pipeline_fqn
)
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"),
source=LineageSource.PipelineLineage,
)
for job in self.job_name_list:
lineage_enities = self.get_lineage_details(job)
for source in lineage_enities.get("sources"):
for target in lineage_enities.get("targets"):
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=source,
toEntity=target,
lineageDetails=lineage_details,
)
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.get(NAME),
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)

View File

@ -0,0 +1,78 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Glue Pipeline Source Model module
"""
from typing import List, Optional
from pydantic import BaseModel, Field
class EntityDetails(BaseModel):
Value: str
class SourceDetails(BaseModel):
schema_details: EntityDetails = Field(alias="Schema")
table_details: EntityDetails = Field(alias="Table")
class AmazonRedshift(BaseModel):
Name: str
Data: SourceDetails
database_name: Optional[str] = None
@property
def table_name(self):
if self.Data:
return self.Data.table_details.Value
return None
@property
def schema_name(self):
if self.Data:
return self.Data.schema_details.Value
return None
class CatalogSource(BaseModel):
Name: str
database_name: str = Field(alias="Database")
schema_name: Optional[str] = None
table_name: str = Field(alias="Table")
class JDBCSource(BaseModel):
Name: str
schema_name: Optional[str] = Field(default=None, alias="SchemaName")
database_name: Optional[str] = None
table_name: str = Field(alias="ConnectionTable")
class S3Source(BaseModel):
Name: str
Paths: List[str]
class S3Target(BaseModel):
Name: str
Path: str
Paths: Optional[str] = None
class JobNodes(BaseModel):
config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")
class JobNodeResponse(BaseModel):
Job: Optional[JobNodes] = None

View File

@ -285,6 +285,16 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
else [] else []
) )
def get_storage_service_names(self) -> List[str]:
"""
Get the list of storage service names
"""
return (
self.source_config.lineageInformation.storageServiceNames or []
if self.source_config.lineageInformation
else []
)
def prepare(self): def prepare(self):
""" """
Method to implement any required logic before starting the ingestion process Method to implement any required logic before starting the ingestion process

View File

@ -0,0 +1,341 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test dbt cloud using the topology
"""
import json
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
SourceUrl,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.pipeline.gluepipeline.metadata import GluepipelineSource
mock_glue_config = {
"source": {
"type": "gluepipeline",
"serviceName": "local_gluepipeline",
"serviceConnection": {
"config": {
"type": "GluePipeline",
"awsConfig": {
"awsAccessKeyId": "aws_access_key_id",
"awsSecretAccessKey": "aws_secret_access_key",
"awsRegion": "us-east-2",
"endPointURL": "https://endpoint.com/",
},
},
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
EXPECTED_JOB_DETAILS = json.loads(
"""
{
"Name": "redshift workflow",
"Description": "redshift workflow description",
"DefaultRunProperties": {},
"CreatedOn": "2024-09-20 15:46:36.668000",
"LastModifiedOn": "2024-09-20 15:46:36.668000",
"LastRun": {
"Name": "redshift workflow",
"WorkflowRunId": "wr_6db99d3ea932db0739f03ba5ae56e4b635b7878261f75af062e1223a7272c50e",
"WorkflowRunProperties": {},
"StartedOn": "2024-09-30 17:07:24.032000",
"CompletedOn": "2024-09-30 17:08:24.032000",
"Status": "COMPLETED",
"Statistics": {
"TotalActions": 1,
"TimeoutActions": 0,
"FailedActions": 1,
"StoppedActions": 0,
"SucceededActions": 0,
"RunningActions": 0,
"ErroredActions": 0,
"WaitingActions": 0
},
"Graph": {
"Nodes": [
{
"Type": "TRIGGER",
"Name": "redshift_event",
"UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"TriggerDetails": {
"Trigger": {
"Name": "redshift_event",
"WorkflowName": "redshift workflow",
"Type": "ON_DEMAND",
"State": "CREATED",
"Actions": [
{
"JobName": "Redshift DBT Job"
}
]
}
}
},
{
"Type": "JOB",
"Name": "Redshift DBT Job",
"UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123",
"JobDetails": {
"JobRuns": [
{
"Id": "jr_108804857dd29cb1857c92d3e8bf0b48f7685c246e56125b713eb6ea7ebfe4e2",
"Attempt": 0,
"TriggerName": "redshift_event",
"JobName": "Redshift DBT Job",
"JobMode": "VISUAL",
"JobRunQueuingEnabled": false,
"StartedOn": "2024-09-30 17:07:59.185000",
"LastModifiedOn": "2024-09-30 17:08:03.003000",
"CompletedOn": "2024-09-30 17:08:03.003000",
"JobRunState": "FAILED",
"ErrorMessage": "Error Message",
"PredecessorRuns": [],
"AllocatedCapacity": 10,
"ExecutionTime": 0,
"Timeout": 2880,
"MaxCapacity": 10.0,
"WorkerType": "G.1X",
"NumberOfWorkers": 10,
"LogGroupName": "/aws-glue/jobs",
"GlueVersion": "4.0",
"ExecutionClass": "STANDARD"
}
]
}
}
],
"Edges": [
{
"SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123"
}
]
}
},
"Graph": {
"Nodes": [
{
"Type": "TRIGGER",
"Name": "redshift_event",
"UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"TriggerDetails": {
"Trigger": {
"Name": "redshift_event",
"WorkflowName": "redshift workflow",
"Type": "ON_DEMAND",
"State": "CREATED",
"Actions": [
{
"JobName": "Redshift DBT Job"
}
]
}
}
},
{
"Type": "JOB",
"Name": "Redshift DBT Job",
"UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123",
"JobDetails": {}
}
],
"Edges": [
{
"SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123"
}
]
}
}
"""
)
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
name=EntityName(root="redshift workflow"),
displayName="redshift workflow",
description=None,
dataProducts=None,
sourceUrl=SourceUrl(
root="https://us-east-2.console.aws.amazon.com/glue/home?region=us-east-2#/v2/etl-configuration/workflows/view/redshift workflow"
),
concurrency=None,
pipelineLocation=None,
startDate=None,
tasks=[
Task(
name="redshift_event",
displayName="redshift_event",
fullyQualifiedName=None,
description=None,
sourceUrl=None,
downstreamTasks=["Redshift DBT Job"],
taskType="TRIGGER",
taskSQL=None,
startDate=None,
endDate=None,
tags=None,
owners=None,
),
Task(
name="Redshift DBT Job",
displayName="Redshift DBT Job",
fullyQualifiedName=None,
description=None,
sourceUrl=None,
downstreamTasks=[],
taskType="JOB",
taskSQL=None,
startDate=None,
endDate=None,
tags=None,
owners=None,
),
],
tags=None,
owners=None,
service=FullyQualifiedEntityName(root="gluepipeline_test"),
extension=None,
scheduleInterval=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
MOCK_PIPELINE_SERVICE = PipelineService(
id="85811038-099a-11ed-861d-0242ac120002",
name="gluepipeline_test",
fullyQualifiedName=FullyQualifiedEntityName("gluepipeline_test"),
connection=PipelineConnection(),
serviceType=PipelineServiceType.DBTCloud,
)
MOCK_PIPELINE = Pipeline(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name=EntityName(root="redshift workflow"),
fullyQualifiedName="gluepipeline_test.redshift workflow",
displayName="OpenMetadata DBTCloud Workflow",
description=Markdown(root="Example Job Description"),
dataProducts=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/jobs/70403103936332"
),
concurrency=None,
pipelineLocation=None,
startDate=None,
tasks=[
Task(
name="70403110257794",
displayName=None,
fullyQualifiedName=None,
description=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403110257794/"
),
downstreamTasks=None,
taskType=None,
taskSQL=None,
startDate="2024-05-27 10:42:20.621788+00:00",
endDate="2024-05-28 10:42:52.622408+00:00",
tags=None,
owners=None,
),
Task(
name="70403111615088",
displayName=None,
fullyQualifiedName=None,
description=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403111615088/"
),
downstreamTasks=None,
taskType=None,
taskSQL=None,
startDate="None",
endDate="None",
tags=None,
owners=None,
),
],
tags=None,
owners=None,
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
extension=None,
scheduleInterval="6 */12 * * 0,1,2,3,4,5,6",
domain=None,
lifeCycle=None,
sourceHash=None,
)
EXPECTED_PIPELINE_NAME = "redshift workflow"
class GluePipelineUnitTest(TestCase):
"""
DBTCloud unit tests
"""
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.model_validate(mock_glue_config)
self.gluepipeline = GluepipelineSource.create(
mock_glue_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
self.gluepipeline.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root
self.gluepipeline.context.get().__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root
def test_pipeline_name(self):
assert (
self.gluepipeline.get_pipeline_name(EXPECTED_JOB_DETAILS)
== EXPECTED_PIPELINE_NAME
)
def test_pipelines(self):
pipeline = list(self.gluepipeline.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right
assert pipeline == EXPECTED_CREATED_PIPELINES