Simplify Pipeline Service Client Response and prep test connection for ingestion base (#10334)

* Simplify Response and prep test connection

* Format
This commit is contained in:
Pere Miquel Brull 2023-02-26 00:52:14 +01:00 committed by GitHub
parent bbf54afaf5
commit 203e2b5fff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 129 additions and 20 deletions

View File

@ -58,7 +58,7 @@ ENV LD_LIBRARY_PATH=/instantclient
WORKDIR ingestion/ WORKDIR ingestion/
# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. # Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container.
COPY ingestion/operators/docker/main.py . COPY ingestion/operators/docker/*.py .
RUN pip install --upgrade pip RUN pip install --upgrade pip

View File

@ -68,7 +68,7 @@ COPY ingestion/setup.* ./
COPY ingestion/README.md . COPY ingestion/README.md .
# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. # Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container.
COPY ingestion/operators/docker/main.py . COPY ingestion/operators/docker/*.py .
RUN pip install --upgrade pip RUN pip install --upgrade pip

View File

@ -1,7 +1,21 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Main ingestion entrypoint to run OM workflows
"""
import os import os
import yaml import yaml
from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType, PipelineType,
) )
@ -15,10 +29,50 @@ WORKFLOW_MAP = {
PipelineType.lineage.value: Workflow, PipelineType.lineage.value: Workflow,
PipelineType.profiler.value: ProfilerWorkflow, PipelineType.profiler.value: ProfilerWorkflow,
PipelineType.TestSuite.value: TestSuiteWorkflow, PipelineType.TestSuite.value: TestSuiteWorkflow,
PipelineType.dataInsight.value: DataInsightWorkflow,
PipelineType.elasticSearchReindex.value: Workflow,
PipelineType.dbt.value: Workflow,
} }
def main(): def main():
"""
Ingestion entrypoint. Get the right Workflow class
and execute the ingestion.
This image is expected to be used and run in environments
such as Airflow's KubernetesPodOperator:
```
config = '''
source:
type: ...
serviceName: ...
serviceConnection:
...
sourceConfig:
...
sink:
...
workflowConfig:
...
'''
KubernetesPodOperator(
task_id="ingest",
name="ingest",
cmds=["python", "main.py"],
image="openmetadata/ingestion-base:0.13.2",
namespace='default',
env_vars={"config": config, "pipelineType": "metadata"},
dag=dag,
)
```
Note how we are expecting the env variables to be sent, with the `config` being the str
representation of the ingestion YAML.
"""
# DockerOperator expects an env var called config # DockerOperator expects an env var called config
config = os.environ["config"] config = os.environ["config"]
pipeline_type = os.environ["pipelineType"] pipeline_type = os.environ["pipelineType"]

View File

@ -0,0 +1,61 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Entrypoint to test the connection to a source
"""
import os
import yaml
from metadata.generated.schema.api.services.ingestionPipelines.testServiceConnection import (
TestServiceConnectionRequest,
)
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
def main():
"""
Test Connection entrypoint.
The expected information is in the shape of `TestServiceConnectionRequest`, e.g.,
```
connection:
config:
type: Mysql
scheme: mysql+pymysql
username: openmetadata_user
password: openmetadata_password
hostPort: 'localhost:3306'
connectionType: Database
```
"""
# DockerOperator expects an env var called config
test_connection_dict = yaml.safe_load(os.environ["config"])
test_service_connection = TestServiceConnectionRequest.parse_obj(
test_connection_dict
)
# we need to instantiate the secret manager in case secrets are passed
SecretsManagerFactory(test_service_connection.secretsManagerProvider, None)
connection = get_connection(test_service_connection.connection.config)
# We won't wrap the call in a try/catch. If the connection fails, we want to
# raise the SourceConnectionException as it comes.
test_connection_fn = get_test_connection_fn(
test_service_connection.connection.config
)
test_connection_fn(connection)
if __name__ == "__main__":
main()

View File

@ -230,7 +230,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) { public Response testConnection(TestServiceConnection testServiceConnection) {
HttpResponse<String> response; HttpResponse<String> response;
try { try {
String statusEndPoint = "%s/%s/test_connection"; String statusEndPoint = "%s/%s/test_connection";
@ -238,7 +238,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
String connectionPayload = JsonUtils.pojoToJson(testServiceConnection); String connectionPayload = JsonUtils.pojoToJson(testServiceConnection);
response = post(statusUrl, connectionPayload); response = post(statusUrl, connectionPayload);
if (response.statusCode() == 200) { if (response.statusCode() == 200) {
return response; return Response.status(200, response.body()).build();
} }
} catch (Exception e) { } catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage()); throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());
@ -247,7 +247,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) { public Response killIngestion(IngestionPipeline ingestionPipeline) {
HttpResponse<String> response; HttpResponse<String> response;
try { try {
String killEndPoint = "%s/%s/kill"; String killEndPoint = "%s/%s/kill";
@ -256,7 +256,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
requestPayload.put(DAG_ID, ingestionPipeline.getName()); requestPayload.put(DAG_ID, ingestionPipeline.getName());
response = post(killUrl, requestPayload.toString()); response = post(killUrl, requestPayload.toString());
if (response.statusCode() == 200) { if (response.statusCode() == 200) {
return response; return Response.status(200, response.body()).build();
} }
} catch (Exception e) { } catch (Exception e) {
throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage()); throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage());

View File

@ -13,7 +13,6 @@
package org.openmetadata.service.clients.pipeline.noop; package org.openmetadata.service.clients.pipeline.noop;
import java.net.http.HttpResponse;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
@ -39,7 +38,7 @@ public class NoopClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) { public Response testConnection(TestServiceConnection testServiceConnection) {
return null; return null;
} }
@ -74,7 +73,7 @@ public class NoopClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) { public Response killIngestion(IngestionPipeline ingestionPipeline) {
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "kill")); throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "kill"));
} }

View File

@ -26,7 +26,6 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException; import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import javax.json.JsonPatch; import javax.json.JsonPatch;
@ -501,8 +500,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
throws IOException { throws IOException {
IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED); IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
decryptOrNullify(securityContext, ingestionPipeline); decryptOrNullify(securityContext, ingestionPipeline);
HttpResponse<String> response = pipelineServiceClient.killIngestion(ingestionPipeline); return pipelineServiceClient.killIngestion(ingestionPipeline);
return Response.status(200, response.body()).build();
} }
@POST @POST
@ -525,8 +523,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
testServiceConnection = testServiceConnection =
testServiceConnection.withSecretsManagerProvider( testServiceConnection.withSecretsManagerProvider(
SecretsManagerFactory.getSecretsManager().getSecretsManagerProvider()); SecretsManagerFactory.getSecretsManager().getSecretsManagerProvider());
HttpResponse<String> response = pipelineServiceClient.testConnection(testServiceConnection); return pipelineServiceClient.testConnection(testServiceConnection);
return Response.status(200, response.body()).build();
} }
@GET @GET

View File

@ -1,6 +1,5 @@
package org.openmetadata.service.pipelineService; package org.openmetadata.service.pipelineService;
import java.net.http.HttpResponse;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
@ -23,7 +22,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) { public Response testConnection(TestServiceConnection testServiceConnection) {
return null; return null;
} }
@ -58,7 +57,7 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
} }
@Override @Override
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) { public Response killIngestion(IngestionPipeline ingestionPipeline) {
return null; return null;
} }

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.net.http.HttpResponse;
import java.util.Base64; import java.util.Base64;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -145,7 +144,7 @@ public abstract class PipelineServiceClient {
public abstract Response getServiceStatus(); public abstract Response getServiceStatus();
/* Test the connection to the service such as database service a pipeline depends on. */ /* Test the connection to the service such as database service a pipeline depends on. */
public abstract HttpResponse<String> testConnection(TestServiceConnection testServiceConnection); public abstract Response testConnection(TestServiceConnection testServiceConnection);
/* Deploy a pipeline to the pipeline service */ /* Deploy a pipeline to the pipeline service */
public abstract String deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service); public abstract String deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service);
@ -166,7 +165,7 @@ public abstract class PipelineServiceClient {
public abstract Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after); public abstract Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after);
/* Get the all last run logs of a deployed pipeline */ /* Get the all last run logs of a deployed pipeline */
public abstract HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline); public abstract Response killIngestion(IngestionPipeline ingestionPipeline);
/* /*
Get the Pipeline Service host IP to whitelist in source systems Get the Pipeline Service host IP to whitelist in source systems