diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 1bef815bf84..640490e144c 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -186,16 +186,22 @@ eventHandlerConfiguration: - "org.openmetadata.service.events.ChangeEventHandler" - "org.openmetadata.service.events.WebAnalyticEventHandler" -airflowConfiguration: - apiEndpoint: ${AIRFLOW_HOST:-http://localhost:8080} - hostIp: ${AIRFLOW_HOST_IP:-""} - username: ${AIRFLOW_USERNAME:-admin} - password: ${AIRFLOW_PASSWORD:-admin} +pipelineServiceClientConfiguration: + # If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient" + className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"} + apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080} metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api} - verifySSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate" + hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""} + verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate" sslConfig: validate: - certificatePath: ${AIRFLOW_SSL_CERT_PATH:-""} # Local path for Airflow + certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client + + # Default required parameters for Airflow as Pipeline Service Client + parameters: + username: ${AIRFLOW_USERNAME:-admin} + password: ${AIRFLOW_PASSWORD:-admin} + timeout: ${AIRFLOW_TIMEOUT:-10} # no_encryption_at_rest is the default value, and it does what it says. Please read the manual on how # to secure your instance of OpenMetadata with TLS and encryption at rest. diff --git a/docker/local-metadata/docker-compose-postgres.yml b/docker/local-metadata/docker-compose-postgres.yml index abf29806051..d1b6de61875 100644 --- a/docker/local-metadata/docker-compose-postgres.yml +++ b/docker/local-metadata/docker-compose-postgres.yml @@ -86,9 +86,11 @@ services: RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"} JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"} JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} - # OpenMetadata Server Airflow Configuration - AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080} + # OpenMetadata Server Pipeline Service Client Configuration + PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} + PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} + PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Database configuration for Postgres DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver} DB_SCHEME: ${DB_SCHEME:-postgresql} @@ -98,9 +100,6 @@ services: DB_HOST: ${DB_HOST:-postgresql} DB_PORT: ${DB_PORT:-5432} OM_DATABASE: ${OM_DATABASE:-openmetadata_db} - # Airflow SSL Configurations - AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} - AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""} # ElasticSearch Configurations ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch} ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200} diff --git a/docker/local-metadata/docker-compose.yml b/docker/local-metadata/docker-compose.yml index dcc0355fdaf..dc8914b8ee2 100644 --- a/docker/local-metadata/docker-compose.yml +++ b/docker/local-metadata/docker-compose.yml @@ -86,9 +86,11 @@ services: RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"} JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"} JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} - # OpenMetadata Server Airflow Configuration - AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080} + # OpenMetadata Server Pipeline Service Client Configuration + PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} + PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} + PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Database configuration for MySQL DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver} DB_SCHEME: ${DB_SCHEME:-mysql} @@ -98,9 +100,6 @@ services: DB_HOST: ${DB_HOST:-mysql} DB_PORT: ${DB_PORT:-3306} OM_DATABASE: ${OM_DATABASE:-openmetadata_db} - # Airflow SSL Configurations - AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} - AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""} # ElasticSearch Configurations ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch} ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200} diff --git a/docker/metadata/docker-compose-postgres.yml b/docker/metadata/docker-compose-postgres.yml index 5bf429c3f41..cfe87a57350 100644 --- a/docker/metadata/docker-compose-postgres.yml +++ b/docker/metadata/docker-compose-postgres.yml @@ -79,9 +79,11 @@ services: RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"} JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"} JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} - # OpenMetadata Server Airflow Configuration - AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080} + # OpenMetadata Server Pipeline Service Client Configuration + PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} + PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} + PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} #Database configuration for postgresql DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver} DB_SCHEME: ${DB_SCHEME:-postgresql} @@ -91,9 +93,6 @@ services: DB_HOST: ${DB_HOST:-postgresql} DB_PORT: ${DB_PORT:-5432} OM_DATABASE: ${OM_DATABASE:-openmetadata_db} - # Airflow SSL Configurations - AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} - AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""} # ElasticSearch Configurations ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch} ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200} diff --git a/docker/metadata/docker-compose.yml b/docker/metadata/docker-compose.yml index 9b4394b1eac..9cff7a3c12d 100644 --- a/docker/metadata/docker-compose.yml +++ b/docker/metadata/docker-compose.yml @@ -77,9 +77,11 @@ services: RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"} JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"} JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"} - # OpenMetadata Server Airflow Configuration - AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080} + # OpenMetadata Server Pipeline Service Client Configuration + PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080} SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api} + PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} + PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Database configuration for MySQL DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver} DB_SCHEME: ${DB_SCHEME:-mysql} @@ -89,9 +91,6 @@ services: DB_HOST: ${DB_HOST:-mysql} DB_PORT: ${DB_PORT:-3306} OM_DATABASE: ${OM_DATABASE:-openmetadata_db} - # Airflow SSL Configurations - AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} - AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""} # ElasticSearch Configurations ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch} ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplicationConfig.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplicationConfig.java index 2f9cc839806..5680693c218 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplicationConfig.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplicationConfig.java @@ -23,8 +23,8 @@ import javax.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; import org.openmetadata.schema.api.configuration.LoginConfiguration; -import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration; import org.openmetadata.schema.api.configuration.events.EventHandlerConfiguration; +import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; import org.openmetadata.schema.api.fernet.FernetConfiguration; import org.openmetadata.schema.api.security.AuthenticationConfiguration; import org.openmetadata.schema.api.security.AuthorizerConfiguration; @@ -62,10 +62,8 @@ public class OpenMetadataApplicationConfig extends Configuration { @JsonProperty("eventHandlerConfiguration") private EventHandlerConfiguration eventHandlerConfiguration; - @NotNull - @Valid - @JsonProperty("airflowConfiguration") - private AirflowConfiguration airflowConfiguration; + @JsonProperty("pipelineServiceClientConfiguration") + private PipelineServiceClientConfiguration pipelineServiceClientConfiguration; @JsonProperty("migrationConfiguration") @NotNull diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowConfigurationForAPI.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceAPIClientConfig.java similarity index 88% rename from openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowConfigurationForAPI.java rename to openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceAPIClientConfig.java index cb30babed0c..358dfb6f5dd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowConfigurationForAPI.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceAPIClientConfig.java @@ -11,13 +11,13 @@ * limitations under the License. */ -package org.openmetadata.service.airflow; +package org.openmetadata.service.clients.pipeline; import javax.validation.constraints.NotEmpty; import lombok.Getter; import lombok.Setter; -public class AirflowConfigurationForAPI { +public class PipelineServiceAPIClientConfig { @NotEmpty @Getter @Setter private String apiEndpoint; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java new file mode 100644 index 00000000000..35b5a94d24e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.clients.pipeline; + +import java.lang.reflect.InvocationTargetException; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; +import org.openmetadata.sdk.PipelineServiceClient; + +@Slf4j +public class PipelineServiceClientFactory { + + @Getter private static PipelineServiceClient pipelineServiceClient; + + public static PipelineServiceClient createPipelineServiceClient(PipelineServiceClientConfiguration config) { + if (pipelineServiceClient != null) { + return pipelineServiceClient; + } + + String pipelineServiceClientClass = config.getClassName(); + LOG.info("Registering PipelineServiceClient: {}", pipelineServiceClientClass); + + try { + pipelineServiceClient = + Class.forName(pipelineServiceClientClass) + .asSubclass(PipelineServiceClient.class) + .getConstructor(PipelineServiceClientConfiguration.class) + .newInstance(config); + return pipelineServiceClient; + } catch (ClassNotFoundException + | NoSuchMethodException + | InvocationTargetException + | InstantiationException + | IllegalAccessException e) { + throw new RuntimeException( + String.format( + "Error trying to load PipelineServiceClient %s: %s", pipelineServiceClientClass, e.getMessage())); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java similarity index 85% rename from openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java rename to openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index ec524cda448..b42bd9aa818 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -11,19 +11,22 @@ * limitations under the License. */ -package org.openmetadata.service.airflow; +package org.openmetadata.service.clients.pipeline.airflow; import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.net.URI; +import java.net.URL; +import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.time.Duration; import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.json.JSONObject; -import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration; +import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; @@ -35,6 +38,15 @@ import org.openmetadata.service.util.JsonUtils; @Slf4j public class AirflowRESTClient extends PipelineServiceClient { + + private static final String USERNAME_KEY = "username"; + private static final String PASSWORD_KEY = "password"; + private static final String TIMEOUT_KEY = "timeout"; + + protected final String username; + protected final String password; + protected final HttpClient client; + protected final URL serviceURL; private static final String API_ENDPOINT = "api/v1/openmetadata"; private static final String DAG_ID = "dag_id"; @@ -57,13 +69,35 @@ public class AirflowRESTClient extends PipelineServiceClient { PipelineType.ELASTIC_SEARCH_REINDEX.toString(), "elasticsearch_reindex_task"); - public AirflowRESTClient(AirflowConfiguration airflowConfig) { - super( - airflowConfig.getUsername(), - airflowConfig.getPassword(), - airflowConfig.getApiEndpoint(), - airflowConfig.getHostIp(), - airflowConfig.getTimeout()); + public AirflowRESTClient(PipelineServiceClientConfiguration config) { + + super(config); + + this.username = (String) config.getParameters().getAdditionalProperties().get(USERNAME_KEY); + this.password = (String) config.getParameters().getAdditionalProperties().get(PASSWORD_KEY); + this.serviceURL = validateServiceURL(config.getApiEndpoint()); + this.client = + HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .connectTimeout( + Duration.ofSeconds((Integer) config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY))) + .build(); + } + + public final HttpResponse post(String endpoint, String payload, boolean authenticate) + throws IOException, InterruptedException { + HttpRequest.Builder requestBuilder = + HttpRequest.newBuilder(URI.create(endpoint)) + .header(CONTENT_HEADER, CONTENT_TYPE) + .POST(HttpRequest.BodyPublishers.ofString(payload)); + if (authenticate) { + requestBuilder.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password)); + } + return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + } + + public final HttpResponse post(String endpoint, String payload) throws IOException, InterruptedException { + return post(endpoint, payload, true); } @Override @@ -89,7 +123,8 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public String deletePipeline(String pipelineName) { + public String deletePipeline(IngestionPipeline ingestionPipeline) { + String pipelineName = ingestionPipeline.getName(); try { String deleteEndpoint = "%s/%s/delete?dag_id=%s"; HttpResponse response = @@ -102,7 +137,8 @@ public class AirflowRESTClient extends PipelineServiceClient { } @Override - public String runPipeline(String pipelineName) { + public String runPipeline(IngestionPipeline ingestionPipeline) { + String pipelineName = ingestionPipeline.getName(); HttpResponse response; try { String triggerEndPoint = "%s/%s/trigger"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java new file mode 100644 index 00000000000..ffeedc4caf3 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/noop/NoopClient.java @@ -0,0 +1,84 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.clients.pipeline.noop; + +import java.net.http.HttpResponse; +import java.util.List; +import java.util.Map; +import javax.ws.rs.core.Response; +import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; +import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.sdk.PipelineServiceClient; +import org.openmetadata.sdk.exception.PipelineServiceClientException; + +public class NoopClient extends PipelineServiceClient { + + String EXCEPTION_MSG = "The NoopClient does not implement the %s method"; + + public NoopClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) { + super(pipelineServiceClientConfiguration); + } + + @Override + public Response getServiceStatus() { + return null; + } + + @Override + public HttpResponse testConnection(TestServiceConnection testServiceConnection) { + return null; + } + + @Override + public String deployPipeline(IngestionPipeline ingestionPipeline) { + throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "deploy")); + } + + @Override + public String runPipeline(IngestionPipeline ingestionPipeline) { + throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "run")); + } + + @Override + public String deletePipeline(IngestionPipeline ingestionPipeline) { + throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "delete")); + } + + @Override + public List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { + return null; + } + + @Override + public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) { + throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "toggle")); + } + + @Override + public Map getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) { + return null; + } + + @Override + public HttpResponse killIngestion(IngestionPipeline ingestionPipeline) { + throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "kill")); + } + + @Override + public Map requestGetHostIp() { + return null; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index af8227acf9d..2ff9e1fb146 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -119,7 +119,7 @@ public class IngestionPipelineRepository extends EntityRepository post(String endpoint, String payload) throws IOException, InterruptedException { - return post(endpoint, payload, true); + public final URL validateServiceURL(String serviceURL) { + try { + return new URL(serviceURL); + } catch (MalformedURLException e) { + throw new PipelineServiceClientException(serviceURL + " Malformed."); + } } public final String getBasicAuthenticationHeader(String username, String password) { @@ -83,18 +82,6 @@ public abstract class PipelineServiceClient { return "Basic " + Base64.getEncoder().encodeToString(valueToEncode.getBytes()); } - public final HttpResponse post(String endpoint, String payload, boolean authenticate) - throws IOException, InterruptedException { - HttpRequest.Builder requestBuilder = - HttpRequest.newBuilder(URI.create(endpoint)) - .header(CONTENT_HEADER, CONTENT_TYPE) - .POST(HttpRequest.BodyPublishers.ofString(payload)); - if (authenticate) { - requestBuilder.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password)); - } - return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); - } - public static String getServerVersion() throws IOException { InputStream fileInput = PipelineServiceClient.class.getResourceAsStream("/catalog/VERSION"); Properties props = new Properties(); @@ -129,7 +116,7 @@ public abstract class PipelineServiceClient { return Map.of( "ip", "Failed to find the IP of Airflow Container. Please make sure https://api.ipify.org, " - + "https://api.my-ip.io/ip reachable from your network."); + + "https://api.my-ip.io/ip reachable from your network or that the `hostIp` setting is configured."); } } @@ -143,10 +130,10 @@ public abstract class PipelineServiceClient { public abstract String deployPipeline(IngestionPipeline ingestionPipeline); /* Deploy run the pipeline at the pipeline service */ - public abstract String runPipeline(String pipelineName); + public abstract String runPipeline(IngestionPipeline ingestionPipeline); /* Stop and delete a pipeline at the pipeline service */ - public abstract String deletePipeline(String pipelineName); + public abstract String deletePipeline(IngestionPipeline ingestionPipeline); /* Get the status of a deployed pipeline */ public abstract List getQueuedPipelineStatus(IngestionPipeline ingestionPipeline); diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/airflowConfiguration.json b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json similarity index 50% rename from openmetadata-spec/src/main/resources/json/schema/configuration/airflowConfiguration.json rename to openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json index 98af8d44b0c..0945d70e8e3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/airflowConfiguration.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/pipelineServiceClientConfiguration.json @@ -1,44 +1,27 @@ { - "$id": "https://open-metadata.org/schema/entity/configuration/airflowConfiguration.json", + "$id": "https://open-metadata.org/schema/entity/configuration/pipelineServiceClientConfiguration.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "AirflowConfiguration", - "description": "This schema defines the AirFlow Configuration", + "title": "PipelineServiceClientConfiguration", + "description": "This schema defines the Pipeline Service Client Configuration", "type": "object", - "javaType": "org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration", + "javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration", "properties": { + "className": { + "description": "Class Name for the Pipeline Service Client.", + "type": "string" + }, "apiEndpoint": { - "description": "API host endpoint for Airflow", + "description": "External API root to interact with the Pipeline Service Client", "type": "string" }, "hostIp": { - "description": "Airflow host IP that will be used to connect to the sources.", - "type": "string" - }, - "username": { - "description": "Username for Login", - "type": "string" - }, - "password": { - "description": "Password for Login", + "description": "Pipeline Service Client host IP that will be used to connect to the sources.", "type": "string" }, "metadataApiEndpoint": { - "description": "Metadata api endpoint", + "description": "Metadata api endpoint, e.g., `http://localhost:8585/api`", "type": "string" }, - "authProvider": { - "description": "Auth Provider like no-auth, azure , google, okta, auth0, customOidc, openmetadata", - "type": "string" - }, - "timeout": { - "description": "Timeout", - "type": "integer", - "default": 10 - }, - "authConfig": { - "description": "Auth Provider Configuration.", - "$ref": "authConfig.json" - }, "verifySSL": { "description": "Client SSL verification policy: no-ssl, ignore, validate.", "type": "string", @@ -47,12 +30,27 @@ "sslConfig": { "description": "OpenMetadata Client SSL configuration.", "$ref": "sslConfig.json" + }, + "authProvider": { + "description": "Auth Provider like no-auth, azure , google, okta, auth0, customOidc, openmetadata", + "type": "string" + }, + "authConfig": { + "description": "Auth Provider Configuration.", + "$ref": "authConfig.json" + }, + "parameters": { + "javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.Parameters", + "description": "Additional parameters to initialize the PipelineServiceClient.", + "type": "object", + "additionalProperties": { + ".{1,}": { "type": "string" } + } } }, "required": [ + "className", "apiEndpoint", - "username", - "password", "metadataApiEndpoint" ], "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/settings/settings.json b/openmetadata-spec/src/main/resources/json/schema/settings/settings.json index c0467becd78..359a4c45c1b 100644 --- a/openmetadata-spec/src/main/resources/json/schema/settings/settings.json +++ b/openmetadata-spec/src/main/resources/json/schema/settings/settings.json @@ -35,7 +35,7 @@ "config_value": { "oneOf": [ { - "$ref": "../configuration/airflowConfiguration.json" + "$ref": "../configuration/pipelineServiceClientConfiguration.json" }, { "$ref": "../configuration/authenticationConfiguration.json" diff --git a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js index 087a5558892..b1605a90529 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js @@ -337,7 +337,11 @@ export const editOwnerforCreatedService = ( 'waitForIngestion' ); - interceptURL('GET', '/api/v1/system/config/airflow', 'airflow'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'airflow' + ); // click on created service cy.get(`[data-testid="service-name-${service_Name}"]`) .should('exist') @@ -985,7 +989,11 @@ export const updateDescriptionForIngestedTables = ( `/api/v1/services/ingestionPipelines?fields=owner,pipelineStatuses&service=${serviceName}`, 'getSelectedService' ); - interceptURL('GET', '/api/v1/system/config/airflow', 'airflow'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'airflow' + ); // click on created service cy.get(`[data-testid="service-name-${serviceName}"]`) diff --git a/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/postgres.spec.js b/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/postgres.spec.js index b1f0ab4ec41..4764f289a07 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/postgres.spec.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/postgres.spec.js @@ -114,7 +114,11 @@ describe('Postgres Ingestion', () => { verifyResponseStatusCode('@getServices', 200); cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData'); - interceptURL('GET', '/api/v1/system/config/airflow', 'airflow'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'airflow' + ); cy.get(`[data-testid="service-name-${serviceName}"]`) .should('exist') .click(); diff --git a/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/redshiftWithDBT.spec.js b/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/redshiftWithDBT.spec.js index 3f120baa4cd..499bddadaa0 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/redshiftWithDBT.spec.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/e2e/AddNewService/redshiftWithDBT.spec.js @@ -110,7 +110,11 @@ describe('RedShift Ingestion', () => { verifyResponseStatusCode('@getServices', 200); cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData'); - interceptURL('GET', '/api/v1/system/config/airflow', 'airflow'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'airflow' + ); cy.get(`[data-testid="service-name-${REDSHIFT.serviceName}"]`) .should('exist') .click(); diff --git a/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/DataQualityAndProfiler.spec.js b/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/DataQualityAndProfiler.spec.js index 8da74c11495..5241db29a50 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/DataQualityAndProfiler.spec.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/DataQualityAndProfiler.spec.js @@ -103,7 +103,11 @@ describe('Data Quality and Profiler should work properly', () => { .should('be.visible') .click(); cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData'); - interceptURL('GET', '/api/v1/system/config/airflow', 'airflow'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'airflow' + ); cy.get(`[data-testid="service-name-${serviceName}"]`) .should('exist') .click(); diff --git a/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/Service.spec.js b/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/Service.spec.js index 2fc6ad8a0d1..d07bf3b57bf 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/Service.spec.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/e2e/Pages/Service.spec.js @@ -32,7 +32,11 @@ describe('Services page should work properly', () => { }); it('Update service description', () => { - interceptURL('GET', '/api/v1/system/config/airflow', 'getService'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'getService' + ); cy.get(`[data-testid="service-name-${service.name}"]`) .should('be.visible') .click(); @@ -52,7 +56,11 @@ describe('Services page should work properly', () => { }); it('Update owner and check description', () => { - interceptURL('GET', '/api/v1/system/config/airflow', 'getService'); + interceptURL( + 'GET', + '/api/v1/system/config/pipeline-service-client', + 'getService' + ); cy.get(`[data-testid="service-name-${service.name}"]`) .should('be.visible') .click(); diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/miscAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/miscAPI.ts index 573bb415ff8..8253f89ee99 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/miscAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/miscAPI.ts @@ -16,8 +16,8 @@ import { Edge } from 'components/EntityLineage/EntityLineage.interface'; import { ExploreSearchIndex } from 'components/Explore/explore.interface'; import { AuthorizerConfiguration } from 'generated/configuration/authorizerConfiguration'; import { SearchIndex } from '../enums/search.enum'; -import { AirflowConfiguration } from '../generated/configuration/airflowConfiguration'; import { AuthenticationConfiguration } from '../generated/configuration/authenticationConfiguration'; +import { PipelineServiceClientConfiguration } from '../generated/configuration/pipelineServiceClientConfiguration'; import { EntitiesCount } from '../generated/entity/utils/entitiesCount'; import { Paging } from '../generated/type/paging'; import { @@ -93,8 +93,8 @@ export const fetchSlackConfig = (): Promise => { }; export const fetchAirflowConfig = async () => { - const response = await APIClient.get( - '/system/config/airflow' + const response = await APIClient.get( + '/system/config/pipeline-service-client' ); return response.data;