From 562d6b39efa84d201315af1c5c01a31616da184d Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 14 Feb 2022 07:47:07 -0800 Subject: [PATCH] Fix #2704: Add suggestions API to query users and teams like autocomplete (#2742) --- .../ElasticSearchEventPublisher.java | 88 +++++++++++ .../ElasticSearchIndexDefinition.java | 146 +++++++++++++++++- .../resources/search/SearchResource.java | 26 ++++ .../elasticsearch/team_index_mapping.json | 29 ++++ .../elasticsearch/user_index_mapping.json | 32 ++++ .../ingestion/models/table_metadata.py | 31 ++++ .../metadata/ingestion/sink/elasticsearch.py | 99 +++++++++++- .../ingestion/sink/elasticsearch_constants.py | 75 +++++++++ .../src/metadata/ingestion/source/metadata.py | 68 ++++++++ 9 files changed, 591 insertions(+), 3 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json create mode 100644 catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java index c2728055880..b2da938bf22 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java @@ -37,6 +37,8 @@ import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; +import org.openmetadata.catalog.entity.teams.Team; +import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.events.AbstractEventPublisher; import org.openmetadata.catalog.events.errors.EventPublisherException; import org.openmetadata.catalog.resources.events.EventResource.ChangeEventList; @@ -84,6 +86,12 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { case Entity.PIPELINE: updateRequest = updatePipeline(event); break; + case Entity.USER: + updateRequest = updateUser(event); + break; + case Entity.TEAM: + updateRequest = updateTeam(event); + break; default: LOG.warn("Ignoring Entity Type {}", entityType); } @@ -291,6 +299,60 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { return updateRequest; } + private UpdateRequest updateUser(ChangeEvent event) throws IOException { + UpdateRequest updateRequest = + new UpdateRequest(ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString()); + UserESIndex userESIndex = null; + if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) { + User user = (User) event.getEntity(); + userESIndex = UserESIndex.builder(user, event.getEventType()).build(); + } + switch (event.getEventType()) { + case ENTITY_CREATED: + String json = JsonUtils.pojoToJson(userESIndex); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + break; + case ENTITY_UPDATED: + scriptedUserUpsert(userESIndex, updateRequest); + break; + case ENTITY_SOFT_DELETED: + softDeleteEntity(updateRequest); + break; + case ENTITY_DELETED: + break; + } + + return updateRequest; + } + + private UpdateRequest updateTeam(ChangeEvent event) throws IOException { + UpdateRequest updateRequest = + new UpdateRequest(ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString()); + TeamESIndex teamESIndex = null; + if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) { + Team team = (Team) event.getEntity(); + teamESIndex = TeamESIndex.builder(team, event.getEventType()).build(); + } + switch (event.getEventType()) { + case ENTITY_CREATED: + String json = JsonUtils.pojoToJson(teamESIndex); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + break; + case ENTITY_UPDATED: + scriptedTeamUpsert(teamESIndex, updateRequest); + break; + case ENTITY_SOFT_DELETED: + softDeleteEntity(updateRequest); + break; + case ENTITY_DELETED: + break; + } + + return updateRequest; + } + private void scriptedUpsert(Object index, UpdateRequest updateRequest) { String scriptTxt = "for (k in params.keySet()) {if (k == 'change_descriptions') " @@ -302,6 +364,32 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { updateRequest.scriptedUpsert(true); } + private void scriptedUserUpsert(Object index, UpdateRequest updateRequest) { + String scriptTxt = + "for (k in params.keySet()) {if (k == 'teams') " + + "{ ctx._source.teams.addAll(params.teams) } " + + "else if (k == 'roles') " + + " { ctx._source.roles.addAll(params.roles) }" + + "else { ctx._source.put(k, params.get(k)) }}"; + Map doc = JsonUtils.getMap(index); + Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, doc); + updateRequest.script(script); + updateRequest.scriptedUpsert(true); + } + + private void scriptedTeamUpsert(Object index, UpdateRequest updateRequest) { + String scriptTxt = + "for (k in params.keySet()) {if (k == 'users') " + + "{ ctx._source.users.addAll(params.users) } " + + "else if (k == 'owns') " + + " { ctx._source.owns.addAll(params.owns) }" + + "else { ctx._source.put(k, params.get(k)) }}"; + Map doc = JsonUtils.getMap(index); + Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, doc); + updateRequest.script(script); + updateRequest.scriptedUpsert(true); + } + private void softDeleteEntity(UpdateRequest updateRequest) { String scriptTxt = "ctx._source.deleted=true"; Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, new HashMap<>()); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java index b52a3af168d..7d6d2eb4a5c 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -34,6 +34,8 @@ import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; +import org.openmetadata.catalog.entity.teams.Team; +import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.type.Column; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EventType; @@ -63,7 +65,9 @@ public class ElasticSearchIndexDefinition { TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/table_index_mapping.json"), TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/topic_index_mapping.json"), DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"), - PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"); + PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"), + USER_SEARCH_INDEX("user_search_index", "/elasticsearch/user_index_mapping.json"), + TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"); public final String indexName; public final String indexMappingFile; @@ -189,6 +193,10 @@ public class ElasticSearchIndexDefinition { return ElasticSearchIndexType.PIPELINE_SEARCH_INDEX; } else if (type.equalsIgnoreCase(Entity.TOPIC)) { return ElasticSearchIndexType.TOPIC_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.USER)) { + return ElasticSearchIndexType.USER_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.TEAM)) { + return ElasticSearchIndexType.TEAM_SEARCH_INDEX; } throw new RuntimeException("Failed to find index doc for type " + type); } @@ -708,3 +716,139 @@ class PipelineESIndex extends ElasticSearchIndex { return pipelineESIndexBuilder; } } + +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class UserESIndex { + @JsonProperty("user_id") + String userId; + + @JsonProperty("name") + String name; + + @JsonProperty("display_name") + String displayName; + + @JsonProperty("email") + String email; + + @JsonProperty("entity_type") + String entityType; + + @JsonProperty("teams") + List teams; + + @JsonProperty("roles") + List roles; + + @JsonProperty("last_updated_timestamp") + @Builder.Default + Long lastUpdatedTimestamp = System.currentTimeMillis(); + + List suggest; + + Boolean deleted; + + public static UserESIndexBuilder builder(User user, EventType eventType) { + List teams = new ArrayList<>(); + List roles = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(user.getName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(user.getDisplayName()).weight(10).build()); + Long updatedTimestamp = user.getUpdatedAt(); + String displayName = user.getDisplayName() != null ? user.getDisplayName() : ""; + if (user.getTeams() != null) { + for (EntityReference team : user.getTeams()) { + teams.add(team.getId().toString()); + } + } + + for (EntityReference role : user.getRoles()) { + roles.add(role.getId().toString()); + } + + UserESIndexBuilder userESIndexBuilder = + internalBuilder() + .userId(user.getId().toString()) + .deleted(user.getDeleted()) + .name(user.getName()) + .email(user.getEmail()) + .displayName(displayName) + .lastUpdatedTimestamp(updatedTimestamp) + .entityType("user") + .suggest(suggest) + .teams(teams) + .roles(roles); + + return userESIndexBuilder; + } +} + +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class TeamESIndex { + @JsonProperty("team_id") + String teamId; + + @JsonProperty("name") + String name; + + @JsonProperty("display_name") + String displayName; + + @JsonProperty("entity_type") + String entityType; + + @JsonProperty("users") + List users; + + @JsonProperty("owns") + List owns; + + @JsonProperty("last_updated_timestamp") + @Builder.Default + Long lastUpdatedTimestamp = System.currentTimeMillis(); + + List suggest; + + Boolean deleted; + + public static TeamESIndexBuilder builder(Team team, EventType eventType) { + List users = new ArrayList<>(); + List owns = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(team.getName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(team.getDisplayName()).weight(10).build()); + Long updatedTimestamp = team.getUpdatedAt(); + String displayName = team.getDisplayName() != null ? team.getDisplayName() : ""; + if (team.getUsers() != null) { + for (EntityReference user : team.getUsers()) { + users.add(user.getId().toString()); + } + } + if (team.getOwns() != null) { + for (EntityReference own : team.getOwns()) { + owns.add(own.getId().toString()); + } + } + + TeamESIndexBuilder teamESIndexBuilder = + internalBuilder() + .teamId(team.getId().toString()) + .deleted(team.getDeleted()) + .name(team.getName()) // pipeline names can be unique ids from source, hence use displayName for search + // indexing + .displayName(displayName) + .lastUpdatedTimestamp(updatedTimestamp) + .entityType("team") + .suggest(suggest) + .owns(owns) + .users(users); + + return teamESIndexBuilder; + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index 9df98a1aa09..c9565d72ce3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -147,6 +147,12 @@ public class SearchResource { case "table_search_index": searchSourceBuilder = buildTableSearchBuilder(query, from, size); break; + case "user_search_index": + searchSourceBuilder = buildUserSearchBuilder(query, from, size); + break; + case "team_search_index": + searchSourceBuilder = buildTeamSearchBuilder(query, from, size); + break; default: searchSourceBuilder = buildAggregateSearchBuilder(query, from, size); break; @@ -354,4 +360,24 @@ public class SearchResource { return searchSourceBuilder; } + + private SearchSourceBuilder buildUserSearchBuilder(String query, int from, int size) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder + .query(QueryBuilders.queryStringQuery(query).field("name", 5.0f).field("display_name", 1.0f).lenient(true)) + .from(from) + .size(size); + + return searchSourceBuilder; + } + + private SearchSourceBuilder buildTeamSearchBuilder(String query, int from, int size) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder + .query(QueryBuilders.queryStringQuery(query).field("name", 5.0f).field("display_name", 3.0f).lenient(true)) + .from(from) + .size(size); + + return searchSourceBuilder; + } } diff --git a/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json new file mode 100644 index 00000000000..54da92a4d20 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json @@ -0,0 +1,29 @@ +{ + "properties": { + "name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "users": { + "type": "keyword" + }, + "owns": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json new file mode 100644 index 00000000000..5d831db628d --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json @@ -0,0 +1,32 @@ +{ + "properties": { + "name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "email": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "teams": { + "type": "keyword" + }, + "roles": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "suggest": { + "type": "completion" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 416c226355b..df9ecf36e8b 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -158,6 +158,37 @@ class PipelineESDocument(BaseModel): doc_as_upsert: bool = True +class UserESDocument(BaseModel): + """Elastic Search Mapping doc for Users""" + + user_id: str + deleted: bool + entity_type: str = "user" + name: str + display_name: str + email: str + suggest: List[dict] + last_updated_timestamp: Optional[int] + teams: List[str] + roles: List[str] + doc_as_upsert: bool = True + + +class TeamESDocument(BaseModel): + """Elastic Search Mapping doc for Teams""" + + team_id: str + deleted: bool + entity_type: str = "team" + name: str + display_name: str + suggest: List[dict] + last_updated_timestamp: Optional[int] + users: List[str] + owns: List[str] + doc_as_upsert: bool = True + + class DashboardOwner(BaseModel): """Dashboard owner""" diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 13f31651948..5009303c141 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -31,6 +31,8 @@ from metadata.generated.schema.entity.services.dashboardService import Dashboard from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.teams.team import Team +from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type import entityReference from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus @@ -39,7 +41,9 @@ from metadata.ingestion.models.table_metadata import ( DashboardESDocument, PipelineESDocument, TableESDocument, + TeamESDocument, TopicESDocument, + UserESDocument, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -47,7 +51,9 @@ from metadata.ingestion.sink.elasticsearch_constants import ( DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING, TABLE_ELASTICSEARCH_INDEX_MAPPING, + TEAM_ELASTICSEARCH_INDEX_MAPPING, TOPIC_ELASTICSEARCH_INDEX_MAPPING, + USER_ELASTICSEARCH_INDEX_MAPPING, ) logger = logging.getLogger(__name__) @@ -66,12 +72,14 @@ class ElasticSearchConfig(ConfigModel): index_topics: Optional[bool] = True index_dashboards: Optional[bool] = True index_pipelines: Optional[bool] = True - index_dbt_models: Optional[bool] = True + index_users: Optional[bool] = True + index_teams: Optional[bool] = True table_index_name: str = "table_search_index" topic_index_name: str = "topic_search_index" dashboard_index_name: str = "dashboard_search_index" pipeline_index_name: str = "pipeline_search_index" - dbt_index_name: str = "dbt_model_search_index" + user_index_name: str = "user_search_index" + team_index_name: str = "team_search_index" scheme: str = "http" use_ssl: bool = False verify_certs: bool = False @@ -144,6 +152,16 @@ class ElasticsearchSink(Sink[Entity]): self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING ) + if self.config.index_users: + self._check_or_create_index( + self.config.user_index_name, USER_ELASTICSEARCH_INDEX_MAPPING + ) + + if self.config.index_teams: + self._check_or_create_index( + self.config.team_index_name, TEAM_ELASTICSEARCH_INDEX_MAPPING + ) + def _check_or_create_index(self, index_name: str, es_mapping: str): """ Retrieve all indices that currently have {elasticsearch_alias} alias @@ -208,6 +226,24 @@ class ElasticsearchSink(Sink[Entity]): request_timeout=self.config.timeout, ) + if isinstance(record, User): + user_doc = self._create_user_es_doc(record) + self.elasticsearch_client.index( + index=self.config.user_index_name, + id=str(user_doc.user_id), + body=user_doc.json(), + request_timeout=self.config.timeout, + ) + + if isinstance(record, Team): + team_doc = self._create_team_es_doc(record) + self.elasticsearch_client.index( + index=self.config.team_index_name, + id=str(team_doc.team_id), + body=team_doc.json(), + request_timeout=self.config.timeout, + ) + if hasattr(record.name, "__root__"): self.status.records_written(record.name.__root__) else: @@ -443,6 +479,65 @@ class ElasticsearchSink(Sink[Entity]): return pipeline_doc + def _create_user_es_doc(self, user: User): + suggest = [ + {"input": [user.displayName], "weight": 5}, + {"input": [user.name], "weight": 10}, + ] + timestamp = user.updatedAt.__root__ + teams = [] + roles = [] + if user.teams: + for team in user.teams.__root__: + teams.append(str(team.id.__root__)) + + if user.roles: + for role in user.roles.__root__: + roles.append(str(role.id.__root__)) + + user_doc = UserESDocument( + user_id=str(user.id.__root__), + deleted=user.deleted, + name=user.name.__root__, + display_name=user.displayName, + email=user.email.__root__, + suggest=suggest, + last_updated_timestamp=timestamp, + teams=list(teams), + roles=list(roles), + ) + + return user_doc + + def _create_team_es_doc(self, team: Team): + suggest = [ + {"input": [team.displayName], "weight": 5}, + {"input": [team.name], "weight": 10}, + ] + timestamp = team.updatedAt.__root__ + users = [] + owns = [] + if team.users: + for user in team.users.__root__: + users.append(str(team.id.__root__)) + + if team.owns: + for own in team.owns.__root__: + owns.append(str(own.id.__root__)) + + team_doc = TeamESDocument( + team_id=str(team.id.__root__), + deleted=team.deleted, + name=team.name.__root__, + display_name=team.displayName, + suggest=suggest, + last_updated_timestamp=timestamp, + users=list(users), + owns=list(owns), + ) + + return team_doc + def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]): charts = [] if chart_refs: diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index b26489f708e..611d158ac95 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -390,3 +390,78 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( } """ ) + +USER_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( + """ + { + "mappings":{ + "properties": { + "name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "email": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "teams": { + "type": "keyword" + }, + "roles": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "suggest": { + "type": "completion" + }, + } + } + } + """ +) + +TEAM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( + """ + { + "mappings":{ + "properties": { + "name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "users": { + "type": "keyword" + }, + "owns": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + } + } + } + """ +) diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 4d6b3f1ffe2..aa58b86e321 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -19,6 +19,8 @@ from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.teams.team import Team +from metadata.generated.schema.entity.teams.user import User from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -34,6 +36,8 @@ class MetadataTablesRestSourceConfig(ConfigModel): include_topics: Optional[bool] = True include_dashboards: Optional[bool] = True include_pipelines: Optional[bool] = True + include_users: Optional[bool] = True + include_teams: Optional[bool] = True limit_records: int = 1000 @@ -78,6 +82,24 @@ class MetadataSourceStatus(SourceStatus): self.success.append(dashboard_name) logger.info("Dashboard Scanned: %s", dashboard_name) + def scanned_team(self, team_name: str) -> None: + """scanned team method + + Args: + team_name (str) + """ + self.success.append(team_name) + logger.info("Team Scanned: %s", team_name) + + def scanned_user(self, user_name: str) -> None: + """scanned user method + + Args: + user_name (str) + """ + self.success.append(user_name) + logger.info("User Scanned: %s", user_name) + # pylint: disable=unused-argument def filtered( self, table_name: str, err: str, dataset_name: str = None, col_type: str = None @@ -145,6 +167,8 @@ class MetadataSource(Source[Entity]): yield from self.fetch_topic() yield from self.fetch_dashboard() yield from self.fetch_pipeline() + yield from self.fetch_users() + yield from self.fetch_teams() def fetch_table(self) -> Table: """Fetch table method @@ -247,6 +271,50 @@ class MetadataSource(Source[Entity]): break after = pipeline_entities.after + def fetch_users(self) -> User: + """fetch users method + + Returns: + User: + """ + if self.config.include_users: + after = None + while True: + user_entities = self.metadata.list_entities( + entity=User, + fields=["teams", "roles"], + after=after, + limit=self.config.limit_records, + ) + for user in user_entities.entities: + self.status.scanned_user(user.name) + yield user + if user_entities.after is None: + break + after = user_entities.after + + def fetch_teams(self) -> Team: + """fetch teams method + + Returns: + Team: + """ + if self.config.include_teams: + after = None + while True: + team_entities = self.metadata.list_entities( + entity=Team, + fields=["users", "owns"], + after=after, + limit=self.config.limit_records, + ) + for team in team_entities.entities: + self.status.scanned_team(team.name) + yield team + if team_entities.after is None: + break + after = team_entities.after + def get_status(self) -> SourceStatus: return self.status