Fix #2355: Upgrade scripts, handle entity_extension updatedAt and dbservice jdbc field , Fix #2353: Add API to AirflowPipelines filter based on service (#2357)

* Fix #2340: Ingestion pipeline deployment refactor - update ingestion

* Fix #2355: Upgrade scripts, handle entity_extension updatedAt and dbservice jdbc field , Fix #2353: Add API to AirflowPipelines filter based on service

* Fix #2355: Upgrade scripts, handle entity_extension updatedAt and dbservice jdbc field , Fix #2353: Add API to AirflowPipelines filter based on service
This commit is contained in:
Sriharsha Chintalapani 2022-01-22 21:12:25 -08:00 committed by GitHub
parent 73a657ef3f
commit e5fdf77d8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 229 additions and 17 deletions

1
.gitignore vendored
View File

@ -48,6 +48,7 @@ logs
.idea/modules.xml
.idea/runConfigurations.xml
.idea/workspace.xml
.idea/uiDesigner.xml
# Package Files
*.jar
*.war

View File

@ -29,6 +29,8 @@ DROP INDEX updatedAt;
UPDATE dbservice_entity
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')), '$.deleted', FALSE);
UPDATE dbservice_entity
SET json = JSON_REMOVE(json, '$.jdbc');
ALTER TABLE dbservice_entity
ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json,
@ -260,9 +262,10 @@ ADD INDEX(updatedAt),
ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')),
ADD INDEX (deleted);
-- Update entity extension data where we store versions of entities which will have updatedAt in old format.
UPDATE entity_extension
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')));
SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')))
where extension like '%.version.%';
ALTER TABLE ingestion_entity
DROP COLUMN updatedAt,

View File

@ -13,9 +13,13 @@
package org.openmetadata.catalog.jdbi3;
import static org.openmetadata.catalog.util.EntityUtil.toBoolean;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.services.DatabaseService;
@ -44,10 +48,33 @@ public class DatabaseServiceRepository extends EntityRepository<DatabaseService>
}
@Override
public DatabaseService setFields(DatabaseService entity, Fields fields) {
public DatabaseService setFields(DatabaseService entity, Fields fields) throws IOException {
entity.setAirflowPipelines(fields.contains("airflowPipeline") ? getAirflowPipelines(entity) : null);
return entity;
}
private List<EntityReference> getAirflowPipelines(DatabaseService databaseService) throws IOException {
if (databaseService == null) {
return null;
}
String databaseServiceId = databaseService.getId().toString();
List<String> airflowPipelineIds =
daoCollection
.relationshipDAO()
.findTo(
databaseServiceId,
Entity.DATABASE_SERVICE,
Relationship.CONTAINS.ordinal(),
Entity.AIRFLOW_PIPELINE,
toBoolean(toInclude(databaseService)));
List<EntityReference> airflowPipelines = new ArrayList<>();
for (String airflowPipelineId : airflowPipelineIds) {
airflowPipelines.add(
daoCollection.airflowPipelineDAO().findEntityReferenceById(UUID.fromString(airflowPipelineId)));
}
return airflowPipelines;
}
@Override
public void restorePatchAttributes(DatabaseService original, DatabaseService updated) {
/* Nothing to do */

View File

@ -150,6 +150,11 @@ public class AirflowPipelineResource {
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(
description = "Filter airflow pipelines by service fully qualified name",
schema = @Schema(type = "string", example = "snowflakeWestCoast"))
@QueryParam("service")
String serviceParam,
@Parameter(description = "Limit the number ingestion returned. (1 to 1000000, " + "default = 10)")
@DefaultValue("10")
@Min(1)
@ -174,9 +179,10 @@ public class AirflowPipelineResource {
ResultList<AirflowPipeline> airflowPipelines;
if (before != null) { // Reverse paging
airflowPipelines = dao.listBefore(uriInfo, fields, null, limitParam, before, include); // Ask for one extra entry
airflowPipelines =
dao.listBefore(uriInfo, fields, serviceParam, limitParam, before, include); // Ask for one extra entry
} else { // Forward paging or first page
airflowPipelines = dao.listAfter(uriInfo, fields, null, limitParam, after, include);
airflowPipelines = dao.listAfter(uriInfo, fields, serviceParam, limitParam, after, include);
}
if (fieldsParam != null && fieldsParam.contains("status")) {
addStatus(airflowPipelines.getData());

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@ -53,6 +54,7 @@ import org.openmetadata.catalog.security.Authorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityHistory;
import org.openmetadata.catalog.type.Include;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
@ -67,6 +69,9 @@ public class DatabaseServiceResource {
private final DatabaseServiceRepository dao;
private final Authorizer authorizer;
static final String FIELDS = "airflowPipeline";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replace(" ", "").split(","));
public DatabaseServiceResource(CollectionDAO dao, Authorizer authorizer) {
Objects.requireNonNull(dao, "DatabaseServiceRepository must not be null");
this.dao = new DatabaseServiceRepository(dao);
@ -97,6 +102,11 @@ public class DatabaseServiceResource {
})
public ResultList<DatabaseService> list(
@Context UriInfo uriInfo,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@DefaultValue("10") @Min(1) @Max(1000000) @QueryParam("limit") int limitParam,
@Parameter(
description = "Returns list of database services before this cursor",
@ -114,11 +124,11 @@ public class DatabaseServiceResource {
Include include)
throws IOException, GeneralSecurityException, ParseException {
RestUtil.validateCursors(before, after);
EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam);
if (before != null) {
return dao.listBefore(uriInfo, null, null, limitParam, before, include);
return dao.listBefore(uriInfo, fields, null, limitParam, before, include);
}
return dao.listAfter(uriInfo, null, null, limitParam, after, include);
return dao.listAfter(uriInfo, fields, null, limitParam, after, include);
}
@GET
@ -139,6 +149,11 @@ public class DatabaseServiceResource {
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("id") String id,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(
description = "Include all, deleted, or non-deleted entities.",
schema = @Schema(implementation = Include.class))
@ -146,7 +161,8 @@ public class DatabaseServiceResource {
@DefaultValue("non-deleted")
Include include)
throws IOException, ParseException {
return dao.get(uriInfo, id, null, include);
EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam);
return dao.get(uriInfo, id, fields, include);
}
@GET
@ -167,6 +183,11 @@ public class DatabaseServiceResource {
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@PathParam("name") String name,
@Parameter(
description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields")
String fieldsParam,
@Parameter(
description = "Include all, deleted, or non-deleted entities.",
schema = @Schema(implementation = Include.class))
@ -174,7 +195,8 @@ public class DatabaseServiceResource {
@DefaultValue("non-deleted")
Include include)
throws IOException, ParseException {
return dao.getByName(uriInfo, name, null, include);
EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam);
return dao.getByName(uriInfo, name, fields, include);
}
@GET

View File

@ -384,7 +384,7 @@ public final class EntityUtil {
private final List<String> fieldList;
public Fields(List<String> validFields, String fieldsParam) {
if (fieldsParam == null) {
if (fieldsParam == null || fieldsParam.isEmpty()) {
fieldList = Collections.emptyList();
return;
}

View File

@ -132,6 +132,10 @@
"databaseConnection": {
"$ref": "#/definitions/databaseConnection"
},
"airflowPipelines": {
"description": "References to airflow pipelines deployed for this database service.",
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
},
"version" : {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"

View File

@ -19,6 +19,7 @@ import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.catalog.Entity.DATABASE_SERVICE;
import static org.openmetadata.catalog.Entity.helper;
import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_CONNECTION_ARGS;
@ -39,9 +40,11 @@ import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException;
@ -59,6 +62,7 @@ import org.openmetadata.catalog.airflow.models.OpenMetadataIngestionConfig;
import org.openmetadata.catalog.airflow.models.OpenMetadataIngestionTask;
import org.openmetadata.catalog.api.operations.pipelines.CreateAirflowPipeline;
import org.openmetadata.catalog.api.operations.pipelines.PipelineConfig;
import org.openmetadata.catalog.api.services.CreateDatabaseService;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.jdbi3.AirflowPipelineRepository;
import org.openmetadata.catalog.operations.pipelines.AirflowPipeline;
@ -409,6 +413,84 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
validateGeneratedAirflowPipelineConfig(airflowPipeline);
}
@Test
void list_AirflowPipelinesList_200(TestInfo test) throws IOException {
DatabaseServiceResourceTest databaseServiceResourceTest = new DatabaseServiceResourceTest();
CreateDatabaseService createSnowflakeService =
new CreateDatabaseService()
.withName("snowflake_test_list")
.withServiceType(CreateDatabaseService.DatabaseServiceType.Snowflake)
.withDatabaseConnection(TestUtils.DATABASE_CONNECTION);
DatabaseService snowflakeDatabaseService =
databaseServiceResourceTest.createEntity(createSnowflakeService, adminAuthHeaders());
EntityReference snowflakeRef =
new EntityReference()
.withName(snowflakeDatabaseService.getName())
.withId(snowflakeDatabaseService.getId())
.withType(Entity.DATABASE_SERVICE);
CreateDatabaseService createBigQueryService =
new CreateDatabaseService()
.withName("bigquery_test_list")
.withServiceType(CreateDatabaseService.DatabaseServiceType.BigQuery)
.withDatabaseConnection(TestUtils.DATABASE_CONNECTION);
DatabaseService databaseService =
databaseServiceResourceTest.createEntity(createBigQueryService, adminAuthHeaders());
EntityReference bigqueryRef =
new EntityReference()
.withName(databaseService.getName())
.withId(databaseService.getId())
.withType(Entity.DATABASE_SERVICE);
CreateAirflowPipeline requestPipeline_1 =
create(test)
.withName("ingestion_1")
.withPipelineType(PipelineType.METADATA)
.withService(bigqueryRef)
.withDescription("description")
.withScheduleInterval("5 * * * *");
AirflowPipeline pipelineBigquery1 = createAndCheckEntity(requestPipeline_1, adminAuthHeaders());
CreateAirflowPipeline requestPipeline_2 =
create(test)
.withName("ingestion_2")
.withPipelineType(PipelineType.METADATA)
.withService(bigqueryRef)
.withDescription("description")
.withScheduleInterval("5 * * * *");
AirflowPipeline pipelineBigquery2 = createAndCheckEntity(requestPipeline_2, adminAuthHeaders());
CreateAirflowPipeline requestPipeline_3 =
create(test)
.withName("ingestion_2")
.withPipelineType(PipelineType.METADATA)
.withService(snowflakeRef)
.withDescription("description")
.withScheduleInterval("5 * * * *");
AirflowPipeline airflowPipeline3 = createAndCheckEntity(requestPipeline_3, adminAuthHeaders());
// List charts by filtering on service name and ensure right charts in the response
Map<String, String> queryParams =
new HashMap<>() {
{
put("service", bigqueryRef.getName());
}
};
Predicate<AirflowPipeline> isPipelineBigquery1 = p -> p.getId().equals(pipelineBigquery1.getId());
Predicate<AirflowPipeline> isPipelineBigquery2 = u -> u.getId().equals(pipelineBigquery2.getId());
Predicate<AirflowPipeline> isPipelineBigquery3 = u -> u.getId().equals(airflowPipeline3.getId());
List<AirflowPipeline> actualBigqueryPipelines = listEntities(queryParams, adminAuthHeaders()).getData();
assertEquals(2, actualBigqueryPipelines.size());
assertTrue(actualBigqueryPipelines.stream().anyMatch(isPipelineBigquery1));
assertTrue(actualBigqueryPipelines.stream().anyMatch(isPipelineBigquery2));
queryParams =
new HashMap<>() {
{
put("service", snowflakeRef.getName());
}
};
List<AirflowPipeline> actualSnowflakePipelines = listEntities(queryParams, adminAuthHeaders()).getData();
assertEquals(1, actualSnowflakePipelines.size());
assertTrue(actualSnowflakePipelines.stream().anyMatch(isPipelineBigquery3));
}
@Test
void put_AirflowPipelineUpdate_200(TestInfo test) throws IOException {
CreateAirflowPipeline request = create(test).withService(BIGQUERY_REFERENCE).withDescription(null).withOwner(null);
@ -453,7 +535,7 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
assertListNotNull(ingestion.getOwner(), ingestion.getService());
}
private CreateAirflowPipeline create(TestInfo test) {
public CreateAirflowPipeline create(TestInfo test) {
return create(getEntityName(test));
}
@ -519,4 +601,9 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest<Ai
source.getConfig().get(INGESTION_OPTIONS));
}
}
private void listDatabaseServicePipelines(AirflowPipeline airflowPipeline) throws IOException, ParseException {
DatabaseService databaseService = helper(airflowPipeline).findEntity("service", DATABASE_SERVICE);
DatabaseServiceResourceTest.getResource("services/databaseServices");
}
}

View File

@ -18,6 +18,7 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.openmetadata.catalog.Entity.helper;
import static org.openmetadata.catalog.security.SecurityUtil.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.UpdateType.MINOR_UPDATE;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
@ -25,6 +26,7 @@ import static org.openmetadata.catalog.util.TestUtils.getPrincipal;
import java.io.IOException;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Map;
import javax.ws.rs.core.Response;
@ -33,11 +35,17 @@ import org.apache.http.client.HttpResponseException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.operations.pipelines.CreateAirflowPipeline;
import org.openmetadata.catalog.api.operations.pipelines.PipelineConfig;
import org.openmetadata.catalog.api.services.CreateDatabaseService;
import org.openmetadata.catalog.api.services.CreateDatabaseService.DatabaseServiceType;
import org.openmetadata.catalog.entity.services.DatabaseService;
import org.openmetadata.catalog.jdbi3.DatabaseServiceRepository.DatabaseServiceEntityInterface;
import org.openmetadata.catalog.operations.pipelines.AirflowPipeline;
import org.openmetadata.catalog.operations.pipelines.DatabaseServiceMetadataPipeline;
import org.openmetadata.catalog.operations.pipelines.FilterPattern;
import org.openmetadata.catalog.resources.EntityResourceTest;
import org.openmetadata.catalog.resources.operations.AirflowPipelineResourceTest;
import org.openmetadata.catalog.resources.services.database.DatabaseServiceResource.DatabaseServiceList;
import org.openmetadata.catalog.type.ChangeDescription;
import org.openmetadata.catalog.type.ConnectionArguments;
@ -127,6 +135,60 @@ public class DatabaseServiceResourceTest extends EntityResourceTest<DatabaseServ
assertEquals(databaseConnection, service.getDatabaseConnection());
}
@Test
void put_addIngestion_as_admin_2xx(TestInfo test) throws IOException, ParseException {
DatabaseService service = createAndCheckEntity(create(test).withDescription(null), adminAuthHeaders());
// Update database description and ingestion service that are null
CreateDatabaseService update = create(test).withDescription("description1");
ChangeDescription change = getChangeDescription(service.getVersion());
change.getFieldsAdded().add(new FieldChange().withName("description").withNewValue("description1"));
updateAndCheckEntity(update, OK, adminAuthHeaders(), UpdateType.MINOR_UPDATE, change);
DatabaseConnection databaseConnection =
new DatabaseConnection()
.withDatabase("test")
.withHostPort("host:9000")
.withPassword("password")
.withUsername("username");
update.withDatabaseConnection(databaseConnection);
service = updateEntity(update, OK, adminAuthHeaders());
assertEquals(databaseConnection, service.getDatabaseConnection());
ConnectionArguments connectionArguments =
new ConnectionArguments()
.withAdditionalProperty("credentials", "/tmp/creds.json")
.withAdditionalProperty("client_email", "ingestion-bot@domain.com");
ConnectionOptions connectionOptions =
new ConnectionOptions().withAdditionalProperty("key1", "value1").withAdditionalProperty("key2", "value2");
databaseConnection.withConnectionArguments(connectionArguments).withConnectionOptions(connectionOptions);
update.withDatabaseConnection(databaseConnection);
service = updateEntity(update, OK, adminAuthHeaders());
assertEquals(databaseConnection, service.getDatabaseConnection());
AirflowPipelineResourceTest airflowPipelineResourceTest = new AirflowPipelineResourceTest();
CreateAirflowPipeline createAirflowPipeline =
airflowPipelineResourceTest.create(test).withService(helper(service).toEntityReference());
DatabaseServiceMetadataPipeline databaseServiceMetadataPipeline =
new DatabaseServiceMetadataPipeline()
.withMarkDeletedTables(true)
.withIncludeViews(true)
.withSchemaFilterPattern(new FilterPattern().withExcludes(Arrays.asList("information_schema.*", "test.*")))
.withTableFilterPattern(new FilterPattern().withIncludes(Arrays.asList("sales.*", "users.*")));
PipelineConfig pipelineConfig =
new PipelineConfig()
.withSchema(PipelineConfig.Schema.DATABASE_SERVICE_METADATA_PIPELINE)
.withConfig(databaseServiceMetadataPipeline);
createAirflowPipeline.withPipelineConfig(pipelineConfig);
AirflowPipeline airflowPipeline =
airflowPipelineResourceTest.createEntity(createAirflowPipeline, adminAuthHeaders());
DatabaseService updatedService = getEntity(service.getId(), "airflowPipeline", adminAuthHeaders());
assertEquals(1, updatedService.getAirflowPipelines().size());
EntityReference expectedPipeline = updatedService.getAirflowPipelines().get(0);
assertEquals(airflowPipeline.getId(), expectedPipeline.getId());
assertEquals(airflowPipeline.getFullyQualifiedName(), expectedPipeline.getName());
}
@Test
void put_update_as_non_admin_401(TestInfo test) throws IOException {
Map<String, String> authHeaders = adminAuthHeaders();

View File

@ -128,8 +128,8 @@ public class TeamResourceTest extends EntityResourceTest<Team> {
// Empty query field .../teams?fields=
HttpResponseException exception =
assertThrows(HttpResponseException.class, () -> getTeam(team.getId(), "", adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, CatalogExceptionMessage.invalidField(""));
assertThrows(HttpResponseException.class, () -> getTeam(team.getId(), "test", adminAuthHeaders()));
assertResponse(exception, BAD_REQUEST, CatalogExceptionMessage.invalidField("test"));
// .../teams?fields=invalidField
exception =

View File

@ -264,7 +264,7 @@ public class UserResourceTest extends EntityResourceTest<User> {
// Empty query field .../users?fields=
HttpResponseException exception =
assertThrows(HttpResponseException.class, () -> getEntity(user.getId(), "", adminAuthHeaders()));
assertThrows(HttpResponseException.class, () -> getEntity(user.getId(), "test", adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST, "Invalid field name");
// .../users?fields=invalidField

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version
__version__ = Version("metadata", 0, 8, 0, dev=9)
__version__ = Version("metadata", 0, 8, 0, dev=10)
__all__ = ["__version__"]

View File

@ -355,7 +355,7 @@ class ElasticsearchSink(Sink[Entity]):
chart_descriptions.append(chart.description)
if len(chart.tags) > 0:
for col_tag in chart.tags:
tags.add(col_tag.tagFQN)
tags.add(col_tag.tagFQN.__root__)
change_descriptions = self._get_change_descriptions(
Dashboard, dashboard.id.__root__
)