diff --git a/.gitignore b/.gitignore index a04779c42ce..e9bf4baf800 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,7 @@ logs .idea/modules.xml .idea/runConfigurations.xml .idea/workspace.xml +.idea/uiDesigner.xml # Package Files *.jar *.war diff --git a/bootstrap/sql/mysql/v003__create_db_connection_info.sql b/bootstrap/sql/mysql/v003__create_db_connection_info.sql index 241062b81ec..3792b0a7bd2 100644 --- a/bootstrap/sql/mysql/v003__create_db_connection_info.sql +++ b/bootstrap/sql/mysql/v003__create_db_connection_info.sql @@ -29,6 +29,8 @@ DROP INDEX updatedAt; UPDATE dbservice_entity SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ')), '$.deleted', FALSE); +UPDATE dbservice_entity +SET json = JSON_REMOVE(json, '$.jdbc'); ALTER TABLE dbservice_entity ADD COLUMN updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL AFTER json, @@ -260,9 +262,10 @@ ADD INDEX(updatedAt), ADD COLUMN deleted BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(json, '$.deleted')), ADD INDEX (deleted); +-- Update entity extension data where we store versions of entities which will have updatedAt in old format. UPDATE entity_extension -SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ'))); - +SET json = JSON_SET(json, '$.updatedAt', UNIX_TIMESTAMP(STR_TO_DATE(json ->> '$.updatedAt', '%Y-%m-%dT%T.%fZ'))) +where extension like '%.version.%'; ALTER TABLE ingestion_entity DROP COLUMN updatedAt, diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseServiceRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseServiceRepository.java index 031aff63f6c..cb7bf96c2b3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseServiceRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/DatabaseServiceRepository.java @@ -13,9 +13,13 @@ package org.openmetadata.catalog.jdbi3; +import static org.openmetadata.catalog.util.EntityUtil.toBoolean; + import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.services.DatabaseService; @@ -44,10 +48,33 @@ public class DatabaseServiceRepository extends EntityRepository } @Override - public DatabaseService setFields(DatabaseService entity, Fields fields) { + public DatabaseService setFields(DatabaseService entity, Fields fields) throws IOException { + entity.setAirflowPipelines(fields.contains("airflowPipeline") ? getAirflowPipelines(entity) : null); return entity; } + private List getAirflowPipelines(DatabaseService databaseService) throws IOException { + if (databaseService == null) { + return null; + } + String databaseServiceId = databaseService.getId().toString(); + List airflowPipelineIds = + daoCollection + .relationshipDAO() + .findTo( + databaseServiceId, + Entity.DATABASE_SERVICE, + Relationship.CONTAINS.ordinal(), + Entity.AIRFLOW_PIPELINE, + toBoolean(toInclude(databaseService))); + List airflowPipelines = new ArrayList<>(); + for (String airflowPipelineId : airflowPipelineIds) { + airflowPipelines.add( + daoCollection.airflowPipelineDAO().findEntityReferenceById(UUID.fromString(airflowPipelineId))); + } + return airflowPipelines; + } + @Override public void restorePatchAttributes(DatabaseService original, DatabaseService updated) { /* Nothing to do */ diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java index 72517d6f3ab..f68e78011f9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResource.java @@ -150,6 +150,11 @@ public class AirflowPipelineResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam, + @Parameter( + description = "Filter airflow pipelines by service fully qualified name", + schema = @Schema(type = "string", example = "snowflakeWestCoast")) + @QueryParam("service") + String serviceParam, @Parameter(description = "Limit the number ingestion returned. (1 to 1000000, " + "default = 10)") @DefaultValue("10") @Min(1) @@ -174,9 +179,10 @@ public class AirflowPipelineResource { ResultList airflowPipelines; if (before != null) { // Reverse paging - airflowPipelines = dao.listBefore(uriInfo, fields, null, limitParam, before, include); // Ask for one extra entry + airflowPipelines = + dao.listBefore(uriInfo, fields, serviceParam, limitParam, before, include); // Ask for one extra entry } else { // Forward paging or first page - airflowPipelines = dao.listAfter(uriInfo, fields, null, limitParam, after, include); + airflowPipelines = dao.listAfter(uriInfo, fields, serviceParam, limitParam, after, include); } if (fieldsParam != null && fieldsParam.contains("status")) { addStatus(airflowPipelines.getData()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/database/DatabaseServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/database/DatabaseServiceResource.java index 1e5ca95ea75..c6799614860 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/database/DatabaseServiceResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/database/DatabaseServiceResource.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.GeneralSecurityException; import java.text.ParseException; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -53,6 +54,7 @@ import org.openmetadata.catalog.security.Authorizer; import org.openmetadata.catalog.security.SecurityUtil; import org.openmetadata.catalog.type.EntityHistory; import org.openmetadata.catalog.type.Include; +import org.openmetadata.catalog.util.EntityUtil; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.PutResponse; import org.openmetadata.catalog.util.ResultList; @@ -67,6 +69,9 @@ public class DatabaseServiceResource { private final DatabaseServiceRepository dao; private final Authorizer authorizer; + static final String FIELDS = "airflowPipeline"; + public static final List FIELD_LIST = Arrays.asList(FIELDS.replace(" ", "").split(",")); + public DatabaseServiceResource(CollectionDAO dao, Authorizer authorizer) { Objects.requireNonNull(dao, "DatabaseServiceRepository must not be null"); this.dao = new DatabaseServiceRepository(dao); @@ -97,6 +102,11 @@ public class DatabaseServiceResource { }) public ResultList list( @Context UriInfo uriInfo, + @Parameter( + description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") + String fieldsParam, @DefaultValue("10") @Min(1) @Max(1000000) @QueryParam("limit") int limitParam, @Parameter( description = "Returns list of database services before this cursor", @@ -114,11 +124,11 @@ public class DatabaseServiceResource { Include include) throws IOException, GeneralSecurityException, ParseException { RestUtil.validateCursors(before, after); - + EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam); if (before != null) { - return dao.listBefore(uriInfo, null, null, limitParam, before, include); + return dao.listBefore(uriInfo, fields, null, limitParam, before, include); } - return dao.listAfter(uriInfo, null, null, limitParam, after, include); + return dao.listAfter(uriInfo, fields, null, limitParam, after, include); } @GET @@ -139,6 +149,11 @@ public class DatabaseServiceResource { @Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("id") String id, + @Parameter( + description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") + String fieldsParam, @Parameter( description = "Include all, deleted, or non-deleted entities.", schema = @Schema(implementation = Include.class)) @@ -146,7 +161,8 @@ public class DatabaseServiceResource { @DefaultValue("non-deleted") Include include) throws IOException, ParseException { - return dao.get(uriInfo, id, null, include); + EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam); + return dao.get(uriInfo, id, fields, include); } @GET @@ -167,6 +183,11 @@ public class DatabaseServiceResource { @Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("name") String name, + @Parameter( + description = "Fields requested in the returned resource", + schema = @Schema(type = "string", example = FIELDS)) + @QueryParam("fields") + String fieldsParam, @Parameter( description = "Include all, deleted, or non-deleted entities.", schema = @Schema(implementation = Include.class)) @@ -174,7 +195,8 @@ public class DatabaseServiceResource { @DefaultValue("non-deleted") Include include) throws IOException, ParseException { - return dao.getByName(uriInfo, name, null, include); + EntityUtil.Fields fields = new EntityUtil.Fields(FIELD_LIST, fieldsParam); + return dao.getByName(uriInfo, name, fields, include); } @GET diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index bb9843e6a8f..9a73c01f2cd 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -384,7 +384,7 @@ public final class EntityUtil { private final List fieldList; public Fields(List validFields, String fieldsParam) { - if (fieldsParam == null) { + if (fieldsParam == null || fieldsParam.isEmpty()) { fieldList = Collections.emptyList(); return; } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index bbbb4a3e9e5..b6245d3006a 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -132,6 +132,10 @@ "databaseConnection": { "$ref": "#/definitions/databaseConnection" }, + "airflowPipelines": { + "description": "References to airflow pipelines deployed for this database service.", + "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + }, "version" : { "description": "Metadata version of the entity.", "$ref": "../../type/entityHistory.json#/definitions/entityVersion" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResourceTest.java index aa3221ce784..41a768883d6 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/operations/AirflowPipelineResourceTest.java @@ -19,6 +19,7 @@ import static javax.ws.rs.core.Response.Status.OK; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.openmetadata.catalog.Entity.DATABASE_SERVICE; import static org.openmetadata.catalog.Entity.helper; import static org.openmetadata.catalog.airflow.AirflowUtils.INGESTION_CONNECTION_ARGS; @@ -39,9 +40,11 @@ import java.net.URISyntaxException; import java.text.ParseException; import java.util.Arrays; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Predicate; import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.HttpResponseException; @@ -59,6 +62,7 @@ import org.openmetadata.catalog.airflow.models.OpenMetadataIngestionConfig; import org.openmetadata.catalog.airflow.models.OpenMetadataIngestionTask; import org.openmetadata.catalog.api.operations.pipelines.CreateAirflowPipeline; import org.openmetadata.catalog.api.operations.pipelines.PipelineConfig; +import org.openmetadata.catalog.api.services.CreateDatabaseService; import org.openmetadata.catalog.entity.services.DatabaseService; import org.openmetadata.catalog.jdbi3.AirflowPipelineRepository; import org.openmetadata.catalog.operations.pipelines.AirflowPipeline; @@ -409,6 +413,84 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest queryParams = + new HashMap<>() { + { + put("service", bigqueryRef.getName()); + } + }; + Predicate isPipelineBigquery1 = p -> p.getId().equals(pipelineBigquery1.getId()); + Predicate isPipelineBigquery2 = u -> u.getId().equals(pipelineBigquery2.getId()); + Predicate isPipelineBigquery3 = u -> u.getId().equals(airflowPipeline3.getId()); + List actualBigqueryPipelines = listEntities(queryParams, adminAuthHeaders()).getData(); + assertEquals(2, actualBigqueryPipelines.size()); + assertTrue(actualBigqueryPipelines.stream().anyMatch(isPipelineBigquery1)); + assertTrue(actualBigqueryPipelines.stream().anyMatch(isPipelineBigquery2)); + queryParams = + new HashMap<>() { + { + put("service", snowflakeRef.getName()); + } + }; + List actualSnowflakePipelines = listEntities(queryParams, adminAuthHeaders()).getData(); + assertEquals(1, actualSnowflakePipelines.size()); + assertTrue(actualSnowflakePipelines.stream().anyMatch(isPipelineBigquery3)); + } + @Test void put_AirflowPipelineUpdate_200(TestInfo test) throws IOException { CreateAirflowPipeline request = create(test).withService(BIGQUERY_REFERENCE).withDescription(null).withOwner(null); @@ -453,7 +535,7 @@ public class AirflowPipelineResourceTest extends EntityOperationsResourceTest authHeaders = adminAuthHeaders(); diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/TeamResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/TeamResourceTest.java index 424ba1c4ec4..f8c960c79e3 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/TeamResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/TeamResourceTest.java @@ -128,8 +128,8 @@ public class TeamResourceTest extends EntityResourceTest { // Empty query field .../teams?fields= HttpResponseException exception = - assertThrows(HttpResponseException.class, () -> getTeam(team.getId(), "", adminAuthHeaders())); - assertResponse(exception, BAD_REQUEST, CatalogExceptionMessage.invalidField("")); + assertThrows(HttpResponseException.class, () -> getTeam(team.getId(), "test", adminAuthHeaders())); + assertResponse(exception, BAD_REQUEST, CatalogExceptionMessage.invalidField("test")); // .../teams?fields=invalidField exception = diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java index 65760e6ae97..c9653887789 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/teams/UserResourceTest.java @@ -264,7 +264,7 @@ public class UserResourceTest extends EntityResourceTest { // Empty query field .../users?fields= HttpResponseException exception = - assertThrows(HttpResponseException.class, () -> getEntity(user.getId(), "", adminAuthHeaders())); + assertThrows(HttpResponseException.class, () -> getEntity(user.getId(), "test", adminAuthHeaders())); TestUtils.assertResponseContains(exception, BAD_REQUEST, "Invalid field name"); // .../users?fields=invalidField diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index b3de04d2be4..770eb3d064b 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 8, 0, dev=9) +__version__ = Version("metadata", 0, 8, 0, dev=10) __all__ = ["__version__"] diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index cdd9d927b19..13f31651948 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -355,7 +355,7 @@ class ElasticsearchSink(Sink[Entity]): chart_descriptions.append(chart.description) if len(chart.tags) > 0: for col_tag in chart.tags: - tags.add(col_tag.tagFQN) + tags.add(col_tag.tagFQN.__root__) change_descriptions = self._get_change_descriptions( Dashboard, dashboard.id.__root__ )