diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 3d11397693a..1e7e27954b5 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -244,6 +244,7 @@ eventHandlerConfiguration: - "org.openmetadata.service.events.WebAnalyticEventHandler" pipelineServiceClientConfiguration: + enabled: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true} # If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient" className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"} apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080} diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py index 83921ab19d7..dd025194af9 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -45,7 +45,7 @@ class OMetaIngestionPipelineMixin: :param pipeline_status: Pipeline Status data to add """ resp = self.client.put( - f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus", + f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus", data=pipeline_status.json(), ) logger.debug( @@ -63,7 +63,7 @@ class OMetaIngestionPipelineMixin: :param pipeline_status_run_id: Pipeline Status run id """ resp = self.client.get( - f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}" + f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}" ) if resp: return PipelineStatus(**resp) @@ -76,7 +76,7 @@ class OMetaIngestionPipelineMixin: ingestion_pipeline_id (str): ingestion pipeline uuid """ resp = self.client.post( - f"/services/ingestionPipelines/trigger/{ingestion_pipeline_id}" + f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}" ) return IngestionPipeline.parse_obj(resp) @@ -98,7 +98,7 @@ class OMetaIngestionPipelineMixin: params = {"startTs": start_ts, "endTs": end_ts} resp = self.client.get( - f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus", + f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus", data=params, ) @@ -120,7 +120,7 @@ class OMetaIngestionPipelineMixin: """ fields_str = "?fields=" + ",".join(fields) if fields else "" resp = self.client.get( - f"/services/ingestionPipelines{fields_str}", + f"{self.get_suffix(IngestionPipeline)}{fields_str}", data=params, ) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index f83c07e1e06..8e8df29101c 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -20,6 +20,9 @@ from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union from pydantic import BaseModel from requests.utils import quote +from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( + CreateIngestionPipelineRequest, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -185,6 +188,8 @@ class OpenMetadata( Based on the entity, return the module path it is found inside generated """ + if issubclass(entity, CreateIngestionPipelineRequest): + return "services.ingestionPipelines" return entity.__module__.split(".")[-2] def get_create_entity_type(self, entity: Type[T]) -> Type[C]: @@ -233,6 +238,7 @@ class OpenMetadata( .replace("testcase", "testCase") .replace("searchindex", "searchIndex") .replace("storedprocedure", "storedProcedure") + .replace("ingestionpipeline", "ingestionPipeline") ) class_path = ".".join( filter( diff --git a/ingestion/src/metadata/ingestion/ometa/routes.py b/ingestion/src/metadata/ingestion/ometa/routes.py index 5db8ec881d9..c8e6669c841 100644 --- a/ingestion/src/metadata/ingestion/ometa/routes.py +++ b/ingestion/src/metadata/ingestion/ometa/routes.py @@ -72,6 +72,9 @@ from metadata.generated.schema.api.services.createSearchService import ( from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) +from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( + CreateIngestionPipelineRequest, +) from metadata.generated.schema.api.teams.createRole import CreateRoleRequest from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest @@ -198,6 +201,7 @@ ROUTES = { SearchService.__name__: "/services/searchServices", CreateSearchServiceRequest.__name__: "/services/searchServices", IngestionPipeline.__name__: "/services/ingestionPipelines", + CreateIngestionPipelineRequest.__name__: "/services/ingestionPipelines", TestConnectionDefinition.__name__: "/services/testConnectionDefinitions", # Data Quality TestDefinition.__name__: "/dataQuality/testDefinitions", diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 871efcdbabf..c0c0990bf17 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -18,7 +18,79 @@ from typing import Type from pydantic import BaseModel +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( + DatabaseServiceQueryLineagePipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import ( + DatabaseServiceQueryUsagePipeline, +) +from metadata.generated.schema.metadataIngestion.dataInsightPipeline import ( + DataInsightPipeline, +) +from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline +from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import ( + MessagingServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.metadataToElasticSearchPipeline import ( + MetadataToElasticSearchPipeline, +) +from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import ( + MlModelServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + PipelineServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import ( + SearchServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( + StorageServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuitePipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import SourceConfig + +SERVICE_TYPE_REF = { + ServiceType.Database.value: "databaseService", + ServiceType.Dashboard.value: "dashboardService", + ServiceType.Pipeline.value: "pipelineService", + ServiceType.Messaging.value: "messagingService", + ServiceType.MlModel.value: "mlmodelService", + ServiceType.Metadata.value: "metadataService", + ServiceType.Search.value: "searchService", + ServiceType.Storage.value: "storageService", +} + +SOURCE_CONFIG_TYPE_INGESTION = { + DatabaseServiceMetadataPipeline.__name__: PipelineType.metadata, + DatabaseServiceQueryUsagePipeline.__name__: PipelineType.usage, + DatabaseServiceQueryLineagePipeline.__name__: PipelineType.lineage, + DatabaseServiceProfilerPipeline.__name__: PipelineType.profiler, + DashboardServiceMetadataPipeline.__name__: PipelineType.metadata, + MessagingServiceMetadataPipeline.__name__: PipelineType.metadata, + PipelineServiceMetadataPipeline.__name__: PipelineType.metadata, + MlModelServiceMetadataPipeline.__name__: PipelineType.metadata, + StorageServiceMetadataPipeline.__name__: PipelineType.metadata, + SearchServiceMetadataPipeline.__name__: PipelineType.metadata, + TestSuitePipeline.__name__: PipelineType.TestSuite, + MetadataToElasticSearchPipeline.__name__: PipelineType.elasticSearchReindex, + DataInsightPipeline.__name__: PipelineType.dataInsight, + DbtPipeline.__name__: PipelineType.dbt, +} def _clean(source_type: str): @@ -31,6 +103,20 @@ def _clean(source_type: str): return source_type +def get_pipeline_type_from_source_config( + source_config_type: SourceConfig.__fields__["config"].type_, +) -> PipelineType: + """From the YAML serviceType, get the Ingestion Pipeline Type""" + pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get( + source_config_type.__class__.__name__ + ) + if not pipeline_type: + raise ValueError( + f"Cannot find Pipeline Type for SourceConfig {source_config_type}" + ) + return pipeline_type + + def _get_service_type_from( # pylint: disable=inconsistent-return-statements service_subtype: str, ) -> ServiceType: @@ -53,6 +139,16 @@ def get_service_type_from_source_type(source_type: str) -> ServiceType: return _get_service_type_from(_clean(source_type)) +def get_reference_type_from_service_type(service_type: ServiceType) -> str: + """Get the type to build the EntityReference from the service type""" + service_reference = SERVICE_TYPE_REF.get(service_type.value) + if not service_type: + raise ValueError( + f"Cannot find Service Type reference for service {service_type}" + ) + return service_reference + + def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]: """ Method to get service class from service type diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 77e76ed136c..edc90d79453 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -24,6 +24,9 @@ import uuid from abc import ABC, abstractmethod from typing import Optional, Tuple, TypeVar, cast +from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( + CreateIngestionPipelineRequest, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -31,12 +34,15 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp ServiceConnection, ) from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + AirflowConfig, + IngestionPipeline, PipelineState, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.tests.testSuite import ServiceType +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage @@ -44,7 +50,10 @@ from metadata.ingestion.models.custom_types import ServiceWithConnectionType from metadata.ingestion.ometa.client_utils import create_ometa_client from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.timer.repeated_timer import RepeatedTimer +from metadata.utils import fqn from metadata.utils.class_helper import ( + get_pipeline_type_from_source_config, + get_reference_type_from_service_type, get_service_class_from_service_type, get_service_type_from_source_type, ) @@ -87,6 +96,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): """ self.config = config self._timer: Optional[RepeatedTimer] = None + self._ingestion_pipeline: Optional[IngestionPipeline] = None set_loggers_level(config.workflowConfig.loggerLevel.value) @@ -109,6 +119,14 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): # Informs the `source` and the rest of `steps` to execute self.set_steps() + @property + def ingestion_pipeline(self): + """Get or create the Ingestion Pipeline from the configuration""" + if not self._ingestion_pipeline: + self._ingestion_pipeline = self.get_or_create_ingestion_pipeline() + + return self._ingestion_pipeline + @abstractmethod def set_steps(self): """ @@ -251,3 +269,48 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): f"Unknown error getting service connection for service name [{service_name}]" f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) + + def get_or_create_ingestion_pipeline(self) -> Optional[IngestionPipeline]: + """ + If we get the `ingestionPipelineFqn` from the `workflowConfig`, it means we want to + keep track of the status. + - During the UI deployment, the IngestionPipeline is already created from the UI. + - From external deployments, we might need to create the Ingestion Pipeline the first time + the YAML is executed. + If the Ingestion Pipeline is not created, create it now to update the status. + + Note that during the very first run, the service might not even be created yet. In that case, + we won't be able to flag the RUNNING status. We'll wait until the metadata ingestion + workflow has prepared the necessary components, and we will update the SUCCESS/FAILED + status at the end of the flow. + """ + maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name( + entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN + ) + + _, pipeline_name = fqn.split( + self.config.ingestionPipelineFQN + ) # Get the name from . + service = self.metadata.get_by_name( + entity=get_service_class_from_service_type(self.service_type), + fqn=self.config.source.serviceName, + ) + + if maybe_pipeline is None and service is not None: + + return self.metadata.create_or_update( + CreateIngestionPipelineRequest( + name=pipeline_name, + service=EntityReference( + id=service.id, + type=get_reference_type_from_service_type(self.service_type), + ), + pipelineType=get_pipeline_type_from_source_config( + self.config.source.sourceConfig.config + ), + sourceConfig=self.config.source.sourceConfig, + airflowConfig=AirflowConfig(), + ) + ) + + return maybe_pipeline diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index adee5cd0dcc..12056832de5 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -92,7 +92,7 @@ class WorkflowStatusMixin: """ # if we don't have a related Ingestion Pipeline FQN, no status is set. - if self.config.ingestionPipelineFQN: + if self.config.ingestionPipelineFQN and self.ingestion_pipeline: if state in (PipelineState.queued, PipelineState.running): pipeline_status = PipelineStatus( runId=self.run_id, diff --git a/ingestion/tests/unit/test_ometa_endpoints.py b/ingestion/tests/unit/test_ometa_endpoints.py index b3cc71b2af1..26cf9581857 100644 --- a/ingestion/tests/unit/test_ometa_endpoints.py +++ b/ingestion/tests/unit/test_ometa_endpoints.py @@ -18,6 +18,9 @@ from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.services.createDatabaseService import ( CreateDatabaseServiceRequest, ) +from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( + CreateIngestionPipelineRequest, +) from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -34,6 +37,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata ) from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + IngestionPipeline, +) from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.teams.user import User @@ -118,3 +124,6 @@ class OMetaEndpointTest(TestCase): entity = self.metadata.get_entity_from_create(CreateUserRequest) assert issubclass(entity, User) + + entity = self.metadata.get_entity_from_create(CreateIngestionPipelineRequest) + assert issubclass(entity, IngestionPipeline) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index 8f49100170a..40fdb665285 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -232,7 +232,7 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { + public List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) { HttpResponse response; try { String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true"; @@ -256,7 +256,7 @@ public class AirflowRESTClient extends PipelineServiceClient { * Auth failed when accessing Airflow APIs 3. Different versions between server and client */ @Override - public PipelineServiceClientResponse getServiceStatus() { + public PipelineServiceClientResponse getServiceStatusInternal() { HttpResponse response; try { response = getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java index fd4ab494610..685d3d247db 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java @@ -33,7 +33,7 @@ public class NoopClient extends PipelineServiceClient { } @Override - public PipelineServiceClientResponse getServiceStatus() { + public PipelineServiceClientResponse getServiceStatusInternal() { return null; } @@ -60,7 +60,7 @@ public class NoopClient extends PipelineServiceClient { } @Override - public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { + public List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) { return null; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java index 8a9400d128e..8385d15b3c6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java @@ -26,6 +26,7 @@ public class PipelineServiceStatusJobHandler { public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry"; public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName"; + private final PipelineServiceClientConfiguration config; private final PipelineServiceClient pipelineServiceClient; private final PrometheusMeterRegistry meterRegistry; private final String clusterName; @@ -36,6 +37,7 @@ public class PipelineServiceStatusJobHandler { private PipelineServiceStatusJobHandler(PipelineServiceClientConfiguration config, String clusterName) throws SchedulerException { + this.config = config; this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config); this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; this.clusterName = clusterName; @@ -77,12 +79,15 @@ public class PipelineServiceStatusJobHandler { } public void addPipelineServiceStatusJob() { - try { - JobDetail jobDetail = jobBuilder(); - Trigger trigger = getTrigger(); - scheduler.scheduleJob(jobDetail, trigger); - } catch (Exception ex) { - LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); + // Only register the job to listen to the status if the Pipeline Service Client is indeed enabled + if (config.getEnabled().equals(Boolean.TRUE)) { + try { + JobDetail jobDetail = jobBuilder(); + Trigger trigger = getTrigger(); + scheduler.scheduleJob(jobDetail, trigger); + } catch (Exception ex) { + LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); + } } } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java index ffb71276dd5..1c3323071df 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java @@ -17,7 +17,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public PipelineServiceClientResponse getServiceStatus() { + public PipelineServiceClientResponse getServiceStatusInternal() { return null; } @@ -44,7 +44,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { + public List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) { return null; } diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java index 449110905ab..6318e2cffec 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.time.Duration; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; @@ -56,6 +57,7 @@ import org.openmetadata.sdk.exception.PipelineServiceVersionException; */ @Slf4j public abstract class PipelineServiceClient { + protected final boolean pipelineServiceClientEnabled; protected final String hostIp; protected final boolean ingestionIpInfoEnabled; @@ -67,6 +69,7 @@ public abstract class PipelineServiceClient { protected static final String CONTENT_TYPE = "application/json"; private static final Integer MAX_ATTEMPTS = 3; private static final Integer BACKOFF_TIME_SECONDS = 5; + private static final String DISABLED_STATUS = "disabled"; public static final String HEALTHY_STATUS = "healthy"; public static final String UNHEALTHY_STATUS = "unhealthy"; public static final String STATUS_KEY = "status"; @@ -103,6 +106,7 @@ public abstract class PipelineServiceClient { } public PipelineServiceClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) { + this.pipelineServiceClientEnabled = pipelineServiceClientConfiguration.getEnabled(); this.hostIp = pipelineServiceClientConfiguration.getHostIp(); this.ingestionIpInfoEnabled = pipelineServiceClientConfiguration.getIngestionIpInfoEnabled(); } @@ -223,7 +227,21 @@ public abstract class PipelineServiceClient { } /* Check the status of pipeline service to ensure it is healthy */ - public abstract PipelineServiceClientResponse getServiceStatus(); + public PipelineServiceClientResponse getServiceStatus() { + if (pipelineServiceClientEnabled) { + return getServiceStatusInternal(); + } + return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS); + } + + public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { + if (pipelineServiceClientEnabled) { + return getQueuedPipelineStatusInternal(ingestionPipeline); + } + return new ArrayList<>(); + } + + public abstract PipelineServiceClientResponse getServiceStatusInternal(); /** * This workflow can be used to execute any necessary async automations from the pipeline service. This will be the @@ -244,7 +262,7 @@ public abstract class PipelineServiceClient { public abstract PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline); /* Get the status of a deployed pipeline */ - public abstract List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline); + public abstract List getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline); /* Toggle the state of an Ingestion Pipeline as enabled/disabled */ public abstract PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline); diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json index 18bac3dda65..d8a7dff59a3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json @@ -6,6 +6,11 @@ "type": "object", "javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration", "properties": { + "enabled": { + "description": "Flags if the ingestion from the OpenMetadata UI is enabled. If ingesting externally, we can set this value to false to not check the Pipeline Service Client component health.", + "type": "boolean", + "default": true + }, "className": { "description": "Class Name for the Pipeline Service Client.", "type": "string"