Update query task for activity feed (#10895)

* solved change event query update for activity feed

* working on url config

* testing

* fix bug in slack alerts

* fixing alerts url

* fixed space issue url issue and edited the testcases for changeeventparser

* fixed email markers bug

* default value added

* removed my gmail config

* fix

* addressing comments

---------

Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com>
This commit is contained in:
07Himank 2023-04-04 17:02:08 +05:30 committed by GitHub
parent f74a190e1f
commit 4eb1ec81e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 157 additions and 37 deletions

View File

@ -306,3 +306,6 @@ applicationConfig:
accessBlockTime: ${OM_LOGIN_ACCESS_BLOCK_TIME:-600}
jwtTokenExpiryTime: ${OM_JWT_EXPIRY_TIME:-3600}
changeEventConfig:
omUri: ${OM_URI:- "http://localhost:8585"} #openmetadata in om uri for eg http://loclhost:8585

View File

@ -0,0 +1,20 @@
package org.openmetadata.service;
import org.openmetadata.api.configuration.ChangeEventConfiguration;
public class ChangeEventConfig {
private static ChangeEventConfiguration INSTANCE;
private static volatile boolean INITIALIZED = false;
public static void initialize(OpenMetadataApplicationConfig config) {
if (!INITIALIZED) {
INSTANCE = config.getChangeEventConfiguration();
INITIALIZED = true;
}
}
public static ChangeEventConfiguration getInstance() {
return INSTANCE;
}
}

View File

@ -120,6 +120,8 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
// init email Util for handling
EmailUtil.initialize(catalogConfig);
ChangeEventConfig.initialize(catalogConfig);
final Jdbi jdbi = createAndSetupJDBI(environment, catalogConfig.getDataSourceFactory());
// init Secret Manager

View File

@ -23,6 +23,7 @@ import javax.validation.constraints.NotNull;
import lombok.Getter;
import lombok.Setter;
import org.openmetadata.api.configuration.ApplicationConfiguration;
import org.openmetadata.api.configuration.ChangeEventConfiguration;
import org.openmetadata.schema.api.configuration.events.EventHandlerConfiguration;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.api.fernet.FernetConfiguration;
@ -95,6 +96,9 @@ public class OpenMetadataApplicationConfig extends Configuration {
@JsonProperty("email")
private SmtpSettings smtpSettings;
@JsonProperty("changeEventConfig")
private ChangeEventConfiguration changeEventConfiguration;
@Override
public String toString() {
return "catalogConfig{"

View File

@ -8,6 +8,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.SneakyThrows;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.data.Query;
@ -120,8 +121,8 @@ public class QueryRepository extends EntityRepository<Query> {
return new QueryUpdater(original, updated, operation);
}
public RestUtil.PutResponse<?> addQueryUsage(String updatedBy, UUID queryId, List<EntityReference> entityIds)
throws IOException {
public RestUtil.PutResponse<?> addQueryUsage(
UriInfo uriInfo, String updatedBy, UUID queryId, List<EntityReference> entityIds) throws IOException {
Query query = Entity.getEntity(Entity.QUERY, queryId, "queryUsedIn", Include.NON_DELETED);
List<EntityReference> oldValue = query.getQueryUsedIn();
// Create Relationships
@ -131,12 +132,14 @@ public class QueryRepository extends EntityRepository<Query> {
// Populate Fields
setFieldsInternal(query, new EntityUtil.Fields(allowedFields, "queryUsedIn"));
ChangeEvent changeEvent = getQueryChangeEvent(updatedBy, "queryUsedIn", oldValue, query.getQueryUsedIn(), query);
Entity.withHref(uriInfo, query.getQueryUsedIn());
ChangeEvent changeEvent =
getQueryChangeEvent(updatedBy, "queryUsedIn", oldValue, query.getQueryUsedIn(), withHref(uriInfo, query));
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
public RestUtil.PutResponse<?> removeQueryUsedIn(String updatedBy, UUID queryId, List<EntityReference> entityIds)
throws IOException {
public RestUtil.PutResponse<?> removeQueryUsedIn(
UriInfo uriInfo, String updatedBy, UUID queryId, List<EntityReference> entityIds) throws IOException {
Query query = Entity.getEntity(Entity.QUERY, queryId, "queryUsedIn", Include.NON_DELETED);
List<EntityReference> oldValue = query.getQueryUsedIn();
@ -146,7 +149,9 @@ public class QueryRepository extends EntityRepository<Query> {
// Populate Fields
setFieldsInternal(query, new EntityUtil.Fields(allowedFields, "queryUsedIn"));
ChangeEvent changeEvent = getQueryChangeEvent(updatedBy, "queryUsedIn", oldValue, query.getQueryUsedIn(), query);
Entity.withHref(uriInfo, query.getQueryUsedIn());
ChangeEvent changeEvent =
getQueryChangeEvent(updatedBy, "queryUsedIn", oldValue, query.getQueryUsedIn(), withHref(uriInfo, query));
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}

View File

@ -408,7 +408,7 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
throws IOException {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
return dao.addQueryUsage(securityContext.getUserPrincipal().getName(), id, entityIds).toResponse();
return dao.addQueryUsage(uriInfo, securityContext.getUserPrincipal().getName(), id, entityIds).toResponse();
}
@DELETE
@ -431,7 +431,7 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
throws IOException {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
return dao.removeQueryUsedIn(securityContext.getUserPrincipal().getName(), id, entityIds).toResponse();
return dao.removeQueryUsedIn(uriInfo, securityContext.getUserPrincipal().getName(), id, entityIds).toResponse();
}
@PUT

View File

@ -21,7 +21,6 @@ import static org.openmetadata.service.Entity.INGESTION_PIPELINE;
import static org.openmetadata.service.Entity.KPI;
import static org.openmetadata.service.Entity.TEST_CASE;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
@ -32,7 +31,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -48,6 +46,7 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.dataInsight.kpi.Kpi;
import org.openmetadata.schema.dataInsight.type.KpiResult;
import org.openmetadata.schema.dataInsight.type.KpiTarget;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.type.TestCaseResult;
@ -55,6 +54,7 @@ import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.service.ChangeEventConfig;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.subscription.emailAlert.EmailMessage;
import org.openmetadata.service.events.subscription.gchat.GChatMessage;
@ -74,6 +74,8 @@ public final class ChangeEventParser {
public static final String FEED_LINE_BREAK = " <br/> ";
public static final String SLACK_LINE_BREAK = "\n";
private static volatile boolean INITIALIZED = false;
private ChangeEventParser() {}
public enum CHANGE_TYPE {
@ -99,6 +101,7 @@ public final class ChangeEventParser {
case SLACK:
return SLACK_BOLD;
case GCHAT:
case EMAIL:
return "<b>%s</b>";
default:
return "INVALID";
@ -110,6 +113,7 @@ public final class ChangeEventParser {
case FEED:
case TEAMS:
case GCHAT:
case EMAIL:
// TEAMS, GCHAT, FEED linebreak formatting are same
return FEED_LINE_BREAK;
case SLACK:
@ -128,6 +132,7 @@ public final class ChangeEventParser {
case SLACK:
return "*";
case GCHAT:
case EMAIL:
return "<b>";
default:
return "INVALID";
@ -143,6 +148,7 @@ public final class ChangeEventParser {
case SLACK:
return "*";
case GCHAT:
case EMAIL:
return "</b>";
default:
return "INVALID";
@ -158,6 +164,7 @@ public final class ChangeEventParser {
case SLACK:
return "~";
case GCHAT:
case EMAIL:
return "<s>";
default:
return "INVALID";
@ -173,6 +180,7 @@ public final class ChangeEventParser {
case SLACK:
return "~";
case GCHAT:
case EMAIL:
return "</s>";
default:
return "INVALID";
@ -183,7 +191,6 @@ public final class ChangeEventParser {
String fqn;
String entityType;
EntityInterface entity = (EntityInterface) event.getEntity();
URI urlInstance = entity.getHref();
if (entity instanceof TestCase) {
fqn = ((TestCase) entity).getTestSuite().getFullyQualifiedName();
entityType = "test-suites";
@ -191,17 +198,18 @@ public final class ChangeEventParser {
fqn = event.getEntityFullyQualifiedName();
entityType = event.getEntityType();
}
if (Objects.nonNull(urlInstance)) {
String scheme = urlInstance.getScheme();
String host = urlInstance.getHost();
if (publishTo == PUBLISH_TO.SLACK || publishTo == PUBLISH_TO.GCHAT) {
return String.format("<%s://%s/%s/%s|%s>", scheme, host, entityType, fqn, fqn);
} else if (publishTo == PUBLISH_TO.TEAMS) {
return String.format("[%s](%s://%s/%s/%s)", fqn, scheme, host, entityType, fqn);
} else if (publishTo == PUBLISH_TO.EMAIL) {
return String.format("%s://%s/%s/%s", scheme, host, entityType, fqn);
}
if (publishTo == PUBLISH_TO.SLACK || publishTo == PUBLISH_TO.GCHAT) {
return String.format(
"<%s/%s/%s|%s>", ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim(), fqn.trim());
} else if (publishTo == PUBLISH_TO.TEAMS) {
return String.format(
"[%s](/%s/%s)", fqn.trim(), ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim());
} else if (publishTo == PUBLISH_TO.EMAIL) {
return String.format(
"<a href = '%s/%s/%s'>%s</a>",
ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim(), fqn.trim());
}
// }
return "";
}
@ -215,8 +223,15 @@ public final class ChangeEventParser {
} else {
eventType = event.getEntityType();
}
String headerTxt = "%s posted on " + eventType + " %s";
String headerText = String.format(headerTxt, event.getUserName(), getEntityUrl(PUBLISH_TO.SLACK, event));
String headerTxt;
String headerText;
if (eventType.equals(Entity.QUERY)) {
headerTxt = "%s posted on " + eventType;
headerText = String.format(headerTxt, event.getUserName());
} else {
headerTxt = "%s posted on " + eventType + " %s";
headerText = String.format(headerTxt, event.getUserName(), getEntityUrl(PUBLISH_TO.SLACK, event));
}
slackMessage.setText(headerText);
}
Map<EntityLink, String> messages =
@ -239,10 +254,14 @@ public final class ChangeEventParser {
emailMessage.setUserName(event.getUserName());
if (event.getEntity() != null) {
emailMessage.setUpdatedBy(event.getUserName());
emailMessage.setEntityUrl(getEntityUrl(PUBLISH_TO.EMAIL, event));
if (event.getEntityType().equals(Entity.QUERY)) {
emailMessage.setEntityUrl(Entity.QUERY);
} else {
emailMessage.setEntityUrl(getEntityUrl(PUBLISH_TO.EMAIL, event));
}
}
Map<EntityLink, String> messages =
getFormattedMessages(PUBLISH_TO.SLACK, event.getChangeDescription(), (EntityInterface) event.getEntity());
getFormattedMessages(PUBLISH_TO.EMAIL, event.getChangeDescription(), (EntityInterface) event.getEntity());
List<String> changeMessage = new ArrayList<>();
for (Entry<EntityLink, String> entry : messages.entrySet()) {
changeMessage.add(entry.getValue());
@ -333,9 +352,18 @@ public final class ChangeEventParser {
for (FieldChange field : fields) {
// if field name has dots, then it is an array field
String fieldName = field.getName();
String newFieldValue = getFieldValue(field.getNewValue());
String oldFieldValue = getFieldValue(field.getOldValue());
String newFieldValue;
String oldFieldValue;
EntityLink link = getEntityLink(fieldName, entity);
if (entity.getEntityReference().getType().equals(Entity.QUERY) && fieldName.equals("queryUsedIn")) {
String message =
handleQueryUsage(field.getNewValue(), field.getOldValue(), entity, publishTo, changeType, link);
messages.put(link, message);
return messages;
} else {
newFieldValue = getFieldValue(field.getNewValue());
oldFieldValue = getFieldValue(field.getOldValue());
}
if (link.getEntityType().equals(TEST_CASE) && link.getFieldName().equals("testCaseResult")) {
String message = handleTestCaseResult(publishTo, entity, link, field.getOldValue(), field.getNewValue());
messages.put(link, message);
@ -359,7 +387,6 @@ public final class ChangeEventParser {
return StringUtils.EMPTY;
}
try {
// Check if field value is a json string
JsonValue json = JsonUtils.readJson(fieldValue.toString());
if (json.getValueType() == ValueType.ARRAY) {
JsonArray jsonArray = json.asJsonArray();
@ -402,6 +429,51 @@ public final class ChangeEventParser {
return fieldValue.toString();
}
private static String handleQueryUsage(
Object newValue,
Object oldValue,
EntityInterface entity,
PUBLISH_TO publishTo,
CHANGE_TYPE changeType,
EntityLink link) {
String fieldName = "queryUsage";
String newVal = getFieldValueForQuery(newValue, entity, publishTo);
String oldVal = getFieldValueForQuery(oldValue, entity, publishTo);
String message = createMessageForField(publishTo, link, changeType, fieldName, oldVal, newVal);
return message;
}
private static String getFieldValueForQuery(Object fieldValue, EntityInterface entity, PUBLISH_TO publishTo) {
Query query = (Query) entity;
StringBuilder field = new StringBuilder();
List<EntityReference> queryUsedIn = (List<EntityReference>) fieldValue;
field.append("for ").append("'" + query.getQuery() + "'").append(", ").append(getLineBreak(publishTo));
field.append("Query Used in :- ");
int i = 1;
for (EntityReference queryUsage : queryUsedIn) {
field.append(getQueryUsageUrl(publishTo, queryUsage.getFullyQualifiedName(), queryUsage.getType()));
if (i < queryUsedIn.size()) {
field.append(", ");
}
i++;
}
return field.toString();
}
private static String getQueryUsageUrl(PUBLISH_TO publishTo, String fqn, String entityType) {
if (publishTo == PUBLISH_TO.SLACK || publishTo == PUBLISH_TO.GCHAT) {
return String.format(
"<%s/%s/%s|%s>", ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim(), fqn.trim());
} else if (publishTo == PUBLISH_TO.TEAMS) {
return String.format("[%s](/%s/%s)", fqn, ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim());
} else if (publishTo == PUBLISH_TO.EMAIL) {
return String.format(
"<a href = '%s/%s/%s'>%s</a>",
ChangeEventConfig.getInstance().getOmUri(), entityType, fqn.trim(), fqn.trim());
}
return String.format("[%s](/%s/%s)", fqn, entityType, fqn.trim());
}
/** Tries to merge additions and deletions into updates and returns a map of formatted messages. */
private static Map<EntityLink, String> mergeAddtionsDeletion(
PUBLISH_TO publishTo, EntityInterface entity, List<FieldChange> addedFields, List<FieldChange> deletedFields) {
@ -669,13 +741,13 @@ public final class ChangeEventParser {
for (DiffMatchPatch.Diff d : diffs) {
if (DiffMatchPatch.Operation.EQUAL.equals(d.operation)) {
// merging equal values of both string
outputStr.append(d.text.trim());
outputStr.append(d.text.trim()).append(" ");
} else if (DiffMatchPatch.Operation.INSERT.equals(d.operation)) {
// merging added values with addMarker before and after of new values added
outputStr.append(addMarker).append(d.text.trim()).append(addMarker).append(" ");
} else {
// merging deleted values with removeMarker before and after of old value removed ..
outputStr.append(" ").append(removeMarker).append(d.text.trim()).append(removeMarker).append(" ");
outputStr.append(removeMarker).append(d.text.trim()).append(removeMarker).append(" ");
}
}
String diff = outputStr.toString().trim();

View File

@ -49,7 +49,7 @@
<br />
<br />
<span style="font-weight: 400;">
<strong>${changeMessage} </span>
${changeMessage} </span>
<span style="font-weight: 400;">
<p class="tw-signoff" style="margin: 45px 0 5px; font-size: 16px; mso-line-height-rule: exactly; line-height: 24px;"> Happy Exploring! <br />Thanks </p>
</td>

View File

@ -179,7 +179,7 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
assertEquals(1, messages.size());
assertEquals(
"Updated **columns.lo_orderpriority**: <br/> name: \"lo_order<span class=\"diff-added\">priority</span> \" <br/> displayName: \"lo_order<span class=\"diff-added\">priority</span> \" <br/> fullyQualifiedName: \"local_mysql.sample_db.lineorder.lo_order<span class=\"diff-added\">priority</span> \"",
"Updated **columns.lo_orderpriority**: <br/> name: \"lo_order <span class=\"diff-added\">priority</span> \" <br/> displayName: \"lo_order <span class=\"diff-added\">priority</span> \" <br/> fullyQualifiedName: \"local_mysql.sample_db.lineorder.lo_order <span class=\"diff-added\">priority</span> \"",
messages.values().iterator().next());
// Simulate a change of datatype change in column
@ -215,7 +215,7 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
assertEquals(1, messages.size());
assertEquals(
"Updated **columns** : lo_orderpriority<span class=\"diff-added\">, newColumn</span>",
"Updated **columns** : lo_orderpriority <span class=\"diff-added\">, newColumn</span>",
messages.values().iterator().next());
}
@ -239,9 +239,9 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
assertEquals(
"Updated *columns.lo_orderpriority*:\n"
+ "name: \"lo_order*priority* \"\n"
+ "displayName: \"lo_order*priority* \"\n"
+ "fullyQualifiedName: \"local_mysql.sample_db.lineorder.lo_order*priority* \"",
+ "name: \"lo_order *priority* \"\n"
+ "displayName: \"lo_order *priority* \"\n"
+ "fullyQualifiedName: \"local_mysql.sample_db.lineorder.lo_order *priority* \"",
messages.values().iterator().next());
// Simulate a change of datatype change in column
@ -278,6 +278,6 @@ class ChangeEventParserResourceTest extends OpenMetadataApplicationTest {
messages = ChangeEventParser.getFormattedMessages(ChangeEventParser.PUBLISH_TO.SLACK, changeDescription, TABLE);
assertEquals(1, messages.size());
assertEquals("Updated *columns* : lo_orderpriority*, newColumn*", messages.values().iterator().next());
assertEquals("Updated *columns* : lo_orderpriority *, newColumn*", messages.values().iterator().next());
}
}

View File

@ -0,0 +1,14 @@
{
"$id": "https://open-metadata.org/schema/entity/configuration/changeEventConfiguration.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ChangeEventConfiguration",
"description": "This schema defines the ChangeEvent Configuration.",
"type": "object",
"javaType": "org.openmetadata.api.configuration.ChangeEventConfiguration",
"properties": {
"omUri" : {
"type": "string"
}
},
"additionalProperties": false
}