mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-24 14:08:45 +00:00
Mask queries, Table sample data and Topic sample data (#12139)
* Filter out PII queries * Mask topic PII * Mask topic PII * Update sample data test * Format * Moved logic to repository or PIIMasker * Fix test * chore(ui): update sample data api endpoint for topic --------- Co-authored-by: Sachin Chaurasiya <sachinchaurasiyachotey87@gmail.com>
This commit is contained in:
parent
0894d339ac
commit
a9135e8db5
@ -9,7 +9,11 @@
|
||||
"maximumMessageSize":167,
|
||||
"cleanupPolicies": ["delete"],
|
||||
"schemaType": "Avro",
|
||||
"schemaText": "{\"namespace\":\"openmetadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}"
|
||||
"schemaText": "{\"namespace\":\"openmetadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}",
|
||||
"sampleData": [
|
||||
"{\"id\": 1, \"first_name\": \"Lévy\", \"country\": \"ES\"}",
|
||||
"{\"id\": 2, \"first_name\": \"Lima\", \"country\": \"ES\"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "product_events",
|
||||
|
||||
@ -63,6 +63,7 @@ from metadata.generated.schema.entity.data.table import (
|
||||
TableData,
|
||||
TableProfile,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData
|
||||
from metadata.generated.schema.entity.policies.policy import Policy
|
||||
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (
|
||||
CustomDatabaseConnection,
|
||||
@ -86,7 +87,7 @@ from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameter
|
||||
from metadata.generated.schema.tests.testSuite import TestSuite
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.schema import Topic
|
||||
from metadata.generated.schema.type.schema import Topic as TopicSchema
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.source import InvalidSourceException, Source
|
||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||
@ -679,7 +680,7 @@ class SampleDataSource(
|
||||
)
|
||||
schema_fields = load_parser_fn(topic["name"], topic["schemaText"])
|
||||
|
||||
create_topic.messageSchema = Topic(
|
||||
create_topic.messageSchema = TopicSchema(
|
||||
schemaText=topic["schemaText"],
|
||||
schemaType=topic["schemaType"],
|
||||
schemaFields=schema_fields,
|
||||
@ -688,6 +689,22 @@ class SampleDataSource(
|
||||
self.status.scanned(f"Topic Scanned: {create_topic.name.__root__}")
|
||||
yield create_topic
|
||||
|
||||
if topic.get("sampleData"):
|
||||
|
||||
topic_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Topic,
|
||||
service_name=self.kafka_service.name.__root__,
|
||||
topic_name=topic["name"],
|
||||
)
|
||||
|
||||
topic_entity = self.metadata.get_by_name(entity=Topic, fqn=topic_fqn)
|
||||
|
||||
self.metadata.ingest_topic_sample_data(
|
||||
topic=topic_entity,
|
||||
sample_data=TopicSampleData(messages=topic["sampleData"]),
|
||||
)
|
||||
|
||||
def ingest_looker(self) -> Iterable[Entity]:
|
||||
"""
|
||||
Looker sample data
|
||||
|
||||
@ -22,6 +22,7 @@ import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.security.KeyStoreException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.net.ssl.SSLContext;
|
||||
@ -219,10 +220,11 @@ public class AirflowRESTClient extends PipelineServiceClient {
|
||||
} catch (Exception e) {
|
||||
throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
|
||||
}
|
||||
throw PipelineServiceClientException.byMessage(
|
||||
ingestionPipeline.getName(),
|
||||
"Failed to fetch ingestion pipeline runs",
|
||||
Response.Status.fromStatusCode(response.statusCode()));
|
||||
// Return an empty list. We'll just show the stored status from the Ingestion Pipeline
|
||||
LOG.error(
|
||||
String.format(
|
||||
"Got status code [%s] trying to get queued statuses: [%s]", response.statusCode(), response.body()));
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -78,6 +78,7 @@ import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.resources.databases.DatabaseUtil;
|
||||
import org.openmetadata.service.resources.databases.TableResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
@ -243,7 +244,11 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
setFieldsInternal(table, Fields.EMPTY_FIELDS);
|
||||
|
||||
// Set the column tags. Will be used to mask the sample data
|
||||
if (!authorizePII) getColumnTags(true, table.getColumns());
|
||||
if (!authorizePII) {
|
||||
getColumnTags(true, table.getColumns());
|
||||
table.setTags(getTags(table.getFullyQualifiedName()));
|
||||
return PIIMasker.getSampleData(table);
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
@ -472,7 +477,10 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
setColumnProfile(table.getColumns());
|
||||
|
||||
// Set the column tags. Will be used to hide the data
|
||||
if (!authorizePII) getColumnTags(true, table.getColumns());
|
||||
if (!authorizePII) {
|
||||
getColumnTags(true, table.getColumns());
|
||||
return PIIMasker.getTableProfile(table);
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
@ -21,8 +21,10 @@ import static org.openmetadata.service.util.EntityUtil.getSchemaField;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.Function;
|
||||
@ -40,6 +42,7 @@ import org.openmetadata.schema.type.topic.CleanupPolicy;
|
||||
import org.openmetadata.schema.type.topic.TopicSampleData;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.topics.TopicResource;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
@ -119,7 +122,6 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
public Topic setFields(Topic topic, Fields fields) throws IOException {
|
||||
topic.setService(getContainer(topic.getId()));
|
||||
topic.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(topic) : null);
|
||||
topic.setSampleData(fields.contains("sampleData") ? getSampleData(topic) : null);
|
||||
if (topic.getMessageSchema() != null) {
|
||||
getFieldTags(fields.contains(FIELD_TAGS), topic.getMessageSchema().getSchemaFields());
|
||||
}
|
||||
@ -138,10 +140,25 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
}
|
||||
}
|
||||
|
||||
private TopicSampleData getSampleData(Topic topic) throws IOException {
|
||||
return JsonUtils.readValue(
|
||||
daoCollection.entityExtensionDAO().getExtension(topic.getId().toString(), "topic.sampleData"),
|
||||
TopicSampleData.class);
|
||||
public Topic getSampleData(UUID topicId, boolean authorizePII) throws IOException {
|
||||
// Validate the request content
|
||||
Topic topic = dao.findEntityById(topicId);
|
||||
|
||||
TopicSampleData sampleData =
|
||||
JsonUtils.readValue(
|
||||
daoCollection.entityExtensionDAO().getExtension(topic.getId().toString(), "topic.sampleData"),
|
||||
TopicSampleData.class);
|
||||
topic.setSampleData(sampleData);
|
||||
setFieldsInternal(topic, Fields.EMPTY_FIELDS);
|
||||
|
||||
// Set the fields tags. Will be used to mask the sample data
|
||||
if (!authorizePII) {
|
||||
getFieldTags(true, topic.getMessageSchema().getSchemaFields());
|
||||
topic.setTags(getTags(topic.getFullyQualifiedName()));
|
||||
return PIIMasker.getSampleData(topic);
|
||||
}
|
||||
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Transaction
|
||||
@ -249,6 +266,17 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
return allTags;
|
||||
}
|
||||
|
||||
public static Set<TagLabel> getAllFieldTags(Field field) {
|
||||
Set<TagLabel> tags = new HashSet<>();
|
||||
if (!listOrEmpty(field.getTags()).isEmpty()) {
|
||||
tags.addAll(field.getTags());
|
||||
}
|
||||
for (Field c : listOrEmpty(field.getChildren())) {
|
||||
tags.addAll(getAllFieldTags(c));
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
public class TopicUpdater extends EntityUpdater {
|
||||
public static final String FIELD_DATA_TYPE_DISPLAY = "dataTypeDisplay";
|
||||
|
||||
|
||||
@ -73,7 +73,6 @@ import org.openmetadata.service.jdbi3.TableRepository;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.security.policyevaluator.OperationContext;
|
||||
import org.openmetadata.service.security.policyevaluator.ResourceContext;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
@ -553,8 +552,8 @@ public class TableResource extends EntityResource<Table, TableRepository> {
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner());
|
||||
|
||||
Table maskedTable = PIIMasker.getSampleData(repository.getSampleData(id, authorizePII), authorizePII);
|
||||
return addHref(uriInfo, maskedTable);
|
||||
Table table = repository.getSampleData(id, authorizePII);
|
||||
return addHref(uriInfo, table);
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@ -673,7 +672,7 @@ public class TableResource extends EntityResource<Table, TableRepository> {
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner());
|
||||
|
||||
return PIIMasker.getTableProfile(repository.getLatestTableProfile(fqn, authorizePII), authorizePII);
|
||||
return repository.getLatestTableProfile(fqn, authorizePII);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
||||
@ -11,9 +11,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.validation.Valid;
|
||||
import javax.validation.constraints.Max;
|
||||
@ -39,12 +37,10 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.api.data.RestoreEntity;
|
||||
import org.openmetadata.schema.api.tests.CreateLogicalTestCases;
|
||||
import org.openmetadata.schema.api.tests.CreateTestCase;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.tests.TestCase;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.tests.type.TestCaseResult;
|
||||
import org.openmetadata.schema.tests.type.TestSummary;
|
||||
import org.openmetadata.schema.type.Column;
|
||||
import org.openmetadata.schema.type.EntityHistory;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.MetadataOperation;
|
||||
@ -55,7 +51,6 @@ import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.jdbi3.TestCaseRepository;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
@ -189,41 +184,7 @@ public class TestCaseResource extends EntityResource<TestCase, TestCaseRepositor
|
||||
ResultList<TestCase> tests =
|
||||
super.listInternal(
|
||||
uriInfo, securityContext, fields, filter, limitParam, before, after, operationContext, resourceContext);
|
||||
|
||||
List<TestCase> maskedTests =
|
||||
tests.getData().stream()
|
||||
.map(
|
||||
testCase -> {
|
||||
try {
|
||||
EntityLink testCaseLink = MessageParser.EntityLink.parse(testCase.getEntityLink());
|
||||
Table table =
|
||||
Entity.getEntityByName(
|
||||
Entity.TABLE, testCaseLink.getEntityFQN(), "owner,tags", Include.NON_DELETED);
|
||||
|
||||
// Ignore table tests
|
||||
if (testCaseLink.getFieldName() == null) return testCase;
|
||||
|
||||
Optional<Column> referencedColumn =
|
||||
table.getColumns().stream()
|
||||
.filter(
|
||||
col -> testCaseLink.getFullyQualifiedFieldValue().equals(col.getFullyQualifiedName()))
|
||||
.findFirst();
|
||||
|
||||
if (referencedColumn.isPresent()) {
|
||||
Column col = referencedColumn.get();
|
||||
// We need the table owner to know if we can authorize the access
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, table.getOwner());
|
||||
return PIIMasker.getTestCase(col, testCase, authorizePII);
|
||||
}
|
||||
return testCase;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
tests.setData(maskedTests);
|
||||
return tests;
|
||||
return PIIMasker.getTestCases(tests, authorizer, securityContext);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
||||
@ -54,6 +54,7 @@ import org.openmetadata.service.jdbi3.QueryRepository;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
import org.openmetadata.service.security.policyevaluator.OperationContext;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
@ -143,7 +144,9 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
|
||||
if (!CommonUtil.nullOrEmpty(entityId)) {
|
||||
filter.addQueryParam("entityId", entityId.toString());
|
||||
}
|
||||
return super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);
|
||||
ResultList<Query> queries =
|
||||
super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);
|
||||
return PIIMasker.getQueries(queries, authorizer, securityContext);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
||||
@ -63,6 +63,7 @@ import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.security.policyevaluator.OperationContext;
|
||||
import org.openmetadata.service.security.policyevaluator.ResourceContext;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
|
||||
@Path("/v1/topics")
|
||||
@ -76,7 +77,7 @@ import org.openmetadata.service.util.ResultList;
|
||||
@Collection(name = "topics")
|
||||
public class TopicResource extends EntityResource<Topic, TopicRepository> {
|
||||
public static final String COLLECTION_PATH = "v1/topics/";
|
||||
static final String FIELDS = "owner,followers,tags,sampleData,extension";
|
||||
static final String FIELDS = "owner,followers,tags,extension";
|
||||
|
||||
@Override
|
||||
public Topic addHref(UriInfo uriInfo, Topic topic) {
|
||||
@ -347,6 +348,32 @@ public class TopicResource extends EntityResource<Topic, TopicRepository> {
|
||||
return addHref(uriInfo, topic);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{id}/sampleData")
|
||||
@Operation(
|
||||
operationId = "getSampleData",
|
||||
summary = "Get sample data",
|
||||
description = "Get sample data from the topic.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully obtained the Topic",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Topic.class)))
|
||||
})
|
||||
public Topic getSampleData(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the topic", schema = @Schema(type = "UUID")) @PathParam("id") UUID id)
|
||||
throws IOException {
|
||||
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.VIEW_SAMPLE_DATA);
|
||||
ResourceContext resourceContext = getResourceContextById(id);
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, resourceContext.getOwner());
|
||||
|
||||
Topic topic = repository.getSampleData(id, authorizePII);
|
||||
return addHref(uriInfo, topic);
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/{id}/followers")
|
||||
@Operation(
|
||||
|
||||
@ -1,14 +1,33 @@
|
||||
package org.openmetadata.service.security.mask;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||
import static org.openmetadata.service.jdbi3.TopicRepository.getAllFieldTags;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import org.openmetadata.schema.entity.data.Query;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.entity.data.Topic;
|
||||
import org.openmetadata.schema.tests.TestCase;
|
||||
import org.openmetadata.schema.type.Column;
|
||||
import org.openmetadata.schema.type.Field;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.TableData;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.topic.TopicSampleData;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.ColumnUtil;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
|
||||
public class PIIMasker {
|
||||
|
||||
@ -16,19 +35,30 @@ public class PIIMasker {
|
||||
public static final String MASKED_VALUE = "********";
|
||||
public static final String MASKED_NAME = "[MASKED]";
|
||||
|
||||
public static Table getSampleData(Table table, boolean authorized) {
|
||||
if (authorized) return table;
|
||||
|
||||
public static Table getSampleData(Table table) {
|
||||
TableData sampleData = table.getSampleData();
|
||||
// get the list of positions to be masked
|
||||
List<Integer> columnsPositionToBeMasked =
|
||||
table.getColumns().stream()
|
||||
.collect(Collectors.toMap(Function.identity(), c -> sampleData.getColumns().indexOf(c.getName())))
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> hasPiiSensitiveTag(entry.getKey()))
|
||||
.map(Map.Entry::getValue)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// If we don't have sample data, there's nothing to do
|
||||
if (sampleData == null) {
|
||||
return table;
|
||||
}
|
||||
|
||||
List<Integer> columnsPositionToBeMasked;
|
||||
|
||||
// If the table itself is marked as PII, mask all the sample data
|
||||
if (hasPiiSensitiveTag(table)) {
|
||||
columnsPositionToBeMasked = IntStream.range(0, table.getColumns().size()).boxed().collect(Collectors.toList());
|
||||
} else {
|
||||
// Otherwise, mask only the PII columns
|
||||
columnsPositionToBeMasked =
|
||||
table.getColumns().stream()
|
||||
.collect(Collectors.toMap(Function.identity(), c -> sampleData.getColumns().indexOf(c.getName())))
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> hasPiiSensitiveTag(entry.getKey()))
|
||||
.map(Map.Entry::getValue)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
// Mask rows
|
||||
sampleData.setRows(
|
||||
@ -46,8 +76,28 @@ public class PIIMasker {
|
||||
return table;
|
||||
}
|
||||
|
||||
public static Table getTableProfile(Table table, boolean authorized) {
|
||||
if (authorized) return table;
|
||||
/*
|
||||
If the topic or any of its fields are flagged as PII, we will
|
||||
mask the full TopicSampleData list of messages, since we cannot
|
||||
easily pick up the specific key containing the sample data.
|
||||
*/
|
||||
public static Topic getSampleData(Topic topic) {
|
||||
TopicSampleData sampleData = topic.getSampleData();
|
||||
|
||||
// If we don't have sample data, there's nothing to do
|
||||
if (sampleData == null) {
|
||||
return topic;
|
||||
}
|
||||
|
||||
if (hasPiiSensitiveTag(topic)) {
|
||||
sampleData.setMessages(List.of(MASKED_VALUE));
|
||||
topic.setSampleData(sampleData);
|
||||
}
|
||||
|
||||
return topic;
|
||||
}
|
||||
|
||||
public static Table getTableProfile(Table table) {
|
||||
for (Column column : table.getColumns()) {
|
||||
if (hasPiiSensitiveTag(column)) {
|
||||
column.setProfile(null);
|
||||
@ -57,8 +107,8 @@ public class PIIMasker {
|
||||
return table;
|
||||
}
|
||||
|
||||
public static TestCase getTestCase(Column column, TestCase testCase, boolean authorized) {
|
||||
if (authorized || !hasPiiSensitiveTag(column)) return testCase;
|
||||
private static TestCase getTestCase(Column column, TestCase testCase) {
|
||||
if (!hasPiiSensitiveTag(column)) return testCase;
|
||||
|
||||
testCase.setTestCaseResult(null);
|
||||
testCase.setParameterValues(null);
|
||||
@ -68,10 +118,96 @@ public class PIIMasker {
|
||||
return testCase;
|
||||
}
|
||||
|
||||
public static ResultList<TestCase> getTestCases(
|
||||
ResultList<TestCase> testCases, Authorizer authorizer, SecurityContext securityContext) {
|
||||
List<TestCase> maskedTests =
|
||||
testCases.getData().stream()
|
||||
.map(
|
||||
testCase -> {
|
||||
try {
|
||||
MessageParser.EntityLink testCaseLink = MessageParser.EntityLink.parse(testCase.getEntityLink());
|
||||
Table table =
|
||||
Entity.getEntityByName(
|
||||
Entity.TABLE, testCaseLink.getEntityFQN(), "owner,tags", Include.NON_DELETED);
|
||||
|
||||
// Ignore table tests
|
||||
if (testCaseLink.getFieldName() == null) return testCase;
|
||||
|
||||
Optional<Column> referencedColumn =
|
||||
table.getColumns().stream()
|
||||
.filter(
|
||||
col -> testCaseLink.getFullyQualifiedFieldValue().equals(col.getFullyQualifiedName()))
|
||||
.findFirst();
|
||||
|
||||
if (referencedColumn.isPresent()) {
|
||||
Column col = referencedColumn.get();
|
||||
// We need the table owner to know if we can authorize the access
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, table.getOwner());
|
||||
if (!authorizePII) return PIIMasker.getTestCase(col, testCase);
|
||||
return testCase;
|
||||
}
|
||||
return testCase;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
testCases.setData(maskedTests);
|
||||
return testCases;
|
||||
}
|
||||
|
||||
/*
|
||||
Either return the query if user has permissions, or hide it completely.
|
||||
*/
|
||||
private static Query getQuery(Query query) {
|
||||
if (!hasPiiSensitiveTag(query)) return query;
|
||||
query.setQuery(MASKED_VALUE);
|
||||
return query;
|
||||
}
|
||||
|
||||
public static ResultList<Query> getQueries(
|
||||
ResultList<Query> queries, Authorizer authorizer, SecurityContext securityContext) {
|
||||
List<Query> maskedQueries =
|
||||
queries.getData().stream()
|
||||
.map(
|
||||
query -> {
|
||||
boolean authorizePII = authorizer.authorizePII(securityContext, query.getOwner());
|
||||
if (!authorizePII) return PIIMasker.getQuery(query);
|
||||
return query;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
queries.setData(maskedQueries);
|
||||
return queries;
|
||||
}
|
||||
|
||||
private static boolean hasPiiSensitiveTag(Query query) {
|
||||
return query.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals);
|
||||
}
|
||||
|
||||
private static boolean hasPiiSensitiveTag(Column column) {
|
||||
return ColumnUtil.getAllTags(column).stream().anyMatch(SENSITIVE_PII_TAG::equals);
|
||||
}
|
||||
|
||||
private static boolean hasPiiSensitiveTag(Table table) {
|
||||
return table.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals);
|
||||
}
|
||||
|
||||
/*
|
||||
Check if the Topic is flagged as PII or any of its fields
|
||||
*/
|
||||
private static boolean hasPiiSensitiveTag(Topic topic) {
|
||||
if (topic.getTags().stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals)) return true;
|
||||
|
||||
Set<TagLabel> fieldTags = new HashSet<>();
|
||||
List<Field> schemaFields = topic.getMessageSchema() != null ? topic.getMessageSchema().getSchemaFields() : null;
|
||||
for (Field schemaField : listOrEmpty(schemaFields)) {
|
||||
fieldTags.addAll(getAllFieldTags(schemaField));
|
||||
}
|
||||
|
||||
return fieldTags.stream().map(TagLabel::getTagFQN).anyMatch(SENSITIVE_PII_TAG::equals);
|
||||
}
|
||||
|
||||
private static List<Object> maskSampleDataRow(List<Object> row, List<Integer> columnsPositionToBeMasked) {
|
||||
columnsPositionToBeMasked.forEach(position -> row.set(position, MASKED_VALUE));
|
||||
return row;
|
||||
|
||||
@ -4,6 +4,7 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
|
||||
import static javax.ws.rs.core.Response.Status.OK;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.openmetadata.service.security.SecurityUtil.authHeaders;
|
||||
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
import static org.openmetadata.service.util.TestUtils.LONG_ENTITY_NAME;
|
||||
import static org.openmetadata.service.util.TestUtils.assertListNotNull;
|
||||
@ -30,11 +31,13 @@ import org.openmetadata.schema.type.ChangeEvent;
|
||||
import org.openmetadata.schema.type.Column;
|
||||
import org.openmetadata.schema.type.ColumnDataType;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.EntityResourceTest;
|
||||
import org.openmetadata.service.resources.databases.TableResourceTest;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
|
||||
@Slf4j
|
||||
@ -174,4 +177,44 @@ public class QueryResourceTest extends EntityResourceTest<Query, CreateQuery> {
|
||||
assertResponse(
|
||||
() -> createEntity(request2, ADMIN_AUTH_HEADERS), BAD_REQUEST, TestUtils.getEntityNameLengthError(entityClass));
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_sensitivePIIQuery() throws IOException {
|
||||
CreateQuery create = createRequest("sensitiveQuery");
|
||||
create.withTags(List.of(PII_SENSITIVE_TAG_LABEL));
|
||||
createAndCheckEntity(create, ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Owner (USER1_REF) can see the results
|
||||
ResultList<Query> queries = getQueries(100, "*", false, authHeaders(USER1_REF.getName()));
|
||||
queries
|
||||
.getData()
|
||||
.forEach(
|
||||
query -> {
|
||||
assertEquals(query.getQuery(), QUERY);
|
||||
});
|
||||
|
||||
// Another user won't see the PII query body
|
||||
ResultList<Query> maskedQueries = getQueries(100, "*", false, authHeaders(USER2_REF.getName()));
|
||||
maskedQueries
|
||||
.getData()
|
||||
.forEach(
|
||||
query -> {
|
||||
if (query.getTags().stream().map(TagLabel::getTagFQN).anyMatch("PII.Sensitive"::equals)) {
|
||||
assertEquals(query.getQuery(), "********");
|
||||
} else {
|
||||
assertEquals(query.getQuery(), QUERY);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ResultList<Query> getQueries(Integer limit, String fields, Boolean includeAll, Map<String, String> authHeaders)
|
||||
throws HttpResponseException {
|
||||
WebTarget target = getCollection();
|
||||
target = limit != null ? target.queryParam("limit", limit) : target;
|
||||
target = target.queryParam("fields", fields);
|
||||
if (includeAll) {
|
||||
target = target.queryParam("include", "all");
|
||||
}
|
||||
return TestUtils.get(target, QueryResource.QueryList.class, authHeaders);
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,7 +312,7 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
|
||||
Topic putResponse = putSampleData(topic.getId(), topicSampleData, ADMIN_AUTH_HEADERS);
|
||||
assertEquals(topicSampleData, putResponse.getSampleData());
|
||||
|
||||
topic = getEntity(topic.getId(), "sampleData", ADMIN_AUTH_HEADERS);
|
||||
topic = getSampleData(topic.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(topicSampleData, topic.getSampleData());
|
||||
messages =
|
||||
Arrays.asList(
|
||||
@ -321,7 +321,7 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
|
||||
topicSampleData.withMessages(messages);
|
||||
putResponse = putSampleData(topic.getId(), topicSampleData, ADMIN_AUTH_HEADERS);
|
||||
assertEquals(topicSampleData, putResponse.getSampleData());
|
||||
topic = getEntity(topic.getId(), "sampleData", ADMIN_AUTH_HEADERS);
|
||||
topic = getSampleData(topic.getId(), ADMIN_AUTH_HEADERS);
|
||||
assertEquals(topicSampleData, topic.getSampleData());
|
||||
}
|
||||
|
||||
@ -413,6 +413,11 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
|
||||
return TestUtils.put(target, data, Topic.class, OK, authHeaders);
|
||||
}
|
||||
|
||||
public Topic getSampleData(UUID topicId, Map<String, String> authHeaders) throws HttpResponseException {
|
||||
WebTarget target = getResource(topicId).path("/sampleData");
|
||||
return TestUtils.get(target, Topic.class, authHeaders);
|
||||
}
|
||||
|
||||
private static Field getField(String name, FieldDataType fieldDataType, TagLabel tag) {
|
||||
List<TagLabel> tags = tag == null ? new ArrayList<>() : singletonList(tag);
|
||||
return new Field().withName(name).withDataType(fieldDataType).withDescription(name).withTags(tags);
|
||||
|
||||
@ -24,16 +24,19 @@ jest.mock('react-router-dom', () => ({
|
||||
Link: jest.fn().mockImplementation(({ children }) => <div>{children}</div>),
|
||||
}));
|
||||
jest.mock('rest/topicsAPI', () => ({
|
||||
getTopicByFqn: jest
|
||||
getSampleDataByTopicId: jest
|
||||
.fn()
|
||||
.mockImplementation(() => ({ sampleData: mockSampleData })),
|
||||
}));
|
||||
|
||||
describe('Test SampleData Component', () => {
|
||||
it('Should render message cards', async () => {
|
||||
const { findAllByTestId } = render(<SampleDataTopic topicFQN="" />, {
|
||||
wrapper: MemoryRouter,
|
||||
});
|
||||
const { findAllByTestId } = render(
|
||||
<SampleDataTopic topicId="f8e260ac-4db1-4bf3-92ad-f28bdb7dc041" />,
|
||||
{
|
||||
wrapper: MemoryRouter,
|
||||
}
|
||||
);
|
||||
|
||||
expect(await findAllByTestId('message-card')).toHaveLength(
|
||||
mockSampleData.messages.length
|
||||
@ -42,7 +45,7 @@ describe('Test SampleData Component', () => {
|
||||
|
||||
it('Should render no data placeholder if no data available', () => {
|
||||
act(() => {
|
||||
const { getByTestId } = render(<SampleDataTopic topicFQN="" />, {
|
||||
const { getByTestId } = render(<SampleDataTopic topicId="" />, {
|
||||
wrapper: MemoryRouter,
|
||||
});
|
||||
|
||||
|
||||
@ -15,11 +15,10 @@ import { DownOutlined, UpOutlined } from '@ant-design/icons';
|
||||
import { Typography } from 'antd';
|
||||
import { AxiosError } from 'axios';
|
||||
import Loader from 'components/Loader/Loader';
|
||||
import { TabSpecificField } from 'enums/entity.enum';
|
||||
import { isUndefined } from 'lodash';
|
||||
import React, { FC, useEffect, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { getTopicByFqn } from 'rest/topicsAPI';
|
||||
import { getSampleDataByTopicId } from 'rest/topicsAPI';
|
||||
import { Transi18next } from 'utils/CommonUtils';
|
||||
import { showErrorToast } from 'utils/ToastUtils';
|
||||
import { WORKFLOWS_METADATA_DOCS } from '../../constants/docs.constants';
|
||||
@ -75,7 +74,7 @@ const MessageCard = ({ message }: { message: string }) => {
|
||||
);
|
||||
};
|
||||
|
||||
const SampleDataTopic: FC<{ topicFQN: string }> = ({ topicFQN }) => {
|
||||
const SampleDataTopic: FC<{ topicId: string }> = ({ topicId }) => {
|
||||
const { t } = useTranslation();
|
||||
const [data, setData] = useState<TopicSampleData>();
|
||||
const [loading, setLoading] = useState(false);
|
||||
@ -83,10 +82,7 @@ const SampleDataTopic: FC<{ topicFQN: string }> = ({ topicFQN }) => {
|
||||
const fetchTopicSampleData = async () => {
|
||||
setLoading(true);
|
||||
try {
|
||||
const { sampleData } = await getTopicByFqn(
|
||||
topicFQN,
|
||||
TabSpecificField.SAMPLE_DATA
|
||||
);
|
||||
const { sampleData } = await getSampleDataByTopicId(topicId);
|
||||
|
||||
setData(sampleData);
|
||||
} catch (error) {
|
||||
|
||||
@ -400,7 +400,7 @@ const TopicDetails: React.FC<TopicDetailsProps> = ({
|
||||
/>
|
||||
),
|
||||
key: EntityTabs.SAMPLE_DATA,
|
||||
children: <SampleDataTopic topicFQN={topicFQN} />,
|
||||
children: <SampleDataTopic topicId={topicDetails.id} />,
|
||||
},
|
||||
{
|
||||
label: <TabsLabel id={EntityTabs.CONFIG} name={t('label.config')} />,
|
||||
|
||||
@ -138,3 +138,9 @@ export const restoreTopic = async (id: string) => {
|
||||
|
||||
return response.data;
|
||||
};
|
||||
|
||||
export const getSampleDataByTopicId = async (id: string) => {
|
||||
const response = await APIClient.get<Topic>(`/topics/${id}/sampleData`);
|
||||
|
||||
return response.data;
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user