From a9135e8db5ea784a25336280cc188cb5d9dd0ea4 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 26 Jun 2023 11:47:39 +0200 Subject: [PATCH] Mask queries, Table sample data and Topic sample data (#12139) * Filter out PII queries * Mask topic PII * Mask topic PII * Update sample data test * Format * Moved logic to repository or PIIMasker * Fix test * chore(ui): update sample data api endpoint for topic --------- Co-authored-by: Sachin Chaurasiya --- .../examples/sample_data/topics/topics.json | 6 +- .../ingestion/source/database/sample_data.py | 21 ++- .../pipeline/airflow/AirflowRESTClient.java | 10 +- .../service/jdbi3/TableRepository.java | 12 +- .../service/jdbi3/TopicRepository.java | 38 +++- .../resources/databases/TableResource.java | 7 +- .../resources/dqtests/TestCaseResource.java | 41 +---- .../resources/query/QueryResource.java | 5 +- .../resources/topics/TopicResource.java | 29 ++- .../service/security/mask/PIIMasker.java | 168 ++++++++++++++++-- .../resources/query/QueryResourceTest.java | 43 +++++ .../resources/topics/TopicResourceTest.java | 9 +- .../SampleDataTopic/SampleDataTopic.test.tsx | 13 +- .../SampleDataTopic/SampleDataTopic.tsx | 10 +- .../TopicDetails/TopicDetails.component.tsx | 2 +- .../main/resources/ui/src/rest/topicsAPI.ts | 6 + 16 files changed, 329 insertions(+), 91 deletions(-) diff --git a/ingestion/examples/sample_data/topics/topics.json b/ingestion/examples/sample_data/topics/topics.json index def10f07e99..cdb7287bd8b 100644 --- a/ingestion/examples/sample_data/topics/topics.json +++ b/ingestion/examples/sample_data/topics/topics.json @@ -9,7 +9,11 @@ "maximumMessageSize":167, "cleanupPolicies": ["delete"], "schemaType": "Avro", - "schemaText": "{\"namespace\":\"openmetadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}" + "schemaText": "{\"namespace\":\"openmetadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}", + "sampleData": [ + "{\"id\": 1, \"first_name\": \"Lévy\", \"country\": \"ES\"}", + "{\"id\": 2, \"first_name\": \"Lima\", \"country\": \"ES\"}" + ] }, { "name": "product_events", diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 07174824c50..9d63ea4a8ac 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -63,6 +63,7 @@ from metadata.generated.schema.entity.data.table import ( TableData, TableProfile, ) +from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import ( CustomDatabaseConnection, @@ -86,7 +87,7 @@ from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameter from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.schema import Topic +from metadata.generated.schema.type.schema import Topic as TopicSchema from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus @@ -679,7 +680,7 @@ class SampleDataSource( ) schema_fields = load_parser_fn(topic["name"], topic["schemaText"]) - create_topic.messageSchema = Topic( + create_topic.messageSchema = TopicSchema( schemaText=topic["schemaText"], schemaType=topic["schemaType"], schemaFields=schema_fields, @@ -688,6 +689,22 @@ class SampleDataSource( self.status.scanned(f"Topic Scanned: {create_topic.name.__root__}") yield create_topic + if topic.get("sampleData"): + + topic_fqn = fqn.build( + self.metadata, + entity_type=Topic, + service_name=self.kafka_service.name.__root__, + topic_name=topic["name"], + ) + + topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn) + + self.metadata.ingest_topic_sample_data( + topic=topic_entity, + sample_data=TopicSampleData(messages=topic["sampleData"]), + ) + def ingest_looker(self) -> Iterable[Entity]: """ Looker sample data diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java index b7908d41e99..c1f697e80b9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java @@ -22,6 +22,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.security.KeyStoreException; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.net.ssl.SSLContext; @@ -219,10 +220,11 @@ public class AirflowRESTClient extends PipelineServiceClient { } catch (Exception e) { throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage()); } - throw PipelineServiceClientException.byMessage( - ingestionPipeline.getName(), - "Failed to fetch ingestion pipeline runs", - Response.Status.fromStatusCode(response.statusCode())); + // Return an empty list. We'll just show the stored status from the Ingestion Pipeline + LOG.error( + String.format( + "Got status code [%s] trying to get queued statuses: [%s]", response.statusCode(), response.body())); + return new ArrayList<>(); } /** diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index a53e86f42c7..aeba581c752 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -78,6 +78,7 @@ import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.resources.databases.DatabaseUtil; import org.openmetadata.service.resources.databases.TableResource; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; +import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; import org.openmetadata.service.util.FullyQualifiedName; @@ -243,7 +244,11 @@ public class TableRepository extends EntityRepository { setFieldsInternal(table, Fields.EMPTY_FIELDS); // Set the column tags. Will be used to mask the sample data - if (!authorizePII) getColumnTags(true, table.getColumns()); + if (!authorizePII) { + getColumnTags(true, table.getColumns()); + table.setTags(getTags(table.getFullyQualifiedName())); + return PIIMasker.getSampleData(table); + } return table; } @@ -472,7 +477,10 @@ public class TableRepository extends EntityRepository
{ setColumnProfile(table.getColumns()); // Set the column tags. Will be used to hide the data - if (!authorizePII) getColumnTags(true, table.getColumns()); + if (!authorizePII) { + getColumnTags(true, table.getColumns()); + return PIIMasker.getTableProfile(table); + } return table; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index d2dae5d6eb8..d12f0f88624 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -21,8 +21,10 @@ import static org.openmetadata.service.util.EntityUtil.getSchemaField; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; @@ -40,6 +42,7 @@ import org.openmetadata.schema.type.topic.CleanupPolicy; import org.openmetadata.schema.type.topic.TopicSampleData; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.topics.TopicResource; +import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; import org.openmetadata.service.util.FullyQualifiedName; @@ -119,7 +122,6 @@ public class TopicRepository extends EntityRepository { public Topic setFields(Topic topic, Fields fields) throws IOException { topic.setService(getContainer(topic.getId())); topic.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(topic) : null); - topic.setSampleData(fields.contains("sampleData") ? getSampleData(topic) : null); if (topic.getMessageSchema() != null) { getFieldTags(fields.contains(FIELD_TAGS), topic.getMessageSchema().getSchemaFields()); } @@ -138,10 +140,25 @@ public class TopicRepository extends EntityRepository { } } - private TopicSampleData getSampleData(Topic topic) throws IOException { - return JsonUtils.readValue( - daoCollection.entityExtensionDAO().getExtension(topic.getId().toString(), "topic.sampleData"), - TopicSampleData.class); + public Topic getSampleData(UUID topicId, boolean authorizePII) throws IOException { + // Validate the request content + Topic topic = dao.findEntityById(topicId); + + TopicSampleData sampleData = + JsonUtils.readValue( + daoCollection.entityExtensionDAO().getExtension(topic.getId().toString(), "topic.sampleData"), + TopicSampleData.class); + topic.setSampleData(sampleData); + setFieldsInternal(topic, Fields.EMPTY_FIELDS); + + // Set the fields tags. Will be used to mask the sample data + if (!authorizePII) { + getFieldTags(true, topic.getMessageSchema().getSchemaFields()); + topic.setTags(getTags(topic.getFullyQualifiedName())); + return PIIMasker.getSampleData(topic); + } + + return topic; } @Transaction @@ -249,6 +266,17 @@ public class TopicRepository extends EntityRepository { return allTags; } + public static Set getAllFieldTags(Field field) { + Set tags = new HashSet<>(); + if (!listOrEmpty(field.getTags()).isEmpty()) { + tags.addAll(field.getTags()); + } + for (Field c : listOrEmpty(field.getChildren())) { + tags.addAll(getAllFieldTags(c)); + } + return tags; + } + public class TopicUpdater extends EntityUpdater { public static final String FIELD_DATA_TYPE_DISPLAY = "dataTypeDisplay"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java index 7a8bd16c452..81a1c0c14ea 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java @@ -73,7 +73,6 @@ import org.openmetadata.service.jdbi3.TableRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.resources.EntityResource; import org.openmetadata.service.security.Authorizer; -import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.security.policyevaluator.ResourceContext; import org.openmetadata.service.util.EntityUtil.Fields; @@ -553,8 +552,8 @@ public class TableResource extends EntityResource { authorizer.authorize(securityContext, operationContext, resourceContext); boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner()); - Table maskedTable = PIIMasker.getSampleData(repository.getSampleData(id, authorizePII), authorizePII); - return addHref(uriInfo, maskedTable); + Table table = repository.getSampleData(id, authorizePII); + return addHref(uriInfo, table); } @DELETE @@ -673,7 +672,7 @@ public class TableResource extends EntityResource { authorizer.authorize(securityContext, operationContext, resourceContext); boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner()); - return PIIMasker.getTableProfile(repository.getLatestTableProfile(fqn, authorizePII), authorizePII); + return repository.getLatestTableProfile(fqn, authorizePII); } @GET diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java index b0c137d45b0..b125932fffe 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dqtests/TestCaseResource.java @@ -11,9 +11,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import javax.json.JsonPatch; import javax.validation.Valid; import javax.validation.constraints.Max; @@ -39,12 +37,10 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.data.RestoreEntity; import org.openmetadata.schema.api.tests.CreateLogicalTestCases; import org.openmetadata.schema.api.tests.CreateTestCase; -import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.tests.TestCase; import org.openmetadata.schema.tests.TestSuite; import org.openmetadata.schema.tests.type.TestCaseResult; import org.openmetadata.schema.tests.type.TestSummary; -import org.openmetadata.schema.type.Column; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.MetadataOperation; @@ -55,7 +51,6 @@ import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.TestCaseRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.resources.EntityResource; -import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.resources.feeds.MessageParser.EntityLink; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.mask.PIIMasker; @@ -189,41 +184,7 @@ public class TestCaseResource extends EntityResource tests = super.listInternal( uriInfo, securityContext, fields, filter, limitParam, before, after, operationContext, resourceContext); - - List maskedTests = - tests.getData().stream() - .map( - testCase -> { - try { - EntityLink testCaseLink = MessageParser.EntityLink.parse(testCase.getEntityLink()); - Table table = - Entity.getEntityByName( - Entity.TABLE, testCaseLink.getEntityFQN(), "owner,tags", Include.NON_DELETED); - - // Ignore table tests - if (testCaseLink.getFieldName() == null) return testCase; - - Optional referencedColumn = - table.getColumns().stream() - .filter( - col -> testCaseLink.getFullyQualifiedFieldValue().equals(col.getFullyQualifiedName())) - .findFirst(); - - if (referencedColumn.isPresent()) { - Column col = referencedColumn.get(); - // We need the table owner to know if we can authorize the access - boolean authorizePII = authorizer.authorizePII(securityContext, table.getOwner()); - return PIIMasker.getTestCase(col, testCase, authorizePII); - } - return testCase; - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()); - - tests.setData(maskedTests); - return tests; + return PIIMasker.getTestCases(tests, authorizer, securityContext); } @GET diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java index f4c4b248b17..9c8a2f80ea6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/query/QueryResource.java @@ -54,6 +54,7 @@ import org.openmetadata.service.jdbi3.QueryRepository; import org.openmetadata.service.resources.Collection; import org.openmetadata.service.resources.EntityResource; import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.mask.PIIMasker; import org.openmetadata.service.security.policyevaluator.OperationContext; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.ResultList; @@ -143,7 +144,9 @@ public class QueryResource extends EntityResource { if (!CommonUtil.nullOrEmpty(entityId)) { filter.addQueryParam("entityId", entityId.toString()); } - return super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); + ResultList queries = + super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); + return PIIMasker.getQueries(queries, authorizer, securityContext); } @GET diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/topics/TopicResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/topics/TopicResource.java index 2d86649f465..5d3601c3734 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/topics/TopicResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/topics/TopicResource.java @@ -63,6 +63,7 @@ import org.openmetadata.service.resources.Collection; import org.openmetadata.service.resources.EntityResource; import org.openmetadata.service.security.Authorizer; import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; import org.openmetadata.service.util.ResultList; @Path("/v1/topics") @@ -76,7 +77,7 @@ import org.openmetadata.service.util.ResultList; @Collection(name = "topics") public class TopicResource extends EntityResource { public static final String COLLECTION_PATH = "v1/topics/"; - static final String FIELDS = "owner,followers,tags,sampleData,extension"; + static final String FIELDS = "owner,followers,tags,extension"; @Override public Topic addHref(UriInfo uriInfo, Topic topic) { @@ -347,6 +348,32 @@ public class TopicResource extends EntityResource { return addHref(uriInfo, topic); } + @GET + @Path("/{id}/sampleData") + @Operation( + operationId = "getSampleData", + summary = "Get sample data", + description = "Get sample data from the topic.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully obtained the Topic", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = Topic.class))) + }) + public Topic getSampleData( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "Id of the topic", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) + throws IOException { + OperationContext operationContext = new OperationContext(entityType, MetadataOperation.VIEW_SAMPLE_DATA); + ResourceContext resourceContext = getResourceContextById(id); + authorizer.authorize(securityContext, operationContext, resourceContext); + boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner()); + + Topic topic = repository.getSampleData(id, authorizePII); + return addHref(uriInfo, topic); + } + @PUT @Path("/{id}/followers") @Operation( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/security/mask/PIIMasker.java b/openmetadata-service/src/main/java/org/openmetadata/service/security/mask/PIIMasker.java index e2d6c56501c..66237a5daa0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/security/mask/PIIMasker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/security/mask/PIIMasker.java @@ -1,14 +1,33 @@ package org.openmetadata.service.security.mask; +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; +import static org.openmetadata.service.jdbi3.TopicRepository.getAllFieldTags; + +import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.ws.rs.core.SecurityContext; +import org.openmetadata.schema.entity.data.Query; import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.tests.TestCase; import org.openmetadata.schema.type.Column; +import org.openmetadata.schema.type.Field; +import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.TableData; +import org.openmetadata.schema.type.TagLabel; +import org.openmetadata.schema.type.topic.TopicSampleData; +import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.ColumnUtil; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.util.ResultList; public class PIIMasker { @@ -16,19 +35,30 @@ public class PIIMasker { public static final String MASKED_VALUE = "********"; public static final String MASKED_NAME = "[MASKED]"; - public static Table getSampleData(Table table, boolean authorized) { - if (authorized) return table; - + public static Table getSampleData(Table table) { TableData sampleData = table.getSampleData(); - // get the list of positions to be masked - List columnsPositionToBeMasked = - table.getColumns().stream() - .collect(Collectors.toMap(Function.identity(), c -> sampleData.getColumns().indexOf(c.getName()))) - .entrySet() - .stream() - .filter(entry -> hasPiiSensitiveTag(entry.getKey())) - .map(Map.Entry::getValue) - .collect(Collectors.toList()); + + // If we don't have sample data, there's nothing to do + if (sampleData == null) { + return table; + } + + List columnsPositionToBeMasked; + + // If the table itself is marked as PII, mask all the sample data + if (hasPiiSensitiveTag(table)) { + columnsPositionToBeMasked = IntStream.range(0, table.getColumns().size()).boxed().collect(Collectors.toList()); + } else { + // Otherwise, mask only the PII columns + columnsPositionToBeMasked = + table.getColumns().stream() + .collect(Collectors.toMap(Function.identity(), c -> sampleData.getColumns().indexOf(c.getName()))) + .entrySet() + .stream() + .filter(entry -> hasPiiSensitiveTag(entry.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + } // Mask rows sampleData.setRows( @@ -46,8 +76,28 @@ public class PIIMasker { return table; } - public static Table getTableProfile(Table table, boolean authorized) { - if (authorized) return table; + /* + If the topic or any of its fields are flagged as PII, we will + mask the full TopicSampleData list of messages, since we cannot + easily pick up the specific key containing the sample data. + */ + public static Topic getSampleData(Topic topic) { + TopicSampleData sampleData = topic.getSampleData(); + + // If we don't have sample data, there's nothing to do + if (sampleData == null) { + return topic; + } + + if (hasPiiSensitiveTag(topic)) { + sampleData.setMessages(List.of(MASKED_VALUE)); + topic.setSampleData(sampleData); + } + + return topic; + } + + public static Table getTableProfile(Table table) { for (Column column : table.getColumns()) { if (hasPiiSensitiveTag(column)) { column.setProfile(null); @@ -57,8 +107,8 @@ public class PIIMasker { return table; } - public static TestCase getTestCase(Column column, TestCase testCase, boolean authorized) { - if (authorized || !hasPiiSensitiveTag(column)) return testCase; + private static TestCase getTestCase(Column column, TestCase testCase) { + if (!hasPiiSensitiveTag(column)) return testCase; testCase.setTestCaseResult(null); testCase.setParameterValues(null); @@ -68,10 +118,96 @@ public class PIIMasker { return testCase; } + public static ResultList getTestCases( + ResultList testCases, Authorizer authorizer, SecurityContext securityContext) { + List maskedTests = + testCases.getData().stream() + .map( + testCase -> { + try { + MessageParser.EntityLink testCaseLink = MessageParser.EntityLink.parse(testCase.getEntityLink()); + Table table = + Entity.getEntityByName( + Entity.TABLE, testCaseLink.getEntityFQN(), "owner,tags", Include.NON_DELETED); + + // Ignore table tests + if (testCaseLink.getFieldName() == null) return testCase; + + Optional referencedColumn = + table.getColumns().stream() + .filter( + col -> testCaseLink.getFullyQualifiedFieldValue().equals(col.getFullyQualifiedName())) + .findFirst(); + + if (referencedColumn.isPresent()) { + Column col = referencedColumn.get(); + // We need the table owner to know if we can authorize the access + boolean authorizePII = authorizer.authorizePII(securityContext, table.getOwner()); + if (!authorizePII) return PIIMasker.getTestCase(col, testCase); + return testCase; + } + return testCase; + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + + testCases.setData(maskedTests); + return testCases; + } + + /* + Either return the query if user has permissions, or hide it completely. + */ + private static Query getQuery(Query query) { + if (!hasPiiSensitiveTag(query)) return query; + query.setQuery(MASKED_VALUE); + return query; + } + + public static ResultList getQueries( + ResultList queries, Authorizer authorizer, SecurityContext securityContext) { + List maskedQueries = + queries.getData().stream() + .map( + query -> { + boolean authorizePII = authorizer.authorizePII(securityContext, query.getOwner()); + if (!authorizePII) return PIIMasker.getQuery(query); + return query; + }) + .collect(Collectors.toList()); + queries.setData(maskedQueries); + return queries; + } + + private static boolean hasPiiSensitiveTag(Query query) { + return query.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals); + } + private static boolean hasPiiSensitiveTag(Column column) { return ColumnUtil.getAllTags(column).stream().anyMatch(SENSITIVE_PII_TAG::equals); } + private static boolean hasPiiSensitiveTag(Table table) { + return table.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals); + } + + /* + Check if the Topic is flagged as PII or any of its fields + */ + private static boolean hasPiiSensitiveTag(Topic topic) { + if (topic.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals)) return true; + + Set fieldTags = new HashSet<>(); + List schemaFields = topic.getMessageSchema() != null ? topic.getMessageSchema().getSchemaFields() : null; + for (Field schemaField : listOrEmpty(schemaFields)) { + fieldTags.addAll(getAllFieldTags(schemaField)); + } + + return fieldTags.stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals); + } + private static List maskSampleDataRow(List row, List columnsPositionToBeMasked) { columnsPositionToBeMasked.forEach(position -> row.set(position, MASKED_VALUE)); return row; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java index 843c6cc46d8..408c8e2df29 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/query/QueryResourceTest.java @@ -4,6 +4,7 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.OK; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.openmetadata.service.security.SecurityUtil.authHeaders; import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS; import static org.openmetadata.service.util.TestUtils.LONG_ENTITY_NAME; import static org.openmetadata.service.util.TestUtils.assertListNotNull; @@ -30,11 +31,13 @@ import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.Column; import org.openmetadata.schema.type.ColumnDataType; import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.TagLabel; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.EntityResourceTest; import org.openmetadata.service.resources.databases.TableResourceTest; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.TestUtils; @Slf4j @@ -174,4 +177,44 @@ public class QueryResourceTest extends EntityResourceTest { assertResponse( () -> createEntity(request2, ADMIN_AUTH_HEADERS), BAD_REQUEST, TestUtils.getEntityNameLengthError(entityClass)); } + + @Test + void test_sensitivePIIQuery() throws IOException { + CreateQuery create = createRequest("sensitiveQuery"); + create.withTags(List.of(PII_SENSITIVE_TAG_LABEL)); + createAndCheckEntity(create, ADMIN_AUTH_HEADERS); + + // Owner (USER1_REF) can see the results + ResultList queries = getQueries(100, "*", false, authHeaders(USER1_REF.getName())); + queries + .getData() + .forEach( + query -> { + assertEquals(query.getQuery(), QUERY); + }); + + // Another user won't see the PII query body + ResultList maskedQueries = getQueries(100, "*", false, authHeaders(USER2_REF.getName())); + maskedQueries + .getData() + .forEach( + query -> { + if (query.getTags().stream().map(TagLabel::getTagFQN).anyMatch("PII.Sensitive"::equals)) { + assertEquals(query.getQuery(), "********"); + } else { + assertEquals(query.getQuery(), QUERY); + } + }); + } + + public ResultList getQueries(Integer limit, String fields, Boolean includeAll, Map authHeaders) + throws HttpResponseException { + WebTarget target = getCollection(); + target = limit != null ? target.queryParam("limit", limit) : target; + target = target.queryParam("fields", fields); + if (includeAll) { + target = target.queryParam("include", "all"); + } + return TestUtils.get(target, QueryResource.QueryList.class, authHeaders); + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java index 1067144b04a..695c5e6b4ea 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java @@ -312,7 +312,7 @@ public class TopicResourceTest extends EntityResourceTest { Topic putResponse = putSampleData(topic.getId(), topicSampleData, ADMIN_AUTH_HEADERS); assertEquals(topicSampleData, putResponse.getSampleData()); - topic = getEntity(topic.getId(), "sampleData", ADMIN_AUTH_HEADERS); + topic = getSampleData(topic.getId(), ADMIN_AUTH_HEADERS); assertEquals(topicSampleData, topic.getSampleData()); messages = Arrays.asList( @@ -321,7 +321,7 @@ public class TopicResourceTest extends EntityResourceTest { topicSampleData.withMessages(messages); putResponse = putSampleData(topic.getId(), topicSampleData, ADMIN_AUTH_HEADERS); assertEquals(topicSampleData, putResponse.getSampleData()); - topic = getEntity(topic.getId(), "sampleData", ADMIN_AUTH_HEADERS); + topic = getSampleData(topic.getId(), ADMIN_AUTH_HEADERS); assertEquals(topicSampleData, topic.getSampleData()); } @@ -413,6 +413,11 @@ public class TopicResourceTest extends EntityResourceTest { return TestUtils.put(target, data, Topic.class, OK, authHeaders); } + public Topic getSampleData(UUID topicId, Map authHeaders) throws HttpResponseException { + WebTarget target = getResource(topicId).path("/sampleData"); + return TestUtils.get(target, Topic.class, authHeaders); + } + private static Field getField(String name, FieldDataType fieldDataType, TagLabel tag) { List tags = tag == null ? new ArrayList<>() : singletonList(tag); return new Field().withName(name).withDataType(fieldDataType).withDescription(name).withTags(tags); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.test.tsx index 10630bfc5b3..903a96ae8d8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.test.tsx @@ -24,16 +24,19 @@ jest.mock('react-router-dom', () => ({ Link: jest.fn().mockImplementation(({ children }) =>
{children}
), })); jest.mock('rest/topicsAPI', () => ({ - getTopicByFqn: jest + getSampleDataByTopicId: jest .fn() .mockImplementation(() => ({ sampleData: mockSampleData })), })); describe('Test SampleData Component', () => { it('Should render message cards', async () => { - const { findAllByTestId } = render(, { - wrapper: MemoryRouter, - }); + const { findAllByTestId } = render( + , + { + wrapper: MemoryRouter, + } + ); expect(await findAllByTestId('message-card')).toHaveLength( mockSampleData.messages.length @@ -42,7 +45,7 @@ describe('Test SampleData Component', () => { it('Should render no data placeholder if no data available', () => { act(() => { - const { getByTestId } = render(, { + const { getByTestId } = render(, { wrapper: MemoryRouter, }); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.tsx b/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.tsx index 27d3f80a1e9..4e6393dbaaf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/SampleDataTopic/SampleDataTopic.tsx @@ -15,11 +15,10 @@ import { DownOutlined, UpOutlined } from '@ant-design/icons'; import { Typography } from 'antd'; import { AxiosError } from 'axios'; import Loader from 'components/Loader/Loader'; -import { TabSpecificField } from 'enums/entity.enum'; import { isUndefined } from 'lodash'; import React, { FC, useEffect, useState } from 'react'; import { useTranslation } from 'react-i18next'; -import { getTopicByFqn } from 'rest/topicsAPI'; +import { getSampleDataByTopicId } from 'rest/topicsAPI'; import { Transi18next } from 'utils/CommonUtils'; import { showErrorToast } from 'utils/ToastUtils'; import { WORKFLOWS_METADATA_DOCS } from '../../constants/docs.constants'; @@ -75,7 +74,7 @@ const MessageCard = ({ message }: { message: string }) => { ); }; -const SampleDataTopic: FC<{ topicFQN: string }> = ({ topicFQN }) => { +const SampleDataTopic: FC<{ topicId: string }> = ({ topicId }) => { const { t } = useTranslation(); const [data, setData] = useState(); const [loading, setLoading] = useState(false); @@ -83,10 +82,7 @@ const SampleDataTopic: FC<{ topicFQN: string }> = ({ topicFQN }) => { const fetchTopicSampleData = async () => { setLoading(true); try { - const { sampleData } = await getTopicByFqn( - topicFQN, - TabSpecificField.SAMPLE_DATA - ); + const { sampleData } = await getSampleDataByTopicId(topicId); setData(sampleData); } catch (error) { diff --git a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx index 5f6da320896..0ccfa4300c7 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx @@ -400,7 +400,7 @@ const TopicDetails: React.FC = ({ /> ), key: EntityTabs.SAMPLE_DATA, - children: , + children: , }, { label: , diff --git a/openmetadata-ui/src/main/resources/ui/src/rest/topicsAPI.ts b/openmetadata-ui/src/main/resources/ui/src/rest/topicsAPI.ts index d0a334d15ab..102955432e4 100644 --- a/openmetadata-ui/src/main/resources/ui/src/rest/topicsAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/rest/topicsAPI.ts @@ -138,3 +138,9 @@ export const restoreTopic = async (id: string) => { return response.data; }; + +export const getSampleDataByTopicId = async (id: string) => { + const response = await APIClient.get(`/topics/${id}/sampleData`); + + return response.data; +};