diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py index 63731473b72..83c49c0724a 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -22,6 +22,61 @@ REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"] # Based on https://schemas.getdbt.com/dbt/catalog/v1.json REQUIRED_CATALOG_KEYS = ["name", "type", "index"] +REQUIRED_RESULTS_KEYS = { + "status", + "timing", + "thread_id", + "execution_time", + "message", + "adapter_response", + "unique_id", +} + +REQUIRED_NODE_KEYS = { + "schema_", + "schema", + "freshness", + "name", + "resource_type", + "path", + "unique_id", + "source_name", + "source_description", + "source_meta", + "loader", + "identifier", + "relation_name", + "fqn", + "alias", + "checksum", + "config", + "column_name", + "test_metadata", + "original_file_path", + "root_path", + "database", + "tags", + "description", + "columns", + "meta", + "owner", + "created_at", + "group", + "sources", + "compiled", + "docs", + "version", + "latest_version", + "package_name", + "depends_on", + "compiled_code", + "compiled_sql", + "raw_code", + "raw_sql", + "language", +} + + NONE_KEYWORDS_LIST = ["none", "null"] DBT_CATALOG_FILE_NAME = "catalog.json" diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 44307b6df75..aa2d65f4e2c 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -13,7 +13,7 @@ DBT service Topology. """ from abc import ABC, abstractmethod -from typing import Iterable +from typing import Iterable, List from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results from pydantic import Field @@ -37,6 +37,10 @@ from metadata.ingestion.models.topology import ( TopologyNode, ) from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.ingestion.source.database.dbt.constants import ( + REQUIRED_NODE_KEYS, + REQUIRED_RESULTS_KEYS, +) from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details from metadata.ingestion.source.database.dbt.models import ( DbtFiles, @@ -169,51 +173,27 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): } ) - required_nodes_keys = { - "schema_", - "schema", - "name", - "resource_type", - "path", - "unique_id", - "fqn", - "alias", - "checksum", - "config", - "column_name", - "test_metadata", - "original_file_path", - "root_path", - "database", - "tags", - "description", - "columns", - "meta", - "owner", - "created_at", - "group", - "sources", - "compiled", - "docs", - "version", - "latest_version", - "package_name", - "depends_on", - "compiled_code", - "compiled_sql", - "raw_code", - "raw_sql", - "language", - } + for field in ["nodes", "sources"]: + for node, value in manifest_dict.get( # pylint: disable=unused-variable + field + ).items(): + keys_to_delete = [ + key for key in value if key.lower() not in REQUIRED_NODE_KEYS + ] + for key in keys_to_delete: + del value[key] - for node, value in manifest_dict.get( # pylint: disable=unused-variable - "nodes" - ).items(): - keys_to_delete = [ - key for key in value if key.lower() not in required_nodes_keys - ] - for key in keys_to_delete: - del value[key] + def remove_run_result_non_required_keys(self, run_results: List[dict]): + """ + Method to remove the non required keys from run results file + """ + for run_result in run_results: + for result in run_result.get("results"): + keys_to_delete = [ + key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS + ] + for key in keys_to_delete: + del result[key] def get_dbt_files(self) -> Iterable[DbtFiles]: dbt_files = get_dbt_details(self.source_config.dbtConfigSource) @@ -225,6 +205,10 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): self.remove_manifest_non_required_keys( manifest_dict=self.context.get().dbt_file.dbt_manifest ) + if self.context.get().dbt_file.dbt_run_results: + self.remove_run_result_non_required_keys( + run_results=self.context.get().dbt_file.dbt_run_results + ) dbt_objects = DbtObjects( dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog) if self.context.get().dbt_file.dbt_catalog diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py index 94dc58f7c56..66ef26f2721 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py @@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import ( Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( GluePipelineConnection, ) @@ -40,14 +41,25 @@ from metadata.generated.schema.type.basic import ( SourceUrl, Timestamp, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.gluepipeline.models import ( + AmazonRedshift, + CatalogSource, + JDBCSource, + JobNodeResponse, + S3Source, + S3Target, +) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.logger import ingestion_logger -from metadata.utils.time_utils import convert_timestamp_to_milliseconds +from metadata.utils.time_utils import datetime_to_timestamp logger = ingestion_logger() @@ -63,6 +75,28 @@ STATUS_MAP = { "incomplete": StatusType.Failed, "pending": StatusType.Pending, } +TABLE_MODEL_MAP = { + "AmazonRedshiftSource": AmazonRedshift, + "AmazonRedshiftTarget": AmazonRedshift, + "AthenaConnectorSource": JDBCSource, + "JDBCConnectorSource": JDBCSource, + "JDBCConnectorTarget": JDBCSource, + "DirectJDBCSource": CatalogSource, + "RedshiftSource": CatalogSource, + "RedshiftTarget": CatalogSource, + "DirectJDBC": CatalogSource, +} +STORAGE_MODEL_MAP = { + "S3CsvSource": S3Source, + "S3JsonSource": S3Source, + "S3ParquetSource": S3Source, + "S3HudiSource": S3Source, + "S3DeltaSource": S3Source, + "S3DirectTarget": S3Target, + "S3DeltaDirectTarget": S3Target, + "S3GlueParquetTarget": S3Target, + "S3HudiDirectTarget": S3Target, +} class GluepipelineSource(PipelineServiceSource): @@ -145,9 +179,88 @@ class GluepipelineSource(PipelineServiceSource): downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]]) return downstream_tasks + def get_lineage_details(self, job) -> Optional[dict]: + """ + Get the Lineage Details of the pipeline + """ + lineage_details = {"sources": [], "targets": []} + try: + job_details = JobNodeResponse.model_validate( + self.glue.get_job(JobName=job) + ).Job + if job_details and job_details.config_nodes: + nodes = job_details.config_nodes + for _, node in nodes.items(): + for key, entity in node.items(): + table_model, storage_model = None, None + if key in TABLE_MODEL_MAP: + table_model = TABLE_MODEL_MAP[key].model_validate(entity) + elif "Catalog" in key: + table_model = CatalogSource.model_validate(entity) + elif key in STORAGE_MODEL_MAP: + storage_model = STORAGE_MODEL_MAP[key].model_validate( + entity + ) + if table_model: + for db_service_name in self.get_db_service_names(): + table_entity = self.metadata.get_entity_reference( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=table_model.table_name, + database_name=table_model.database_name, + schema_name=table_model.schema_name, + service_name=db_service_name, + ), + ) + if table_entity: + if key.endswith("Source"): + lineage_details["sources"].append(table_entity) + else: + lineage_details["targets"].append(table_entity) + break + if storage_model: + for path in storage_model.Paths or [storage_model.Path]: + container = self.metadata.es_search_container_by_path( + full_path=path + ) + if container and container[0]: + storage_entity = EntityReference( + id=container[0].id, + type="container", + name=container[0].name.root, + fullyQualifiedName=container[ + 0 + ].fullyQualifiedName.root, + ) + if storage_entity: + if key.endswith("Source"): + lineage_details["sources"].append( + storage_entity + ) + else: + lineage_details["targets"].append( + storage_entity + ) + break + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to get lineage details for job : {job} due to : {exc}" + ) + return lineage_details + def yield_pipeline_status( self, pipeline_details: Any ) -> Iterable[Either[OMetaPipelineStatus]]: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) for job in self.job_name_list: try: runs = self.glue.get_job_runs(JobName=job) @@ -161,13 +274,13 @@ class GluepipelineSource(PipelineServiceSource): attempt["JobRunState"].lower(), StatusType.Pending ).value, startTime=Timestamp( - convert_timestamp_to_milliseconds( - attempt["StartedOn"].timestamp() + datetime_to_timestamp( + attempt["StartedOn"], milliseconds=True ) ), endTime=Timestamp( - convert_timestamp_to_milliseconds( - attempt["CompletedOn"].timestamp() + datetime_to_timestamp( + attempt["CompletedOn"], milliseconds=True ) ), ) @@ -175,20 +288,14 @@ class GluepipelineSource(PipelineServiceSource): pipeline_status = PipelineStatus( taskStatus=task_status, timestamp=Timestamp( - convert_timestamp_to_milliseconds( - attempt["StartedOn"].timestamp() + datetime_to_timestamp( + attempt["StartedOn"], milliseconds=True ) ), executionStatus=STATUS_MAP.get( attempt["JobRunState"].lower(), StatusType.Pending ).value, ) - pipeline_fqn = fqn.build( - metadata=self.metadata, - entity_type=Pipeline, - service_name=self.context.get().pipeline_service, - pipeline_name=self.context.get().pipeline, - ) yield Either( right=OMetaPipelineStatus( pipeline_fqn=pipeline_fqn, @@ -199,7 +306,7 @@ class GluepipelineSource(PipelineServiceSource): yield Either( left=StackTraceError( name=pipeline_fqn, - error=f"Failed to yield pipeline status: {exc}", + error=f"Failed to yield pipeline status for job {job}: {exc}", stackTrace=traceback.format_exc(), ) ) @@ -210,3 +317,42 @@ class GluepipelineSource(PipelineServiceSource): """ Get lineage between pipeline and data sources """ + try: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + pipeline_entity = self.metadata.get_by_name( + entity=Pipeline, fqn=pipeline_fqn + ) + + lineage_details = LineageDetails( + pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"), + source=LineageSource.PipelineLineage, + ) + + for job in self.job_name_list: + lineage_enities = self.get_lineage_details(job) + for source in lineage_enities.get("sources"): + for target in lineage_enities.get("targets"): + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=source, + toEntity=target, + lineageDetails=lineage_details, + ) + ) + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=pipeline_details.get(NAME), + error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py new file mode 100644 index 00000000000..84090b1febe --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py @@ -0,0 +1,78 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Glue Pipeline Source Model module +""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class EntityDetails(BaseModel): + Value: str + + +class SourceDetails(BaseModel): + schema_details: EntityDetails = Field(alias="Schema") + table_details: EntityDetails = Field(alias="Table") + + +class AmazonRedshift(BaseModel): + Name: str + Data: SourceDetails + database_name: Optional[str] = None + + @property + def table_name(self): + if self.Data: + return self.Data.table_details.Value + return None + + @property + def schema_name(self): + if self.Data: + return self.Data.schema_details.Value + return None + + +class CatalogSource(BaseModel): + Name: str + database_name: str = Field(alias="Database") + schema_name: Optional[str] = None + table_name: str = Field(alias="Table") + + +class JDBCSource(BaseModel): + Name: str + schema_name: Optional[str] = Field(default=None, alias="SchemaName") + database_name: Optional[str] = None + table_name: str = Field(alias="ConnectionTable") + + +class S3Source(BaseModel): + Name: str + Paths: List[str] + + +class S3Target(BaseModel): + Name: str + Path: str + Paths: Optional[str] = None + + +class JobNodes(BaseModel): + config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes") + + +class JobNodeResponse(BaseModel): + Job: Optional[JobNodes] = None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 40449f08bc1..3df3712277a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -285,6 +285,16 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): else [] ) + def get_storage_service_names(self) -> List[str]: + """ + Get the list of storage service names + """ + return ( + self.source_config.lineageInformation.storageServiceNames or [] + if self.source_config.lineageInformation + else [] + ) + def prepare(self): """ Method to implement any required logic before starting the ingestion process diff --git a/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py b/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py new file mode 100644 index 00000000000..428d6e90025 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_gluepipeline.py @@ -0,0 +1,341 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test dbt cloud using the topology +""" +import json +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import ( + EntityName, + FullyQualifiedEntityName, + Markdown, + SourceUrl, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.pipeline.gluepipeline.metadata import GluepipelineSource + +mock_glue_config = { + "source": { + "type": "gluepipeline", + "serviceName": "local_gluepipeline", + "serviceConnection": { + "config": { + "type": "GluePipeline", + "awsConfig": { + "awsAccessKeyId": "aws_access_key_id", + "awsSecretAccessKey": "aws_secret_access_key", + "awsRegion": "us-east-2", + "endPointURL": "https://endpoint.com/", + }, + }, + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +EXPECTED_JOB_DETAILS = json.loads( + """ +{ + "Name": "redshift workflow", + "Description": "redshift workflow description", + "DefaultRunProperties": {}, + "CreatedOn": "2024-09-20 15:46:36.668000", + "LastModifiedOn": "2024-09-20 15:46:36.668000", + "LastRun": { + "Name": "redshift workflow", + "WorkflowRunId": "wr_6db99d3ea932db0739f03ba5ae56e4b635b7878261f75af062e1223a7272c50e", + "WorkflowRunProperties": {}, + "StartedOn": "2024-09-30 17:07:24.032000", + "CompletedOn": "2024-09-30 17:08:24.032000", + "Status": "COMPLETED", + "Statistics": { + "TotalActions": 1, + "TimeoutActions": 0, + "FailedActions": 1, + "StoppedActions": 0, + "SucceededActions": 0, + "RunningActions": 0, + "ErroredActions": 0, + "WaitingActions": 0 + }, + "Graph": { + "Nodes": [ + { + "Type": "TRIGGER", + "Name": "redshift_event", + "UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "TriggerDetails": { + "Trigger": { + "Name": "redshift_event", + "WorkflowName": "redshift workflow", + "Type": "ON_DEMAND", + "State": "CREATED", + "Actions": [ + { + "JobName": "Redshift DBT Job" + } + ] + } + } + }, + { + "Type": "JOB", + "Name": "Redshift DBT Job", + "UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123", + "JobDetails": { + "JobRuns": [ + { + "Id": "jr_108804857dd29cb1857c92d3e8bf0b48f7685c246e56125b713eb6ea7ebfe4e2", + "Attempt": 0, + "TriggerName": "redshift_event", + "JobName": "Redshift DBT Job", + "JobMode": "VISUAL", + "JobRunQueuingEnabled": false, + "StartedOn": "2024-09-30 17:07:59.185000", + "LastModifiedOn": "2024-09-30 17:08:03.003000", + "CompletedOn": "2024-09-30 17:08:03.003000", + "JobRunState": "FAILED", + "ErrorMessage": "Error Message", + "PredecessorRuns": [], + "AllocatedCapacity": 10, + "ExecutionTime": 0, + "Timeout": 2880, + "MaxCapacity": 10.0, + "WorkerType": "G.1X", + "NumberOfWorkers": 10, + "LogGroupName": "/aws-glue/jobs", + "GlueVersion": "4.0", + "ExecutionClass": "STANDARD" + } + ] + } + } + ], + "Edges": [ + { + "SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123" + } + ] + } + }, + "Graph": { + "Nodes": [ + { + "Type": "TRIGGER", + "Name": "redshift_event", + "UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "TriggerDetails": { + "Trigger": { + "Name": "redshift_event", + "WorkflowName": "redshift workflow", + "Type": "ON_DEMAND", + "State": "CREATED", + "Actions": [ + { + "JobName": "Redshift DBT Job" + } + ] + } + } + }, + { + "Type": "JOB", + "Name": "Redshift DBT Job", + "UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123", + "JobDetails": {} + } + ], + "Edges": [ + { + "SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee", + "DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123" + } + ] + } +} +""" +) + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name=EntityName(root="redshift workflow"), + displayName="redshift workflow", + description=None, + dataProducts=None, + sourceUrl=SourceUrl( + root="https://us-east-2.console.aws.amazon.com/glue/home?region=us-east-2#/v2/etl-configuration/workflows/view/redshift workflow" + ), + concurrency=None, + pipelineLocation=None, + startDate=None, + tasks=[ + Task( + name="redshift_event", + displayName="redshift_event", + fullyQualifiedName=None, + description=None, + sourceUrl=None, + downstreamTasks=["Redshift DBT Job"], + taskType="TRIGGER", + taskSQL=None, + startDate=None, + endDate=None, + tags=None, + owners=None, + ), + Task( + name="Redshift DBT Job", + displayName="Redshift DBT Job", + fullyQualifiedName=None, + description=None, + sourceUrl=None, + downstreamTasks=[], + taskType="JOB", + taskSQL=None, + startDate=None, + endDate=None, + tags=None, + owners=None, + ), + ], + tags=None, + owners=None, + service=FullyQualifiedEntityName(root="gluepipeline_test"), + extension=None, + scheduleInterval=None, + domain=None, + lifeCycle=None, + sourceHash=None, +) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="gluepipeline_test", + fullyQualifiedName=FullyQualifiedEntityName("gluepipeline_test"), + connection=PipelineConnection(), + serviceType=PipelineServiceType.DBTCloud, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name=EntityName(root="redshift workflow"), + fullyQualifiedName="gluepipeline_test.redshift workflow", + displayName="OpenMetadata DBTCloud Workflow", + description=Markdown(root="Example Job Description"), + dataProducts=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/jobs/70403103936332" + ), + concurrency=None, + pipelineLocation=None, + startDate=None, + tasks=[ + Task( + name="70403110257794", + displayName=None, + fullyQualifiedName=None, + description=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403110257794/" + ), + downstreamTasks=None, + taskType=None, + taskSQL=None, + startDate="2024-05-27 10:42:20.621788+00:00", + endDate="2024-05-28 10:42:52.622408+00:00", + tags=None, + owners=None, + ), + Task( + name="70403111615088", + displayName=None, + fullyQualifiedName=None, + description=None, + sourceUrl=SourceUrl( + root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403111615088/" + ), + downstreamTasks=None, + taskType=None, + taskSQL=None, + startDate="None", + endDate="None", + tags=None, + owners=None, + ), + ], + tags=None, + owners=None, + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), + extension=None, + scheduleInterval="6 */12 * * 0,1,2,3,4,5,6", + domain=None, + lifeCycle=None, + sourceHash=None, +) + +EXPECTED_PIPELINE_NAME = "redshift workflow" + + +class GluePipelineUnitTest(TestCase): + """ + DBTCloud unit tests + """ + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + + config = OpenMetadataWorkflowConfig.model_validate(mock_glue_config) + self.gluepipeline = GluepipelineSource.create( + mock_glue_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.gluepipeline.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root + self.gluepipeline.context.get().__dict__[ + "pipeline_service" + ] = MOCK_PIPELINE_SERVICE.name.root + + def test_pipeline_name(self): + assert ( + self.gluepipeline.get_pipeline_name(EXPECTED_JOB_DETAILS) + == EXPECTED_PIPELINE_NAME + ) + + def test_pipelines(self): + pipeline = list(self.gluepipeline.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right + assert pipeline == EXPECTED_CREATED_PIPELINES diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index ee3b29a1a8f..b9149c998a8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -175,6 +175,8 @@ public interface SearchClient { void deleteEntityByFields(List indexName, List> fieldAndValue); + void deleteEntityByFQNPrefix(String indexName, String fqnPrefix); + void softDeleteOrRestoreEntity(String indexName, String docId, String scriptTxt); void softDeleteOrRestoreChildren( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 653221d4148..511ec91cc14 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -554,6 +554,25 @@ public class SearchRepository { } } + public void deleteEntityByFQNPrefix(EntityInterface entity) { + if (entity != null) { + String entityType = entity.getEntityReference().getType(); + String fqn = entity.getFullyQualifiedName(); + IndexMapping indexMapping = entityIndexMap.get(entityType); + try { + searchClient.deleteEntityByFQNPrefix(indexMapping.getIndexName(clusterAlias), fqn); + } catch (Exception ie) { + LOG.error( + "Issue in Deleting the search document for entityFQN [{}] and entityType [{}]. Reason[{}], Cause[{}], Stack [{}]", + fqn, + entityType, + ie.getMessage(), + ie.getCause(), + ExceptionUtils.getStackTrace(ie)); + } + } + } + public void deleteTimeSeriesEntityById(EntityTimeSeriesInterface entity) { if (entity != null) { String entityId = entity.getId().toString(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index bb94153cdf5..12857e56628 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -71,6 +71,7 @@ import es.org.elasticsearch.index.query.BoolQueryBuilder; import es.org.elasticsearch.index.query.MatchQueryBuilder; import es.org.elasticsearch.index.query.MultiMatchQueryBuilder; import es.org.elasticsearch.index.query.Operator; +import es.org.elasticsearch.index.query.PrefixQueryBuilder; import es.org.elasticsearch.index.query.QueryBuilder; import es.org.elasticsearch.index.query.QueryBuilders; import es.org.elasticsearch.index.query.QueryStringQueryBuilder; @@ -1619,6 +1620,16 @@ public class ElasticSearchClient implements SearchClient { } } + @Override + public void deleteEntityByFQNPrefix(String indexName, String fqnPrefix) { + if (isClientAvailable) { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName); + deleteByQueryRequest.setQuery( + new PrefixQueryBuilder("fullyQualifiedName.keyword", fqnPrefix.toLowerCase())); + deleteEntityFromElasticSearchByQuery(deleteByQueryRequest); + } + } + @Override public void softDeleteOrRestoreEntity(String indexName, String docId, String scriptTxt) { if (isClientAvailable) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 92073335d97..1d63d82cc9e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -155,6 +155,7 @@ import os.org.opensearch.index.query.BoolQueryBuilder; import os.org.opensearch.index.query.MatchQueryBuilder; import os.org.opensearch.index.query.MultiMatchQueryBuilder; import os.org.opensearch.index.query.Operator; +import os.org.opensearch.index.query.PrefixQueryBuilder; import os.org.opensearch.index.query.QueryBuilder; import os.org.opensearch.index.query.QueryBuilders; import os.org.opensearch.index.query.QueryStringQueryBuilder; @@ -1589,6 +1590,16 @@ public class OpenSearchClient implements SearchClient { } } + @Override + public void deleteEntityByFQNPrefix(String indexName, String fqnPrefix) { + if (isClientAvailable) { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName); + deleteByQueryRequest.setQuery( + new PrefixQueryBuilder("fullyQualifiedName.keyword", fqnPrefix.toLowerCase())); + deleteEntityFromOpenSearchByQuery(deleteByQueryRequest); + } + } + @Override public void deleteEntityByFields( List indexName, List> fieldAndValue) { diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AppContainer/app-container.less b/openmetadata-ui/src/main/resources/ui/src/components/AppContainer/app-container.less index e0d632e2ab7..e8eb7eda389 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AppContainer/app-container.less +++ b/openmetadata-ui/src/main/resources/ui/src/components/AppContainer/app-container.less @@ -24,6 +24,7 @@ background-color: white; .ant-layout-header { line-height: inherit; + background: @white; } .navbar-container { border-bottom: 1px solid @border-color; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/MyData/LeftSidebar/left-sidebar.less b/openmetadata-ui/src/main/resources/ui/src/components/MyData/LeftSidebar/left-sidebar.less index 09318aee17b..716c3962a13 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/MyData/LeftSidebar/left-sidebar.less +++ b/openmetadata-ui/src/main/resources/ui/src/components/MyData/LeftSidebar/left-sidebar.less @@ -261,6 +261,12 @@ color: initial; } } + + .left-sidebar-menu.ant-menu > .ant-menu-item { + &:first-child { + margin-top: 0; + } + } } .left-panel-item.active,