Fix #263: Add Tags and Followers to Topic Entity

This commit is contained in:
Suresh Srinivas 2021-08-22 00:57:37 -07:00
parent 3bc71fc65f
commit 0cacece02e
72 changed files with 358 additions and 117 deletions

View File

@ -90,8 +90,7 @@ public class ElasticSearchEventHandler implements EventHandler {
}
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>();
tagsList.addAll(tags);
List<String> tagsList = new ArrayList<>(tags);
jsonMap.put("tags", tagsList);
}
if (!columnDescriptions.isEmpty()) {

View File

@ -26,6 +26,7 @@ import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.resources.feeds.FeedUtil;
import org.openmetadata.catalog.resources.feeds.MessageParser;
import org.openmetadata.catalog.resources.feeds.MessageParser.EntityLink;
@ -78,6 +79,9 @@ public abstract class FeedRepository {
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@Transaction
public Thread create(Thread thread) throws IOException {
// Validate user creating thread
@ -87,7 +91,7 @@ public abstract class FeedRepository {
// Validate about data entity is valid
EntityLink about = EntityLink.parse(thread.getAbout());
EntityReference aboutRef = EntityUtil.validateEntityLink(about, userDAO(), teamDAO(), tableDAO(),
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO());
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO());
// Get owner for the addressed to Entity
EntityReference owner = EntityUtil.populateOwner(aboutRef.getId(), relationshipDAO(), userDAO(), teamDAO());
@ -166,7 +170,7 @@ public abstract class FeedRepository {
throw new IllegalArgumentException("Only entity links of type <E#/{entityType}/{entityName}> is allowed");
}
EntityReference reference = EntityUtil.validateEntityLink(entityLink, userDAO(), teamDAO(), tableDAO(),
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO());
databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO());
List<String> threadIds = new ArrayList<>();
List<List<String>> result = fieldRelationshipDAO().listToByPrefix(entityLink.getFullyQualifiedFieldValue(),
entityLink.getFullyQualifiedFieldType(), "thread",

View File

@ -26,6 +26,7 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.resources.teams.TeamResource;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
@ -91,6 +92,9 @@ public abstract class TeamRepository {
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@Transaction
public Team create(Team team, List<UUID> userIds) throws IOException {
// Query 1 - Validate user IDs
@ -231,7 +235,7 @@ public abstract class TeamRepository {
private List<EntityReference> getOwns(String teamId) throws IOException {
// Compile entities owned by the team
return EntityUtil.getEntityReference(relationshipDAO().findTo(teamId, OWNS.ordinal()), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO());
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO());
}
private void addUserRelationship(Team team, User user) {

View File

@ -17,6 +17,7 @@
package org.openmetadata.catalog.jdbi3;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.exception.CatalogExceptionMessage;
@ -27,6 +28,7 @@ import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.resources.topics.TopicResource;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.JsonUtils;
@ -74,6 +76,9 @@ public abstract class TopicRepository {
@CreateSqlObject
abstract MessagingServiceDAO messageServiceDAO();
@CreateSqlObject
abstract TagRepository.TagDAO tagDAO();
@CreateSqlObject
abstract UsageDAO usageDAO();
@ -226,9 +231,19 @@ public abstract class TopicRepository {
private Topic setFields(Topic topic, Fields fields) throws IOException {
topic.setOwner(fields.contains("owner") ? getOwner(topic) : null);
topic.setService(fields.contains("service") ? getService(topic) : null);
topic.setFollowers(fields.contains("followers") ? getFollowers(topic) : null);
topic.setTags(fields.contains("tags") ? getTags(topic.getFullyQualifiedName()) : null);
return topic;
}
private List<EntityReference> getFollowers(Topic topic) throws IOException {
return topic == null ? null : EntityUtil.getFollowers(topic.getId(), relationshipDAO(), userDAO());
}
private List<TagLabel> getTags(String fqn) {
return tagDAO().getTags(fqn);
}
private EntityReference getService(Topic topic) throws IOException {
return topic == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(),
topic.getId())));
@ -256,6 +271,19 @@ public abstract class TopicRepository {
}
}
@Transaction
public Status addFollower(String topicId, String userId) throws IOException {
EntityUtil.validate(topicId, topicDAO().findById(topicId), Table.class);
return EntityUtil.addFollower(relationshipDAO(), userDAO(), topicId, Entity.TOPIC, userId, Entity.USER) ?
Status.CREATED : Status.OK;
}
@Transaction
public void deleteFollower(String topicId, String userId) {
EntityUtil.validateUser(userDAO(), userId);
EntityUtil.removeFollower(relationshipDAO(), topicId, userId);
}
public interface TopicDAO {
@SqlUpdate("INSERT INTO topic_entity (json) VALUES (:json)")
void insert(@Bind("json") String json);

View File

@ -20,6 +20,7 @@ import org.openmetadata.catalog.jdbi3.DashboardRepository.DashboardDAO;
import org.openmetadata.catalog.jdbi3.DatabaseRepository.DatabaseDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.type.EntityReference;
@ -43,6 +44,7 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
import static org.openmetadata.catalog.util.EntityUtil.getEntityReference;
@ -67,13 +69,18 @@ public abstract class UsageRepository {
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@CreateSqlObject
abstract EntityRelationshipDAO relationshipDAO();
@Transaction
public EntityUsage get(String entityType, String id, String date, int days) throws IOException {
EntityReference ref = getEntityReference(entityType, id, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO());
EntityReference ref = getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(),
metricsDAO(), dashboardDAO(), reportDAO(), topicDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(id, date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref);
}
@ -81,7 +88,7 @@ public abstract class UsageRepository {
@Transaction
public EntityUsage getByName(String entityType, String fqn, String date, int days) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fqn, tableDAO(), databaseDAO(),
metricsDAO(), reportDAO());
metricsDAO(), reportDAO(), topicDAO());
List<UsageDetails> usageDetails = usageDAO().getUsageById(ref.getId().toString(), date, days - 1);
return new EntityUsage().withUsage(usageDetails).withEntity(ref);
}
@ -89,14 +96,15 @@ public abstract class UsageRepository {
@Transaction
public void create(String entityType, String id, DailyCount usage) throws IOException {
// Validate data entity for which usage is being collected
getEntityReference(entityType, id, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO());
getEntityReference(entityType, UUID.fromString(id), tableDAO(), databaseDAO(), metricsDAO(),
dashboardDAO(), reportDAO(), topicDAO());
addUsage(entityType, id, usage);
}
@Transaction
public void createByName(String entityType, String fullyQualifiedName, DailyCount usage) throws IOException {
EntityReference ref = EntityUtil.getEntityReferenceByName(entityType, fullyQualifiedName, tableDAO(),
databaseDAO(), metricsDAO(), reportDAO());
databaseDAO(), metricsDAO(), reportDAO(), topicDAO());
addUsage(entityType, ref.getId().toString(), usage);
LOG.info("Usage successfully posted by name");
}

View File

@ -25,6 +25,7 @@ import org.openmetadata.catalog.jdbi3.MetricsRepository.MetricsDAO;
import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.resources.teams.UserResource;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.util.EntityUtil;
@ -92,6 +93,9 @@ public abstract class UserRepository {
@CreateSqlObject
abstract ReportDAO reportDAO();
@CreateSqlObject
abstract TopicDAO topicDAO();
@Transaction
public List<User> listAfter(Fields fields, int limitParam, String after) throws IOException,
ParseException, GeneralSecurityException {
@ -210,12 +214,12 @@ public abstract class UserRepository {
}
// Populate details in entity reference
return EntityUtil.getEntityReference(ownedEntities, tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(),
reportDAO());
reportDAO(), topicDAO());
}
private List<EntityReference> getFollows(User user) throws IOException {
return EntityUtil.getEntityReference(relationshipDAO().findTo(user.getId().toString(), FOLLOWS.ordinal()),
tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO());
tableDAO(), databaseDAO(), metricsDAO(), dashboardDAO(), reportDAO(), topicDAO());
}
private void patchTeams(User original, User updated) throws IOException {

View File

@ -28,6 +28,7 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.openmetadata.catalog.api.data.CreateTopic;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.jdbi3.TopicRepository;
import org.openmetadata.catalog.resources.Collection;
@ -64,6 +65,7 @@ import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.GeneralSecurityException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -91,6 +93,7 @@ public class TopicResource {
topic.setHref(RestUtil.getHref(uriInfo, TOPIC_COLLECTION_PATH, topic.getId()));
EntityUtil.addHref(uriInfo, topic.getOwner());
EntityUtil.addHref(uriInfo, topic.getService());
EntityUtil.addHref(uriInfo, topic.getFollowers());
return topic;
}
@ -113,7 +116,7 @@ public class TopicResource {
}
}
static final String FIELDS = "owner,service";
static final String FIELDS = "owner,service,followers,tags";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));
@ -186,15 +189,13 @@ public class TopicResource {
schema = @Schema(implementation = Dashboard.class))),
@ApiResponse(responseCode = "404", description = "Topic for instance {id} is not found")
})
public Response get(@Context UriInfo uriInfo, @PathParam("id") String id,
public Topic get(@Context UriInfo uriInfo, @PathParam("id") String id,
@Context SecurityContext securityContext,
@Parameter(description = "Fields requested in the returned resource",
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam) throws IOException {
Fields fields = new Fields(FIELD_LIST, fieldsParam);
Topic topic = dao.get(id, fields);
addHref(uriInfo, topic);
return Response.ok(topic).build();
return addHref(uriInfo, dao.get(id, fields));
}
@GET
@ -237,7 +238,8 @@ public class TopicResource {
.withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime());
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime())
.withTags(create.getTags());
topic = addHref(uriInfo, dao.create(topic, create.getService(), create.getOwner()));
return Response.created(topic.getHref()).entity(topic).build();
}
@ -284,12 +286,53 @@ public class TopicResource {
.withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime());
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime())
.withTags(create.getTags());
PutResponse<Topic> response = dao.createOrUpdate(topic, create.getService(), create.getOwner());
topic = addHref(uriInfo, response.getEntity());
return Response.status(response.getStatus()).entity(topic).build();
}
@PUT
@Path("/{id}/followers")
@Operation(summary = "Add a follower", tags = "topics",
description = "Add a user identified by `userId` as followed of this topic",
responses = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "404", description = "Topic for instance {id} is not found")
})
public Response addFollower(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the topic", schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Id of the user to be added as follower",
schema = @Schema(type = "string"))
String userId) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, "followers");
Response.Status status = dao.addFollower(id, userId);
Topic table = dao.get(id, fields);
return Response.status(status).entity(table).build();
}
@DELETE
@Path("/{id}/followers/{userId}")
@Operation(summary = "Remove a follower", tags = "topic",
description = "Remove the user identified `userId` as a follower of the topic.")
public Topic deleteFollower(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the topic",
schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Id of the user being removed as follower",
schema = @Schema(type = "string"))
@PathParam("userId") String userId) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, "followers");
dao.deleteFollower(id, userId);
Topic topic = dao.get(id, fields);
return addHref(uriInfo, topic);
}
@DELETE
@Path("/{id}")
@Operation(summary = "Delete a topic", tags = "topics",

View File

@ -38,6 +38,7 @@ import org.openmetadata.catalog.jdbi3.ReportRepository.ReportDAO;
import org.openmetadata.catalog.jdbi3.TableRepository.TableDAO;
import org.openmetadata.catalog.jdbi3.TagRepository.TagDAO;
import org.openmetadata.catalog.jdbi3.TeamRepository.TeamDAO;
import org.openmetadata.catalog.jdbi3.TopicRepository.TopicDAO;
import org.openmetadata.catalog.jdbi3.UsageRepository.UsageDAO;
import org.openmetadata.catalog.jdbi3.UserRepository.UserDAO;
import org.openmetadata.catalog.resources.databases.DatabaseResource;
@ -208,16 +209,16 @@ public final class EntityUtil {
public static List<EntityReference> getEntityReference(List<EntityReference> list, TableDAO tableDAO,
DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO) throws IOException {
ReportDAO reportDAO, TopicDAO topicDAO) throws IOException {
for (EntityReference ref : list) {
getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO);
getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO);
}
return list;
}
public static EntityReference getEntityReference(EntityReference ref, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO) throws IOException {
ReportDAO reportDAO, TopicDAO topicDAO) throws IOException {
// Note href to entity reference is not added here
String entity = ref.getType();
String id = ref.getId().toString();
@ -236,20 +237,23 @@ public final class EntityUtil {
} else if (entity.equalsIgnoreCase(Entity.REPORT)) {
Report instance = EntityUtil.validate(id, reportDAO.findById(id), Report.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(id, topicDAO.findById(id), Topic.class);
return ref.withDescription(instance.getDescription()).withName(instance.getFullyQualifiedName());
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(entity));
}
public static EntityReference getEntityReference(String entity, String id, TableDAO tableDAO, DatabaseDAO databaseDAO,
public static EntityReference getEntityReference(String entity, UUID id, TableDAO tableDAO, DatabaseDAO databaseDAO,
MetricsDAO metricsDAO, DashboardDAO dashboardDAO,
ReportDAO reportDAO) throws IOException {
EntityReference ref = new EntityReference().withId(UUID.fromString(id)).withType(entity);
return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO);
ReportDAO reportDAO, TopicDAO topicDAO) throws IOException {
EntityReference ref = new EntityReference().withId(id).withType(entity);
return getEntityReference(ref, tableDAO, databaseDAO, metricsDAO, dashboardDAO, reportDAO, topicDAO);
}
public static EntityReference getEntityReferenceByName(String entity, String fqn, TableDAO tableDAO,
DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
ReportDAO reportDAO) throws IOException {
ReportDAO reportDAO, TopicDAO topicDAO) throws IOException {
if (entity.equalsIgnoreCase(Entity.TABLE)) {
Table instance = EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TABLE)
@ -266,6 +270,10 @@ public final class EntityUtil {
Report instance = EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.REPORT)
.withDescription(instance.getDescription());
} else if (entity.equalsIgnoreCase(Entity.TOPIC)) {
Topic instance = EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class);
return new EntityReference().withId(instance.getId()).withName(instance.getName()).withType(Entity.TOPIC)
.withDescription(instance.getDescription());
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entity, fqn));
}
@ -292,6 +300,9 @@ public final class EntityUtil {
} else if (clazz.toString().toLowerCase().endsWith(Entity.USER.toLowerCase())) {
User instance = (User) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) {
Topic instance = (Topic) entity;
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(
String.format("Failed to find entity class %s", clazz.toString())));
@ -309,25 +320,29 @@ public final class EntityUtil {
public static EntityReference validateEntityLink(EntityLink entityLink, UserDAO userDAO, TeamDAO teamDAO,
TableDAO tableDAO, DatabaseDAO databaseDAO, MetricsDAO metricsDAO,
DashboardDAO dashboardDAO, ReportDAO reportDAO) throws IOException {
DashboardDAO dashboardDAO, ReportDAO reportDAO, TopicDAO topicDAO)
throws IOException {
String entityType = entityLink.getEntityType();
String fqn = entityLink.getEntityId();
if (entityType.equals(Entity.USER)) {
return getEntityReference(EntityUtil.validate(fqn, userDAO.findByName(fqn), User.class));
} else if (entityType.equals(Entity.TEAM)) {
return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class));
} else if (entityType.equals(Entity.TABLE)) {
return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class));
} else if (entityType.equals(Entity.DATABASE)) {
return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class));
} else if (entityType.equals(Entity.METRICS)) {
return getEntityReference(EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class));
} else if (entityType.equals(Entity.DASHBOARD)) {
return getEntityReference(EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class));
} else if (entityType.equals(Entity.REPORT)) {
return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class));
} else {
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
switch (entityType) {
case Entity.USER:
return getEntityReference(EntityUtil.validate(fqn, userDAO.findByName(fqn), User.class));
case Entity.TEAM:
return getEntityReference(EntityUtil.validate(fqn, teamDAO.findByName(fqn), Team.class));
case Entity.TABLE:
return getEntityReference(EntityUtil.validate(fqn, tableDAO.findByFQN(fqn), Table.class));
case Entity.DATABASE:
return getEntityReference(EntityUtil.validate(fqn, databaseDAO.findByFQN(fqn), Database.class));
case Entity.METRICS:
return getEntityReference(EntityUtil.validate(fqn, metricsDAO.findByFQN(fqn), Metrics.class));
case Entity.DASHBOARD:
return getEntityReference(EntityUtil.validate(fqn, dashboardDAO.findByFQN(fqn), Dashboard.class));
case Entity.REPORT:
return getEntityReference(EntityUtil.validate(fqn, reportDAO.findByFQN(fqn), Report.class));
case Entity.TOPIC:
return getEntityReference(EntityUtil.validate(fqn, topicDAO.findByFQN(fqn), Topic.class));
default:
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityNotFound(entityType, fqn));
}
}
@ -384,6 +399,7 @@ public final class EntityUtil {
.withName(user.getName()).withType(Entity.USER);
}
public static void validateTags(TagDAO tagDAO, List<TagLabel> tagLabels) {
Optional.ofNullable(tagLabels).orElse(Collections.emptyList()).forEach(tagLabel -> {
if (!tagDAO.tagExists(tagLabel.getTagFQN())) {

View File

@ -56,6 +56,14 @@
"owner": {
"description": "Owner of this topic",
"$ref": "../../type/entityReference.json"
},
"tags": {
"description": "Tags for this table",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
}
},
"required": [

View File

@ -107,6 +107,18 @@
"description": "Owner of this topic.",
"$ref": "../../type/entityReference.json"
},
"followers": {
"description": "Followers of this table.",
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
},
"tags": {
"description": "Tags for this table.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
},
"href": {
"description": "Link to the resource corresponding to this entity.",
"$ref": "../../type/basic.json#/definitions/href"

View File

@ -26,6 +26,7 @@ import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.api.data.CreateTopic;
import org.openmetadata.catalog.api.services.CreateMessagingService;
import org.openmetadata.catalog.api.services.CreateMessagingService.MessagingServiceType;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.entity.services.MessagingService;
import org.openmetadata.catalog.entity.teams.Team;
@ -56,17 +57,11 @@ import static javax.ws.rs.core.Response.Status.CREATED;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.*;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.readOnlyAttribute;
import static org.openmetadata.catalog.util.TestUtils.LONG_ENTITY_NAME;
import static org.openmetadata.catalog.util.TestUtils.adminAuthHeaders;
import static org.openmetadata.catalog.util.TestUtils.assertEntityPagination;
import static org.openmetadata.catalog.util.TestUtils.assertResponse;
import static org.openmetadata.catalog.util.TestUtils.authHeaders;
import static org.openmetadata.catalog.util.TestUtils.*;
import static org.openmetadata.catalog.util.TestUtils.NON_EXISTENT_ENTITY;
public class TopicResourceTest extends CatalogApplicationTest {
private static final Logger LOG = LoggerFactory.getLogger(TopicResourceTest.class);
@ -476,6 +471,42 @@ public class TopicResourceTest extends CatalogApplicationTest {
// TODO
}
@Test
public void put_addDeleteFollower_200(TestInfo test) throws HttpResponseException {
Topic topic = createAndCheckTopic(create(test), adminAuthHeaders());
// Add follower to the table
User user1 = UserResourceTest.createUser(UserResourceTest.create(test, 1), userAuthHeaders());
addAndCheckFollower(topic, user1.getId(), CREATED, 1, userAuthHeaders());
// Add the same user as follower and make sure no errors are thrown and return response is OK (and not CREATED)
addAndCheckFollower(topic, user1.getId(), OK, 1, userAuthHeaders());
// Add a new follower to the table
User user2 = UserResourceTest.createUser(UserResourceTest.create(test, 2), userAuthHeaders());
addAndCheckFollower(topic, user2.getId(), CREATED, 2, userAuthHeaders());
// Delete followers and make sure they are deleted
deleteAndCheckFollower(topic, user1.getId(), 1, userAuthHeaders());
deleteAndCheckFollower(topic, user2.getId(), 0, userAuthHeaders());
}
@Test
public void put_addDeleteInvalidFollower_200(TestInfo test) throws HttpResponseException {
Topic topic = createAndCheckTopic(create(test), adminAuthHeaders());
// Add non existent user as follower to the table
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
addAndCheckFollower(topic, NON_EXISTENT_ENTITY, CREATED, 1, adminAuthHeaders()));
assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("User", NON_EXISTENT_ENTITY));
// Delete non existent user as follower to the table
exception = assertThrows(HttpResponseException.class, () ->
deleteAndCheckFollower(topic, NON_EXISTENT_ENTITY, 1, adminAuthHeaders()));
assertResponse(exception, NOT_FOUND, CatalogExceptionMessage.entityNotFound("User", NON_EXISTENT_ENTITY));
}
@Test
public void delete_nonExistentTopic_404() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
@ -666,4 +697,69 @@ public class TopicResourceTest extends CatalogApplicationTest {
public static CreateTopic create(TestInfo test, int index) {
return new CreateTopic().withName(getTopicName(test, index)).withService(KAFKA_REFERENCE).withPartitions(1);
}
public static void addAndCheckFollower(Topic topic, UUID userId, Status status, int totalFollowerCount,
Map<String, String> authHeaders) throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource(String.format("topics/%s/followers", topic.getId()));
TestUtils.put(target, userId.toString(), status, authHeaders);
// GET .../topics/{topicId} returns newly added follower
Topic getTopic = getTopic(topic.getId(), "followers", authHeaders);
assertEquals(totalFollowerCount, getTopic.getFollowers().size());
TestUtils.validateEntityReference(getTopic.getFollowers());
boolean followerFound = false;
for (EntityReference followers : getTopic.getFollowers()) {
if (followers.getId().equals(userId)) {
followerFound = true;
break;
}
}
assertTrue(followerFound, "Follower added was not found in table get response");
// GET .../users/{userId} shows user as following table
checkUserFollowing(userId, topic.getId(), true, authHeaders);
}
private static void checkUserFollowing(UUID userId, UUID topicId, boolean expectedFollowing,
Map<String, String> authHeaders) throws HttpResponseException {
// GET .../users/{userId} shows user as following table
boolean following = false;
User user = UserResourceTest.getUser(userId, "follows", authHeaders);
for (EntityReference follows : user.getFollows()) {
TestUtils.validateEntityReference(follows);
if (follows.getId().equals(topicId)) {
following = true;
break;
}
}
assertEquals(expectedFollowing, following, "Follower list for the user is invalid");
}
private void deleteAndCheckFollower(Topic topic, UUID userId, int totalFollowerCount,
Map<String, String> authHeaders) throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource(String.format("topics/%s/followers/%s",
topic.getId(), userId));
TestUtils.delete(target, authHeaders);
Topic getTopic = checkFollowerDeleted(topic.getId(), userId, authHeaders);
assertEquals(totalFollowerCount, getTopic.getFollowers().size());
}
public static Topic checkFollowerDeleted(UUID topicId, UUID userId, Map<String, String> authHeaders)
throws HttpResponseException {
Topic getTopic = getTopic(topicId, "followers", authHeaders);
TestUtils.validateEntityReference(getTopic.getFollowers());
boolean followerFound = false;
for (EntityReference followers : getTopic.getFollowers()) {
if (followers.getId().equals(userId)) {
followerFound = true;
break;
}
}
assertFalse(followerFound, "Follower deleted is still found in table get response");
// GET .../users/{userId} shows user as following table
checkUserFollowing(userId, topicId, false, authHeaders);
return getTopic;
}
}

View File

@ -169,7 +169,7 @@ public final class TestUtils {
assertNotNull(ref.getName());
assertNotNull(ref.getType());
// Ensure data entities use fully qualified name
if (List.of("table", "database", "metrics", "dashboard", "pipeline", "report").contains(ref.getName())) {
if (List.of("table", "database", "metrics", "dashboard", "pipeline", "report", "topic").contains(ref.getName())) {
ref.getName().contains("."); // FullyQualifiedName has "." as separator
}
}

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations
@ -9,7 +9,7 @@ from typing import Any, List, Optional
from pydantic import BaseModel, Field, conint
from ...entity.data import topic
from ...type import entityReference
from ...type import entityReference, tagLabel
class CreateTopic(BaseModel):
@ -57,3 +57,6 @@ class CreateTopic(BaseModel):
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this topic'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(
None, description='Tags for this table'
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations
@ -9,7 +9,7 @@ from typing import Any, List, Optional
from pydantic import BaseModel, Field, conint, constr
from ...type import basic, entityReference
from ...type import basic, entityReference, tagLabel
class TopicName(BaseModel):
@ -80,6 +80,12 @@ class Topic(BaseModel):
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this topic.'
)
followers: Optional[entityReference.EntityReferenceList] = Field(
None, description='Followers of this table.'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(
None, description='Tags for this table.'
)
href: Optional[basic.Href] = Field(
None, description='Link to the resource corresponding to this entity.'
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-08-22T00:32:25+00:00
# timestamp: 2021-08-22T05:45:37+00:00
from __future__ import annotations

View File

@ -206,3 +206,4 @@ class TopicESDocument(BaseModel):
tier: Optional[str] = None
schema_description: Optional[str] = None
owner: str
followers: List[str]

View File

@ -166,9 +166,18 @@ class ElasticsearchSink(Sink):
suggest = [{'input': [fqdn], 'weight': 5}, {'input': [topic_name], 'weight': 10}]
tags = set()
timestamp = time.time()
tier = None
service_entity = self.rest.get_messaging_service_by_id(str(topic.service.id.__root__))
topic_owner = str(topic.owner.id.__root__) if topic.owner is not None else ""
topic_followers = []
if topic.followers:
for follower in topic.followers.__root__:
topic_followers.append(str(follower.id.__root__))
tier = None
for topic_tag in topic.tags:
if "Tier" in topic_tag.tagFQN:
tier = topic_tag.tagFQN
else:
tags.add(topic_tag.tagFQN)
topic_doc = TopicESDocument(topic_id=str(topic.id.__root__),
service=service_entity.name,
service_type=service_entity.serviceType.name,
@ -179,7 +188,8 @@ class ElasticsearchSink(Sink):
tier=tier,
tags=list(tags),
fqdn=fqdn,
owner=topic_owner)
owner=topic_owner,
followers=topic_followers)
return topic_doc

View File

@ -54,11 +54,10 @@ class MetadataSource(Source):
if self.config.include_topics:
self.topics = self.client.list_topics(
fields="owner,service", offset=0, limit=self.config.limit_records)
fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records)
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
print("WTF")
config = MetadataTablesRestSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)