Added MetadataServices for creating DataInsight and ElasticSearch Reindex (#8565)

* Added SystemServices for creating DataInsight and ElasticSearch Reindexing Pipeling with Sink config

* Add Amundsen config

* Changed SystemService to MetadataService

* Change to MetadataConnection

* Fixed failing test

* Fixed Failing test

* Added Metadata Service Test

* Bootstrapped Openmetadat service

* checkstyle fix
This commit is contained in:
Mohit Yadav 2022-11-11 13:16:54 +05:30 committed by GitHub
parent 2c430d1c7e
commit 2dbd43a589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 927 additions and 20 deletions

View File

@ -81,3 +81,15 @@ CREATE TABLE IF NOT EXISTS kpi_entity (
PRIMARY KEY (id),
UNIQUE (name)
);
CREATE TABLE IF NOT EXISTS metadata_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (name)
);

View File

@ -86,3 +86,15 @@ CREATE TABLE IF NOT EXISTS kpi_entity (
PRIMARY KEY (id),
UNIQUE (name)
);
CREATE TABLE IF NOT EXISTS metadata_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> 'serviceType') STORED NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS ((json ->> 'deleted')::boolean) STORED,
PRIMARY KEY (id),
UNIQUE (name)
);

View File

@ -73,6 +73,7 @@ public final class Entity {
public static final String PIPELINE_SERVICE = "pipelineService";
public static final String STORAGE_SERVICE = "storageService";
public static final String MLMODEL_SERVICE = "mlmodelService";
public static final String METADATA_SERVICE = "metadataService";
//
// Data asset entities

View File

@ -72,6 +72,7 @@ import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.services.MessagingService;
import org.openmetadata.schema.entity.services.MetadataService;
import org.openmetadata.schema.entity.services.MlModelService;
import org.openmetadata.schema.entity.services.PipelineService;
import org.openmetadata.schema.entity.services.StorageService;
@ -188,6 +189,9 @@ public interface CollectionDAO {
@CreateSqlObject
DatabaseServiceDAO dbServiceDAO();
@CreateSqlObject
MetadataServiceDAO metadataServiceDAO();
@CreateSqlObject
PipelineServiceDAO pipelineServiceDAO();
@ -330,6 +334,23 @@ public interface CollectionDAO {
}
}
interface MetadataServiceDAO extends EntityDAO<MetadataService> {
@Override
default String getTableName() {
return "metadata_service_entity";
}
@Override
default Class<MetadataService> getEntityClass() {
return MetadataService.class;
}
@Override
default String getNameColumn() {
return "name";
}
}
interface StorageServiceDAO extends EntityDAO<StorageService> {
@Override
default String getTableName() {

View File

@ -0,0 +1,19 @@
package org.openmetadata.service.jdbi3;
import org.openmetadata.schema.entity.services.MetadataConnection;
import org.openmetadata.schema.entity.services.MetadataService;
import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.services.database.DatabaseServiceResource;
public class MetadataServiceRepository extends ServiceEntityRepository<MetadataService, MetadataConnection> {
public MetadataServiceRepository(CollectionDAO dao) {
super(
DatabaseServiceResource.COLLECTION_PATH,
Entity.METADATA_SERVICE,
dao,
dao.metadataServiceDAO(),
MetadataConnection.class,
ServiceType.METADATA);
}
}

View File

@ -77,7 +77,7 @@ import org.openmetadata.service.util.ResultList;
@Api(value = "Bot collection", tags = "Bot collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "bots")
@Collection(name = "bots", order = 8) // initialize after user resource
public class BotResource extends EntityResource<Bot, BotRepository> {
public static final String COLLECTION_PATH = "/v1/bots/";

View File

@ -33,7 +33,18 @@ import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.*;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

View File

@ -0,0 +1,413 @@
package org.openmetadata.service.resources.services.metadata;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.api.services.CreateMetadataService;
import org.openmetadata.schema.entity.services.MetadataConnection;
import org.openmetadata.schema.entity.services.MetadataService;
import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.schema.services.connections.metadata.ComponentConfig;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.schema.services.connections.metadata.Sink;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.MetadataServiceRepository;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.services.ServiceEntityResource;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataServerConnectionBuilder;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
@Slf4j
@Path("/v1/services/metadataServices")
@Api(value = "Metadata service collection", tags = "MetadataServices -> Metadata service collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "metadataServices")
public class MetadataServiceResource
extends ServiceEntityResource<MetadataService, MetadataServiceRepository, MetadataConnection> {
public static final String COLLECTION_PATH = "v1/services/metadataServices/";
public static final String FIELDS = "pipelines,owner";
public void initialize(OpenMetadataApplicationConfig config) throws IOException {
registerMetadataServices(config);
}
private void registerMetadataServices(OpenMetadataApplicationConfig config) {
try {
if (config.getElasticSearchConfiguration() != null) {
OpenMetadataServerConnection openMetadataServerConnection =
new OpenMetadataServerConnectionBuilder(config)
.build()
.withElasticsSearch(getElasticSearchConnectionSink(config.getElasticSearchConfiguration()));
MetadataConnection metadataConnection = new MetadataConnection().withConfig(openMetadataServerConnection);
List<MetadataService> servicesList = dao.getEntitiesFromSeedData(".*json/data/metadataService/.*\\.json$");
servicesList.forEach(
(service) -> {
try {
// populate values for the Metadata Service
service.setConnection(metadataConnection);
dao.initializeEntity(service);
} catch (IOException e) {
LOG.error(
"[MetadataService] Failed to initialize a Metadata Service {}", service.getFullyQualifiedName(), e);
}
});
} else {
LOG.error("[MetadataService] Missing Elastic Search Config");
}
} catch (Exception ex) {
LOG.error("[MetadataService] Error in creating Metadata Services");
}
}
@Override
public MetadataService addHref(UriInfo uriInfo, MetadataService service) {
service.setHref(RestUtil.getHref(uriInfo, COLLECTION_PATH, service.getId()));
Entity.withHref(uriInfo, service.getOwner());
Entity.withHref(uriInfo, service.getPipelines());
return service;
}
public MetadataServiceResource(CollectionDAO dao, Authorizer authorizer) {
super(MetadataService.class, new MetadataServiceRepository(dao), authorizer, ServiceType.METADATA);
}
public static class MetadataServiceList extends ResultList<MetadataService> {
@SuppressWarnings("unused") /* Required for tests */
public MetadataServiceList() {}
}
@GET
@Operation(
operationId = "listMetadataServices",
summary = "List metadata services",
tags = "metadataService",
description = "Get a list of metadata services.",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Metadata Service instances",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = MetadataServiceResource.MetadataServiceList.class)))
})
public ResultList<MetadataService> list(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@DefaultValue("10") @Min(0) @Max(1000000) @QueryParam("limit") int limitParam,
@Parameter(
description = "Returns list of metadata services before this cursor",
schema = @Schema(type = "string"))
@QueryParam("before")
String before,
@Parameter(description = "Returns list of metadata services after this cursor", schema = @Schema(type = "string"))
@QueryParam("after")
String after,
@Parameter(
description = "Include all, deleted, or non-deleted entities.",
schema = @Schema(implementation = Include.class))
@QueryParam("include")
@DefaultValue("non-deleted")
Include include)
throws IOException {
RestUtil.validateCursors(before, after);
EntityUtil.Fields fields = getFields(fieldsParam);
ResultList<MetadataService> metadataServices;
ListFilter filter = new ListFilter(include);
if (before != null) {
metadataServices = dao.listBefore(uriInfo, fields, filter, limitParam, before);
} else {
metadataServices = dao.listAfter(uriInfo, fields, filter, limitParam, after);
}
return addHref(uriInfo, decryptOrNullify(securityContext, metadataServices));
}
@GET
@Path("/{id}")
@Operation(
operationId = "getMetadataServiceByID",
summary = "Get a Metadata Service",
tags = "metadataService",
description = "Get a Metadata Service by `id`.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Metadata Service instance",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = MetadataService.class))),
@ApiResponse(responseCode = "404", description = "Metadata Service for instance {id} is not found")
})
public MetadataService get(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("id") UUID id,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(
description = "Include all, deleted, or non-deleted entities.",
schema = @Schema(implementation = Include.class))
@QueryParam("include")
@DefaultValue("non-deleted")
Include include)
throws IOException {
MetadataService metadataService = getInternal(uriInfo, securityContext, id, fieldsParam, include);
return decryptOrNullify(securityContext, metadataService);
}
@GET
@Path("/name/{name}")
@Operation(
operationId = "getMetadataServiceByFQN",
summary = "Get Metadata Service by name",
tags = "metadataService",
description = "Get a Metadata Service by the service `name`.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Metadata Service instance",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = MetadataService.class))),
@ApiResponse(responseCode = "404", description = "Metadata Service for instance {id} is not found")
})
public MetadataService getByName(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("name") String name,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(
description = "Include all, deleted, or non-deleted entities.",
schema = @Schema(implementation = Include.class))
@QueryParam("include")
@DefaultValue("non-deleted")
Include include)
throws IOException {
MetadataService metadataService = getByNameInternal(uriInfo, securityContext, name, fieldsParam, include);
return decryptOrNullify(securityContext, metadataService);
}
@GET
@Path("/{id}/versions")
@Operation(
operationId = "listAllMetadataServiceVersion",
summary = "List Metadata Service versions",
tags = "metadataService",
description = "Get a list of all the versions of a Metadata Service identified by `id`",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of Metadata Service versions",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = EntityHistory.class)))
})
public EntityHistory listVersions(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Metadata Service Id", schema = @Schema(type = "string")) @PathParam("id") UUID id)
throws IOException {
EntityHistory entityHistory = super.listVersionsInternal(securityContext, id);
List<Object> versions =
entityHistory.getVersions().stream()
.map(
json -> {
try {
MetadataService MetadataService = JsonUtils.readValue((String) json, MetadataService.class);
return JsonUtils.pojoToJson(decryptOrNullify(securityContext, MetadataService));
} catch (IOException e) {
return json;
}
})
.collect(Collectors.toList());
entityHistory.setVersions(versions);
return entityHistory;
}
@GET
@Path("/{id}/versions/{version}")
@Operation(
operationId = "getSpecificMetadataServiceVersion",
summary = "Get a version of the Metadata Service",
tags = "metadataService",
description = "Get a version of the Metadata Service by given `id`",
responses = {
@ApiResponse(
responseCode = "200",
description = "Metadata Service",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = MetadataService.class))),
@ApiResponse(
responseCode = "404",
description = "Metadata Service for instance {id} and version " + "{version} is not found")
})
public MetadataService getVersion(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Metadata Service Id", schema = @Schema(type = "string")) @PathParam("id") UUID id,
@Parameter(
description = "Metadata Service version number in the form `major`" + ".`minor`",
schema = @Schema(type = "string", example = "0.1 or 1.1"))
@PathParam("version")
String version)
throws IOException {
MetadataService metadataService = super.getVersionInternal(securityContext, id, version);
return decryptOrNullify(securityContext, metadataService);
}
@POST
@Operation(
operationId = "createMetadataService",
summary = "Create Metadata Service",
tags = "metadataService",
description = "Create a new Metadata Service.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Metadata Service instance",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = MetadataService.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response create(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateMetadataService create)
throws IOException {
MetadataService service = getMetadataService(create, securityContext.getUserPrincipal().getName());
Response response = create(uriInfo, securityContext, service);
decryptOrNullify(securityContext, (MetadataService) response.getEntity());
return response;
}
@PUT
@Operation(
operationId = "createOrUpdateMetadataService",
summary = "Update Metadata Service",
tags = "metadataService",
description = "Update an existing or create a new Metadata Service.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Metadata Service instance",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = MetadataService.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response createOrUpdate(
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateMetadataService update)
throws IOException {
MetadataService service = getMetadataService(update, securityContext.getUserPrincipal().getName());
Response response = createOrUpdate(uriInfo, securityContext, service);
decryptOrNullify(securityContext, (MetadataService) response.getEntity());
return response;
}
@DELETE
@Path("/{id}")
@Operation(
operationId = "deleteMetadataService",
summary = "Delete a Metadata Service",
tags = "metadataService",
description = "Delete a metadata services. If some service belong the service, it can't be " + "deleted.",
responses = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "MetadataService service for instance {id} " + "is not found")
})
public Response delete(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Recursively delete this entity and it's children. (Default `false`)")
@DefaultValue("false")
@QueryParam("recursive")
boolean recursive,
@Parameter(description = "Hard delete the entity. (Default = `false`)")
@QueryParam("hardDelete")
@DefaultValue("false")
boolean hardDelete,
@Parameter(description = "Id of the Metadata Service", schema = @Schema(type = "string")) @PathParam("id")
UUID id)
throws IOException {
return delete(uriInfo, securityContext, id, recursive, hardDelete);
}
private MetadataService getMetadataService(CreateMetadataService create, String user) throws IOException {
return copy(new MetadataService(), create, user)
.withServiceType(create.getServiceType())
.withConnection(create.getConnection());
}
@Override
protected MetadataService nullifyConnection(MetadataService service) {
return service.withConnection(null);
}
@Override
protected String extractServiceType(MetadataService service) {
return service.getServiceType().value();
}
private Sink getElasticSearchConnectionSink(ElasticSearchConfiguration esConfig) {
if (Objects.nonNull(esConfig)) {
Sink sink = new Sink();
ComponentConfig componentConfig = new ComponentConfig();
sink.withType("elasticsearch")
.withConfig(
componentConfig
.withAdditionalProperty("es_host", esConfig.getHost())
.withAdditionalProperty("es_port", esConfig.getPort().toString())
.withAdditionalProperty("es_username", esConfig.getUsername())
.withAdditionalProperty("es_password", esConfig.getPassword())
.withAdditionalProperty("scheme", esConfig.getScheme()));
return sink;
} else {
throw new RuntimeException("Elastic Search Configuration Missing");
}
}
}

View File

@ -122,7 +122,7 @@ import org.openmetadata.service.util.ResultList;
@Api(value = "User collection", tags = "User collection")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "users", order = 8) // Initialize user resource before bot resource (at default order 9)
@Collection(name = "users", order = 7) // Initialize user resource before bot resource (at default order 9)
public class UserResource extends EntityResource<User, UserRepository> {
public static final String COLLECTION_PATH = "v1/users/";
public static final String USER_PROTECTED_FIELDS = "authenticationMechanism";

View File

@ -80,7 +80,7 @@ public class NoopSecretsManager extends SecretsManager {
Method getPasswordMethod = clazz.getMethod("get" + field);
Method setPasswordMethod = clazz.getMethod("set" + field, String.class);
String password = (String) getPasswordMethod.invoke(connConfig);
if (password != null) {
if (password != null && !password.equals("")) {
if (!Fernet.isTokenized(password) && encrypt) {
password = fernet.encrypt(password);
} else if (Fernet.isTokenized(password) && !encrypt) {

View File

@ -468,5 +468,17 @@
"ViewAll",
"EditDescription"
]
},
{
"name" : "metadataService",
"operations" : [
"Create",
"Delete",
"ViewAll",
"EditAll",
"EditDescription",
"EditDisplayName",
"EditCustomFields"
]
}
]

View File

@ -0,0 +1,6 @@
{
"name": "Openmetadata",
"displayName": "Openmetadata Service",
"description": "Service Used for creating Openmetadata Ingestion Pipelines.",
"serviceType": "OpenMetadataServer"
}

View File

@ -144,6 +144,7 @@ import org.openmetadata.service.resources.policies.PolicyResourceTest;
import org.openmetadata.service.resources.services.DashboardServiceResourceTest;
import org.openmetadata.service.resources.services.DatabaseServiceResourceTest;
import org.openmetadata.service.resources.services.MessagingServiceResourceTest;
import org.openmetadata.service.resources.services.MetadataServiceResourceTest;
import org.openmetadata.service.resources.services.MlModelServiceResourceTest;
import org.openmetadata.service.resources.services.PipelineServiceResourceTest;
import org.openmetadata.service.resources.services.StorageServiceResourceTest;
@ -227,6 +228,9 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
public static EntityReference AWS_STORAGE_SERVICE_REFERENCE;
public static EntityReference GCP_STORAGE_SERVICE_REFERENCE;
public static EntityReference AMUNDSEN_SERVICE_REFERENCE;
public static EntityReference ATLAS_SERVICE_REFERENCE;
public static TagLabel USER_ADDRESS_TAG_LABEL;
public static TagLabel PERSONAL_DATA_TAG_LABEL;
public static TagLabel PII_SENSITIVE_TAG_LABEL;
@ -333,6 +337,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
new StorageServiceResourceTest().setupStorageServices();
new DashboardServiceResourceTest().setupDashboardServices(test);
new MlModelServiceResourceTest().setupMlModelServices(test);
new MetadataServiceResourceTest().setupMetadataServices();
new TableResourceTest().setupDatabaseSchemas(test);
new TestSuiteResourceTest().setupTestSuites(test);
new TestDefinitionResourceTest().setupTestDefinitions(test);

View File

@ -0,0 +1,234 @@
package org.openmetadata.service.resources.services;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
import static org.openmetadata.service.util.EntityUtil.fieldUpdated;
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
import static org.openmetadata.service.util.TestUtils.AMUNDSEN_CONNECTION;
import static org.openmetadata.service.util.TestUtils.assertResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.schema.api.services.CreateMetadataService;
import org.openmetadata.schema.entity.services.MetadataConnection;
import org.openmetadata.schema.entity.services.MetadataService;
import org.openmetadata.schema.services.connections.metadata.AmundsenConnection;
import org.openmetadata.schema.services.connections.metadata.AtlasConnection;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.services.metadata.MetadataServiceResource;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.TestUtils;
@Slf4j
public class MetadataServiceResourceTest extends EntityResourceTest<MetadataService, CreateMetadataService> {
public MetadataServiceResourceTest() {
super(
Entity.METADATA_SERVICE,
MetadataService.class,
MetadataServiceResource.MetadataServiceList.class,
"services/metadataServices",
MetadataServiceResource.FIELDS);
supportsPatch = false;
supportsAuthorizedMetadataOperations = false;
}
public void setupMetadataServices() throws HttpResponseException {
// Create Amundsen service
MetadataServiceResourceTest metadataServiceResourceTest = new MetadataServiceResourceTest();
CreateMetadataService createMetadata =
new CreateMetadataService()
.withName("amundsen")
.withServiceType(CreateMetadataService.MetadataServiceType.Amundsen)
.withConnection(TestUtils.AMUNDSEN_CONNECTION);
MetadataService metadataService = metadataServiceResourceTest.createEntity(createMetadata, ADMIN_AUTH_HEADERS);
AMUNDSEN_SERVICE_REFERENCE = metadataService.getEntityReference();
// Create Atlas Service
createMetadata
.withName("atlas")
.withServiceType(CreateMetadataService.MetadataServiceType.Atlas)
.withConnection(TestUtils.ATLAS_CONNECTION);
metadataService = metadataServiceResourceTest.createEntity(createMetadata, ADMIN_AUTH_HEADERS);
ATLAS_SERVICE_REFERENCE = metadataService.getEntityReference();
}
@Test
void post_withoutRequiredFields_400_badRequest(TestInfo test) {
// Create metadata with mandatory serviceType field empty
assertResponse(
() -> createEntity(createRequest(test).withServiceType(null), ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"[serviceType must not be null]");
// Create metadata with mandatory brokers field empty
assertResponse(
() -> createEntity(createRequest(test).withConnection(null), ADMIN_AUTH_HEADERS),
BAD_REQUEST,
"[connection must not be null]");
}
@Test
void post_validService_as_admin_200_ok(TestInfo test) throws IOException, URISyntaxException {
// Create metadata service with different optional fields
createAndCheckEntity(createRequest(test, 1).withDescription(null), ADMIN_AUTH_HEADERS);
createAndCheckEntity(createRequest(test, 2).withDescription("description"), ADMIN_AUTH_HEADERS);
createAndCheckEntity(
createRequest(test, 3)
.withConnection(
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("localhost:9092"))
.withUsername("admin")
.withPassword("admin"))),
ADMIN_AUTH_HEADERS);
}
@Test
void put_updateService_as_admin_2xx(TestInfo test) throws IOException, URISyntaxException {
MetadataService service =
createAndCheckEntity(
createRequest(test)
.withDescription(null)
.withConnection(
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("localhost:9092"))
.withUsername("admin")
.withPassword("admin"))),
ADMIN_AUTH_HEADERS);
MetadataConnection metadataConnection =
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("localhost:9092"))
.withUsername("admin")
.withPassword("admin"));
// Update metadata description
CreateMetadataService update =
createRequest(test).withDescription("description1").withConnection(metadataConnection);
ChangeDescription change = getChangeDescription(service.getVersion());
fieldAdded(change, "description", "description1");
service = updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, TestUtils.UpdateType.MINOR_UPDATE, change);
// Update connection
MetadataConnection metadataConnection1 =
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("localhost:9094"))
.withUsername("admin1")
.withPassword("admin1"));
change = getChangeDescription(service.getVersion());
fieldUpdated(change, "connection", metadataConnection, metadataConnection1);
update.withConnection(metadataConnection1);
service = updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, TestUtils.UpdateType.MINOR_UPDATE, change);
// Update description and connection
MetadataConnection metadataConnection2 =
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("localhost:9095"))
.withUsername("admin2")
.withPassword("admin2"));
update.withConnection(metadataConnection2);
change = getChangeDescription(service.getVersion());
fieldUpdated(change, "connection", metadataConnection1, metadataConnection2);
update.setConnection(metadataConnection2);
updateAndCheckEntity(update, OK, ADMIN_AUTH_HEADERS, TestUtils.UpdateType.MINOR_UPDATE, change);
}
@Override
public CreateMetadataService createRequest(String name) {
return new CreateMetadataService()
.withName(name)
.withServiceType(CreateMetadataService.MetadataServiceType.Amundsen)
.withConnection(AMUNDSEN_CONNECTION);
}
@Override
public void validateCreatedEntity(
MetadataService service, CreateMetadataService createRequest, Map<String, String> authHeaders) {
MetadataConnection expectedMetadataConnection = createRequest.getConnection();
MetadataConnection actualMetadataConnection = service.getConnection();
validateConnection(expectedMetadataConnection, actualMetadataConnection, service.getServiceType());
}
@Override
public void compareEntities(MetadataService expected, MetadataService updated, Map<String, String> authHeaders) {
// PATCH operation is not supported by this entity
}
@Override
public MetadataService validateGetWithDifferentFields(MetadataService service, boolean byName)
throws HttpResponseException {
String fields = "";
service =
byName
? getEntityByName(service.getFullyQualifiedName(), null, fields, ADMIN_AUTH_HEADERS)
: getEntity(service.getId(), fields, ADMIN_AUTH_HEADERS);
TestUtils.assertListNull(service.getOwner());
fields = "owner";
service =
byName
? getEntityByName(service.getFullyQualifiedName(), null, fields, ADMIN_AUTH_HEADERS)
: getEntity(service.getId(), fields, ADMIN_AUTH_HEADERS);
// Checks for other owner, tags, and followers is done in the base class
return service;
}
@Override
public void assertFieldChange(String fieldName, Object expected, Object actual) throws IOException {
if ("connection".equals(fieldName)) {
assertTrue(((String) actual).contains("-encrypted-value"));
} else {
super.assertCommonFieldChange(fieldName, expected, actual);
}
}
private void validateConnection(
MetadataConnection expectedConnection,
MetadataConnection actualConnection,
CreateMetadataService.MetadataServiceType metadataServiceType) {
if (expectedConnection != null
&& actualConnection != null
&& expectedConnection.getConfig() != null
&& actualConnection.getConfig() != null) {
if (metadataServiceType == CreateMetadataService.MetadataServiceType.Atlas) {
AtlasConnection expectedAtlasConnection = (AtlasConnection) expectedConnection.getConfig();
AtlasConnection actualAtlasConnection;
if (actualConnection.getConfig() instanceof AtlasConnection) {
actualAtlasConnection = (AtlasConnection) actualConnection.getConfig();
} else {
actualAtlasConnection = JsonUtils.convertValue(actualConnection.getConfig(), AtlasConnection.class);
}
assertEquals(expectedAtlasConnection.getHostPort(), actualAtlasConnection.getHostPort());
} else if (metadataServiceType == CreateMetadataService.MetadataServiceType.Amundsen) {
AmundsenConnection expectedAmundsenConnection = (AmundsenConnection) expectedConnection.getConfig();
AmundsenConnection actualAmundsenConnection;
if (actualConnection.getConfig() instanceof AmundsenConnection) {
actualAmundsenConnection = (AmundsenConnection) actualConnection.getConfig();
} else {
actualAmundsenConnection = JsonUtils.convertValue(actualConnection.getConfig(), AmundsenConnection.class);
}
assertEquals(expectedAmundsenConnection.getHostPort(), actualAmundsenConnection.getHostPort());
}
}
}
}

View File

@ -49,6 +49,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
import org.openmetadata.schema.api.services.DatabaseConnection;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.services.MetadataConnection;
import org.openmetadata.schema.entity.tags.Tag;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.entity.type.CustomProperty;
@ -59,6 +60,8 @@ import org.openmetadata.schema.services.connections.database.MysqlConnection;
import org.openmetadata.schema.services.connections.database.RedshiftConnection;
import org.openmetadata.schema.services.connections.database.SnowflakeConnection;
import org.openmetadata.schema.services.connections.messaging.KafkaConnection;
import org.openmetadata.schema.services.connections.metadata.AmundsenConnection;
import org.openmetadata.schema.services.connections.metadata.AtlasConnection;
import org.openmetadata.schema.services.connections.mlmodel.MlflowConnection;
import org.openmetadata.schema.services.connections.pipeline.AirflowConnection;
import org.openmetadata.schema.services.connections.pipeline.GluePipelineConnection;
@ -103,6 +106,8 @@ public final class TestUtils {
public static DashboardConnection SUPERSET_CONNECTION;
public static final MlModelConnection MLFLOW_CONNECTION;
public static MetadataConnection AMUNDSEN_CONNECTION;
public static MetadataConnection ATLAS_CONNECTION;
public static URI PIPELINE_URL;
@ -205,6 +210,36 @@ public final class TestUtils {
}
}
static {
try {
AMUNDSEN_CONNECTION =
new MetadataConnection()
.withConfig(
new AmundsenConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("admin")
.withPassword("admin"));
} catch (URISyntaxException e) {
AMUNDSEN_CONNECTION = null;
e.printStackTrace();
}
}
static {
try {
ATLAS_CONNECTION =
new MetadataConnection()
.withConfig(
new AtlasConnection()
.withHostPort(new URI("http://localhost:8080"))
.withUsername("admin")
.withPassword("admin"));
} catch (URISyntaxException e) {
ATLAS_CONNECTION = null;
e.printStackTrace();
}
}
private TestUtils() {}
public static void readResponseError(Response response) throws HttpResponseException {

View File

@ -0,0 +1,35 @@
{
"$id": "https://open-metadata.org/schema/api/services/createMetadataService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CreateMetadataServiceRequest",
"description": "Create Metadata Service entity request",
"type": "object",
"javaType": "org.openmetadata.schema.api.services.CreateMetadataService",
"javaInterfaces": ["org.openmetadata.schema.CreateEntity"],
"properties": {
"name": {
"description": "Name that identifies the this entity instance uniquely",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"displayName": {
"description": "Display Name that identifies this Metadata service.",
"type": "string"
},
"description": {
"description": "Description of Metadata entity.",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"serviceType": {
"$ref": "../../entity/services/metadataService.json#/definitions/metadataServiceType"
},
"connection": {
"$ref": "../../entity/services/metadataService.json#/definitions/metadataConnection"
},
"owner": {
"description": "Owner of this Metadata service.",
"$ref": "../../type/entityReference.json"
}
},
"required": ["name", "serviceType", "connection"],
"additionalProperties": false
}

View File

@ -49,6 +49,16 @@
"description": "For Database Services using SQLAlchemy, True to enable running a comment for all queries run from OpenMetadata.",
"type": "boolean",
"default": true
},
"supportsMetadataToElasticSearchExtraction": {
"description": "Support Metadata To Elastic Search",
"type": "boolean",
"default": true
},
"supportsElasticSearchReindexingExtraction": {
"description": "Support Elastic Search Reindexing",
"type": "boolean",
"default": true
}
}
}

View File

@ -29,8 +29,11 @@
"format": "password"
},
"hostPort": {
"expose": true,
"title": "Host and Port",
"description": "Host and port of the Amundsen Neo4j Connection.",
"type": "string"
"type": "string",
"format": "uri"
},
"maxConnectionLifeTime": {
"description": "Maximum connection lifetime for the Amundsen Neo4j Connection.",

View File

@ -30,7 +30,10 @@
},
"hostPort": {
"description": "Host and port of the data source.",
"type": "string"
"title": "Host and Port",
"type": "string",
"format": "uri",
"expose": true
},
"entityTypes": {

View File

@ -1,7 +1,7 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/openMetadataConnection.json",
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/openMetadataServerConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "OpenMetadataConnection",
"title": "OpenMetadataServerConnection",
"description": "OpenMetadata Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection",
@ -9,8 +9,8 @@
"openmetadataType": {
"description": "OpenMetadata service type",
"type": "string",
"enum": ["OpenMetadata"],
"default": "OpenMetadata"
"enum": ["OpenMetadataServer"],
"default": "OpenMetadataServer"
}
},
"properties": {
@ -22,7 +22,7 @@
"type": {
"description": "Service Type",
"$ref": "#/definitions/openmetadataType",
"default": "OpenMetadata"
"default": "OpenMetadataServer"
},
"hostPort": {
"description": "OpenMetadata Server Config. Must include API end point ex: http://localhost:8585/api",
@ -181,8 +181,14 @@
"type": "boolean",
"default": false
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
"elasticsSearch": {
"$ref": "../../../../metadataIngestion/workflow.json#/definitions/sink"
},
"supportMetadataToElasticSearchExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataToElasticSearchExtraction"
},
"supportsElasticSearchReindexingExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsElasticSearchReindexingExtraction"
}
},
"additionalProperties": false,

View File

@ -25,6 +25,9 @@
},
{
"$ref": "../mlmodelService.json#/definitions/mlModelConnection"
},
{
"$ref": "../metadataService.json#/definitions/metadataConnection"
}
]
}

View File

@ -11,7 +11,7 @@
"description": "Type of Pipeline - metadata, usage",
"type": "string",
"javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType",
"enum": ["metadata", "usage", "lineage", "profiler", "TestSuite", "dataInsight"]
"enum": ["metadata", "usage", "lineage", "profiler", "TestSuite", "MetadataToElasticSearch"]
},
"pipelineStatus": {
"type": "object",
@ -151,7 +151,7 @@
"$ref": "../../../metadataIngestion/workflow.json#/definitions/sourceConfig"
},
"openMetadataServerConnection": {
"$ref": "../connections/metadata/openMetadataConnection.json"
"$ref": "../connections/metadata/openMetadataServerConnection.json"
},
"airflowConfig": {
"$ref": "#/definitions/airflowConfig"

View File

@ -3,12 +3,18 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Metadata Service",
"description": "This schema defines the Metadata Service entity, such as Amundsen, Atlas etc.",
"javaType": "org.openmetadata.schema.entity.services.MetadataService",
"javaInterfaces": [
"org.openmetadata.schema.EntityInterface",
"org.openmetadata.schema.ServiceEntityInterface"
],
"type": "object",
"definitions": {
"metadataServiceType": {
"description": "Type of database service such as Amundsen, Atlas...",
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
"type": "string",
"enum": ["Amundsen", "MetadataES", "OpenMetadata", "Atlas"],
"enum": ["Amundsen", "MetadataES", "OpenMetadataServer", "Atlas"],
"javaEnums": [
{
"name": "Amundsen"
@ -17,7 +23,7 @@
"name": "MetadataES"
},
{
"name": "OpenMetadata"
"name": "OpenMetadataServer"
},
{
"name": "Atlas"
@ -26,6 +32,10 @@
},
"metadataConnection": {
"type": "object",
"javaType": "org.openmetadata.schema.entity.services.MetadataConnection",
"javaInterfaces": [
"org.openmetadata.schema.ServiceConnectionEntityInterface"
],
"description": "Metadata Service Connection.",
"properties": {
"config": {
@ -38,7 +48,7 @@
"$ref": "./connections/metadata/metadataESConnection.json"
},
{
"$ref": "./connections/metadata/openMetadataConnection.json"
"$ref": "./connections/metadata/openMetadataServerConnection.json"
},
{
"$ref": "./connections/metadata/atlasConnection.json"
@ -109,6 +119,11 @@
"description": "When `true` indicates the entity has been soft deleted.",
"type": "boolean",
"default": false
},
"allowServiceCreation": {
"description": "When `true` indicates the metadata service can be created",
"type": "boolean",
"default": true
}
},
"required": ["id", "name", "serviceType"],

View File

@ -0,0 +1,51 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/metadataToElasticSearchPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MetadataToElasticSearchPipeline",
"description": "Data Insight Pipeline Configuration.",
"definitions": {
"metadataToESConfigType": {
"description": "Pipeline Source Config Metadata Pipeline type",
"type": "string",
"enum": ["MetadataToElasticSearch"],
"default": "MetadataToElasticSearch"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/metadataToESConfigType",
"default": "MetadataToElasticSearch"
},
"useSSL": {
"description": "Indicates whether to use SSL",
"type": "boolean",
"default": false
},
"verifyCerts": {
"description": "Indicates whether to verify certificates",
"type": "boolean",
"default": false
},
"timeout": {
"description": "Connection Timeout",
"type": "integer",
"default": 30
},
"caCerts": {
"description": "Certificate path to be added in configuration",
"type": "string"
},
"useAwsCredentials": {
"description": "Indicates whether to use aws credentials",
"type": "boolean",
"default": false
},
"regionName": {
"description": "Region name in case of useAwsCredentials",
"type": "string"
}
},
"required": ["type"],
"additionalProperties": false
}

View File

@ -40,7 +40,7 @@
"$ref": "testSuitePipeline.json"
},
{
"$ref": "dataInsightPipeline.json"
"$ref": "metadataToElasticSearchPipeline.json"
}
]
}
@ -153,7 +153,7 @@
"default": "INFO"
},
"openMetadataServerConfig": {
"$ref": "../entity/services/connections/metadata/openMetadataConnection.json"
"$ref": "../entity/services/connections/metadata/openMetadataServerConnection.json"
},
"config": {
"$ref": "#/definitions/componentConfig"