diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java index bc3627ef58f..b9fcb2a552d 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/IngestionPipelineRepository.java @@ -21,8 +21,8 @@ import org.json.JSONObject; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig; import org.openmetadata.catalog.entity.services.ingestionPipelines.IngestionPipeline; -import org.openmetadata.catalog.entity.services.ingestionPipelines.Source; import org.openmetadata.catalog.metadataIngestion.LogLevels; +import org.openmetadata.catalog.metadataIngestion.SourceConfig; import org.openmetadata.catalog.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.catalog.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.catalog.type.EntityReference; @@ -33,8 +33,8 @@ import org.openmetadata.catalog.util.JsonUtils; import org.openmetadata.catalog.util.PipelineServiceClient; public class IngestionPipelineRepository extends EntityRepository { - private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,source,airflowConfig,loggerLevel"; - private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,source,airflowConfig,loggerLevel"; + private static final String INGESTION_PIPELINE_UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel"; + private static final String INGESTION_PIPELINE_PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel"; private static PipelineServiceClient pipelineServiceClient; public IngestionPipelineRepository(CollectionDAO dao) { @@ -107,16 +107,6 @@ public class IngestionPipelineRepository extends EntityRepository ingestionPipelines) { listOrEmpty(ingestionPipelines).forEach(this::addStatus); } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index 3d19ff89c4a..90993f1f33f 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -42,11 +42,6 @@ "type": "object", "javaType": "org.openmetadata.catalog.entity.services.ingestionPipelines.AirflowConfig", "properties": { - "forceDeploy": { - "description": "Deploy the pipeline by overwriting existing pipeline with the same name.", - "type": "boolean", - "default": false - }, "pausePipeline": { "description": "pause the pipeline from running once the deploy is finished successfully.", "type": "boolean", @@ -147,8 +142,8 @@ "description": "Name that uniquely identifies a Pipeline.", "$ref": "../../../type/basic.json#/definitions/fullyQualifiedEntityName" }, - "source": { - "$ref": "../../../metadataIngestion/workflow.json#/definitions/source" + "sourceConfig": { + "$ref": "../../../metadataIngestion/workflow.json#/definitions/sourceConfig" }, "openMetadataServerConnection": { "$ref": "../connections/metadata/openMetadataConnection.json" @@ -205,7 +200,7 @@ "required": [ "name", "pipelineType", - "source", + "sourceConfig", "openMetadataServerConnection", "airflowConfig" ], diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java index f9a8c2dea62..f806fd41eda 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/services/DatabaseServiceResourceTest.java @@ -238,13 +238,6 @@ public class DatabaseServiceResourceTest extends EntityResourceTest authHeaders) throws HttpResponseException { assertEquals(createRequest.getAirflowConfig().getConcurrency(), ingestion.getAirflowConfig().getConcurrency()); - validateSourceConfig(createRequest.getSourceConfig(), ingestion.getSource().getSourceConfig(), ingestion); + validateSourceConfig(createRequest.getSourceConfig(), ingestion.getSourceConfig(), ingestion); } @Override @@ -147,7 +147,7 @@ public class IngestionPipelineResourceTest extends EntityResourceTest WorkflowSource: + """ + Use the service EntityReference to build the Source. + Building the source dynamically helps us to not store any + sensitive info. + :param ingestion_pipeline: With the service ref + :return: WorkflowSource + """ + + metadata = OpenMetadata(config=ingestion_pipeline.openMetadataServerConnection) + + service_type = ingestion_pipeline.service.type + service: Optional[ + Union[DatabaseService, MessagingService, PipelineService, DashboardService] + ] = None + + if service_type == "databaseService": + service: DatabaseService = metadata.get_by_name( + entity=DatabaseService, fqn=ingestion_pipeline.service.name + ) + elif service_type == "pipelineService": + service: PipelineService = metadata.get_by_name( + entity=PipelineService, fqn=ingestion_pipeline.service.name + ) + elif service_type == "dashboardService": + service: MessagingService = metadata.get_by_name( + entity=MessagingService, fqn=ingestion_pipeline.service.name + ) + elif service_type == "messagingService": + service: DashboardService = metadata.get_by_name( + entity=DashboardService, fqn=ingestion_pipeline.service.name + ) + + if not service: + raise ValueError(f"Could not get service from type {service_type}") + + return WorkflowSource( + type=service.serviceType.value.lower(), + serviceName=service.name.__root__, + serviceConnection=service.connection, + sourceConfig=ingestion_pipeline.sourceConfig, + ) + + def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): """ Task that creates and runs the ingestion workflow. diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py index 5758fa89333..18105dd7d4d 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/metadata.py @@ -15,6 +15,7 @@ Metadata DAG function builder from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_source, build_workflow_config_property, metadata_ingestion_workflow, ) @@ -39,8 +40,9 @@ def build_metadata_workflow_config( """ Given an airflow_pipeline, prepare the workflow config JSON """ + workflow_config = OpenMetadataWorkflowConfig( - source=ingestion_pipeline.source, + source=build_source(ingestion_pipeline), sink=Sink( type="metadata-rest", config={}, diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/profiler.py index 862da17f7d7..bbb9d9a90b7 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/profiler.py @@ -13,7 +13,11 @@ Profiler DAG function builder """ from airflow import DAG -from openmetadata.workflows.ingestion.common import build_dag, profiler_workflow +from openmetadata.workflows.ingestion.common import ( + build_dag, + build_source, + profiler_workflow, +) try: from airflow.operators.python import PythonOperator @@ -38,7 +42,7 @@ def build_profiler_workflow_config( Given an airflow_pipeline, prepare the workflow config JSON """ workflow_config = OpenMetadataWorkflowConfig( - source=ingestion_pipeline.source, + source=build_source(ingestion_pipeline), sink=Sink( type="metadata-rest", config={}, diff --git a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py index ccda526359f..05ee9ab4c85 100644 --- a/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/src/openmetadata/workflows/ingestion/usage.py @@ -16,6 +16,7 @@ from pathlib import Path from airflow import DAG from openmetadata.workflows.ingestion.common import ( build_dag, + build_source, build_workflow_config_property, metadata_ingestion_workflow, ) @@ -49,11 +50,12 @@ def build_usage_config_from_file( :param filename: staging location file :return: OpenMetadataWorkflowConfig """ - usage_source = ingestion_pipeline.source - usage_source.type = f"{usage_source.type}-usage" # Mark the source as usage + + source = build_source(ingestion_pipeline) + source.type = f"{source.type}-usage" # Mark the source as usage return OpenMetadataWorkflowConfig( - source=usage_source, + source=source, processor=Processor(type="query-parser", config={"filter": ""}), stage=Stage( type="table-usage", @@ -73,7 +75,7 @@ def build_usage_workflow_config( """ Given an airflow_pipeline, prepare the workflow config JSON """ - location = ingestion_pipeline.source.sourceConfig.config.stageFileLocation + location = ingestion_pipeline.sourceConfig.config.stageFileLocation if not location: with tempfile.NamedTemporaryFile() as tmp_file: