mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-11 15:28:00 +00:00
Group airflowConfiguration into pipelineService config (#10120)
Group airflowConfiguration into pipelineService config (#10120)
This commit is contained in:
parent
b8e15bd969
commit
1835de8bf2
@ -186,16 +186,22 @@ eventHandlerConfiguration:
|
||||
- "org.openmetadata.service.events.ChangeEventHandler"
|
||||
- "org.openmetadata.service.events.WebAnalyticEventHandler"
|
||||
|
||||
airflowConfiguration:
|
||||
apiEndpoint: ${AIRFLOW_HOST:-http://localhost:8080}
|
||||
hostIp: ${AIRFLOW_HOST_IP:-""}
|
||||
username: ${AIRFLOW_USERNAME:-admin}
|
||||
password: ${AIRFLOW_PASSWORD:-admin}
|
||||
pipelineServiceClientConfiguration:
|
||||
# If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient"
|
||||
className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
|
||||
apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}
|
||||
metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api}
|
||||
verifySSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate"
|
||||
hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
|
||||
verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate"
|
||||
sslConfig:
|
||||
validate:
|
||||
certificatePath: ${AIRFLOW_SSL_CERT_PATH:-""} # Local path for Airflow
|
||||
certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client
|
||||
|
||||
# Default required parameters for Airflow as Pipeline Service Client
|
||||
parameters:
|
||||
username: ${AIRFLOW_USERNAME:-admin}
|
||||
password: ${AIRFLOW_PASSWORD:-admin}
|
||||
timeout: ${AIRFLOW_TIMEOUT:-10}
|
||||
|
||||
# no_encryption_at_rest is the default value, and it does what it says. Please read the manual on how
|
||||
# to secure your instance of OpenMetadata with TLS and encryption at rest.
|
||||
|
||||
@ -86,9 +86,11 @@ services:
|
||||
RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"}
|
||||
JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"}
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Airflow Configuration
|
||||
AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
# Database configuration for Postgres
|
||||
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver}
|
||||
DB_SCHEME: ${DB_SCHEME:-postgresql}
|
||||
@ -98,9 +100,6 @@ services:
|
||||
DB_HOST: ${DB_HOST:-postgresql}
|
||||
DB_PORT: ${DB_PORT:-5432}
|
||||
OM_DATABASE: ${OM_DATABASE:-openmetadata_db}
|
||||
# Airflow SSL Configurations
|
||||
AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"}
|
||||
AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""}
|
||||
# ElasticSearch Configurations
|
||||
ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch}
|
||||
ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200}
|
||||
|
||||
@ -86,9 +86,11 @@ services:
|
||||
RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"}
|
||||
JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"}
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Airflow Configuration
|
||||
AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
# Database configuration for MySQL
|
||||
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver}
|
||||
DB_SCHEME: ${DB_SCHEME:-mysql}
|
||||
@ -98,9 +100,6 @@ services:
|
||||
DB_HOST: ${DB_HOST:-mysql}
|
||||
DB_PORT: ${DB_PORT:-3306}
|
||||
OM_DATABASE: ${OM_DATABASE:-openmetadata_db}
|
||||
# Airflow SSL Configurations
|
||||
AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"}
|
||||
AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""}
|
||||
# ElasticSearch Configurations
|
||||
ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch}
|
||||
ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200}
|
||||
|
||||
@ -79,9 +79,11 @@ services:
|
||||
RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"}
|
||||
JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"}
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Airflow Configuration
|
||||
AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
#Database configuration for postgresql
|
||||
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-org.postgresql.Driver}
|
||||
DB_SCHEME: ${DB_SCHEME:-postgresql}
|
||||
@ -91,9 +93,6 @@ services:
|
||||
DB_HOST: ${DB_HOST:-postgresql}
|
||||
DB_PORT: ${DB_PORT:-5432}
|
||||
OM_DATABASE: ${OM_DATABASE:-openmetadata_db}
|
||||
# Airflow SSL Configurations
|
||||
AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"}
|
||||
AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""}
|
||||
# ElasticSearch Configurations
|
||||
ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch}
|
||||
ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200}
|
||||
|
||||
@ -77,9 +77,11 @@ services:
|
||||
RSA_PRIVATE_KEY_FILE_PATH: ${RSA_PRIVATE_KEY_FILE_PATH:-"./conf/private_key.der"}
|
||||
JWT_ISSUER: ${JWT_ISSUER:-"open-metadata.org"}
|
||||
JWT_KEY_ID: ${JWT_KEY_ID:-"Gb389a-9f76-gdjs-a92j-0242bk94356"}
|
||||
# OpenMetadata Server Airflow Configuration
|
||||
AIRFLOW_HOST: ${AIRFLOW_HOST:-http://ingestion:8080}
|
||||
# OpenMetadata Server Pipeline Service Client Configuration
|
||||
PIPELINE_SERVICE_CLIENT_ENDPOINT: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://ingestion:8080}
|
||||
SERVER_HOST_API_URL: ${SERVER_HOST_API_URL:-http://openmetadata-server:8585/api}
|
||||
PIPELINE_SERVICE_CLIENT_VERIFY_SSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"}
|
||||
PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""}
|
||||
# Database configuration for MySQL
|
||||
DB_DRIVER_CLASS: ${DB_DRIVER_CLASS:-com.mysql.cj.jdbc.Driver}
|
||||
DB_SCHEME: ${DB_SCHEME:-mysql}
|
||||
@ -89,9 +91,6 @@ services:
|
||||
DB_HOST: ${DB_HOST:-mysql}
|
||||
DB_PORT: ${DB_PORT:-3306}
|
||||
OM_DATABASE: ${OM_DATABASE:-openmetadata_db}
|
||||
# Airflow SSL Configurations
|
||||
AIRFLOW_VERIFY_SSL: ${AIRFLOW_VERIFY_SSL:-"no-ssl"}
|
||||
AIRFLOW_SSL_CERT_PATH: ${AIRFLOW_SSL_CERT_PATH:-""}
|
||||
# ElasticSearch Configurations
|
||||
ELASTICSEARCH_HOST: ${ELASTICSEARCH_HOST:- elasticsearch}
|
||||
ELASTICSEARCH_PORT: ${ELASTICSEARCH_PORT:-9200}
|
||||
|
||||
@ -23,8 +23,8 @@ import javax.validation.constraints.NotNull;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.openmetadata.schema.api.configuration.LoginConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.events.EventHandlerConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.api.fernet.FernetConfiguration;
|
||||
import org.openmetadata.schema.api.security.AuthenticationConfiguration;
|
||||
import org.openmetadata.schema.api.security.AuthorizerConfiguration;
|
||||
@ -62,10 +62,8 @@ public class OpenMetadataApplicationConfig extends Configuration {
|
||||
@JsonProperty("eventHandlerConfiguration")
|
||||
private EventHandlerConfiguration eventHandlerConfiguration;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty("airflowConfiguration")
|
||||
private AirflowConfiguration airflowConfiguration;
|
||||
@JsonProperty("pipelineServiceClientConfiguration")
|
||||
private PipelineServiceClientConfiguration pipelineServiceClientConfiguration;
|
||||
|
||||
@JsonProperty("migrationConfiguration")
|
||||
@NotNull
|
||||
|
||||
@ -11,13 +11,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.airflow;
|
||||
package org.openmetadata.service.clients.pipeline;
|
||||
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
public class AirflowConfigurationForAPI {
|
||||
public class PipelineServiceAPIClientConfig {
|
||||
|
||||
@NotEmpty @Getter @Setter private String apiEndpoint;
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.clients.pipeline;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.sdk.PipelineServiceClient;
|
||||
|
||||
@Slf4j
|
||||
public class PipelineServiceClientFactory {
|
||||
|
||||
@Getter private static PipelineServiceClient pipelineServiceClient;
|
||||
|
||||
public static PipelineServiceClient createPipelineServiceClient(PipelineServiceClientConfiguration config) {
|
||||
if (pipelineServiceClient != null) {
|
||||
return pipelineServiceClient;
|
||||
}
|
||||
|
||||
String pipelineServiceClientClass = config.getClassName();
|
||||
LOG.info("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
|
||||
|
||||
try {
|
||||
pipelineServiceClient =
|
||||
Class.forName(pipelineServiceClientClass)
|
||||
.asSubclass(PipelineServiceClient.class)
|
||||
.getConstructor(PipelineServiceClientConfiguration.class)
|
||||
.newInstance(config);
|
||||
return pipelineServiceClient;
|
||||
} catch (ClassNotFoundException
|
||||
| NoSuchMethodException
|
||||
| InvocationTargetException
|
||||
| InstantiationException
|
||||
| IllegalAccessException e) {
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Error trying to load PipelineServiceClient %s: %s", pipelineServiceClientClass, e.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -11,19 +11,22 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.airflow;
|
||||
package org.openmetadata.service.clients.pipeline.airflow;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.json.JSONObject;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||
@ -35,6 +38,15 @@ import org.openmetadata.service.util.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class AirflowRESTClient extends PipelineServiceClient {
|
||||
|
||||
private static final String USERNAME_KEY = "username";
|
||||
private static final String PASSWORD_KEY = "password";
|
||||
private static final String TIMEOUT_KEY = "timeout";
|
||||
|
||||
protected final String username;
|
||||
protected final String password;
|
||||
protected final HttpClient client;
|
||||
protected final URL serviceURL;
|
||||
private static final String API_ENDPOINT = "api/v1/openmetadata";
|
||||
private static final String DAG_ID = "dag_id";
|
||||
|
||||
@ -57,13 +69,35 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
||||
PipelineType.ELASTIC_SEARCH_REINDEX.toString(),
|
||||
"elasticsearch_reindex_task");
|
||||
|
||||
public AirflowRESTClient(AirflowConfiguration airflowConfig) {
|
||||
super(
|
||||
airflowConfig.getUsername(),
|
||||
airflowConfig.getPassword(),
|
||||
airflowConfig.getApiEndpoint(),
|
||||
airflowConfig.getHostIp(),
|
||||
airflowConfig.getTimeout());
|
||||
public AirflowRESTClient(PipelineServiceClientConfiguration config) {
|
||||
|
||||
super(config);
|
||||
|
||||
this.username = (String) config.getParameters().getAdditionalProperties().get(USERNAME_KEY);
|
||||
this.password = (String) config.getParameters().getAdditionalProperties().get(PASSWORD_KEY);
|
||||
this.serviceURL = validateServiceURL(config.getApiEndpoint());
|
||||
this.client =
|
||||
HttpClient.newBuilder()
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(
|
||||
Duration.ofSeconds((Integer) config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)))
|
||||
.build();
|
||||
}
|
||||
|
||||
public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate)
|
||||
throws IOException, InterruptedException {
|
||||
HttpRequest.Builder requestBuilder =
|
||||
HttpRequest.newBuilder(URI.create(endpoint))
|
||||
.header(CONTENT_HEADER, CONTENT_TYPE)
|
||||
.POST(HttpRequest.BodyPublishers.ofString(payload));
|
||||
if (authenticate) {
|
||||
requestBuilder.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password));
|
||||
}
|
||||
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
|
||||
}
|
||||
|
||||
public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
|
||||
return post(endpoint, payload, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -89,7 +123,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String deletePipeline(String pipelineName) {
|
||||
public String deletePipeline(IngestionPipeline ingestionPipeline) {
|
||||
String pipelineName = ingestionPipeline.getName();
|
||||
try {
|
||||
String deleteEndpoint = "%s/%s/delete?dag_id=%s";
|
||||
HttpResponse<String> response =
|
||||
@ -102,7 +137,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String runPipeline(String pipelineName) {
|
||||
public String runPipeline(IngestionPipeline ingestionPipeline) {
|
||||
String pipelineName = ingestionPipeline.getName();
|
||||
HttpResponse<String> response;
|
||||
try {
|
||||
String triggerEndPoint = "%s/%s/trigger";
|
||||
@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.clients.pipeline.noop;
|
||||
|
||||
import java.net.http.HttpResponse;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.ws.rs.core.Response;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||
import org.openmetadata.sdk.PipelineServiceClient;
|
||||
import org.openmetadata.sdk.exception.PipelineServiceClientException;
|
||||
|
||||
public class NoopClient extends PipelineServiceClient {
|
||||
|
||||
String EXCEPTION_MSG = "The NoopClient does not implement the %s method";
|
||||
|
||||
public NoopClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
||||
super(pipelineServiceClientConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response getServiceStatus() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String deployPipeline(IngestionPipeline ingestionPipeline) {
|
||||
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "deploy"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String runPipeline(IngestionPipeline ingestionPipeline) {
|
||||
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "run"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String deletePipeline(IngestionPipeline ingestionPipeline) {
|
||||
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "delete"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
|
||||
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "toggle"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) {
|
||||
throw new PipelineServiceClientException(String.format(EXCEPTION_MSG, "kill"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> requestGetHostIp() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -119,7 +119,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
|
||||
@Override
|
||||
protected void postDelete(IngestionPipeline entity) {
|
||||
pipelineServiceClient.deletePipeline(entity.getName());
|
||||
pipelineServiceClient.deletePipeline(entity);
|
||||
}
|
||||
|
||||
public void setPipelineServiceClient(PipelineServiceClient client) {
|
||||
|
||||
@ -63,7 +63,7 @@ import org.openmetadata.schema.type.MetadataOperation;
|
||||
import org.openmetadata.sdk.PipelineServiceClient;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.airflow.AirflowRESTClient;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
@ -103,7 +103,9 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
||||
@Override
|
||||
public void initialize(OpenMetadataApplicationConfig config) {
|
||||
this.openMetadataApplicationConfig = config;
|
||||
this.pipelineServiceClient = new AirflowRESTClient(openMetadataApplicationConfig.getAirflowConfiguration());
|
||||
|
||||
this.pipelineServiceClient =
|
||||
PipelineServiceClientFactory.createPipelineServiceClient(config.getPipelineServiceClientConfiguration());
|
||||
dao.setPipelineServiceClient(pipelineServiceClient);
|
||||
}
|
||||
|
||||
@ -438,7 +440,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
||||
throws IOException {
|
||||
Fields fields = getFields(FIELD_OWNER);
|
||||
IngestionPipeline ingestionPipeline = dao.get(uriInfo, id, fields);
|
||||
pipelineServiceClient.runPipeline(ingestionPipeline.getName());
|
||||
pipelineServiceClient.runPipeline(ingestionPipeline);
|
||||
decryptOrNullify(securityContext, ingestionPipeline);
|
||||
return addHref(uriInfo, ingestionPipeline);
|
||||
}
|
||||
|
||||
@ -26,7 +26,7 @@ import org.openmetadata.schema.api.security.AuthenticationConfiguration;
|
||||
import org.openmetadata.schema.api.security.AuthorizerConfiguration;
|
||||
import org.openmetadata.schema.api.slackChat.SlackChatConfiguration;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.airflow.AirflowConfigurationForAPI;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceAPIClientConfig;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.sandbox.SandboxConfiguration;
|
||||
import org.openmetadata.service.security.jwt.JWKSResponse;
|
||||
@ -139,7 +139,7 @@ public class ConfigResource {
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(("/airflow"))
|
||||
@Path(("/pipeline-service-client"))
|
||||
@Operation(
|
||||
operationId = "getAirflowConfiguration",
|
||||
summary = "Get airflow configuration",
|
||||
@ -151,15 +151,15 @@ public class ConfigResource {
|
||||
content =
|
||||
@Content(
|
||||
mediaType = "application/json",
|
||||
schema = @Schema(implementation = AirflowConfigurationForAPI.class)))
|
||||
schema = @Schema(implementation = PipelineServiceAPIClientConfig.class)))
|
||||
})
|
||||
public AirflowConfigurationForAPI getAirflowConfig() {
|
||||
AirflowConfigurationForAPI airflowConfigurationForAPI = new AirflowConfigurationForAPI();
|
||||
if (openMetadataApplicationConfig.getAirflowConfiguration() != null) {
|
||||
airflowConfigurationForAPI.setApiEndpoint(
|
||||
openMetadataApplicationConfig.getAirflowConfiguration().getApiEndpoint());
|
||||
public PipelineServiceAPIClientConfig getPipelineServiceConfig() {
|
||||
PipelineServiceAPIClientConfig pipelineServiceClientConfigForAPI = new PipelineServiceAPIClientConfig();
|
||||
if (openMetadataApplicationConfig.getPipelineServiceClientConfiguration() != null) {
|
||||
pipelineServiceClientConfigForAPI.setApiEndpoint(
|
||||
openMetadataApplicationConfig.getPipelineServiceClientConfiguration().getApiEndpoint());
|
||||
}
|
||||
return airflowConfigurationForAPI;
|
||||
return pipelineServiceClientConfigForAPI;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
||||
@ -4,7 +4,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.api.configuration.airflow.SSLConfig;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.auth.JWTAuthMechanism;
|
||||
import org.openmetadata.schema.auth.SSOAuthMechanism;
|
||||
import org.openmetadata.schema.entity.Bot;
|
||||
@ -52,14 +52,17 @@ public class OpenMetadataConnectionBuilder {
|
||||
authProvider = extractAuthProvider(botUser);
|
||||
}
|
||||
|
||||
AirflowConfiguration airflowConfiguration = openMetadataApplicationConfig.getAirflowConfiguration();
|
||||
openMetadataURL = airflowConfiguration.getMetadataApiEndpoint();
|
||||
clusterName = openMetadataApplicationConfig.getClusterName();
|
||||
secretsManagerProvider = SecretsManagerFactory.getSecretsManager().getSecretsManagerProvider();
|
||||
verifySSL = VerifySSL.fromValue(airflowConfiguration.getVerifySSL());
|
||||
PipelineServiceClientConfiguration pipelineServiceClientConfiguration =
|
||||
openMetadataApplicationConfig.getPipelineServiceClientConfiguration();
|
||||
openMetadataURL = pipelineServiceClientConfiguration.getMetadataApiEndpoint();
|
||||
verifySSL = VerifySSL.fromValue(pipelineServiceClientConfiguration.getVerifySSL());
|
||||
airflowSSLConfig =
|
||||
getAirflowSSLConfig(
|
||||
VerifySSL.fromValue(airflowConfiguration.getVerifySSL()), airflowConfiguration.getSslConfig());
|
||||
VerifySSL.fromValue(pipelineServiceClientConfiguration.getVerifySSL()),
|
||||
pipelineServiceClientConfiguration.getSslConfig());
|
||||
|
||||
clusterName = openMetadataApplicationConfig.getClusterName();
|
||||
secretsManagerProvider = SecretsManagerFactory.getSecretsManager().getSecretsManagerProvider();
|
||||
}
|
||||
|
||||
private OpenMetadataConnection.AuthProvider extractAuthProvider(User botUser) {
|
||||
|
||||
@ -20,7 +20,7 @@ import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.api.configuration.airflow.AuthConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.auth.BasicAuthMechanism;
|
||||
import org.openmetadata.schema.auth.JWTAuthMechanism;
|
||||
import org.openmetadata.schema.auth.JWTTokenExpiry;
|
||||
@ -143,14 +143,16 @@ public final class UserUtil {
|
||||
*/
|
||||
public static User addOrUpdateBotUser(User user, OpenMetadataApplicationConfig openMetadataApplicationConfig) {
|
||||
User originalUser = retrieveWithAuthMechanism(user);
|
||||
AirflowConfiguration airflowConfig = openMetadataApplicationConfig.getAirflowConfiguration();
|
||||
PipelineServiceClientConfiguration pipelineServiceClientConfiguration =
|
||||
openMetadataApplicationConfig.getPipelineServiceClientConfiguration();
|
||||
AuthenticationMechanism authMechanism = originalUser != null ? originalUser.getAuthenticationMechanism() : null;
|
||||
// the user did not have an auth mechanism and auth config is present
|
||||
if (authConfigPresent(airflowConfig) && authMechanism == null) {
|
||||
AuthConfiguration authConfig = airflowConfig.getAuthConfig();
|
||||
if (authConfigPresent(pipelineServiceClientConfiguration) && authMechanism == null) {
|
||||
AuthConfiguration authConfig = pipelineServiceClientConfiguration.getAuthConfig();
|
||||
String currentAuthProvider = openMetadataApplicationConfig.getAuthenticationConfiguration().getProvider();
|
||||
// if the auth provider is "openmetadata" in the configuration set JWT as auth mechanism
|
||||
if ("openmetadata".equals(airflowConfig.getAuthProvider()) && !"basic".equals(currentAuthProvider)) {
|
||||
if ("openmetadata".equals(pipelineServiceClientConfiguration.getAuthProvider())
|
||||
&& !"basic".equals(currentAuthProvider)) {
|
||||
OpenMetadataJWTClientConfig jwtClientConfig = authConfig.getOpenmetadata();
|
||||
authMechanism = buildAuthMechanism(JWT, buildJWTAuthMechanism(jwtClientConfig, user));
|
||||
// TODO: https://github.com/open-metadata/OpenMetadata/issues/7712
|
||||
@ -192,8 +194,8 @@ public final class UserUtil {
|
||||
return addOrUpdateUser(user);
|
||||
}
|
||||
|
||||
private static boolean authConfigPresent(AirflowConfiguration airflowConfig) {
|
||||
return airflowConfig != null && airflowConfig.getAuthConfig() != null;
|
||||
private static boolean authConfigPresent(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
||||
return pipelineServiceClientConfiguration != null && pipelineServiceClientConfiguration.getAuthConfig() != null;
|
||||
}
|
||||
|
||||
private static JWTAuthMechanism buildJWTAuthMechanism(OpenMetadataJWTClientConfig jwtClientConfig, User user) {
|
||||
|
||||
@ -4,6 +4,7 @@ import java.net.http.HttpResponse;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.ws.rs.core.Response;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||
@ -11,9 +12,8 @@ import org.openmetadata.sdk.PipelineServiceClient;
|
||||
|
||||
public class MockPipelineServiceClient extends PipelineServiceClient {
|
||||
|
||||
public MockPipelineServiceClient(
|
||||
String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) {
|
||||
super(userName, password, apiEndpoint, hostIp, apiTimeout);
|
||||
public MockPipelineServiceClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
||||
super(pipelineServiceClientConfiguration);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -32,12 +32,12 @@ public class MockPipelineServiceClient extends PipelineServiceClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String runPipeline(String pipelineName) {
|
||||
public String runPipeline(IngestionPipeline ingestionPipeline) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String deletePipeline(String pipelineName) {
|
||||
public String deletePipeline(IngestionPipeline ingestionPipeline) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@ -4,12 +4,17 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.sdk.exception.PipelineServiceVersionException;
|
||||
|
||||
public class PipelineServiceClientTest {
|
||||
|
||||
final MockPipelineServiceClient mockPipelineServiceClient =
|
||||
new MockPipelineServiceClient("user", "password", "https://endpoint.com", "111.11.11.1", 10);
|
||||
new MockPipelineServiceClient(
|
||||
new PipelineServiceClientConfiguration()
|
||||
.withClassName("")
|
||||
.withMetadataApiEndpoint("http://openmetadata-server:8585/api")
|
||||
.withApiEndpoint("http://ingestion:8080"));
|
||||
|
||||
@Test
|
||||
public void testGetVersionFromString() {
|
||||
|
||||
@ -19,19 +19,19 @@ import static org.openmetadata.service.resources.services.ingestionpipelines.Ing
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import lombok.SneakyThrows;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.Parameters;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
|
||||
import org.openmetadata.sdk.exception.PipelineServiceClientException;
|
||||
import org.openmetadata.service.airflow.AirflowRESTClient;
|
||||
import org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class AirflowRESTClientIntegrationTest {
|
||||
@ -53,8 +53,19 @@ class AirflowRESTClientIntegrationTest {
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
AirflowConfiguration airflowConfiguration = createDefaultAirflowConfiguration();
|
||||
airflowRESTClient = new AirflowRESTClient(airflowConfiguration);
|
||||
|
||||
PipelineServiceClientConfiguration pipelineServiceClientConfiguration = new PipelineServiceClientConfiguration();
|
||||
pipelineServiceClientConfiguration.setHostIp("111.11.11.1");
|
||||
pipelineServiceClientConfiguration.setMetadataApiEndpoint("http://openmetadata-server:8585/api");
|
||||
|
||||
Parameters params = new Parameters();
|
||||
params.setAdditionalProperty("username", "user");
|
||||
params.setAdditionalProperty("password", "pass");
|
||||
params.setAdditionalProperty("apiEndpoint", "");
|
||||
|
||||
pipelineServiceClientConfiguration.setParameters(params);
|
||||
|
||||
airflowRESTClient = new AirflowRESTClient(pipelineServiceClientConfiguration);
|
||||
httpServerExtension.unregisterHandler();
|
||||
}
|
||||
|
||||
@ -97,16 +108,6 @@ class AirflowRESTClientIntegrationTest {
|
||||
assertEquals(expectedMessage, actualMessage);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private AirflowConfiguration createDefaultAirflowConfiguration() {
|
||||
AirflowConfiguration airflowConfiguration = new AirflowConfiguration();
|
||||
airflowConfiguration.setApiEndpoint(HttpServerExtension.getUriFor("").toString());
|
||||
airflowConfiguration.setUsername("user");
|
||||
airflowConfiguration.setPassword("pass");
|
||||
airflowConfiguration.setTimeout(60);
|
||||
return airflowConfiguration;
|
||||
}
|
||||
|
||||
private void registerMockedEndpoints(int lastDagLogStatusCode) {
|
||||
String jsonResponse = "{ \"key1\": \"value1\", \"key2\": \"value2\" }";
|
||||
|
||||
|
||||
@ -43,7 +43,6 @@ import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
|
||||
import org.openmetadata.schema.api.services.CreateDatabaseService;
|
||||
import org.openmetadata.schema.api.services.DatabaseConnection;
|
||||
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
|
||||
@ -79,7 +78,6 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
|
||||
public static SourceConfig DATABASE_METADATA_CONFIG;
|
||||
public static SourceConfig DASHBOARD_METADATA_CONFIG;
|
||||
public static SourceConfig MESSAGING_METADATA_CONFIG;
|
||||
public static AirflowConfiguration AIRFLOW_CONFIG;
|
||||
public static DatabaseServiceResourceTest DATABASE_SERVICE_RESOURCE_TEST;
|
||||
public static Date START_DATE;
|
||||
|
||||
@ -110,10 +108,6 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
|
||||
DATABASE_METADATA_CONFIG = new SourceConfig().withConfig(databaseServiceMetadataPipeline);
|
||||
DASHBOARD_METADATA_CONFIG = new SourceConfig().withConfig(dashboardServiceMetadataPipeline);
|
||||
MESSAGING_METADATA_CONFIG = new SourceConfig().withConfig(messagingServiceMetadataPipeline);
|
||||
AIRFLOW_CONFIG = new AirflowConfiguration();
|
||||
AIRFLOW_CONFIG.setApiEndpoint("http://localhost:8080");
|
||||
AIRFLOW_CONFIG.setUsername("admin");
|
||||
AIRFLOW_CONFIG.setPassword("admin");
|
||||
DATABASE_SERVICE_RESOURCE_TEST = new DatabaseServiceResourceTest();
|
||||
START_DATE = new DateTime("2022-06-10T15:06:47+00:00").toDate();
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ import org.openmetadata.schema.api.security.AuthorizerConfiguration;
|
||||
import org.openmetadata.schema.api.slackChat.SlackChatConfiguration;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||
import org.openmetadata.service.airflow.AirflowConfigurationForAPI;
|
||||
import org.openmetadata.service.clients.pipeline.PipelineServiceAPIClientConfig;
|
||||
import org.openmetadata.service.security.jwt.JWKSKey;
|
||||
import org.openmetadata.service.security.jwt.JWKSResponse;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
@ -81,9 +81,10 @@ class ConfigResourceTest extends OpenMetadataApplicationTest {
|
||||
|
||||
@Test
|
||||
void get_airflow_configs_200_OK() throws IOException {
|
||||
WebTarget target = getConfigResource("airflow");
|
||||
AirflowConfigurationForAPI auth = TestUtils.get(target, AirflowConfigurationForAPI.class, TEST_AUTH_HEADERS);
|
||||
assertEquals(config.getAirflowConfiguration().getApiEndpoint(), auth.getApiEndpoint());
|
||||
WebTarget target = getConfigResource("pipeline-service-client");
|
||||
PipelineServiceAPIClientConfig auth =
|
||||
TestUtils.get(target, PipelineServiceAPIClientConfig.class, TEST_AUTH_HEADERS);
|
||||
assertEquals(config.getPipelineServiceClientConfiguration().getApiEndpoint(), auth.getApiEndpoint());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -166,13 +166,19 @@ eventHandlerConfiguration:
|
||||
- "org.openmetadata.service.events.AuditEventHandler"
|
||||
- "org.openmetadata.service.events.ChangeEventHandler"
|
||||
|
||||
airflowConfiguration:
|
||||
apiEndpoint: "http://localhost:8080"
|
||||
username: "admin"
|
||||
password: "admin"
|
||||
metadataApiEndpoint: "http://localhost:8585/api"
|
||||
pipelineServiceClientConfiguration:
|
||||
className: "org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"
|
||||
metadataApiEndpoint: http://localhost:8585/api
|
||||
apiEndpoint: http://localhost:8080
|
||||
hostIp: ""
|
||||
verifySSL: "no-ssl"
|
||||
authProvider: "openmetadata"
|
||||
|
||||
parameters:
|
||||
username: admin
|
||||
password: admin
|
||||
timeout: 10
|
||||
|
||||
email:
|
||||
enableSmtpServer : false
|
||||
emailingEntity: ""
|
||||
|
||||
@ -1,14 +1,23 @@
|
||||
/*
|
||||
* Copyright 2021 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.sdk;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.time.Duration;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -17,6 +26,7 @@ import java.util.regex.Pattern;
|
||||
import javax.ws.rs.core.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
|
||||
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
|
||||
@ -37,11 +47,8 @@ import org.openmetadata.sdk.exception.PipelineServiceVersionException;
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class PipelineServiceClient {
|
||||
protected final URL serviceURL;
|
||||
protected final String username;
|
||||
protected final String password;
|
||||
protected final String hostIp;
|
||||
protected final HttpClient client;
|
||||
|
||||
protected static final String AUTH_HEADER = "Authorization";
|
||||
protected static final String CONTENT_HEADER = "Content-Type";
|
||||
protected static final String CONTENT_TYPE = "application/json";
|
||||
@ -58,24 +65,16 @@ public abstract class PipelineServiceClient {
|
||||
SERVER_VERSION = rawServerVersion;
|
||||
}
|
||||
|
||||
public PipelineServiceClient(String userName, String password, String apiEndpoint, String hostIp, int apiTimeout) {
|
||||
try {
|
||||
this.serviceURL = new URL(apiEndpoint);
|
||||
} catch (MalformedURLException e) {
|
||||
throw new PipelineServiceClientException(apiEndpoint + " Malformed.");
|
||||
}
|
||||
this.username = userName;
|
||||
this.password = password;
|
||||
this.hostIp = hostIp;
|
||||
this.client =
|
||||
HttpClient.newBuilder()
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(Duration.ofSeconds(apiTimeout))
|
||||
.build();
|
||||
public PipelineServiceClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) {
|
||||
this.hostIp = pipelineServiceClientConfiguration.getHostIp();
|
||||
}
|
||||
|
||||
public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
|
||||
return post(endpoint, payload, true);
|
||||
public final URL validateServiceURL(String serviceURL) {
|
||||
try {
|
||||
return new URL(serviceURL);
|
||||
} catch (MalformedURLException e) {
|
||||
throw new PipelineServiceClientException(serviceURL + " Malformed.");
|
||||
}
|
||||
}
|
||||
|
||||
public final String getBasicAuthenticationHeader(String username, String password) {
|
||||
@ -83,18 +82,6 @@ public abstract class PipelineServiceClient {
|
||||
return "Basic " + Base64.getEncoder().encodeToString(valueToEncode.getBytes());
|
||||
}
|
||||
|
||||
public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate)
|
||||
throws IOException, InterruptedException {
|
||||
HttpRequest.Builder requestBuilder =
|
||||
HttpRequest.newBuilder(URI.create(endpoint))
|
||||
.header(CONTENT_HEADER, CONTENT_TYPE)
|
||||
.POST(HttpRequest.BodyPublishers.ofString(payload));
|
||||
if (authenticate) {
|
||||
requestBuilder.header(AUTH_HEADER, getBasicAuthenticationHeader(username, password));
|
||||
}
|
||||
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
|
||||
}
|
||||
|
||||
public static String getServerVersion() throws IOException {
|
||||
InputStream fileInput = PipelineServiceClient.class.getResourceAsStream("/catalog/VERSION");
|
||||
Properties props = new Properties();
|
||||
@ -129,7 +116,7 @@ public abstract class PipelineServiceClient {
|
||||
return Map.of(
|
||||
"ip",
|
||||
"Failed to find the IP of Airflow Container. Please make sure https://api.ipify.org, "
|
||||
+ "https://api.my-ip.io/ip reachable from your network.");
|
||||
+ "https://api.my-ip.io/ip reachable from your network or that the `hostIp` setting is configured.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,10 +130,10 @@ public abstract class PipelineServiceClient {
|
||||
public abstract String deployPipeline(IngestionPipeline ingestionPipeline);
|
||||
|
||||
/* Deploy run the pipeline at the pipeline service */
|
||||
public abstract String runPipeline(String pipelineName);
|
||||
public abstract String runPipeline(IngestionPipeline ingestionPipeline);
|
||||
|
||||
/* Stop and delete a pipeline at the pipeline service */
|
||||
public abstract String deletePipeline(String pipelineName);
|
||||
public abstract String deletePipeline(IngestionPipeline ingestionPipeline);
|
||||
|
||||
/* Get the status of a deployed pipeline */
|
||||
public abstract List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline);
|
||||
|
||||
@ -1,44 +1,27 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/configuration/airflowConfiguration.json",
|
||||
"$id": "https://open-metadata.org/schema/entity/configuration/pipelineServiceClientConfiguration.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "AirflowConfiguration",
|
||||
"description": "This schema defines the AirFlow Configuration",
|
||||
"title": "PipelineServiceClientConfiguration",
|
||||
"description": "This schema defines the Pipeline Service Client Configuration",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration",
|
||||
"javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration",
|
||||
"properties": {
|
||||
"className": {
|
||||
"description": "Class Name for the Pipeline Service Client.",
|
||||
"type": "string"
|
||||
},
|
||||
"apiEndpoint": {
|
||||
"description": "API host endpoint for Airflow",
|
||||
"description": "External API root to interact with the Pipeline Service Client",
|
||||
"type": "string"
|
||||
},
|
||||
"hostIp": {
|
||||
"description": "Airflow host IP that will be used to connect to the sources.",
|
||||
"type": "string"
|
||||
},
|
||||
"username": {
|
||||
"description": "Username for Login",
|
||||
"type": "string"
|
||||
},
|
||||
"password": {
|
||||
"description": "Password for Login",
|
||||
"description": "Pipeline Service Client host IP that will be used to connect to the sources.",
|
||||
"type": "string"
|
||||
},
|
||||
"metadataApiEndpoint": {
|
||||
"description": "Metadata api endpoint",
|
||||
"description": "Metadata api endpoint, e.g., `http://localhost:8585/api`",
|
||||
"type": "string"
|
||||
},
|
||||
"authProvider": {
|
||||
"description": "Auth Provider like no-auth, azure , google, okta, auth0, customOidc, openmetadata",
|
||||
"type": "string"
|
||||
},
|
||||
"timeout": {
|
||||
"description": "Timeout",
|
||||
"type": "integer",
|
||||
"default": 10
|
||||
},
|
||||
"authConfig": {
|
||||
"description": "Auth Provider Configuration.",
|
||||
"$ref": "authConfig.json"
|
||||
},
|
||||
"verifySSL": {
|
||||
"description": "Client SSL verification policy: no-ssl, ignore, validate.",
|
||||
"type": "string",
|
||||
@ -47,12 +30,27 @@
|
||||
"sslConfig": {
|
||||
"description": "OpenMetadata Client SSL configuration.",
|
||||
"$ref": "sslConfig.json"
|
||||
},
|
||||
"authProvider": {
|
||||
"description": "Auth Provider like no-auth, azure , google, okta, auth0, customOidc, openmetadata",
|
||||
"type": "string"
|
||||
},
|
||||
"authConfig": {
|
||||
"description": "Auth Provider Configuration.",
|
||||
"$ref": "authConfig.json"
|
||||
},
|
||||
"parameters": {
|
||||
"javaType": "org.openmetadata.schema.api.configuration.pipelineServiceClient.Parameters",
|
||||
"description": "Additional parameters to initialize the PipelineServiceClient.",
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
".{1,}": { "type": "string" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"className",
|
||||
"apiEndpoint",
|
||||
"username",
|
||||
"password",
|
||||
"metadataApiEndpoint"
|
||||
],
|
||||
"additionalProperties": false
|
||||
@ -35,7 +35,7 @@
|
||||
"config_value": {
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "../configuration/airflowConfiguration.json"
|
||||
"$ref": "../configuration/pipelineServiceClientConfiguration.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../configuration/authenticationConfiguration.json"
|
||||
|
||||
@ -337,7 +337,11 @@ export const editOwnerforCreatedService = (
|
||||
'waitForIngestion'
|
||||
);
|
||||
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'airflow');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'airflow'
|
||||
);
|
||||
// click on created service
|
||||
cy.get(`[data-testid="service-name-${service_Name}"]`)
|
||||
.should('exist')
|
||||
@ -985,7 +989,11 @@ export const updateDescriptionForIngestedTables = (
|
||||
`/api/v1/services/ingestionPipelines?fields=owner,pipelineStatuses&service=${serviceName}`,
|
||||
'getSelectedService'
|
||||
);
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'airflow');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'airflow'
|
||||
);
|
||||
|
||||
// click on created service
|
||||
cy.get(`[data-testid="service-name-${serviceName}"]`)
|
||||
|
||||
@ -114,7 +114,11 @@ describe('Postgres Ingestion', () => {
|
||||
|
||||
verifyResponseStatusCode('@getServices', 200);
|
||||
cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData');
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'airflow');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'airflow'
|
||||
);
|
||||
cy.get(`[data-testid="service-name-${serviceName}"]`)
|
||||
.should('exist')
|
||||
.click();
|
||||
|
||||
@ -110,7 +110,11 @@ describe('RedShift Ingestion', () => {
|
||||
|
||||
verifyResponseStatusCode('@getServices', 200);
|
||||
cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData');
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'airflow');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'airflow'
|
||||
);
|
||||
cy.get(`[data-testid="service-name-${REDSHIFT.serviceName}"]`)
|
||||
.should('exist')
|
||||
.click();
|
||||
|
||||
@ -103,7 +103,11 @@ describe('Data Quality and Profiler should work properly', () => {
|
||||
.should('be.visible')
|
||||
.click();
|
||||
cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData');
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'airflow');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'airflow'
|
||||
);
|
||||
cy.get(`[data-testid="service-name-${serviceName}"]`)
|
||||
.should('exist')
|
||||
.click();
|
||||
|
||||
@ -32,7 +32,11 @@ describe('Services page should work properly', () => {
|
||||
});
|
||||
|
||||
it('Update service description', () => {
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'getService');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'getService'
|
||||
);
|
||||
cy.get(`[data-testid="service-name-${service.name}"]`)
|
||||
.should('be.visible')
|
||||
.click();
|
||||
@ -52,7 +56,11 @@ describe('Services page should work properly', () => {
|
||||
});
|
||||
|
||||
it('Update owner and check description', () => {
|
||||
interceptURL('GET', '/api/v1/system/config/airflow', 'getService');
|
||||
interceptURL(
|
||||
'GET',
|
||||
'/api/v1/system/config/pipeline-service-client',
|
||||
'getService'
|
||||
);
|
||||
cy.get(`[data-testid="service-name-${service.name}"]`)
|
||||
.should('be.visible')
|
||||
.click();
|
||||
|
||||
@ -16,8 +16,8 @@ import { Edge } from 'components/EntityLineage/EntityLineage.interface';
|
||||
import { ExploreSearchIndex } from 'components/Explore/explore.interface';
|
||||
import { AuthorizerConfiguration } from 'generated/configuration/authorizerConfiguration';
|
||||
import { SearchIndex } from '../enums/search.enum';
|
||||
import { AirflowConfiguration } from '../generated/configuration/airflowConfiguration';
|
||||
import { AuthenticationConfiguration } from '../generated/configuration/authenticationConfiguration';
|
||||
import { PipelineServiceClientConfiguration } from '../generated/configuration/pipelineServiceClientConfiguration';
|
||||
import { EntitiesCount } from '../generated/entity/utils/entitiesCount';
|
||||
import { Paging } from '../generated/type/paging';
|
||||
import {
|
||||
@ -93,8 +93,8 @@ export const fetchSlackConfig = (): Promise<AxiosResponse> => {
|
||||
};
|
||||
|
||||
export const fetchAirflowConfig = async () => {
|
||||
const response = await APIClient.get<AirflowConfiguration>(
|
||||
'/system/config/airflow'
|
||||
const response = await APIClient.get<PipelineServiceClientConfiguration>(
|
||||
'/system/config/pipeline-service-client'
|
||||
);
|
||||
|
||||
return response.data;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user