diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py index 94dc58f7c56..66ef26f2721 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py @@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import ( Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( GluePipelineConnection, ) @@ -40,14 +41,25 @@ from metadata.generated.schema.type.basic import ( SourceUrl, Timestamp, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.gluepipeline.models import ( + AmazonRedshift, + CatalogSource, + JDBCSource, + JobNodeResponse, + S3Source, + S3Target, +) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.logger import ingestion_logger -from metadata.utils.time_utils import convert_timestamp_to_milliseconds +from metadata.utils.time_utils import datetime_to_timestamp logger = ingestion_logger() @@ -63,6 +75,28 @@ STATUS_MAP = { "incomplete": StatusType.Failed, "pending": StatusType.Pending, } +TABLE_MODEL_MAP = { + "AmazonRedshiftSource": AmazonRedshift, + "AmazonRedshiftTarget": AmazonRedshift, + "AthenaConnectorSource": JDBCSource, + "JDBCConnectorSource": JDBCSource, + "JDBCConnectorTarget": JDBCSource, + "DirectJDBCSource": CatalogSource, + "RedshiftSource": CatalogSource, + "RedshiftTarget": CatalogSource, + "DirectJDBC": CatalogSource, +} +STORAGE_MODEL_MAP = { + "S3CsvSource": S3Source, + "S3JsonSource": S3Source, + "S3ParquetSource": S3Source, + "S3HudiSource": S3Source, + "S3DeltaSource": S3Source, + "S3DirectTarget": S3Target, + "S3DeltaDirectTarget": S3Target, + "S3GlueParquetTarget": S3Target, + "S3HudiDirectTarget": S3Target, +} class GluepipelineSource(PipelineServiceSource): @@ -145,9 +179,88 @@ class GluepipelineSource(PipelineServiceSource): downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]]) return downstream_tasks + def get_lineage_details(self, job) -> Optional[dict]: + """ + Get the Lineage Details of the pipeline + """ + lineage_details = {"sources": [], "targets": []} + try: + job_details = JobNodeResponse.model_validate( + self.glue.get_job(JobName=job) + ).Job + if job_details and job_details.config_nodes: + nodes = job_details.config_nodes + for _, node in nodes.items(): + for key, entity in node.items(): + table_model, storage_model = None, None + if key in TABLE_MODEL_MAP: + table_model = TABLE_MODEL_MAP[key].model_validate(entity) + elif "Catalog" in key: + table_model = CatalogSource.model_validate(entity) + elif key in STORAGE_MODEL_MAP: + storage_model = STORAGE_MODEL_MAP[key].model_validate( + entity + ) + if table_model: + for db_service_name in self.get_db_service_names(): + table_entity = self.metadata.get_entity_reference( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=table_model.table_name, + database_name=table_model.database_name, + schema_name=table_model.schema_name, + service_name=db_service_name, + ), + ) + if table_entity: + if key.endswith("Source"): + lineage_details["sources"].append(table_entity) + else: + lineage_details["targets"].append(table_entity) + break + if storage_model: + for path in storage_model.Paths or [storage_model.Path]: + container = self.metadata.es_search_container_by_path( + full_path=path + ) + if container and container[0]: + storage_entity = EntityReference( + id=container[0].id, + type="container", + name=container[0].name.root, + fullyQualifiedName=container[ + 0 + ].fullyQualifiedName.root, + ) + if storage_entity: + if key.endswith("Source"): + lineage_details["sources"].append( + storage_entity + ) + else: + lineage_details["targets"].append( + storage_entity + ) + break + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to get lineage details for job : {job} due to : {exc}" + ) + return lineage_details + def yield_pipeline_status( self, pipeline_details: Any ) -> Iterable[Either[OMetaPipelineStatus]]: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) for job in self.job_name_list: try: runs = self.glue.get_job_runs(JobName=job) @@ -161,13 +274,13 @@ class GluepipelineSource(PipelineServiceSource): attempt["JobRunState"].lower(), StatusType.Pending ).value, startTime=Timestamp( - convert_timestamp_to_milliseconds( - attempt["StartedOn"].timestamp() + datetime_to_timestamp( + attempt["StartedOn"], milliseconds=True ) ), endTime=Timestamp( - convert_timestamp_to_milliseconds( - attempt["CompletedOn"].timestamp() + datetime_to_timestamp( + attempt["CompletedOn"], milliseconds=True ) ), ) @@ -175,20 +288,14 @@ class GluepipelineSource(PipelineServiceSource): pipeline_status = PipelineStatus( taskStatus=task_status, timestamp=Timestamp( - convert_timestamp_to_milliseconds( - attempt["StartedOn"].timestamp() + datetime_to_timestamp( + attempt["StartedOn"], milliseconds=True ) ), executionStatus=STATUS_MAP.get( attempt["JobRunState"].lower(), StatusType.Pending ).value, ) - pipeline_fqn = fqn.build( - metadata=self.metadata, - entity_type=Pipeline, - service_name=self.context.get().pipeline_service, - pipeline_name=self.context.get().pipeline, - ) yield Either( right=OMetaPipelineStatus( pipeline_fqn=pipeline_fqn, @@ -199,7 +306,7 @@ class GluepipelineSource(PipelineServiceSource): yield Either( left=StackTraceError( name=pipeline_fqn, - error=f"Failed to yield pipeline status: {exc}", + error=f"Failed to yield pipeline status for job {job}: {exc}", stackTrace=traceback.format_exc(), ) ) @@ -210,3 +317,42 @@ class GluepipelineSource(PipelineServiceSource): """ Get lineage between pipeline and data sources """ + try: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + pipeline_entity = self.metadata.get_by_name( + entity=Pipeline, fqn=pipeline_fqn + ) + + lineage_details = LineageDetails( + pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"), + source=LineageSource.PipelineLineage, + ) + + for job in self.job_name_list: + lineage_enities = self.get_lineage_details(job) + for source in lineage_enities.get("sources"): + for target in lineage_enities.get("targets"): + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=source, + toEntity=target, + lineageDetails=lineage_details, + ) + ) + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=pipeline_details.get(NAME), + error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py new file mode 100644 index 00000000000..84090b1febe --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py @@ -0,0 +1,78 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Glue Pipeline Source Model module +""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class EntityDetails(BaseModel): + Value: str + + +class SourceDetails(BaseModel): + schema_details: EntityDetails = Field(alias="Schema") + table_details: EntityDetails = Field(alias="Table") + + +class AmazonRedshift(BaseModel): + Name: str + Data: SourceDetails + database_name: Optional[str] = None + + @property + def table_name(self): + if self.Data: + return self.Data.table_details.Value + return None + + @property + def schema_name(self): + if self.Data: + return self.Data.schema_details.Value + return None + + +class CatalogSource(BaseModel): + Name: str + database_name: str = Field(alias="Database") + schema_name: Optional[str] = None + table_name: str = Field(alias="Table") + + +class JDBCSource(BaseModel): + Name: str + schema_name: Optional[str] = Field(default=None, alias="SchemaName") + database_name: Optional[str] = None + table_name: str = Field(alias="ConnectionTable") + + +class S3Source(BaseModel): + Name: str + Paths: List[str] + + +class S3Target(BaseModel): + Name: str + Path: str + Paths: Optional[str] = None + + +class JobNodes(BaseModel): + config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes") + + +class JobNodeResponse(BaseModel): + Job: Optional[JobNodes] = None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 40449f08bc1..3df3712277a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -285,6 +285,16 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): else [] ) + def get_storage_service_names(self) -> List[str]: + """ + Get the list of storage service names + """ + return ( + self.source_config.lineageInformation.storageServiceNames or [] + if self.source_config.lineageInformation + else [] + ) + def prepare(self): """ Method to implement any required logic before starting the ingestion process diff --git a/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py b/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py new file mode 100644 index 00000000000..428d6e90025 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py @@ -0,0 +1,341 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test dbt cloud using the topology +""" +import json +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import ( + EntityName, + FullyQualifiedEntityName, + Markdown, + SourceUrl, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.pipeline.gluepipeline.metadata import GluepipelineSource + +mock_glue_config = { + "source": { + "type": "gluepipeline", + "serviceName": "local_gluepipeline", + "serviceConnection": { + "config": { + "type": "GluePipeline", + "awsConfig": { + "awsAccessKeyId": "aws_access_key_id", + "awsSecretAccessKey": "aws_secret_access_key", + "awsRegion": "us-east-2", + "endPointURL": "https://endpoint.com/", + }, + }, + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +EXPECTED_JOB_DETAILS = json.loads( + """ +{ + "Name": "redshift workflow", + "Description": "redshift workflow description", + "DefaultRunProperties": {}, + "CreatedOn": "2024-09-20 15:46:36.668000", + "LastModifiedOn": "2024-09-20 15:46:36.668000", + "LastRun": { + "Name": "redshift workflow", + "WorkflowRunId": "wr_6db99d3ea932db0739f03ba5ae56e4b635b7878261f75af062e1223a7272c50e", + "WorkflowRunProperties": {}, + "StartedOn": "2024-09-30 17:07:24.032000", + "CompletedOn": "2024-09-30 17:08:24.032000", + "Status": "COMPLETED", + "Statistics": { + "TotalActions": 1, + "TimeoutActions": 0, + "FailedActions": 1, + "StoppedActions": 0, + "SucceededActions": 0, + "RunningActions": 0, + "ErroredActions": 0, + "WaitingActions": 0 + }, + "Graph": { + "Nodes": [ + { + "Type": "TRIGGER", + "Name": "redshift_event", + "UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "TriggerDetails": { + "Trigger": { + "Name": "redshift_event", + "WorkflowName": "redshift workflow", + "Type": "ON_DEMAND", + "State": "CREATED", + "Actions": [ + { + "JobName": "Redshift DBT Job" + } + ] + } + } + }, + { + "Type": "JOB", + "Name": "Redshift DBT Job", + "UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123", + "JobDetails": { + "JobRuns": [ + { + "Id": "jr_108804857dd29cb1857c92d3e8bf0b48f7685c246e56125b713eb6ea7ebfe4e2", + "Attempt": 0, + "TriggerName": "redshift_event", + "JobName": "Redshift DBT Job", + "JobMode": "VISUAL", + "JobRunQueuingEnabled": false, + "StartedOn": "2024-09-30 17:07:59.185000", + "LastModifiedOn": "2024-09-30 17:08:03.003000", + "CompletedOn": "2024-09-30 17:08:03.003000", + "JobRunState": "FAILED", + "ErrorMessage": "Error Message", + "PredecessorRuns": [], + "AllocatedCapacity": 10, + "ExecutionTime": 0, + "Timeout": 2880, + "MaxCapacity": 10.0, + "WorkerType": "G.1X", + "NumberOfWorkers": 10, + "LogGroupName": "/aws-glue/jobs", + "GlueVersion": "4.0", + "ExecutionClass": "STANDARD" + } + ] + } + } + ], + "Edges": [ + { + "SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123" + } + ] + } + }, + "Graph": { + "Nodes": [ + { + "Type": "TRIGGER", + "Name": "redshift_event", + "UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "TriggerDetails": { + "Trigger": { + "Name": "redshift_event", + "WorkflowName": "redshift workflow", + "Type": "ON_DEMAND", + "State": "CREATED", + "Actions": [ + { + "JobName": "Redshift DBT Job" + } + ] + } + } + }, + { + "Type": "JOB", + "Name": "Redshift DBT Job", + "UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123", + "JobDetails": {} + } + ], + "Edges": [ + { + "SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123" + } + ] + } +} +""" +) + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name=EntityName(root="redshift workflow"), + displayName="redshift workflow", + description=None, + dataProducts=None, + sourceUrl=SourceUrl( + root="https://us-east-2.console.aws.amazon.com/glue/home?region=us-east-2#/v2/etl-configuration/workflows/view/redshift workflow" + ), + concurrency=None, + pipelineLocation=None, + startDate=None, + tasks=[ + Task( + name="redshift_event", + displayName="redshift_event", + fullyQualifiedName=None, + description=None, + sourceUrl=None, + downstreamTasks=["Redshift DBT Job"], + taskType="TRIGGER", + taskSQL=None, + startDate=None, + endDate=None, + tags=None, + owners=None, + ), + Task( + name="Redshift DBT Job", + displayName="Redshift DBT Job", + fullyQualifiedName=None, + description=None, + sourceUrl=None, + downstreamTasks=[], + taskType="JOB", + taskSQL=None, + startDate=None, + endDate=None, + tags=None, + owners=None, + ), + ], + tags=None, + owners=None, + service=FullyQualifiedEntityName(root="gluepipeline_test"), + extension=None, + scheduleInterval=None, + domain=None, + lifeCycle=None, + sourceHash=None, +) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="gluepipeline_test", + fullyQualifiedName=FullyQualifiedEntityName("gluepipeline_test"), + connection=PipelineConnection(), + serviceType=PipelineServiceType.DBTCloud, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name=EntityName(root="redshift workflow"), + fullyQualifiedName="gluepipeline_test.redshift workflow", + displayName="OpenMetadata DBTCloud Workflow", + description=Markdown(root="Example Job Description"), + dataProducts=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/jobs/70403103936332" + ), + concurrency=None, + pipelineLocation=None, + startDate=None, + tasks=[ + Task( + name="70403110257794", + displayName=None, + fullyQualifiedName=None, + description=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403110257794/" + ), + downstreamTasks=None, + taskType=None, + taskSQL=None, + startDate="2024-05-27 10:42:20.621788+00:00", + endDate="2024-05-28 10:42:52.622408+00:00", + tags=None, + owners=None, + ), + Task( + name="70403111615088", + displayName=None, + fullyQualifiedName=None, + description=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403111615088/" + ), + downstreamTasks=None, + taskType=None, + taskSQL=None, + startDate="None", + endDate="None", + tags=None, + owners=None, + ), + ], + tags=None, + owners=None, + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), + extension=None, + scheduleInterval="6 */12 * * 0,1,2,3,4,5,6", + domain=None, + lifeCycle=None, + sourceHash=None, +) + +EXPECTED_PIPELINE_NAME = "redshift workflow" + + +class GluePipelineUnitTest(TestCase): + """ + DBTCloud unit tests + """ + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + + config = OpenMetadataWorkflowConfig.model_validate(mock_glue_config) + self.gluepipeline = GluepipelineSource.create( + mock_glue_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.gluepipeline.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root + self.gluepipeline.context.get().__dict__[ + "pipeline_service" + ] = MOCK_PIPELINE_SERVICE.name.root + + def test_pipeline_name(self): + assert ( + self.gluepipeline.get_pipeline_name(EXPECTED_JOB_DETAILS) + == EXPECTED_PIPELINE_NAME + ) + + def test_pipelines(self): + pipeline = list(self.gluepipeline.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right + assert pipeline == EXPECTED_CREATED_PIPELINES