[InsightReport] DataInsight Manual trigger Job (#11538)

* Add Manual Trigger for Insight Reports

* unrequ change

* add time diff comment
This commit is contained in:
Mohit Yadav 2023-05-10 23:49:50 +05:30 committed by GitHub
parent c027799928
commit 5bbd49cc60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 5 deletions

View File

@ -31,7 +31,6 @@ import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -54,6 +53,7 @@ import org.openmetadata.schema.dataInsight.type.PercentageOfEntitiesWithOwnerByT
import org.openmetadata.schema.dataInsight.type.TotalEntitiesByTier; import org.openmetadata.schema.dataInsight.type.TotalEntitiesByTier;
import org.openmetadata.schema.dataInsight.type.TotalEntitiesByType; import org.openmetadata.schema.dataInsight.type.TotalEntitiesByType;
import org.openmetadata.schema.entity.events.EventSubscription; import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.TriggerConfig;
import org.openmetadata.schema.entity.teams.Team; import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
@ -81,10 +81,9 @@ public class DataInsightsReportJob implements Job {
(RestHighLevelClient) jobExecutionContext.getJobDetail().getJobDataMap().get(ES_REST_CLIENT); (RestHighLevelClient) jobExecutionContext.getJobDetail().getJobDataMap().get(ES_REST_CLIENT);
EventSubscription dataReport = EventSubscription dataReport =
(EventSubscription) jobExecutionContext.getJobDetail().getJobDataMap().get(EVENT_SUBSCRIPTION); (EventSubscription) jobExecutionContext.getJobDetail().getJobDataMap().get(EVENT_SUBSCRIPTION);
Date nextFireTime = jobExecutionContext.getTrigger().getNextFireTime(); // Calculate time diff
Long currentTime = Instant.now().toEpochMilli(); long currentTime = Instant.now().toEpochMilli();
Long timeDifference = nextFireTime.getTime() - currentTime; long scheduleTime = currentTime - getTimeFromSchedule(dataReport.getTrigger());
Long scheduleTime = currentTime - timeDifference;
int numberOfDaysChange = getNumberOfDays(dataReport.getTrigger()); int numberOfDaysChange = getNumberOfDays(dataReport.getTrigger());
try { try {
sendReportsToTeams(repository, client, scheduleTime, currentTime, numberOfDaysChange); sendReportsToTeams(repository, client, scheduleTime, currentTime, numberOfDaysChange);
@ -507,4 +506,19 @@ public class DataInsightsReportJob implements Job {
} }
return dateWithDataMap; return dateWithDataMap;
} }
private long getTimeFromSchedule(TriggerConfig config) {
if (config.getTriggerType() == TriggerConfig.TriggerType.SCHEDULED) {
TriggerConfig.ScheduleInfo scheduleInfo = config.getScheduleInfo();
switch (scheduleInfo) {
case DAILY:
return 86400000L;
case WEEKLY:
return 604800000L;
case MONTHLY:
return 2592000000L;
}
}
throw new IllegalArgumentException("Invalid Trigger Type, Can only be Scheduled.");
}
} }

View File

@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.schema.entity.events.EventSubscription; import org.openmetadata.schema.entity.events.EventSubscription;
@ -130,4 +132,17 @@ public class ReportsHandler {
INSTANCE.reportScheduler.shutdown(); INSTANCE.reportScheduler.shutdown();
} }
} }
public Response triggerExistingDataInsightJob(EventSubscription dataReport) throws SchedulerException {
JobDetail jobDetail = getJobKey(dataReport.getId());
if (jobDetail != null) {
JobDataMap dataMap = new JobDataMap();
dataMap.put(JOB_CONTEXT_CHART_REPO, this.chartRepository);
dataMap.put(ES_REST_CLIENT, restHighLevelClient);
dataMap.put(EVENT_SUBSCRIPTION, dataReport);
reportScheduler.triggerJob(jobDetail.getKey(), dataMap);
return Response.status(Response.Status.OK).entity("Job Triggered Successfully.").build();
}
throw new BadRequestException("Job with given Id does not exist");
}
} }

View File

@ -146,4 +146,12 @@ public class DataInsightDescriptionAndOwnerTemplate {
public void setTierMap(Map<String, Double> tierMap) { public void setTierMap(Map<String, Double> tierMap) {
this.tierMap = tierMap; this.tierMap = tierMap;
} }
public int getNumberOfDaysChange() {
return numberOfDaysChange;
}
public void setNumberOfDaysChange(int numberOfDaysChange) {
this.numberOfDaysChange = numberOfDaysChange;
}
} }

View File

@ -17,6 +17,7 @@ public class DataInsightTotalAssetTemplate {
private Double totalDataAssets; private Double totalDataAssets;
private Double percentChangeTotalAssets; private Double percentChangeTotalAssets;
private String completeMessage; private String completeMessage;
private int numberOfDaysChange; private int numberOfDaysChange;
public DataInsightTotalAssetTemplate( public DataInsightTotalAssetTemplate(
@ -57,4 +58,12 @@ public class DataInsightTotalAssetTemplate {
public void setCompleteMessage(String completeMessage) { public void setCompleteMessage(String completeMessage) {
this.completeMessage = completeMessage; this.completeMessage = completeMessage;
} }
public int getNumberOfDaysChange() {
return numberOfDaysChange;
}
public void setNumberOfDaysChange(int numberOfDaysChange) {
this.numberOfDaysChange = numberOfDaysChange;
}
} }

View File

@ -346,6 +346,33 @@ public class EventSubscriptionResource extends EntityResource<EventSubscription,
return response; return response;
} }
@PUT
@Path("/trigger/{id}")
@Operation(
operationId = "triggerDataInsightJob",
summary = "Trigger a existing Data Insight Report Job to run",
description = "Trigger a existing Data Insight Report Job to run",
responses = {
@ApiResponse(
responseCode = "200",
description = "create Event Subscription",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CreateEventSubscription.class))),
@ApiResponse(responseCode = "400", description = "Bad request")
})
public Response triggerDataInsightJob(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the event Subscription", schema = @Schema(type = "UUID")) @PathParam("id")
UUID id)
throws IOException, SchedulerException {
// authorizer.authorizeAdmin(securityContext);
EventSubscription eventSub = dao.get(null, id, dao.getFields("id,name"));
return ReportsHandler.getInstance().triggerExistingDataInsightJob(eventSub);
}
@PATCH @PATCH
@Path("/{id}") @Path("/{id}")
@Operation( @Operation(