Fix #5196: defaultRoles not present in results of team_search_index, Fix #5195: ES - Need teams as EntityReference in user_search_index, Fix #5111: Add ElasticSearch index & APIs for MlModel (#5308)

This commit is contained in:
Sriharsha Chintalapani 2022-06-05 21:36:56 -07:00 committed by GitHub
parent a056ccd9a0
commit 0119f7fc38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1229 additions and 197 deletions

View File

@ -45,6 +45,7 @@ import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Database;
import org.openmetadata.catalog.entity.data.DatabaseSchema;
import org.openmetadata.catalog.entity.data.GlossaryTerm;
import org.openmetadata.catalog.entity.data.MlModel;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
@ -127,6 +128,9 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
case Entity.PIPELINE_SERVICE:
updatePipelineService(event);
break;
case Entity.MLMODEL:
updateMlModel(event);
break;
default:
LOG.warn("Ignoring Entity Type {}", entityType);
}
@ -446,6 +450,43 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
}
}
private void updateMlModel(ChangeEvent event) throws IOException {
MlModelESIndex mlModelESIndex = null;
if (event.getEntity() != null
&& event.getEventType() != EventType.ENTITY_SOFT_DELETED
&& event.getEventType() != EventType.ENTITY_DELETED) {
MlModel mlModel = (MlModel) event.getEntity();
mlModelESIndex = MlModelESIndex.builder(mlModel, event.getEventType()).build();
}
UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString());
switch (event.getEventType()) {
case ENTITY_CREATED:
String json = JsonUtils.pojoToJson(mlModelESIndex);
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
updateElasticSearch(updateRequest);
break;
case ENTITY_UPDATED:
if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
updateRequest = applyChangeEvent(event);
} else {
scriptedUpsert(mlModelESIndex, updateRequest);
}
updateElasticSearch(updateRequest);
break;
case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest);
updateElasticSearch(updateRequest);
break;
case ENTITY_DELETED:
DeleteRequest deleteRequest =
new DeleteRequest(ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString());
deleteEntityFromElasticSearch(deleteRequest);
break;
}
}
private void updateDatabase(ChangeEvent event) throws IOException {
if (event.getEventType() == EventType.ENTITY_DELETED) {
Database database = (Database) event.getEntity();
@ -515,12 +556,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
}
private void scriptedUserUpsert(Object index, UpdateRequest updateRequest) {
String scriptTxt =
"for (k in params.keySet()) {if (k == 'teams') "
+ "{ ctx._source.teams.addAll(params.teams) } "
+ "else if (k == 'roles') "
+ " { ctx._source.roles.addAll(params.roles) }"
+ "else { ctx._source.put(k, params.get(k)) }}";
String scriptTxt = "for (k in params.keySet()) {ctx._source.put(k, params.get(k)) }";
Map<String, Object> doc = JsonUtils.getMap(index);
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt, doc);
updateRequest.script(script);
@ -528,12 +564,7 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
}
private void scriptedTeamUpsert(Object index, UpdateRequest updateRequest) {
String scriptTxt =
"for (k in params.keySet()) {if (k == 'users') "
+ "{ ctx._source.users.addAll(params.users) } "
+ "else if (k == 'owns') "
+ " { ctx._source.owns.addAll(params.owns) }"
+ "else { ctx._source.put(k, params.get(k)) }}";
String scriptTxt = "for (k in params.keySet()) { ctx._source.put(k, params.get(k)) }";
Map<String, Object> doc = JsonUtils.getMap(index);
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptTxt, doc);
updateRequest.script(script);

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.GlossaryTerm;
import org.openmetadata.catalog.entity.data.MlModel;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
@ -39,6 +40,8 @@ import org.openmetadata.catalog.entity.teams.User;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.EventType;
import org.openmetadata.catalog.type.MlFeature;
import org.openmetadata.catalog.type.MlHyperParameter;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.type.Task;
import org.openmetadata.catalog.util.FullyQualifiedName;
@ -69,7 +72,8 @@ public class ElasticSearchIndexDefinition {
PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"),
USER_SEARCH_INDEX("user_search_index", "/elasticsearch/user_index_mapping.json"),
TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"),
GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json");
GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json"),
MLMODEL_SEARCH_INDEX("mlmodel_search_index", "/elasticsearch/mlmodel_index_mapping.json");
public final String indexName;
public final String indexMappingFile;
@ -190,6 +194,8 @@ public class ElasticSearchIndexDefinition {
return ElasticSearchIndexType.TEAM_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.GLOSSARY)) {
return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.MLMODEL)) {
return ElasticSearchIndexType.MLMODEL_SEARCH_INDEX;
}
throw new RuntimeException("Failed to find index doc for type " + type);
}
@ -216,7 +222,7 @@ class ElasticSearchIndex {
String displayName;
String fqdn;
String service;
EntityReference service;
Boolean deleted;
@JsonProperty("service_type")
@ -361,7 +367,6 @@ class TableESIndex extends ElasticSearchIndex {
.fqdn(table.getFullyQualifiedName())
.suggest(suggest)
.entityType("table")
.serviceCategory("databaseService")
.columnNames(columnNames)
.columnDescriptions(columnDescriptions)
.tableType(tableType)
@ -377,7 +382,7 @@ class TableESIndex extends ElasticSearchIndex {
}
if (table.getService() != null) {
tableESIndexBuilder.service(table.getService().getName());
tableESIndexBuilder.service(table.getService());
tableESIndexBuilder.serviceType(table.getServiceType().toString());
}
@ -469,7 +474,7 @@ class TopicESIndex extends ElasticSearchIndex {
.tier(parseTags.tierTag);
if (topic.getService() != null) {
topicESIndexBuilder.service(topic.getService().getName());
topicESIndexBuilder.service(topic.getService());
topicESIndexBuilder.serviceType(topic.getServiceType().toString());
}
if (topic.getFollowers() != null) {
@ -559,7 +564,7 @@ class DashboardESIndex extends ElasticSearchIndex {
.tier(parseTags.tierTag);
if (dashboard.getService() != null) {
dashboardESIndexBuilder.service(dashboard.getService().getName());
dashboardESIndexBuilder.service(dashboard.getService());
dashboardESIndexBuilder.serviceType(dashboard.getServiceType().toString());
}
if (dashboard.getUsageSummary() != null) {
@ -642,7 +647,7 @@ class PipelineESIndex extends ElasticSearchIndex {
.tier(parseTags.tierTag);
if (pipeline.getService() != null) {
pipelineESIndexBuilder.service(pipeline.getService().getName());
pipelineESIndexBuilder.service(pipeline.getService());
pipelineESIndexBuilder.serviceType(pipeline.getServiceType().toString());
}
if (pipeline.getFollowers() != null) {
@ -680,10 +685,10 @@ class UserESIndex {
String entityType;
@JsonProperty("teams")
List<String> teams;
List<EntityReference> teams;
@JsonProperty("roles")
List<String> roles;
List<EntityReference> roles;
@JsonProperty("last_updated_timestamp")
@Builder.Default
@ -694,21 +699,19 @@ class UserESIndex {
Boolean deleted;
public static UserESIndexBuilder builder(User user) {
List<String> teams = new ArrayList<>();
List<String> roles = new ArrayList<>();
List<EntityReference> teams = new ArrayList<>();
List<EntityReference> roles = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(user.getName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(user.getDisplayName()).weight(10).build());
Long updatedTimestamp = user.getUpdatedAt();
String displayName = user.getDisplayName() != null ? user.getDisplayName() : "";
if (user.getTeams() != null) {
for (EntityReference team : user.getTeams()) {
teams.add(team.getId().toString());
}
teams.addAll(user.getTeams());
}
for (EntityReference role : user.getRoles()) {
roles.add(role.getId().toString());
if (user.getRoles() != null) {
roles.addAll(user.getRoles());
}
return internalBuilder()
@ -743,11 +746,14 @@ class TeamESIndex {
String entityType;
@JsonProperty("users")
List<String> users;
List<EntityReference> users;
@JsonProperty("owns")
List<String> owns;
@JsonProperty("default_roles")
List<EntityReference> defaultRoles;
@JsonProperty("last_updated_timestamp")
@Builder.Default
Long lastUpdatedTimestamp = System.currentTimeMillis();
@ -757,7 +763,8 @@ class TeamESIndex {
Boolean deleted;
public static TeamESIndexBuilder builder(Team team) {
List<String> users = new ArrayList<>();
List<EntityReference> users = new ArrayList<>();
List<EntityReference> defaultRoles = new ArrayList<>();
List<String> owns = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(team.getName()).weight(5).build());
@ -765,15 +772,16 @@ class TeamESIndex {
Long updatedTimestamp = team.getUpdatedAt();
String displayName = team.getDisplayName() != null ? team.getDisplayName() : "";
if (team.getUsers() != null) {
for (EntityReference user : team.getUsers()) {
users.add(user.getId().toString());
}
users.addAll(team.getUsers());
}
if (team.getOwns() != null) {
for (EntityReference own : team.getOwns()) {
owns.add(own.getId().toString());
}
}
if (team.getDefaultRoles() != null) {
defaultRoles.addAll(team.getDefaultRoles());
}
return internalBuilder()
.teamId(team.getId().toString())
@ -785,7 +793,8 @@ class TeamESIndex {
.entityType("team")
.suggest(suggest)
.owns(owns)
.users(users);
.users(users)
.defaultRoles(defaultRoles);
}
}
@ -842,3 +851,72 @@ class GlossaryTermESIndex extends ElasticSearchIndex {
.tags(parseTags.tags);
}
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class MlModelESIndex extends ElasticSearchIndex {
@JsonProperty("ml_model_id")
String mlModelId;
@JsonProperty("display_name")
String displayName;
@JsonProperty("entity_type")
String entityType;
@JsonProperty("alogrithm")
String algorithm;
@JsonProperty("ml_features")
List<String> mlFeatures;
@JsonProperty("ml_hyper_parameters")
List<String> mlHyperParameters;
public static MlModelESIndexBuilder builder(MlModel mlModel, EventType eventType) {
List<String> tags = new ArrayList<>();
List<String> mlHyperParameters = new ArrayList<>();
List<String> mlFeatures = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(mlModel.getName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(mlModel.getDisplayName()).weight(10).build());
if (mlModel.getTags() != null) {
mlModel.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (mlModel.getMlHyperParameters() != null) {
for (MlHyperParameter mlHyperParameter : mlModel.getMlHyperParameters()) {
mlHyperParameters.add(mlHyperParameter.getName());
}
}
if (mlModel.getMlFeatures() != null) {
for (MlFeature mlFeature : mlModel.getMlFeatures()) {
mlFeatures.add(mlFeature.getName());
}
}
Long updatedTimestamp = mlModel.getUpdatedAt();
ParseTags parseTags = new ParseTags(tags);
String description = mlModel.getDescription() != null ? mlModel.getDescription() : "";
String displayName = mlModel.getDisplayName() != null ? mlModel.getDisplayName() : "";
return internalBuilder()
.mlModelId(mlModel.getId().toString())
.name(mlModel.getName())
.displayName(displayName)
.description(description)
.fqdn(mlModel.getFullyQualifiedName())
.algorithm(mlModel.getAlgorithm())
.mlFeatures(mlFeatures)
.mlHyperParameters(mlHyperParameters)
.lastUpdatedTimestamp(updatedTimestamp)
.entityType("glossaryTerm")
.suggest(suggest)
.deleted(mlModel.getDeleted())
.tags(parseTags.tags);
}
}

View File

@ -347,8 +347,9 @@ public class SearchResource {
private SearchSourceBuilder addAggregation(SearchSourceBuilder builder) {
builder
.aggregation(AggregationBuilders.terms("Service").field("service_type").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("ServiceName").field("service").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("ServiceCategory").field("service_category").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("ServiceName").field("service.name.keyword").size(MAX_AGGREGATE_SIZE))
.aggregation(
AggregationBuilders.terms("ServiceCategory").field("service.type.keyword").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("EntityType").field("entity_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags").size(MAX_AGGREGATE_SIZE));

View File

@ -13,7 +13,7 @@
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"ignore_above": 128
}
}
},
@ -72,14 +72,45 @@
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},

View File

@ -13,7 +13,7 @@
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"ignore_above": 128
}
}
},

View File

@ -0,0 +1,102 @@
{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"algorithm": {
"type": "keyword"
},
"ml_features": {
"type": "keyword"
},
"ml_hyper_parameters": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"owner": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 128
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"monthly_stats":{
"type": "long"
},
"monthly_percentile_rank":{
"type": "long"
},
"weekly_stats":{
"type": "long"
},
"weekly_percentile_rank":{
"type": "long"
},
"daily_percentile_rank": {
"type": "long"
},
"daily_stats": {
"type": "long"
}
}
}

View File

@ -13,7 +13,7 @@
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"ignore_above": 128
}
}
},
@ -72,14 +72,45 @@
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},

View File

@ -13,7 +13,7 @@
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"ignore_above": 128
}
}
},
@ -72,14 +72,45 @@
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},

View File

@ -17,11 +17,82 @@
"type": "boolean"
},
"users": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"owns": {
"type": "keyword"
},
"default_roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"suggest": {
"type": "completion"
}

View File

@ -13,7 +13,7 @@
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"ignore_above": 128
}
}
},
@ -66,14 +66,45 @@
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},

View File

@ -17,10 +17,78 @@
"type": "keyword"
},
"teams": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"roles": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"deleted": {
"type": "boolean"

View File

@ -39,6 +39,11 @@
"type": "boolean",
"default": "true"
},
"includeMlModels": {
"description": "Include MlModels for Indexing",
"type": "boolean",
"default": "true"
},
"includeUsers": {
"description": "Include Users for Indexing",
"type": "boolean",

View File

@ -86,6 +86,11 @@
"type": "boolean",
"default": true
},
"includeMlModels": {
"description": "Include MlModels for Indexing",
"type": "boolean",
"default": true
},
"includeUsers": {
"description": "Include Users for Indexing",
"type": "boolean",

View File

@ -24,12 +24,20 @@ class DeleteTable(BaseModel):
table: Table
class ChangeDescription(BaseModel):
updatedBy: str
updatedAt: int
fieldsAdded: Optional[str]
fieldsDeleted: Optional[str]
fieldsUpdated: Optional[str]
class ESEntityReference(BaseModel):
"""JsonSchema genereated pydantic contains many unnecessary fields its not one-to-one representation of JsonSchema
Example all the "__root__" fields. This will not index into ES elegnatly hence we are creating special class
for EntityReference
"""
id: str
name: str
displayName: str
description: str = ""
type: str
fullyQualifiedName: str
deleted: bool
href: str
class TableESDocument(BaseModel):
@ -39,9 +47,8 @@ class TableESDocument(BaseModel):
deleted: bool
database: str
database_schema: str
service: str
service: ESEntityReference
service_type: str
service_category: str
entity_type: str = "table"
name: str
suggest: List[dict]
@ -61,7 +68,6 @@ class TableESDocument(BaseModel):
tier: Optional[str] = None
owner: EntityReference = None
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
@ -70,9 +76,8 @@ class TopicESDocument(BaseModel):
topic_id: str
deleted: bool
service: str
service: ESEntityReference
service_type: str
service_category: str
entity_type: str = "topic"
name: str
suggest: List[dict]
@ -83,7 +88,6 @@ class TopicESDocument(BaseModel):
tier: Optional[str] = None
owner: EntityReference = None
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
@ -92,9 +96,8 @@ class DashboardESDocument(BaseModel):
dashboard_id: str
deleted: bool
service: str
service: EntityReference
service_type: str
service_category: str
entity_type: str = "dashboard"
name: str
suggest: List[dict]
@ -105,7 +108,7 @@ class DashboardESDocument(BaseModel):
tags: List[str]
fqdn: str
tier: Optional[str] = None
owner: EntityReference = None
owner: ESEntityReference = None
followers: List[str]
monthly_stats: int
monthly_percentile_rank: int
@ -113,7 +116,6 @@ class DashboardESDocument(BaseModel):
weekly_percentile_rank: int
daily_stats: int
daily_percentile_rank: int
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
@ -122,9 +124,8 @@ class PipelineESDocument(BaseModel):
pipeline_id: str
deleted: bool
service: str
service: ESEntityReference
service_type: str
service_category: str
entity_type: str = "pipeline"
name: str
suggest: List[dict]
@ -135,9 +136,29 @@ class PipelineESDocument(BaseModel):
tags: List[str]
fqdn: str
tier: Optional[str] = None
owner: EntityReference = None
owner: ESEntityReference = None
followers: List[str]
doc_as_upsert: bool = True
class MlModelESDocument(BaseModel):
"""Elastic Search Mapping doc for MlModels"""
ml_model_id: str
deleted: bool
entity_type: str = "mlmodel"
name: str
suggest: List[dict]
description: Optional[str] = None
last_updated_timestamp: Optional[int]
algorithm: str
ml_features: List[str]
ml_hyper_parameters: List[str]
tags: List[str]
fqdn: str
tier: Optional[str] = None
owner: ESEntityReference = None
followers: List[str]
change_descriptions: Optional[List[ChangeDescription]] = None
doc_as_upsert: bool = True
@ -152,8 +173,8 @@ class UserESDocument(BaseModel):
email: str
suggest: List[dict]
last_updated_timestamp: Optional[int]
teams: List[str]
roles: List[str]
teams: List[ESEntityReference]
roles: List[ESEntityReference]
doc_as_upsert: bool = True
@ -167,7 +188,8 @@ class TeamESDocument(BaseModel):
display_name: str
suggest: List[dict]
last_updated_timestamp: Optional[int]
users: List[str]
users: List[ESEntityReference]
default_roles: List[ESEntityReference]
owns: List[str]
doc_as_upsert: bool = True

View File

@ -36,7 +36,7 @@ class ESMixin(Generic[T]):
client: REST
search_from_service_url = "/search/query?q=service:{service} AND {filters}&from={from_}&size={size}&index={index}"
search_from_service_url = "/search/query?q=service.name:{service} AND {filters}&from={from_}&size={size}&index={index}"
def _search_es_entity(
self, entity_type: Type[T], query_string: str

View File

@ -11,6 +11,7 @@
import json
import ssl
import sys
import traceback
from datetime import datetime
from typing import List, Optional
@ -24,6 +25,7 @@ from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.topic import Topic
@ -41,7 +43,9 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.table_metadata import (
DashboardESDocument,
ESEntityReference,
GlossaryTermESDocument,
MlModelESDocument,
PipelineESDocument,
TableESDocument,
TeamESDocument,
@ -52,6 +56,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.sink.elasticsearch_constants import (
DASHBOARD_ELASTICSEARCH_INDEX_MAPPING,
GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING,
MLMODEL_ELASTICSEARCH_INDEX_MAPPING,
PIPELINE_ELASTICSEARCH_INDEX_MAPPING,
TABLE_ELASTICSEARCH_INDEX_MAPPING,
TEAM_ELASTICSEARCH_INDEX_MAPPING,
@ -78,6 +83,7 @@ class ElasticSearchConfig(ConfigModel):
index_pipelines: Optional[bool] = True
index_users: Optional[bool] = True
index_teams: Optional[bool] = True
index_mlmodels: Optional[bool] = True
index_glossary_terms: Optional[bool] = True
table_index_name: str = "table_search_index"
topic_index_name: str = "topic_search_index"
@ -86,6 +92,7 @@ class ElasticSearchConfig(ConfigModel):
user_index_name: str = "user_search_index"
team_index_name: str = "team_search_index"
glossary_term_index_name: str = "glossary_search_index"
mlmodel_index_name: str = "mlmodel_search_index"
scheme: str = "http"
use_ssl: bool = False
verify_certs: bool = False
@ -172,6 +179,12 @@ class ElasticsearchSink(Sink[Entity]):
GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING,
)
if self.config.index_mlmodels:
self._check_or_create_index(
self.config.mlmodel_index_name,
MLMODEL_ELASTICSEARCH_INDEX_MAPPING,
)
def _check_or_create_index(self, index_name: str, es_mapping: str):
"""
Retrieve all indices that currently have {elasticsearch_alias} alias
@ -201,9 +214,10 @@ class ElasticsearchSink(Sink[Entity]):
+ "The index doesn't exist for a newly created ES. It's OK on first run."
)
# create new index with mapping
self.elasticsearch_client.indices.delete(
index=index_name, request_timeout=self.config.timeout
)
if self.elasticsearch_client.indices.exists(index=index_name):
self.elasticsearch_client.indices.delete(
index=index_name, request_timeout=self.config.timeout
)
self.elasticsearch_client.indices.create(
index=index_name, body=es_mapping, request_timeout=self.config.timeout
)
@ -270,10 +284,20 @@ class ElasticsearchSink(Sink[Entity]):
request_timeout=self.config.timeout,
)
if isinstance(record, MlModel):
ml_model_doc = self._create_ml_model_es_doc(record)
self.elasticsearch_client.index(
index=self.config.mlmodel_index_name,
id=str(ml_model_doc.ml_model_id),
body=ml_model_doc.json(),
request_timeout=self.config.timeout,
)
self.status.records_written(record.name.__root__)
except Exception as e:
logger.error(f"Failed to index entity {record} due to {e}")
logger.debug(traceback.format_exc())
print(traceback.format_exc())
print(sys.exc_info()[2])
def _create_table_es_doc(self, table: Table):
table_fqn = table.fullyQualifiedName.__root__
@ -303,8 +327,19 @@ class ElasticsearchSink(Sink[Entity]):
database_schema_entity = self.metadata.get_by_id(
entity=DatabaseSchema, entity_id=str(table.databaseSchema.id.__root__)
)
service_entity = self.metadata.get_by_id(
entity=DatabaseService, entity_id=str(database_entity.service.id.__root__)
service_entity = ESEntityReference(
id=str(database_entity.service.id.__root__),
name=database_entity.service.name,
displayName=database_entity.service.displayName
if database_entity.service.displayName
else "",
description=database_entity.service.description.__root__
if database_entity.service.description
else "",
type=database_entity.service.type,
fullyQualifiedName=database_entity.service.fullyQualifiedName,
deleted=database_entity.service.deleted,
href=database_entity.service.href.__root__,
)
table_followers = []
if table.followers:
@ -317,13 +352,12 @@ class ElasticsearchSink(Sink[Entity]):
table_id=str(table.id.__root__),
deleted=table.deleted,
database=str(database_entity.name.__root__),
service=service_entity.name.__root__,
service_type=service_entity.serviceType.name,
service_category="databaseService",
service=service_entity,
service_type=str(table.serviceType.name),
name=table.name.__root__,
suggest=suggest,
database_schema=str(database_schema_entity.name.__root__),
description=str(table.description.__root__),
description=table.description.__root__ if table.description else "",
table_type=table_type,
last_updated_timestamp=timestamp,
column_names=column_names,
@ -351,9 +385,6 @@ class ElasticsearchSink(Sink[Entity]):
]
tags = set()
timestamp = topic.updatedAt.__root__
service_entity = self.metadata.get_by_id(
entity=MessagingService, entity_id=str(topic.service.id.__root__)
)
topic_followers = []
if topic.followers:
for follower in topic.followers.__root__:
@ -364,16 +395,26 @@ class ElasticsearchSink(Sink[Entity]):
tier = topic_tag.tagFQN.__root__
else:
tags.add(topic_tag.tagFQN.__root__)
service_entity = ESEntityReference(
id=str(topic.service.id.__root__),
name=topic.service.name,
displayName=topic.service.displayName if topic.service.displayName else "",
description=topic.service.description.__root__
if topic.service.description
else "",
type=topic.service.type,
fullyQualifiedName=topic.service.fullyQualifiedName,
deleted=topic.service.deleted,
href=topic.service.href.__root__,
)
topic_doc = TopicESDocument(
topic_id=str(topic.id.__root__),
deleted=topic.deleted,
service=service_entity.name.__root__,
service_type=service_entity.serviceType.name,
service_category="messagingService",
service=service_entity,
service_type=str(topic.serviceType.name),
name=topic.name.__root__,
suggest=suggest,
description=topic.description.__root__,
description=topic.description.__root__ if topic.description else "",
last_updated_timestamp=timestamp,
tier=tier,
tags=list(tags),
@ -388,9 +429,6 @@ class ElasticsearchSink(Sink[Entity]):
suggest = [{"input": [dashboard.displayName], "weight": 10}]
tags = set()
timestamp = dashboard.updatedAt.__root__
service_entity = self.metadata.get_by_id(
entity=DashboardService, entity_id=str(dashboard.service.id.__root__)
)
dashboard_followers = []
if dashboard.followers:
for follower in dashboard.followers.__root__:
@ -412,17 +450,30 @@ class ElasticsearchSink(Sink[Entity]):
for col_tag in chart.tags:
tags.add(col_tag.tagFQN.__root__)
service_entity = ESEntityReference(
id=str(dashboard.service.id.__root__),
name=dashboard.service.name,
displayName=dashboard.service.displayName
if dashboard.service.displayName
else "",
description=dashboard.service.description.__root__
if dashboard.service.description
else "",
type=dashboard.service.type,
fullyQualifiedName=dashboard.service.fullyQualifiedName,
deleted=dashboard.service.deleted,
href=dashboard.service.href.__root__,
)
dashboard_doc = DashboardESDocument(
dashboard_id=str(dashboard.id.__root__),
deleted=dashboard.deleted,
service=service_entity.name.__root__,
service_type=service_entity.serviceType.name,
service_category="dashboardService",
service=service_entity,
service_type=str(dashboard.serviceType.name),
name=dashboard.displayName,
chart_names=chart_names,
chart_descriptions=chart_descriptions,
suggest=suggest,
description=dashboard.description.__root__,
description=dashboard.description.__root__ if dashboard.description else "",
last_updated_timestamp=timestamp,
tier=tier,
tags=list(tags),
@ -444,9 +495,21 @@ class ElasticsearchSink(Sink[Entity]):
suggest = [{"input": [pipeline.displayName], "weight": 10}]
tags = set()
timestamp = pipeline.updatedAt.__root__
service_entity = self.metadata.get_by_id(
entity=PipelineService, entity_id=str(pipeline.service.id.__root__)
service_entity = ESEntityReference(
id=str(pipeline.service.id.__root__),
name=pipeline.service.name,
displayName=pipeline.service.displayName
if pipeline.service.displayName
else "",
description=pipeline.service.description.__root__
if pipeline.service.description
else "",
type=pipeline.service.type,
fullyQualifiedName=pipeline.service.fullyQualifiedName,
deleted=pipeline.service.deleted,
href=pipeline.service.href.__root__,
)
pipeline_followers = []
if pipeline.followers:
for follower in pipeline.followers.__root__:
@ -471,14 +534,13 @@ class ElasticsearchSink(Sink[Entity]):
pipeline_doc = PipelineESDocument(
pipeline_id=str(pipeline.id.__root__),
deleted=pipeline.deleted,
service=service_entity.name.__root__,
service_type=service_entity.serviceType.name,
service_category="pipelineService",
service=service_entity,
service_type=str(pipeline.serviceType.name),
name=pipeline.displayName,
task_names=task_names,
task_descriptions=task_descriptions,
suggest=suggest,
description=pipeline.description.__root__,
description=pipeline.description.__root__ if pipeline.description else "",
last_updated_timestamp=timestamp,
tier=tier,
tags=list(tags),
@ -489,6 +551,49 @@ class ElasticsearchSink(Sink[Entity]):
return pipeline_doc
def _create_ml_model_es_doc(self, ml_model: MlModel):
ml_model_fqn = ml_model.fullyQualifiedName.__root__
suggest = [{"input": [ml_model.displayName], "weight": 10}]
tags = set()
timestamp = ml_model.updatedAt.__root__
ml_model_followers = []
ml_model_hyper_parameters = []
ml_features = []
if ml_model.followers:
for follower in ml_model.followers.__root__:
ml_model_followers.append(str(follower.id.__root__))
tier = None
for ml_model_tag in ml_model.tags:
if "Tier" in ml_model_tag.tagFQN.__root__:
tier = ml_model_tag.tagFQN.__root__
else:
tags.add(ml_model_tag.tagFQN.__root__)
for ml_model_feature in ml_model.mlFeatures:
ml_features.append(ml_model_feature.name.__root__)
for ml_model_hyper_parameter in ml_model.mlHyperParameters:
ml_model_hyper_parameters.append(ml_model_hyper_parameter.name)
ml_model_doc = MlModelESDocument(
ml_model_id=str(ml_model.id.__root__),
deleted=ml_model.deleted,
name=ml_model.displayName,
algorithm=ml_model.algorithm,
ml_features=ml_features,
ml_hyper_parameters=ml_model_hyper_parameters,
suggest=suggest,
description=ml_model.description.__root__ if ml_model.description else "",
last_updated_timestamp=timestamp,
tier=tier,
tags=list(tags),
fqdn=ml_model_fqn,
owner=ml_model.owner,
followers=ml_model_followers,
)
return ml_model_doc
def _create_user_es_doc(self, user: User):
display_name = user.displayName if user.displayName else user.name.__root__
suggest = [
@ -500,7 +605,17 @@ class ElasticsearchSink(Sink[Entity]):
roles = []
if user.teams:
for team in user.teams.__root__:
teams.append(str(team.id.__root__))
teams.append(
ESEntityReference(
id=str(team.id.__root__),
name=team.name,
displayName=team.displayName if team.displayName else "",
type=team.type,
fullyQualifiedName=team.fullyQualifiedName,
deleted=team.deleted,
href=team.href.__root__,
)
)
if user.roles:
for role in user.roles.__root__:
@ -528,14 +643,43 @@ class ElasticsearchSink(Sink[Entity]):
timestamp = team.updatedAt.__root__
users = []
owns = []
default_roles = []
if team.users:
for user in team.users.__root__:
users.append(user.name)
users.append(
ESEntityReference(
id=str(user.id.__root__),
name=user.name,
displayName=user.displayName if user.displayName else "",
description=user.description.__root__
if user.description
else "",
type=user.type,
fullyQualifiedName=user.fullyQualifiedName,
deleted=user.deleted,
href=user.href.__root__,
)
)
if team.owns:
for own in team.owns.__root__:
owns.append(str(own.id.__root__))
if team.defaultRoles:
for role in team.defaultRoles:
default_roles.append(
ESEntityReference(
id=str(role.id.__root__),
name=role.name,
displayName=role.displayName if role.displayName else "",
description=role.description.__root if role.description else "",
type=role.type,
fullyQualifiedName=role.fullyQualifiedName,
deleted=role.deleted,
href=role.href.__root__,
)
)
team_doc = TeamESDocument(
team_id=str(team.id.__root__),
deleted=team.deleted,
@ -545,6 +689,7 @@ class ElasticsearchSink(Sink[Entity]):
last_updated_timestamp=timestamp,
users=list(users),
owns=list(owns),
default_roles=list(default_roles),
)
return team_doc

View File

@ -88,14 +88,45 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
@ -203,14 +234,45 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
@ -300,14 +362,45 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
@ -415,14 +508,45 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "keyword"
},
"service": {
"type": "keyword"
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
@ -437,76 +561,216 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
USER_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"email": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"teams": {
"type": "keyword"
},
"roles": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"suggest": {
"type": "completion"
}
}
}
}
{
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"email": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"teams": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"deleted": {
"type": "boolean"
},
"suggest": {
"type": "completion"
},
"roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
}
}
}
}
"""
)
TEAM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"users": {
"type": "keyword"
},
"owns": {
"type": "keyword"
},
"suggest": {
"type": "completion"
}
}
}
}
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"owns": {
"type": "keyword"
},
"default_roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"suggest": {
"type": "completion"
}
}
}
}
"""
)
@ -594,3 +858,113 @@ GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
}
"""
)
MLMODEL_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"fqdn": {
"type": "keyword"
},
"algorithm": {
"type": "keyword"
},
"ml_features": {
"type": "keyword"
},
"ml_hyper_parameters": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"owner": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 128
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"monthly_stats": {
"type": "long"
},
"monthly_percentile_rank": {
"type": "long"
},
"weekly_stats": {
"type": "long"
},
"weekly_percentile_rank": {
"type": "long"
},
"daily_percentile_rank": {
"type": "long"
},
"daily_stats": {
"type": "long"
}
}
}
}
"""
)

View File

@ -16,6 +16,7 @@ from typing import Iterable, List
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
@ -154,6 +155,11 @@ class MetadataSource(Source[Entity]):
entity_class=Pipeline,
fields=["owner", "tags", "followers", "tasks"],
)
if self.service_connection.includeMlModels:
yield from self.fetch_entities(
entity_class=MlModel,
fields=["owner", "tags", "followers"],
)
if self.service_connection.includeUsers:
yield from self.fetch_entities(
entity_class=User,