From 203e2b5fff94d14bd072d40a91ec5cd6b0f031ed Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Sun, 26 Feb 2023 00:52:14 +0100 Subject: [PATCH] Simplify Pipeline Service Client Response and prep test connection for ingestion base (#10334) * Simplify Response and prep test connection * Format --- ingestion/operators/docker/Dockerfile | 2 +- ingestion/operators/docker/Dockerfile-dev | 2 +- ingestion/operators/docker/main.py | 54 ++++++++++++++++ ingestion/operators/docker/test_connection.py | 61 +++++++++++++++++++ .../pipeline/airflow/AirflowRESTClient.java | 8 +-- .../clients/pipeline/noop/NoopClient.java | 5 +- .../IngestionPipelineResource.java | 7 +-- .../MockPipelineServiceClient.java | 5 +- .../sdk/PipelineServiceClient.java | 5 +- 9 files changed, 129 insertions(+), 20 deletions(-) create mode 100644 ingestion/operators/docker/test_connection.py diff --git a/ingestion/operators/docker/Dockerfile b/ingestion/operators/docker/Dockerfile index 97f65fec044..0fc8e6dd08a 100644 --- a/ingestion/operators/docker/Dockerfile +++ b/ingestion/operators/docker/Dockerfile @@ -58,7 +58,7 @@ ENV LD_LIBRARY_PATH=/instantclient WORKDIR ingestion/ # Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. -COPY ingestion/operators/docker/main.py . +COPY ingestion/operators/docker/*.py . RUN pip install --upgrade pip diff --git a/ingestion/operators/docker/Dockerfile-dev b/ingestion/operators/docker/Dockerfile-dev index cbdc673a011..9dbaa9d44e8 100644 --- a/ingestion/operators/docker/Dockerfile-dev +++ b/ingestion/operators/docker/Dockerfile-dev @@ -68,7 +68,7 @@ COPY ingestion/setup.* ./ COPY ingestion/README.md . # Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. -COPY ingestion/operators/docker/main.py . +COPY ingestion/operators/docker/*.py . RUN pip install --upgrade pip diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index c277e189216..34885d6dbcb 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -1,7 +1,21 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Main ingestion entrypoint to run OM workflows +""" import os import yaml +from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( PipelineType, ) @@ -15,10 +29,50 @@ WORKFLOW_MAP = { PipelineType.lineage.value: Workflow, PipelineType.profiler.value: ProfilerWorkflow, PipelineType.TestSuite.value: TestSuiteWorkflow, + PipelineType.dataInsight.value: DataInsightWorkflow, + PipelineType.elasticSearchReindex.value: Workflow, + PipelineType.dbt.value: Workflow, } def main(): + """ + Ingestion entrypoint. Get the right Workflow class + and execute the ingestion. + + This image is expected to be used and run in environments + such as Airflow's KubernetesPodOperator: + + ``` + config = ''' + source: + type: ... + serviceName: ... + serviceConnection: + ... + sourceConfig: + ... + sink: + ... + workflowConfig: + ... + ''' + + KubernetesPodOperator( + task_id="ingest", + name="ingest", + cmds=["python", "main.py"], + image="openmetadata/ingestion-base:0.13.2", + namespace='default', + env_vars={"config": config, "pipelineType": "metadata"}, + dag=dag, + ) + ``` + + Note how we are expecting the env variables to be sent, with the `config` being the str + representation of the ingestion YAML. + """ + # DockerOperator expects an env var called config config = os.environ["config"] pipeline_type = os.environ["pipelineType"] diff --git a/ingestion/operators/docker/test_connection.py b/ingestion/operators/docker/test_connection.py new file mode 100644 index 00000000000..d799dc73152 --- /dev/null +++ b/ingestion/operators/docker/test_connection.py @@ -0,0 +1,61 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Entrypoint to test the connection to a source +""" +import os + +import yaml + +from metadata.generated.schema.api.services.ingestionPipelines.testServiceConnection import ( + TestServiceConnectionRequest, +) +from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory + + +def main(): + """ + Test Connection entrypoint. + + The expected information is in the shape of `TestServiceConnectionRequest`, e.g., + ``` + connection: + config: + type: Mysql + scheme: mysql+pymysql + username: openmetadata_user + password: openmetadata_password + hostPort: 'localhost:3306' + connectionType: Database + ``` + """ + + # DockerOperator expects an env var called config + test_connection_dict = yaml.safe_load(os.environ["config"]) + test_service_connection = TestServiceConnectionRequest.parse_obj( + test_connection_dict + ) + + # we need to instantiate the secret manager in case secrets are passed + SecretsManagerFactory(test_service_connection.secretsManagerProvider, None) + connection = get_connection(test_service_connection.connection.config) + + # We won't wrap the call in a try/catch. If the connection fails, we want to + # raise the SourceConnectionException as it comes. + test_connection_fn = get_test_connection_fn( + test_service_connection.connection.config + ) + test_connection_fn(connection) + + +if __name__ == "__main__": + main() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index 2537f518ebf..fa88c7af2c9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -230,7 +230,7 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + public Response testConnection(TestServiceConnection testServiceConnection) { HttpResponse response; try { String statusEndPoint = "%s/%s/test_connection"; @@ -238,7 +238,7 @@ public class AirflowRESTClient extends PipelineServiceClient { String connectionPayload = JsonUtils.pojoToJson(testServiceConnection); response = post(statusUrl, connectionPayload); if (response.statusCode() == 200) { - return response; + return Response.status(200, response.body()).build(); } } catch (Exception e) { throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage()); @@ -247,7 +247,7 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public HttpResponse killIngestion(IngestionPipeline ingestionPipeline) { + public Response killIngestion(IngestionPipeline ingestionPipeline) { HttpResponse response; try { String killEndPoint = "%s/%s/kill"; @@ -256,7 +256,7 @@ public class AirflowRESTClient extends PipelineServiceClient { requestPayload.put(DAG_ID, ingestionPipeline.getName()); response = post(killUrl, requestPayload.toString()); if (response.statusCode() == 200) { - return response; + return Response.status(200, response.body()).build(); } } catch (Exception e) { throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java index 557b59b8175..b37a027e53e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java @@ -13,7 +13,6 @@ package org.openmetadata.service.clients.pipeline.noop; -import java.net.http.HttpResponse; import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; @@ -39,7 +38,7 @@ public class NoopClient extends PipelineServiceClient { } @Override - public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + public Response testConnection(TestServiceConnection testServiceConnection) { return null; } @@ -74,7 +73,7 @@ public class NoopClient extends PipelineServiceClient { } @Override - public HttpResponse killIngestion(IngestionPipeline ingestionPipeline) { + public Response killIngestion(IngestionPipeline ingestionPipeline) { throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "kill")); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 1303df6631b..233ed3a2e7d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -26,7 +26,6 @@ import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.Map; import java.util.UUID; import javax.json.JsonPatch; @@ -501,8 +500,7 @@ public class IngestionPipelineResource extends EntityResource response = pipelineServiceClient.killIngestion(ingestionPipeline); - return Response.status(200, response.body()).build(); + return pipelineServiceClient.killIngestion(ingestionPipeline); } @POST @@ -525,8 +523,7 @@ public class IngestionPipelineResource extends EntityResource response = pipelineServiceClient.testConnection(testServiceConnection); - return Response.status(200, response.body()).build(); + return pipelineServiceClient.testConnection(testServiceConnection); } @GET diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java index d2cb665d375..336e9692505 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/pipelineService/MockPipelineServiceClient.java @@ -1,6 +1,5 @@ package org.openmetadata.service.pipelineService; -import java.net.http.HttpResponse; import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; @@ -23,7 +22,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + public Response testConnection(TestServiceConnection testServiceConnection) { return null; } @@ -58,7 +57,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient { } @Override - public HttpResponse killIngestion(IngestionPipeline ingestionPipeline) { + public Response killIngestion(IngestionPipeline ingestionPipeline) { return null; } diff --git a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java index 0926264abf2..41ca4fb4736 100644 --- a/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java +++ b/openmetadata-spec/src/main/java/org/openmetadata/sdk/PipelineServiceClient.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.net.http.HttpResponse; import java.util.Base64; import java.util.List; import java.util.Map; @@ -145,7 +144,7 @@ public abstract class PipelineServiceClient { public abstract Response getServiceStatus(); /* Test the connection to the service such as database service a pipeline depends on. */ - public abstract HttpResponse testConnection(TestServiceConnection testServiceConnection); + public abstract Response testConnection(TestServiceConnection testServiceConnection); /* Deploy a pipeline to the pipeline service */ public abstract String deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service); @@ -166,7 +165,7 @@ public abstract class PipelineServiceClient { public abstract Map getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after); /* Get the all last run logs of a deployed pipeline */ - public abstract HttpResponse killIngestion(IngestionPipeline ingestionPipeline); + public abstract Response killIngestion(IngestionPipeline ingestionPipeline); /* Get the Pipeline Service host IP to whitelist in source systems