diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java index 5339407ee4f..61bc3aaa52b 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java @@ -45,6 +45,7 @@ import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.Database; import org.openmetadata.catalog.entity.data.DatabaseSchema; import org.openmetadata.catalog.entity.data.GlossaryTerm; +import org.openmetadata.catalog.entity.data.MlModel; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; @@ -127,6 +128,9 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { case Entity.PIPELINE_SERVICE: updatePipelineService(event); break; + case Entity.MLMODEL: + updateMlModel(event); + break; default: LOG.warn("Ignoring Entity Type {}", entityType); } @@ -446,6 +450,43 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } } + private void updateMlModel(ChangeEvent event) throws IOException { + MlModelESIndex mlModelESIndex = null; + if (event.getEntity() != null + && event.getEventType() != EventType.ENTITY_SOFT_DELETED + && event.getEventType() != EventType.ENTITY_DELETED) { + MlModel mlModel = (MlModel) event.getEntity(); + mlModelESIndex = MlModelESIndex.builder(mlModel, event.getEventType()).build(); + } + UpdateRequest updateRequest = + new UpdateRequest(ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString()); + switch (event.getEventType()) { + case ENTITY_CREATED: + String json = JsonUtils.pojoToJson(mlModelESIndex); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + updateElasticSearch(updateRequest); + break; + case ENTITY_UPDATED: + if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) { + updateRequest = applyChangeEvent(event); + } else { + scriptedUpsert(mlModelESIndex, updateRequest); + } + updateElasticSearch(updateRequest); + break; + case ENTITY_SOFT_DELETED: + softDeleteEntity(updateRequest); + updateElasticSearch(updateRequest); + break; + case ENTITY_DELETED: + DeleteRequest deleteRequest = + new DeleteRequest(ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString()); + deleteEntityFromElasticSearch(deleteRequest); + break; + } + } + private void updateDatabase(ChangeEvent event) throws IOException { if (event.getEventType() == EventType.ENTITY_DELETED) { Database database = (Database) event.getEntity(); @@ -515,12 +556,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } private void scriptedUserUpsert(Object index, UpdateRequest updateRequest) { - String scriptTxt = - "for (k in params.keySet()) {if (k == 'teams') " - + "{ ctx._source.teams.addAll(params.teams) } " - + "else if (k == 'roles') " - + " { ctx._source.roles.addAll(params.roles) }" - + "else { ctx._source.put(k, params.get(k)) }}"; + String scriptTxt = "for (k in params.keySet()) {ctx._source.put(k, params.get(k)) }"; Map doc = JsonUtils.getMap(index); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt, doc); updateRequest.script(script); @@ -528,12 +564,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { } private void scriptedTeamUpsert(Object index, UpdateRequest updateRequest) { - String scriptTxt = - "for (k in params.keySet()) {if (k == 'users') " - + "{ ctx._source.users.addAll(params.users) } " - + "else if (k == 'owns') " - + " { ctx._source.owns.addAll(params.owns) }" - + "else { ctx._source.put(k, params.get(k)) }}"; + String scriptTxt = "for (k in params.keySet()) { ctx._source.put(k, params.get(k)) }"; Map doc = JsonUtils.getMap(index); Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt, doc); updateRequest.script(script); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java index 05b90408de0..f8877311911 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Dashboard; import org.openmetadata.catalog.entity.data.GlossaryTerm; +import org.openmetadata.catalog.entity.data.MlModel; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; @@ -39,6 +40,8 @@ import org.openmetadata.catalog.entity.teams.User; import org.openmetadata.catalog.type.Column; import org.openmetadata.catalog.type.EntityReference; import org.openmetadata.catalog.type.EventType; +import org.openmetadata.catalog.type.MlFeature; +import org.openmetadata.catalog.type.MlHyperParameter; import org.openmetadata.catalog.type.TagLabel; import org.openmetadata.catalog.type.Task; import org.openmetadata.catalog.util.FullyQualifiedName; @@ -69,7 +72,8 @@ public class ElasticSearchIndexDefinition { PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"), USER_SEARCH_INDEX("user_search_index", "/elasticsearch/user_index_mapping.json"), TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"), - GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json"); + GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json"), + MLMODEL_SEARCH_INDEX("mlmodel_search_index", "/elasticsearch/mlmodel_index_mapping.json"); public final String indexName; public final String indexMappingFile; @@ -190,6 +194,8 @@ public class ElasticSearchIndexDefinition { return ElasticSearchIndexType.TEAM_SEARCH_INDEX; } else if (type.equalsIgnoreCase(Entity.GLOSSARY)) { return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.MLMODEL)) { + return ElasticSearchIndexType.MLMODEL_SEARCH_INDEX; } throw new RuntimeException("Failed to find index doc for type " + type); } @@ -216,7 +222,7 @@ class ElasticSearchIndex { String displayName; String fqdn; - String service; + EntityReference service; Boolean deleted; @JsonProperty("service_type") @@ -361,7 +367,6 @@ class TableESIndex extends ElasticSearchIndex { .fqdn(table.getFullyQualifiedName()) .suggest(suggest) .entityType("table") - .serviceCategory("databaseService") .columnNames(columnNames) .columnDescriptions(columnDescriptions) .tableType(tableType) @@ -377,7 +382,7 @@ class TableESIndex extends ElasticSearchIndex { } if (table.getService() != null) { - tableESIndexBuilder.service(table.getService().getName()); + tableESIndexBuilder.service(table.getService()); tableESIndexBuilder.serviceType(table.getServiceType().toString()); } @@ -469,7 +474,7 @@ class TopicESIndex extends ElasticSearchIndex { .tier(parseTags.tierTag); if (topic.getService() != null) { - topicESIndexBuilder.service(topic.getService().getName()); + topicESIndexBuilder.service(topic.getService()); topicESIndexBuilder.serviceType(topic.getServiceType().toString()); } if (topic.getFollowers() != null) { @@ -559,7 +564,7 @@ class DashboardESIndex extends ElasticSearchIndex { .tier(parseTags.tierTag); if (dashboard.getService() != null) { - dashboardESIndexBuilder.service(dashboard.getService().getName()); + dashboardESIndexBuilder.service(dashboard.getService()); dashboardESIndexBuilder.serviceType(dashboard.getServiceType().toString()); } if (dashboard.getUsageSummary() != null) { @@ -642,7 +647,7 @@ class PipelineESIndex extends ElasticSearchIndex { .tier(parseTags.tierTag); if (pipeline.getService() != null) { - pipelineESIndexBuilder.service(pipeline.getService().getName()); + pipelineESIndexBuilder.service(pipeline.getService()); pipelineESIndexBuilder.serviceType(pipeline.getServiceType().toString()); } if (pipeline.getFollowers() != null) { @@ -680,10 +685,10 @@ class UserESIndex { String entityType; @JsonProperty("teams") - List teams; + List teams; @JsonProperty("roles") - List roles; + List roles; @JsonProperty("last_updated_timestamp") @Builder.Default @@ -694,21 +699,19 @@ class UserESIndex { Boolean deleted; public static UserESIndexBuilder builder(User user) { - List teams = new ArrayList<>(); - List roles = new ArrayList<>(); + List teams = new ArrayList<>(); + List roles = new ArrayList<>(); List suggest = new ArrayList<>(); suggest.add(ElasticSearchSuggest.builder().input(user.getName()).weight(5).build()); suggest.add(ElasticSearchSuggest.builder().input(user.getDisplayName()).weight(10).build()); Long updatedTimestamp = user.getUpdatedAt(); String displayName = user.getDisplayName() != null ? user.getDisplayName() : ""; if (user.getTeams() != null) { - for (EntityReference team : user.getTeams()) { - teams.add(team.getId().toString()); - } + teams.addAll(user.getTeams()); } - for (EntityReference role : user.getRoles()) { - roles.add(role.getId().toString()); + if (user.getRoles() != null) { + roles.addAll(user.getRoles()); } return internalBuilder() @@ -743,11 +746,14 @@ class TeamESIndex { String entityType; @JsonProperty("users") - List users; + List users; @JsonProperty("owns") List owns; + @JsonProperty("default_roles") + List defaultRoles; + @JsonProperty("last_updated_timestamp") @Builder.Default Long lastUpdatedTimestamp = System.currentTimeMillis(); @@ -757,7 +763,8 @@ class TeamESIndex { Boolean deleted; public static TeamESIndexBuilder builder(Team team) { - List users = new ArrayList<>(); + List users = new ArrayList<>(); + List defaultRoles = new ArrayList<>(); List owns = new ArrayList<>(); List suggest = new ArrayList<>(); suggest.add(ElasticSearchSuggest.builder().input(team.getName()).weight(5).build()); @@ -765,15 +772,16 @@ class TeamESIndex { Long updatedTimestamp = team.getUpdatedAt(); String displayName = team.getDisplayName() != null ? team.getDisplayName() : ""; if (team.getUsers() != null) { - for (EntityReference user : team.getUsers()) { - users.add(user.getId().toString()); - } + users.addAll(team.getUsers()); } if (team.getOwns() != null) { for (EntityReference own : team.getOwns()) { owns.add(own.getId().toString()); } } + if (team.getDefaultRoles() != null) { + defaultRoles.addAll(team.getDefaultRoles()); + } return internalBuilder() .teamId(team.getId().toString()) @@ -785,7 +793,8 @@ class TeamESIndex { .entityType("team") .suggest(suggest) .owns(owns) - .users(users); + .users(users) + .defaultRoles(defaultRoles); } } @@ -842,3 +851,72 @@ class GlossaryTermESIndex extends ElasticSearchIndex { .tags(parseTags.tags); } } + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class MlModelESIndex extends ElasticSearchIndex { + @JsonProperty("ml_model_id") + String mlModelId; + + @JsonProperty("display_name") + String displayName; + + @JsonProperty("entity_type") + String entityType; + + @JsonProperty("alogrithm") + String algorithm; + + @JsonProperty("ml_features") + List mlFeatures; + + @JsonProperty("ml_hyper_parameters") + List mlHyperParameters; + + public static MlModelESIndexBuilder builder(MlModel mlModel, EventType eventType) { + List tags = new ArrayList<>(); + List mlHyperParameters = new ArrayList<>(); + List mlFeatures = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(mlModel.getName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(mlModel.getDisplayName()).weight(10).build()); + + if (mlModel.getTags() != null) { + mlModel.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + if (mlModel.getMlHyperParameters() != null) { + for (MlHyperParameter mlHyperParameter : mlModel.getMlHyperParameters()) { + mlHyperParameters.add(mlHyperParameter.getName()); + } + } + + if (mlModel.getMlFeatures() != null) { + for (MlFeature mlFeature : mlModel.getMlFeatures()) { + mlFeatures.add(mlFeature.getName()); + } + } + + Long updatedTimestamp = mlModel.getUpdatedAt(); + ParseTags parseTags = new ParseTags(tags); + String description = mlModel.getDescription() != null ? mlModel.getDescription() : ""; + String displayName = mlModel.getDisplayName() != null ? mlModel.getDisplayName() : ""; + return internalBuilder() + .mlModelId(mlModel.getId().toString()) + .name(mlModel.getName()) + .displayName(displayName) + .description(description) + .fqdn(mlModel.getFullyQualifiedName()) + .algorithm(mlModel.getAlgorithm()) + .mlFeatures(mlFeatures) + .mlHyperParameters(mlHyperParameters) + .lastUpdatedTimestamp(updatedTimestamp) + .entityType("glossaryTerm") + .suggest(suggest) + .deleted(mlModel.getDeleted()) + .tags(parseTags.tags); + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index 6b1012e43ef..6d1087c4f5e 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -347,8 +347,9 @@ public class SearchResource { private SearchSourceBuilder addAggregation(SearchSourceBuilder builder) { builder .aggregation(AggregationBuilders.terms("Service").field("service_type").size(MAX_AGGREGATE_SIZE)) - .aggregation(AggregationBuilders.terms("ServiceName").field("service").size(MAX_AGGREGATE_SIZE)) - .aggregation(AggregationBuilders.terms("ServiceCategory").field("service_category").size(MAX_AGGREGATE_SIZE)) + .aggregation(AggregationBuilders.terms("ServiceName").field("service.name.keyword").size(MAX_AGGREGATE_SIZE)) + .aggregation( + AggregationBuilders.terms("ServiceCategory").field("service.type.keyword").size(MAX_AGGREGATE_SIZE)) .aggregation(AggregationBuilders.terms("EntityType").field("entity_type")) .aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tags").field("tags").size(MAX_AGGREGATE_SIZE)); diff --git a/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json index 348991314cb..8376ba76fc6 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/dashboard_index_mapping.json @@ -13,7 +13,7 @@ "fields": { "keyword": { "type": "keyword", - "ignore_above": 36 + "ignore_above": 128 } } }, @@ -72,14 +72,45 @@ "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, diff --git a/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json index 2a0abe41b86..9425ae61ce1 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json @@ -13,7 +13,7 @@ "fields": { "keyword": { "type": "keyword", - "ignore_above": 36 + "ignore_above": 128 } } }, diff --git a/catalog-rest-service/src/main/resources/elasticsearch/mlmodel_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/mlmodel_index_mapping.json new file mode 100644 index 00000000000..f4a91bb26a0 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/mlmodel_index_mapping.json @@ -0,0 +1,102 @@ +{ + "properties": { + "name": { + "type":"text" + }, + "display_name": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "algorithm": { + "type": "keyword" + }, + "ml_features": { + "type": "keyword" + }, + "ml_hyper_parameters": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "owner": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + "monthly_stats":{ + "type": "long" + }, + "monthly_percentile_rank":{ + "type": "long" + }, + "weekly_stats":{ + "type": "long" + }, + "weekly_percentile_rank":{ + "type": "long" + }, + "daily_percentile_rank": { + "type": "long" + }, + "daily_stats": { + "type": "long" + } + } +} \ No newline at end of file diff --git a/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json index 3cf17774a0a..c8174e80595 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/pipeline_index_mapping.json @@ -13,7 +13,7 @@ "fields": { "keyword": { "type": "keyword", - "ignore_above": 36 + "ignore_above": 128 } } }, @@ -72,14 +72,45 @@ "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, diff --git a/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json index 0ce6ebded36..5b0a7b14937 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/table_index_mapping.json @@ -13,7 +13,7 @@ "fields": { "keyword": { "type": "keyword", - "ignore_above": 36 + "ignore_above": 128 } } }, @@ -72,14 +72,45 @@ "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "keyword" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, diff --git a/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json index 54da92a4d20..6cd896416a3 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/team_index_mapping.json @@ -17,11 +17,82 @@ "type": "boolean" }, "users": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "owns": { "type": "keyword" }, + "default_roles": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, "suggest": { "type": "completion" } diff --git a/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json index bbb9a6c605d..8956c7dc328 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/topic_index_mapping.json @@ -13,7 +13,7 @@ "fields": { "keyword": { "type": "keyword", - "ignore_above": 36 + "ignore_above": 128 } } }, @@ -66,14 +66,45 @@ "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, diff --git a/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json index 5d831db628d..b763c12c095 100644 --- a/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json +++ b/catalog-rest-service/src/main/resources/elasticsearch/user_index_mapping.json @@ -17,10 +17,78 @@ "type": "keyword" }, "teams": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "roles": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "deleted": { "type": "boolean" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json index 387da693b82..f2fc7efd533 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json @@ -39,6 +39,11 @@ "type": "boolean", "default": "true" }, + "includeMlModels": { + "description": "Include MlModels for Indexing", + "type": "boolean", + "default": "true" + }, "includeUsers": { "description": "Include Users for Indexing", "type": "boolean", diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json index 66285e827c7..d4f6eaadbf8 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json @@ -86,6 +86,11 @@ "type": "boolean", "default": true }, + "includeMlModels": { + "description": "Include MlModels for Indexing", + "type": "boolean", + "default": true + }, "includeUsers": { "description": "Include Users for Indexing", "type": "boolean", diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index d05155e2084..0702da7dae5 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -24,12 +24,20 @@ class DeleteTable(BaseModel): table: Table -class ChangeDescription(BaseModel): - updatedBy: str - updatedAt: int - fieldsAdded: Optional[str] - fieldsDeleted: Optional[str] - fieldsUpdated: Optional[str] +class ESEntityReference(BaseModel): + """JsonSchema genereated pydantic contains many unnecessary fields its not one-to-one representation of JsonSchema + Example all the "__root__" fields. This will not index into ES elegnatly hence we are creating special class + for EntityReference + """ + + id: str + name: str + displayName: str + description: str = "" + type: str + fullyQualifiedName: str + deleted: bool + href: str class TableESDocument(BaseModel): @@ -39,9 +47,8 @@ class TableESDocument(BaseModel): deleted: bool database: str database_schema: str - service: str + service: ESEntityReference service_type: str - service_category: str entity_type: str = "table" name: str suggest: List[dict] @@ -61,7 +68,6 @@ class TableESDocument(BaseModel): tier: Optional[str] = None owner: EntityReference = None followers: List[str] - change_descriptions: Optional[List[ChangeDescription]] = None doc_as_upsert: bool = True @@ -70,9 +76,8 @@ class TopicESDocument(BaseModel): topic_id: str deleted: bool - service: str + service: ESEntityReference service_type: str - service_category: str entity_type: str = "topic" name: str suggest: List[dict] @@ -83,7 +88,6 @@ class TopicESDocument(BaseModel): tier: Optional[str] = None owner: EntityReference = None followers: List[str] - change_descriptions: Optional[List[ChangeDescription]] = None doc_as_upsert: bool = True @@ -92,9 +96,8 @@ class DashboardESDocument(BaseModel): dashboard_id: str deleted: bool - service: str + service: EntityReference service_type: str - service_category: str entity_type: str = "dashboard" name: str suggest: List[dict] @@ -105,7 +108,7 @@ class DashboardESDocument(BaseModel): tags: List[str] fqdn: str tier: Optional[str] = None - owner: EntityReference = None + owner: ESEntityReference = None followers: List[str] monthly_stats: int monthly_percentile_rank: int @@ -113,7 +116,6 @@ class DashboardESDocument(BaseModel): weekly_percentile_rank: int daily_stats: int daily_percentile_rank: int - change_descriptions: Optional[List[ChangeDescription]] = None doc_as_upsert: bool = True @@ -122,9 +124,8 @@ class PipelineESDocument(BaseModel): pipeline_id: str deleted: bool - service: str + service: ESEntityReference service_type: str - service_category: str entity_type: str = "pipeline" name: str suggest: List[dict] @@ -135,9 +136,29 @@ class PipelineESDocument(BaseModel): tags: List[str] fqdn: str tier: Optional[str] = None - owner: EntityReference = None + owner: ESEntityReference = None + followers: List[str] + doc_as_upsert: bool = True + + +class MlModelESDocument(BaseModel): + """Elastic Search Mapping doc for MlModels""" + + ml_model_id: str + deleted: bool + entity_type: str = "mlmodel" + name: str + suggest: List[dict] + description: Optional[str] = None + last_updated_timestamp: Optional[int] + algorithm: str + ml_features: List[str] + ml_hyper_parameters: List[str] + tags: List[str] + fqdn: str + tier: Optional[str] = None + owner: ESEntityReference = None followers: List[str] - change_descriptions: Optional[List[ChangeDescription]] = None doc_as_upsert: bool = True @@ -152,8 +173,8 @@ class UserESDocument(BaseModel): email: str suggest: List[dict] last_updated_timestamp: Optional[int] - teams: List[str] - roles: List[str] + teams: List[ESEntityReference] + roles: List[ESEntityReference] doc_as_upsert: bool = True @@ -167,7 +188,8 @@ class TeamESDocument(BaseModel): display_name: str suggest: List[dict] last_updated_timestamp: Optional[int] - users: List[str] + users: List[ESEntityReference] + default_roles: List[ESEntityReference] owns: List[str] doc_as_upsert: bool = True diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 8a65e2eb515..a4185107c6a 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -36,7 +36,7 @@ class ESMixin(Generic[T]): client: REST - search_from_service_url = "/search/query?q=service:{service} AND {filters}&from={from_}&size={size}&index={index}" + search_from_service_url = "/search/query?q=service.name:{service} AND {filters}&from={from_}&size={size}&index={index}" def _search_es_entity( self, entity_type: Type[T], query_string: str diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index d3fae125b45..cce6aae6b20 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -11,6 +11,7 @@ import json import ssl +import sys import traceback from datetime import datetime from typing import List, Optional @@ -24,6 +25,7 @@ from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm +from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.data.topic import Topic @@ -41,7 +43,9 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.table_metadata import ( DashboardESDocument, + ESEntityReference, GlossaryTermESDocument, + MlModelESDocument, PipelineESDocument, TableESDocument, TeamESDocument, @@ -52,6 +56,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.sink.elasticsearch_constants import ( DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, + MLMODEL_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING, TABLE_ELASTICSEARCH_INDEX_MAPPING, TEAM_ELASTICSEARCH_INDEX_MAPPING, @@ -78,6 +83,7 @@ class ElasticSearchConfig(ConfigModel): index_pipelines: Optional[bool] = True index_users: Optional[bool] = True index_teams: Optional[bool] = True + index_mlmodels: Optional[bool] = True index_glossary_terms: Optional[bool] = True table_index_name: str = "table_search_index" topic_index_name: str = "topic_search_index" @@ -86,6 +92,7 @@ class ElasticSearchConfig(ConfigModel): user_index_name: str = "user_search_index" team_index_name: str = "team_search_index" glossary_term_index_name: str = "glossary_search_index" + mlmodel_index_name: str = "mlmodel_search_index" scheme: str = "http" use_ssl: bool = False verify_certs: bool = False @@ -172,6 +179,12 @@ class ElasticsearchSink(Sink[Entity]): GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, ) + if self.config.index_mlmodels: + self._check_or_create_index( + self.config.mlmodel_index_name, + MLMODEL_ELASTICSEARCH_INDEX_MAPPING, + ) + def _check_or_create_index(self, index_name: str, es_mapping: str): """ Retrieve all indices that currently have {elasticsearch_alias} alias @@ -201,9 +214,10 @@ class ElasticsearchSink(Sink[Entity]): + "The index doesn't exist for a newly created ES. It's OK on first run." ) # create new index with mapping - self.elasticsearch_client.indices.delete( - index=index_name, request_timeout=self.config.timeout - ) + if self.elasticsearch_client.indices.exists(index=index_name): + self.elasticsearch_client.indices.delete( + index=index_name, request_timeout=self.config.timeout + ) self.elasticsearch_client.indices.create( index=index_name, body=es_mapping, request_timeout=self.config.timeout ) @@ -270,10 +284,20 @@ class ElasticsearchSink(Sink[Entity]): request_timeout=self.config.timeout, ) + if isinstance(record, MlModel): + ml_model_doc = self._create_ml_model_es_doc(record) + self.elasticsearch_client.index( + index=self.config.mlmodel_index_name, + id=str(ml_model_doc.ml_model_id), + body=ml_model_doc.json(), + request_timeout=self.config.timeout, + ) + self.status.records_written(record.name.__root__) except Exception as e: logger.error(f"Failed to index entity {record} due to {e}") - logger.debug(traceback.format_exc()) + print(traceback.format_exc()) + print(sys.exc_info()[2]) def _create_table_es_doc(self, table: Table): table_fqn = table.fullyQualifiedName.__root__ @@ -303,8 +327,19 @@ class ElasticsearchSink(Sink[Entity]): database_schema_entity = self.metadata.get_by_id( entity=DatabaseSchema, entity_id=str(table.databaseSchema.id.__root__) ) - service_entity = self.metadata.get_by_id( - entity=DatabaseService, entity_id=str(database_entity.service.id.__root__) + service_entity = ESEntityReference( + id=str(database_entity.service.id.__root__), + name=database_entity.service.name, + displayName=database_entity.service.displayName + if database_entity.service.displayName + else "", + description=database_entity.service.description.__root__ + if database_entity.service.description + else "", + type=database_entity.service.type, + fullyQualifiedName=database_entity.service.fullyQualifiedName, + deleted=database_entity.service.deleted, + href=database_entity.service.href.__root__, ) table_followers = [] if table.followers: @@ -317,13 +352,12 @@ class ElasticsearchSink(Sink[Entity]): table_id=str(table.id.__root__), deleted=table.deleted, database=str(database_entity.name.__root__), - service=service_entity.name.__root__, - service_type=service_entity.serviceType.name, - service_category="databaseService", + service=service_entity, + service_type=str(table.serviceType.name), name=table.name.__root__, suggest=suggest, database_schema=str(database_schema_entity.name.__root__), - description=str(table.description.__root__), + description=table.description.__root__ if table.description else "", table_type=table_type, last_updated_timestamp=timestamp, column_names=column_names, @@ -351,9 +385,6 @@ class ElasticsearchSink(Sink[Entity]): ] tags = set() timestamp = topic.updatedAt.__root__ - service_entity = self.metadata.get_by_id( - entity=MessagingService, entity_id=str(topic.service.id.__root__) - ) topic_followers = [] if topic.followers: for follower in topic.followers.__root__: @@ -364,16 +395,26 @@ class ElasticsearchSink(Sink[Entity]): tier = topic_tag.tagFQN.__root__ else: tags.add(topic_tag.tagFQN.__root__) - + service_entity = ESEntityReference( + id=str(topic.service.id.__root__), + name=topic.service.name, + displayName=topic.service.displayName if topic.service.displayName else "", + description=topic.service.description.__root__ + if topic.service.description + else "", + type=topic.service.type, + fullyQualifiedName=topic.service.fullyQualifiedName, + deleted=topic.service.deleted, + href=topic.service.href.__root__, + ) topic_doc = TopicESDocument( topic_id=str(topic.id.__root__), deleted=topic.deleted, - service=service_entity.name.__root__, - service_type=service_entity.serviceType.name, - service_category="messagingService", + service=service_entity, + service_type=str(topic.serviceType.name), name=topic.name.__root__, suggest=suggest, - description=topic.description.__root__, + description=topic.description.__root__ if topic.description else "", last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -388,9 +429,6 @@ class ElasticsearchSink(Sink[Entity]): suggest = [{"input": [dashboard.displayName], "weight": 10}] tags = set() timestamp = dashboard.updatedAt.__root__ - service_entity = self.metadata.get_by_id( - entity=DashboardService, entity_id=str(dashboard.service.id.__root__) - ) dashboard_followers = [] if dashboard.followers: for follower in dashboard.followers.__root__: @@ -412,17 +450,30 @@ class ElasticsearchSink(Sink[Entity]): for col_tag in chart.tags: tags.add(col_tag.tagFQN.__root__) + service_entity = ESEntityReference( + id=str(dashboard.service.id.__root__), + name=dashboard.service.name, + displayName=dashboard.service.displayName + if dashboard.service.displayName + else "", + description=dashboard.service.description.__root__ + if dashboard.service.description + else "", + type=dashboard.service.type, + fullyQualifiedName=dashboard.service.fullyQualifiedName, + deleted=dashboard.service.deleted, + href=dashboard.service.href.__root__, + ) dashboard_doc = DashboardESDocument( dashboard_id=str(dashboard.id.__root__), deleted=dashboard.deleted, - service=service_entity.name.__root__, - service_type=service_entity.serviceType.name, - service_category="dashboardService", + service=service_entity, + service_type=str(dashboard.serviceType.name), name=dashboard.displayName, chart_names=chart_names, chart_descriptions=chart_descriptions, suggest=suggest, - description=dashboard.description.__root__, + description=dashboard.description.__root__ if dashboard.description else "", last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -444,9 +495,21 @@ class ElasticsearchSink(Sink[Entity]): suggest = [{"input": [pipeline.displayName], "weight": 10}] tags = set() timestamp = pipeline.updatedAt.__root__ - service_entity = self.metadata.get_by_id( - entity=PipelineService, entity_id=str(pipeline.service.id.__root__) + service_entity = ESEntityReference( + id=str(pipeline.service.id.__root__), + name=pipeline.service.name, + displayName=pipeline.service.displayName + if pipeline.service.displayName + else "", + description=pipeline.service.description.__root__ + if pipeline.service.description + else "", + type=pipeline.service.type, + fullyQualifiedName=pipeline.service.fullyQualifiedName, + deleted=pipeline.service.deleted, + href=pipeline.service.href.__root__, ) + pipeline_followers = [] if pipeline.followers: for follower in pipeline.followers.__root__: @@ -471,14 +534,13 @@ class ElasticsearchSink(Sink[Entity]): pipeline_doc = PipelineESDocument( pipeline_id=str(pipeline.id.__root__), deleted=pipeline.deleted, - service=service_entity.name.__root__, - service_type=service_entity.serviceType.name, - service_category="pipelineService", + service=service_entity, + service_type=str(pipeline.serviceType.name), name=pipeline.displayName, task_names=task_names, task_descriptions=task_descriptions, suggest=suggest, - description=pipeline.description.__root__, + description=pipeline.description.__root__ if pipeline.description else "", last_updated_timestamp=timestamp, tier=tier, tags=list(tags), @@ -489,6 +551,49 @@ class ElasticsearchSink(Sink[Entity]): return pipeline_doc + def _create_ml_model_es_doc(self, ml_model: MlModel): + ml_model_fqn = ml_model.fullyQualifiedName.__root__ + suggest = [{"input": [ml_model.displayName], "weight": 10}] + tags = set() + timestamp = ml_model.updatedAt.__root__ + ml_model_followers = [] + ml_model_hyper_parameters = [] + ml_features = [] + if ml_model.followers: + for follower in ml_model.followers.__root__: + ml_model_followers.append(str(follower.id.__root__)) + tier = None + for ml_model_tag in ml_model.tags: + if "Tier" in ml_model_tag.tagFQN.__root__: + tier = ml_model_tag.tagFQN.__root__ + else: + tags.add(ml_model_tag.tagFQN.__root__) + + for ml_model_feature in ml_model.mlFeatures: + ml_features.append(ml_model_feature.name.__root__) + + for ml_model_hyper_parameter in ml_model.mlHyperParameters: + ml_model_hyper_parameters.append(ml_model_hyper_parameter.name) + + ml_model_doc = MlModelESDocument( + ml_model_id=str(ml_model.id.__root__), + deleted=ml_model.deleted, + name=ml_model.displayName, + algorithm=ml_model.algorithm, + ml_features=ml_features, + ml_hyper_parameters=ml_model_hyper_parameters, + suggest=suggest, + description=ml_model.description.__root__ if ml_model.description else "", + last_updated_timestamp=timestamp, + tier=tier, + tags=list(tags), + fqdn=ml_model_fqn, + owner=ml_model.owner, + followers=ml_model_followers, + ) + + return ml_model_doc + def _create_user_es_doc(self, user: User): display_name = user.displayName if user.displayName else user.name.__root__ suggest = [ @@ -500,7 +605,17 @@ class ElasticsearchSink(Sink[Entity]): roles = [] if user.teams: for team in user.teams.__root__: - teams.append(str(team.id.__root__)) + teams.append( + ESEntityReference( + id=str(team.id.__root__), + name=team.name, + displayName=team.displayName if team.displayName else "", + type=team.type, + fullyQualifiedName=team.fullyQualifiedName, + deleted=team.deleted, + href=team.href.__root__, + ) + ) if user.roles: for role in user.roles.__root__: @@ -528,14 +643,43 @@ class ElasticsearchSink(Sink[Entity]): timestamp = team.updatedAt.__root__ users = [] owns = [] + default_roles = [] if team.users: for user in team.users.__root__: - users.append(user.name) + users.append( + ESEntityReference( + id=str(user.id.__root__), + name=user.name, + displayName=user.displayName if user.displayName else "", + description=user.description.__root__ + if user.description + else "", + type=user.type, + fullyQualifiedName=user.fullyQualifiedName, + deleted=user.deleted, + href=user.href.__root__, + ) + ) if team.owns: for own in team.owns.__root__: owns.append(str(own.id.__root__)) + if team.defaultRoles: + for role in team.defaultRoles: + default_roles.append( + ESEntityReference( + id=str(role.id.__root__), + name=role.name, + displayName=role.displayName if role.displayName else "", + description=role.description.__root if role.description else "", + type=role.type, + fullyQualifiedName=role.fullyQualifiedName, + deleted=role.deleted, + href=role.href.__root__, + ) + ) + team_doc = TeamESDocument( team_id=str(team.id.__root__), deleted=team.deleted, @@ -545,6 +689,7 @@ class ElasticsearchSink(Sink[Entity]): last_updated_timestamp=timestamp, users=list(users), owns=list(owns), + default_roles=list(default_roles), ) return team_doc diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index 8a6cdc29e32..d290acedfa4 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -88,14 +88,45 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, @@ -203,14 +234,45 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, @@ -300,14 +362,45 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, @@ -415,14 +508,45 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "type": "keyword" }, "service": { - "type": "keyword" + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } }, "service_type": { "type": "keyword" }, - "service_category": { - "type": "keyword" - }, "entity_type": { "type": "keyword" }, @@ -437,76 +561,216 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( USER_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( """ - { - "mappings":{ - "properties": { - "name": { - "type":"text" - }, - "display_name": { - "type": "text" - }, - "email": { - "type": "text" - }, - "last_updated_timestamp": { - "type": "date", - "format": "epoch_second" - }, - "entity_type": { - "type": "keyword" - }, - "teams": { - "type": "keyword" - }, - "roles": { - "type": "keyword" - }, - "deleted": { - "type": "boolean" - }, - "suggest": { - "type": "completion" - } - } - } - } + { + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "display_name": { + "type": "text" + }, + "email": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "teams": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, + + "deleted": { + "type": "boolean" + }, + "suggest": { + "type": "completion" + }, + "roles": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + } + } + } + } """ ) TEAM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( """ { - "mappings":{ - "properties": { - "name": { - "type":"text" - }, - "display_name": { - "type": "text" - }, - "last_updated_timestamp": { - "type": "date", - "format": "epoch_second" - }, - "entity_type": { - "type": "keyword" - }, - "deleted": { - "type": "boolean" - }, - "users": { - "type": "keyword" - }, - "owns": { - "type": "keyword" - }, - "suggest": { - "type": "completion" - } - } - } - } + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "display_name": { + "type": "text" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "entity_type": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "users": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, + "owns": { + "type": "keyword" + }, + "default_roles": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 36 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, + "suggest": { + "type": "completion" + } + } + } + } """ ) @@ -594,3 +858,113 @@ GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( } """ ) + +MLMODEL_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( + """ + { + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "display_name": { + "type": "text" + }, + "fqdn": { + "type": "keyword" + }, + "algorithm": { + "type": "keyword" + }, + "ml_features": { + "type": "keyword" + }, + "ml_hyper_parameters": { + "type": "keyword" + }, + "deleted": { + "type": "boolean" + }, + "owner": { + "properties": { + "id": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 128 + } + } + }, + "type": { + "type": "text" + }, + "name": { + "type": "keyword", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "fullyQualifiedName": { + "type": "text" + }, + "description": { + "type": "text" + }, + "deleted": { + "type": "boolean" + }, + "href": { + "type": "text" + } + } + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + }, + "monthly_stats": { + "type": "long" + }, + "monthly_percentile_rank": { + "type": "long" + }, + "weekly_stats": { + "type": "long" + }, + "weekly_percentile_rank": { + "type": "long" + }, + "daily_percentile_rank": { + "type": "long" + }, + "daily_stats": { + "type": "long" + } + } + } + + } + """ +) diff --git a/ingestion/src/metadata/ingestion/source/metadata/metadata.py b/ingestion/src/metadata/ingestion/source/metadata/metadata.py index d67dc4549f9..e4d6df71a9a 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata/metadata.py @@ -16,6 +16,7 @@ from typing import Iterable, List from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.glossary import Glossary from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm +from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic @@ -154,6 +155,11 @@ class MetadataSource(Source[Entity]): entity_class=Pipeline, fields=["owner", "tags", "followers", "tasks"], ) + if self.service_connection.includeMlModels: + yield from self.fetch_entities( + entity_class=MlModel, + fields=["owner", "tags", "followers"], + ) if self.service_connection.includeUsers: yield from self.fetch_entities( entity_class=User,