Fix #681: Add support for Pipeline Entity handling in eventHandlers (#682)

This commit is contained in:
Sriharsha Chintalapani 2021-10-05 17:59:30 -07:00 committed by GitHub
parent 2b263041d9
commit bd8b5b56bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 3 deletions

View File

@ -27,6 +27,7 @@ import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
import org.openmetadata.catalog.type.Column;
@ -73,15 +74,19 @@ public class ElasticSearchEventHandler implements EventHandler {
if (responseContext.getEntity() != null) {
Object entity = responseContext.getEntity();
UpdateRequest updateRequest = null;
if (entity.getClass().toString().toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
String entityClass = entity.getClass().toString();
if (entityClass.toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
Table instance = (Table) entity;
updateRequest = updateTable(instance);
} else if (entity.getClass().toString().toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) {
} else if (entityClass.toLowerCase().endsWith(Entity.DASHBOARD.toLowerCase())) {
Dashboard instance = (Dashboard) entity;
updateRequest = updateDashboard(instance);
} else if (entity.getClass().toString().toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) {
} else if (entityClass.toLowerCase().endsWith(Entity.TOPIC.toLowerCase())) {
Topic instance = (Topic) entity;
updateRequest = updateTopic(instance);
} else if (entityClass.toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) {
Pipeline instance = (Pipeline) entity;
updateRequest = updatePipeline(instance);
}
if (updateRequest != null) {
client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);
@ -222,6 +227,46 @@ public class ElasticSearchEventHandler implements EventHandler {
return updateRequest;
}
private UpdateRequest updatePipeline(Pipeline instance) {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>(tags);
String tierTag = null;
for (String tag: tagsList) {
if (tag.toLowerCase().matches("(.*)tier(.*)")) {
tierTag = tag;
break;
}
}
if (tierTag != null) {
tagsList.remove(tierTag);
jsonMap.put("tier", tierTag);
}
jsonMap.put("tags", tagsList);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("pipeline_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
return updateRequest;
}
public void close() {
try {
this.client.close();

View File

@ -21,6 +21,7 @@ import org.openmetadata.catalog.entity.data.Chart;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.Database;
import org.openmetadata.catalog.entity.data.Metrics;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Report;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Task;
@ -377,12 +378,18 @@ public final class EntityUtil {
} else if (clazz.toString().toLowerCase().endsWith(Entity.TASK.toLowerCase())) {
Task instance = (Task) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.PIPELINE.toLowerCase())) {
Pipeline instance = (Pipeline) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.MESSAGING_SERVICE.toLowerCase())) {
MessagingService instance = (MessagingService) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.DASHBOARD_SERVICE.toLowerCase())) {
DashboardService instance = (DashboardService) entity;
return getEntityReference(instance);
} else if (clazz.toString().toLowerCase().endsWith(Entity.PIPELINE_SERVICE.toLowerCase())) {
PipelineService instance = (PipelineService) entity;
return getEntityReference(instance);
}
throw EntityNotFoundException.byMessage(CatalogExceptionMessage.entityTypeNotFound(
String.format("Failed to find entity class %s", clazz.toString())));
@ -451,6 +458,11 @@ public final class EntityUtil {
return details;
}
public static EntityReference getEntityReference(Pipeline pipeline) {
return new EntityReference().withDescription(pipeline.getDescription()).withId(pipeline.getId())
.withName(pipeline.getFullyQualifiedName()).withType(Entity.PIPELINE);
}
public static EntityReference getEntityReference(Task task) {
return new EntityReference().withDescription(task.getDescription()).withId(task.getId())
.withName(task.getFullyQualifiedName()).withType(Entity.TASK);