mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-20 23:18:01 +00:00
Register Ingestion pipelines just from YAML (#13501)
* Register Ingestion pipelines just from YAML * Format
This commit is contained in:
parent
f6a87ee02a
commit
d3da2d1b9f
@ -244,6 +244,7 @@ eventHandlerConfiguration:
|
|||||||
- "org.openmetadata.service.events.WebAnalyticEventHandler"
|
- "org.openmetadata.service.events.WebAnalyticEventHandler"
|
||||||
|
|
||||||
pipelineServiceClientConfiguration:
|
pipelineServiceClientConfiguration:
|
||||||
|
enabled: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
|
||||||
# If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient"
|
# If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient"
|
||||||
className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
|
className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
|
||||||
apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}
|
apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}
|
||||||
|
@ -45,7 +45,7 @@ class OMetaIngestionPipelineMixin:
|
|||||||
:param pipeline_status: Pipeline Status data to add
|
:param pipeline_status: Pipeline Status data to add
|
||||||
"""
|
"""
|
||||||
resp = self.client.put(
|
resp = self.client.put(
|
||||||
f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus",
|
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus",
|
||||||
data=pipeline_status.json(),
|
data=pipeline_status.json(),
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -63,7 +63,7 @@ class OMetaIngestionPipelineMixin:
|
|||||||
:param pipeline_status_run_id: Pipeline Status run id
|
:param pipeline_status_run_id: Pipeline Status run id
|
||||||
"""
|
"""
|
||||||
resp = self.client.get(
|
resp = self.client.get(
|
||||||
f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}"
|
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}"
|
||||||
)
|
)
|
||||||
if resp:
|
if resp:
|
||||||
return PipelineStatus(**resp)
|
return PipelineStatus(**resp)
|
||||||
@ -76,7 +76,7 @@ class OMetaIngestionPipelineMixin:
|
|||||||
ingestion_pipeline_id (str): ingestion pipeline uuid
|
ingestion_pipeline_id (str): ingestion pipeline uuid
|
||||||
"""
|
"""
|
||||||
resp = self.client.post(
|
resp = self.client.post(
|
||||||
f"/services/ingestionPipelines/trigger/{ingestion_pipeline_id}"
|
f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return IngestionPipeline.parse_obj(resp)
|
return IngestionPipeline.parse_obj(resp)
|
||||||
@ -98,7 +98,7 @@ class OMetaIngestionPipelineMixin:
|
|||||||
params = {"startTs": start_ts, "endTs": end_ts}
|
params = {"startTs": start_ts, "endTs": end_ts}
|
||||||
|
|
||||||
resp = self.client.get(
|
resp = self.client.get(
|
||||||
f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus",
|
f"{self.get_suffix(IngestionPipeline)}/{ingestion_pipeline_fqn}/pipelineStatus",
|
||||||
data=params,
|
data=params,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -120,7 +120,7 @@ class OMetaIngestionPipelineMixin:
|
|||||||
"""
|
"""
|
||||||
fields_str = "?fields=" + ",".join(fields) if fields else ""
|
fields_str = "?fields=" + ",".join(fields) if fields else ""
|
||||||
resp = self.client.get(
|
resp = self.client.get(
|
||||||
f"/services/ingestionPipelines{fields_str}",
|
f"{self.get_suffix(IngestionPipeline)}{fields_str}",
|
||||||
data=params,
|
data=params,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,6 +20,9 @@ from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from requests.utils import quote
|
from requests.utils import quote
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
|
||||||
|
CreateIngestionPipelineRequest,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||||
OpenMetadataConnection,
|
OpenMetadataConnection,
|
||||||
)
|
)
|
||||||
@ -185,6 +188,8 @@ class OpenMetadata(
|
|||||||
Based on the entity, return the module path
|
Based on the entity, return the module path
|
||||||
it is found inside generated
|
it is found inside generated
|
||||||
"""
|
"""
|
||||||
|
if issubclass(entity, CreateIngestionPipelineRequest):
|
||||||
|
return "services.ingestionPipelines"
|
||||||
return entity.__module__.split(".")[-2]
|
return entity.__module__.split(".")[-2]
|
||||||
|
|
||||||
def get_create_entity_type(self, entity: Type[T]) -> Type[C]:
|
def get_create_entity_type(self, entity: Type[T]) -> Type[C]:
|
||||||
@ -233,6 +238,7 @@ class OpenMetadata(
|
|||||||
.replace("testcase", "testCase")
|
.replace("testcase", "testCase")
|
||||||
.replace("searchindex", "searchIndex")
|
.replace("searchindex", "searchIndex")
|
||||||
.replace("storedprocedure", "storedProcedure")
|
.replace("storedprocedure", "storedProcedure")
|
||||||
|
.replace("ingestionpipeline", "ingestionPipeline")
|
||||||
)
|
)
|
||||||
class_path = ".".join(
|
class_path = ".".join(
|
||||||
filter(
|
filter(
|
||||||
|
@ -72,6 +72,9 @@ from metadata.generated.schema.api.services.createSearchService import (
|
|||||||
from metadata.generated.schema.api.services.createStorageService import (
|
from metadata.generated.schema.api.services.createStorageService import (
|
||||||
CreateStorageServiceRequest,
|
CreateStorageServiceRequest,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
|
||||||
|
CreateIngestionPipelineRequest,
|
||||||
|
)
|
||||||
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
|
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
|
||||||
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
|
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
|
||||||
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
||||||
@ -198,6 +201,7 @@ ROUTES = {
|
|||||||
SearchService.__name__: "/services/searchServices",
|
SearchService.__name__: "/services/searchServices",
|
||||||
CreateSearchServiceRequest.__name__: "/services/searchServices",
|
CreateSearchServiceRequest.__name__: "/services/searchServices",
|
||||||
IngestionPipeline.__name__: "/services/ingestionPipelines",
|
IngestionPipeline.__name__: "/services/ingestionPipelines",
|
||||||
|
CreateIngestionPipelineRequest.__name__: "/services/ingestionPipelines",
|
||||||
TestConnectionDefinition.__name__: "/services/testConnectionDefinitions",
|
TestConnectionDefinition.__name__: "/services/testConnectionDefinitions",
|
||||||
# Data Quality
|
# Data Quality
|
||||||
TestDefinition.__name__: "/dataQuality/testDefinitions",
|
TestDefinition.__name__: "/dataQuality/testDefinitions",
|
||||||
|
@ -18,7 +18,79 @@ from typing import Type
|
|||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||||
|
PipelineType,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.serviceType import ServiceType
|
from metadata.generated.schema.entity.services.serviceType import ServiceType
|
||||||
|
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
|
||||||
|
DashboardServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
|
||||||
|
DatabaseServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
|
||||||
|
DatabaseServiceProfilerPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
|
||||||
|
DatabaseServiceQueryLineagePipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
|
||||||
|
DatabaseServiceQueryUsagePipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.dataInsightPipeline import (
|
||||||
|
DataInsightPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline
|
||||||
|
from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import (
|
||||||
|
MessagingServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.metadataToElasticSearchPipeline import (
|
||||||
|
MetadataToElasticSearchPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.mlmodelServiceMetadataPipeline import (
|
||||||
|
MlModelServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
|
||||||
|
PipelineServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import (
|
||||||
|
SearchServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import (
|
||||||
|
StorageServiceMetadataPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
|
||||||
|
TestSuitePipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
|
||||||
|
|
||||||
|
SERVICE_TYPE_REF = {
|
||||||
|
ServiceType.Database.value: "databaseService",
|
||||||
|
ServiceType.Dashboard.value: "dashboardService",
|
||||||
|
ServiceType.Pipeline.value: "pipelineService",
|
||||||
|
ServiceType.Messaging.value: "messagingService",
|
||||||
|
ServiceType.MlModel.value: "mlmodelService",
|
||||||
|
ServiceType.Metadata.value: "metadataService",
|
||||||
|
ServiceType.Search.value: "searchService",
|
||||||
|
ServiceType.Storage.value: "storageService",
|
||||||
|
}
|
||||||
|
|
||||||
|
SOURCE_CONFIG_TYPE_INGESTION = {
|
||||||
|
DatabaseServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
DatabaseServiceQueryUsagePipeline.__name__: PipelineType.usage,
|
||||||
|
DatabaseServiceQueryLineagePipeline.__name__: PipelineType.lineage,
|
||||||
|
DatabaseServiceProfilerPipeline.__name__: PipelineType.profiler,
|
||||||
|
DashboardServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
MessagingServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
PipelineServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
MlModelServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
StorageServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
SearchServiceMetadataPipeline.__name__: PipelineType.metadata,
|
||||||
|
TestSuitePipeline.__name__: PipelineType.TestSuite,
|
||||||
|
MetadataToElasticSearchPipeline.__name__: PipelineType.elasticSearchReindex,
|
||||||
|
DataInsightPipeline.__name__: PipelineType.dataInsight,
|
||||||
|
DbtPipeline.__name__: PipelineType.dbt,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _clean(source_type: str):
|
def _clean(source_type: str):
|
||||||
@ -31,6 +103,20 @@ def _clean(source_type: str):
|
|||||||
return source_type
|
return source_type
|
||||||
|
|
||||||
|
|
||||||
|
def get_pipeline_type_from_source_config(
|
||||||
|
source_config_type: SourceConfig.__fields__["config"].type_,
|
||||||
|
) -> PipelineType:
|
||||||
|
"""From the YAML serviceType, get the Ingestion Pipeline Type"""
|
||||||
|
pipeline_type = SOURCE_CONFIG_TYPE_INGESTION.get(
|
||||||
|
source_config_type.__class__.__name__
|
||||||
|
)
|
||||||
|
if not pipeline_type:
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot find Pipeline Type for SourceConfig {source_config_type}"
|
||||||
|
)
|
||||||
|
return pipeline_type
|
||||||
|
|
||||||
|
|
||||||
def _get_service_type_from( # pylint: disable=inconsistent-return-statements
|
def _get_service_type_from( # pylint: disable=inconsistent-return-statements
|
||||||
service_subtype: str,
|
service_subtype: str,
|
||||||
) -> ServiceType:
|
) -> ServiceType:
|
||||||
@ -53,6 +139,16 @@ def get_service_type_from_source_type(source_type: str) -> ServiceType:
|
|||||||
return _get_service_type_from(_clean(source_type))
|
return _get_service_type_from(_clean(source_type))
|
||||||
|
|
||||||
|
|
||||||
|
def get_reference_type_from_service_type(service_type: ServiceType) -> str:
|
||||||
|
"""Get the type to build the EntityReference from the service type"""
|
||||||
|
service_reference = SERVICE_TYPE_REF.get(service_type.value)
|
||||||
|
if not service_type:
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot find Service Type reference for service {service_type}"
|
||||||
|
)
|
||||||
|
return service_reference
|
||||||
|
|
||||||
|
|
||||||
def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]:
|
def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]:
|
||||||
"""
|
"""
|
||||||
Method to get service class from service type
|
Method to get service class from service type
|
||||||
|
@ -24,6 +24,9 @@ import uuid
|
|||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import Optional, Tuple, TypeVar, cast
|
from typing import Optional, Tuple, TypeVar, cast
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
|
||||||
|
CreateIngestionPipelineRequest,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||||
OpenMetadataConnection,
|
OpenMetadataConnection,
|
||||||
)
|
)
|
||||||
@ -31,12 +34,15 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp
|
|||||||
ServiceConnection,
|
ServiceConnection,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||||
|
AirflowConfig,
|
||||||
|
IngestionPipeline,
|
||||||
PipelineState,
|
PipelineState,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
OpenMetadataWorkflowConfig,
|
OpenMetadataWorkflowConfig,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.tests.testSuite import ServiceType
|
from metadata.generated.schema.tests.testSuite import ServiceType
|
||||||
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
||||||
from metadata.ingestion.api.step import Step
|
from metadata.ingestion.api.step import Step
|
||||||
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
|
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
|
||||||
@ -44,7 +50,10 @@ from metadata.ingestion.models.custom_types import ServiceWithConnectionType
|
|||||||
from metadata.ingestion.ometa.client_utils import create_ometa_client
|
from metadata.ingestion.ometa.client_utils import create_ometa_client
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
from metadata.timer.repeated_timer import RepeatedTimer
|
from metadata.timer.repeated_timer import RepeatedTimer
|
||||||
|
from metadata.utils import fqn
|
||||||
from metadata.utils.class_helper import (
|
from metadata.utils.class_helper import (
|
||||||
|
get_pipeline_type_from_source_config,
|
||||||
|
get_reference_type_from_service_type,
|
||||||
get_service_class_from_service_type,
|
get_service_class_from_service_type,
|
||||||
get_service_type_from_source_type,
|
get_service_type_from_source_type,
|
||||||
)
|
)
|
||||||
@ -87,6 +96,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
|
|||||||
"""
|
"""
|
||||||
self.config = config
|
self.config = config
|
||||||
self._timer: Optional[RepeatedTimer] = None
|
self._timer: Optional[RepeatedTimer] = None
|
||||||
|
self._ingestion_pipeline: Optional[IngestionPipeline] = None
|
||||||
|
|
||||||
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
||||||
|
|
||||||
@ -109,6 +119,14 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
|
|||||||
# Informs the `source` and the rest of `steps` to execute
|
# Informs the `source` and the rest of `steps` to execute
|
||||||
self.set_steps()
|
self.set_steps()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ingestion_pipeline(self):
|
||||||
|
"""Get or create the Ingestion Pipeline from the configuration"""
|
||||||
|
if not self._ingestion_pipeline:
|
||||||
|
self._ingestion_pipeline = self.get_or_create_ingestion_pipeline()
|
||||||
|
|
||||||
|
return self._ingestion_pipeline
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def set_steps(self):
|
def set_steps(self):
|
||||||
"""
|
"""
|
||||||
@ -251,3 +269,48 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
|
|||||||
f"Unknown error getting service connection for service name [{service_name}]"
|
f"Unknown error getting service connection for service name [{service_name}]"
|
||||||
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
|
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_or_create_ingestion_pipeline(self) -> Optional[IngestionPipeline]:
|
||||||
|
"""
|
||||||
|
If we get the `ingestionPipelineFqn` from the `workflowConfig`, it means we want to
|
||||||
|
keep track of the status.
|
||||||
|
- During the UI deployment, the IngestionPipeline is already created from the UI.
|
||||||
|
- From external deployments, we might need to create the Ingestion Pipeline the first time
|
||||||
|
the YAML is executed.
|
||||||
|
If the Ingestion Pipeline is not created, create it now to update the status.
|
||||||
|
|
||||||
|
Note that during the very first run, the service might not even be created yet. In that case,
|
||||||
|
we won't be able to flag the RUNNING status. We'll wait until the metadata ingestion
|
||||||
|
workflow has prepared the necessary components, and we will update the SUCCESS/FAILED
|
||||||
|
status at the end of the flow.
|
||||||
|
"""
|
||||||
|
maybe_pipeline: Optional[IngestionPipeline] = self.metadata.get_by_name(
|
||||||
|
entity=IngestionPipeline, fqn=self.config.ingestionPipelineFQN
|
||||||
|
)
|
||||||
|
|
||||||
|
_, pipeline_name = fqn.split(
|
||||||
|
self.config.ingestionPipelineFQN
|
||||||
|
) # Get the name from <service>.<name>
|
||||||
|
service = self.metadata.get_by_name(
|
||||||
|
entity=get_service_class_from_service_type(self.service_type),
|
||||||
|
fqn=self.config.source.serviceName,
|
||||||
|
)
|
||||||
|
|
||||||
|
if maybe_pipeline is None and service is not None:
|
||||||
|
|
||||||
|
return self.metadata.create_or_update(
|
||||||
|
CreateIngestionPipelineRequest(
|
||||||
|
name=pipeline_name,
|
||||||
|
service=EntityReference(
|
||||||
|
id=service.id,
|
||||||
|
type=get_reference_type_from_service_type(self.service_type),
|
||||||
|
),
|
||||||
|
pipelineType=get_pipeline_type_from_source_config(
|
||||||
|
self.config.source.sourceConfig.config
|
||||||
|
),
|
||||||
|
sourceConfig=self.config.source.sourceConfig,
|
||||||
|
airflowConfig=AirflowConfig(),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return maybe_pipeline
|
||||||
|
@ -92,7 +92,7 @@ class WorkflowStatusMixin:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# if we don't have a related Ingestion Pipeline FQN, no status is set.
|
# if we don't have a related Ingestion Pipeline FQN, no status is set.
|
||||||
if self.config.ingestionPipelineFQN:
|
if self.config.ingestionPipelineFQN and self.ingestion_pipeline:
|
||||||
if state in (PipelineState.queued, PipelineState.running):
|
if state in (PipelineState.queued, PipelineState.running):
|
||||||
pipeline_status = PipelineStatus(
|
pipeline_status = PipelineStatus(
|
||||||
runId=self.run_id,
|
runId=self.run_id,
|
||||||
|
@ -18,6 +18,9 @@ from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
|
|||||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||||
CreateDatabaseServiceRequest,
|
CreateDatabaseServiceRequest,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
|
||||||
|
CreateIngestionPipelineRequest,
|
||||||
|
)
|
||||||
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
||||||
from metadata.generated.schema.entity.data.chart import Chart
|
from metadata.generated.schema.entity.data.chart import Chart
|
||||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||||
@ -34,6 +37,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
|||||||
)
|
)
|
||||||
from metadata.generated.schema.entity.services.dashboardService import DashboardService
|
from metadata.generated.schema.entity.services.dashboardService import DashboardService
|
||||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||||
|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||||
|
IngestionPipeline,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
||||||
from metadata.generated.schema.entity.teams.user import User
|
from metadata.generated.schema.entity.teams.user import User
|
||||||
@ -118,3 +124,6 @@ class OMetaEndpointTest(TestCase):
|
|||||||
|
|
||||||
entity = self.metadata.get_entity_from_create(CreateUserRequest)
|
entity = self.metadata.get_entity_from_create(CreateUserRequest)
|
||||||
assert issubclass(entity, User)
|
assert issubclass(entity, User)
|
||||||
|
|
||||||
|
entity = self.metadata.get_entity_from_create(CreateIngestionPipelineRequest)
|
||||||
|
assert issubclass(entity, IngestionPipeline)
|
||||||
|
@ -232,7 +232,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
|
public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
|
||||||
HttpResponse<String> response;
|
HttpResponse<String> response;
|
||||||
try {
|
try {
|
||||||
String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
|
String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
|
||||||
@ -256,7 +256,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
|||||||
* Auth failed when accessing Airflow APIs 3. Different versions between server and client
|
* Auth failed when accessing Airflow APIs 3. Different versions between server and client
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public PipelineServiceClientResponse getServiceStatus() {
|
public PipelineServiceClientResponse getServiceStatusInternal() {
|
||||||
HttpResponse<String> response;
|
HttpResponse<String> response;
|
||||||
try {
|
try {
|
||||||
response = getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT);
|
response = getRequestAuthenticatedForJsonContent("%s/%s/health-auth", serviceURL, API_ENDPOINT);
|
||||||
|
@ -33,7 +33,7 @@ public class NoopClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PipelineServiceClientResponse getServiceStatus() {
|
public PipelineServiceClientResponse getServiceStatusInternal() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,7 +60,7 @@ public class NoopClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
|
public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ public class PipelineServiceStatusJobHandler {
|
|||||||
public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry";
|
public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry";
|
||||||
public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName";
|
public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName";
|
||||||
|
|
||||||
|
private final PipelineServiceClientConfiguration config;
|
||||||
private final PipelineServiceClient pipelineServiceClient;
|
private final PipelineServiceClient pipelineServiceClient;
|
||||||
private final PrometheusMeterRegistry meterRegistry;
|
private final PrometheusMeterRegistry meterRegistry;
|
||||||
private final String clusterName;
|
private final String clusterName;
|
||||||
@ -36,6 +37,7 @@ public class PipelineServiceStatusJobHandler {
|
|||||||
|
|
||||||
private PipelineServiceStatusJobHandler(PipelineServiceClientConfiguration config, String clusterName)
|
private PipelineServiceStatusJobHandler(PipelineServiceClientConfiguration config, String clusterName)
|
||||||
throws SchedulerException {
|
throws SchedulerException {
|
||||||
|
this.config = config;
|
||||||
this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config);
|
this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config);
|
||||||
this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
|
this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
@ -77,6 +79,8 @@ public class PipelineServiceStatusJobHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void addPipelineServiceStatusJob() {
|
public void addPipelineServiceStatusJob() {
|
||||||
|
// Only register the job to listen to the status if the Pipeline Service Client is indeed enabled
|
||||||
|
if (config.getEnabled().equals(Boolean.TRUE)) {
|
||||||
try {
|
try {
|
||||||
JobDetail jobDetail = jobBuilder();
|
JobDetail jobDetail = jobBuilder();
|
||||||
Trigger trigger = getTrigger();
|
Trigger trigger = getTrigger();
|
||||||
@ -85,4 +89,5 @@ public class PipelineServiceStatusJobHandler {
|
|||||||
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
|
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PipelineServiceClientResponse getServiceStatus() {
|
public PipelineServiceClientResponse getServiceStatusInternal() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,7 +44,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
|
public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ import java.io.InputStream;
|
|||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -56,6 +57,7 @@ import org.openmetadata.sdk.exception.PipelineServiceVersionException;
|
|||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class PipelineServiceClient {
|
public abstract class PipelineServiceClient {
|
||||||
|
protected final boolean pipelineServiceClientEnabled;
|
||||||
protected final String hostIp;
|
protected final String hostIp;
|
||||||
|
|
||||||
protected final boolean ingestionIpInfoEnabled;
|
protected final boolean ingestionIpInfoEnabled;
|
||||||
@ -67,6 +69,7 @@ public abstract class PipelineServiceClient {
|
|||||||
protected static final String CONTENT_TYPE = "application/json";
|
protected static final String CONTENT_TYPE = "application/json";
|
||||||
private static final Integer MAX_ATTEMPTS = 3;
|
private static final Integer MAX_ATTEMPTS = 3;
|
||||||
private static final Integer BACKOFF_TIME_SECONDS = 5;
|
private static final Integer BACKOFF_TIME_SECONDS = 5;
|
||||||
|
private static final String DISABLED_STATUS = "disabled";
|
||||||
public static final String HEALTHY_STATUS = "healthy";
|
public static final String HEALTHY_STATUS = "healthy";
|
||||||
public static final String UNHEALTHY_STATUS = "unhealthy";
|
public static final String UNHEALTHY_STATUS = "unhealthy";
|
||||||
public static final String STATUS_KEY = "status";
|
public static final String STATUS_KEY = "status";
|
||||||
@ -103,6 +106,7 @@ public abstract class PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public PipelineServiceClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
public PipelineServiceClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
||||||
|
this.pipelineServiceClientEnabled = pipelineServiceClientConfiguration.getEnabled();
|
||||||
this.hostIp = pipelineServiceClientConfiguration.getHostIp();
|
this.hostIp = pipelineServiceClientConfiguration.getHostIp();
|
||||||
this.ingestionIpInfoEnabled = pipelineServiceClientConfiguration.getIngestionIpInfoEnabled();
|
this.ingestionIpInfoEnabled = pipelineServiceClientConfiguration.getIngestionIpInfoEnabled();
|
||||||
}
|
}
|
||||||
@ -223,7 +227,21 @@ public abstract class PipelineServiceClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Check the status of pipeline service to ensure it is healthy */
|
/* Check the status of pipeline service to ensure it is healthy */
|
||||||
public abstract PipelineServiceClientResponse getServiceStatus();
|
public PipelineServiceClientResponse getServiceStatus() {
|
||||||
|
if (pipelineServiceClientEnabled) {
|
||||||
|
return getServiceStatusInternal();
|
||||||
|
}
|
||||||
|
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
|
||||||
|
if (pipelineServiceClientEnabled) {
|
||||||
|
return getQueuedPipelineStatusInternal(ingestionPipeline);
|
||||||
|
}
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract PipelineServiceClientResponse getServiceStatusInternal();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This workflow can be used to execute any necessary async automations from the pipeline service. This will be the
|
* This workflow can be used to execute any necessary async automations from the pipeline service. This will be the
|
||||||
@ -244,7 +262,7 @@ public abstract class PipelineServiceClient {
|
|||||||
public abstract PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline);
|
public abstract PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline);
|
||||||
|
|
||||||
/* Get the status of a deployed pipeline */
|
/* Get the status of a deployed pipeline */
|
||||||
public abstract List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline);
|
public abstract List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline);
|
||||||
|
|
||||||
/* Toggle the state of an Ingestion Pipeline as enabled/disabled */
|
/* Toggle the state of an Ingestion Pipeline as enabled/disabled */
|
||||||
public abstract PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline);
|
public abstract PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline);
|
||||||
|
@ -6,6 +6,11 @@
|
|||||||
"type": "object",
|
"type": "object",
|
||||||
"javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration",
|
"javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration",
|
||||||
"properties": {
|
"properties": {
|
||||||
|
"enabled": {
|
||||||
|
"description": "Flags if the ingestion from the OpenMetadata UI is enabled. If ingesting externally, we can set this value to false to not check the Pipeline Service Client component health.",
|
||||||
|
"type": "boolean",
|
||||||
|
"default": true
|
||||||
|
},
|
||||||
"className": {
|
"className": {
|
||||||
"description": "Class Name for the Pipeline Service Client.",
|
"description": "Class Name for the Pipeline Service Client.",
|
||||||
"type": "string"
|
"type": "string"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user