From ab3042e8eee4408d68c073b9e6919f9ffb18371f Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 1 Sep 2023 18:01:20 +0200 Subject: [PATCH] Issue 12297 bis -- Delete Insert logic in the DI workflow (#13058) * fix: updated ingestion to delete existing data for the ingestion day * fix: added delete endpoint for the report data * fix: added migration to delete duplicate data + added tables creation & data deltion logic in a transaction * fix: made deletion SQL engine aware * fix: added ES deletion logic back while we work on DI event publisher --- .../mysql/postDataMigrationSQLScript.sql | 50 ++++++++++++ .../native/1.1.3/mysql/schemaChanges.sql | 10 ++- .../postgres/postDataMigrationSQLScript.sql | 53 ++++++++++++ .../native/1.1.3/postgres/schemaChanges.sql | 9 ++- .../src/metadata/data_insight/api/workflow.py | 58 ++++++++++++- .../ometa/mixins/data_insight_mixin.py | 13 ++- .../test_data_insight_workflow.py | 26 ++++++ .../service/jdbi3/CollectionDAO.java | 12 +++ .../service/jdbi3/ReportDataRepository.java | 6 ++ .../analytics/ReportDataResource.java | 49 ++++++++--- .../ElasticSearchClientImpl.java | 2 +- .../PaginatedDataInsightSource.java | 6 +- .../analytics/ReportDataResourceTest.java | 81 ++++++++++++++++++- 13 files changed, 357 insertions(+), 18 deletions(-) diff --git a/bootstrap/sql/migrations/native/1.1.3/mysql/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.1.3/mysql/postDataMigrationSQLScript.sql index e69de29bb2d..2ce1654285b 100644 --- a/bootstrap/sql/migrations/native/1.1.3/mysql/postDataMigrationSQLScript.sql +++ b/bootstrap/sql/migrations/native/1.1.3/mysql/postDataMigrationSQLScript.sql @@ -0,0 +1,50 @@ +START TRANSACTION; +-- We'll rank all the runs (timestamps) for every day, and delete all the data but the most recent one. +DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN ( + select ids FROM ( + SELECT + (json ->> '$.id') AS ids, + DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank + FROM ( + SELECT + * + FROM report_data_time_series rdts + WHERE json ->> '$.reportDataType' = 'WebAnalyticEntityViewReportData' + ) duplicates + ORDER BY `date` DESC, `timestamp` DESC + ) as dense_ranked + WHERE denseRank != 1 +); + +DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN ( + select ids FROM ( + SELECT + (json ->> '$.id') AS ids, + DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank + FROM ( + SELECT + * + FROM report_data_time_series rdts + WHERE json ->> '$.reportDataType' = 'EntityReportData' + ) duplicates + ORDER BY `date` DESC, `timestamp` DESC + ) as dense_ranked + WHERE denseRank != 1 +); + +DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN ( + select ids FROM ( + SELECT + (json ->> '$.id') AS ids, + DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank + FROM ( + SELECT + * + FROM report_data_time_series rdts + WHERE json ->> '$.reportDataType' = 'WebAnalyticUserActivityReportData' + ) duplicates + ORDER BY `date` DESC, `timestamp` DESC + ) as dense_ranked + WHERE denseRank != 1 +); +COMMIT; \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.1.3/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.1.3/mysql/schemaChanges.sql index bb02f03fa69..4dd882e361d 100644 --- a/bootstrap/sql/migrations/native/1.1.3/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.1.3/mysql/schemaChanges.sql @@ -9,6 +9,7 @@ WHERE extension in ('table.tableProfile', 'table.columnProfile'); ; +START TRANSACTION; -- Create report data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS report_data_time_series ( entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, @@ -16,7 +17,9 @@ CREATE TABLE IF NOT EXISTS report_data_time_series ( jsonSchema VARCHAR(256) NOT NULL, json JSON NOT NULL, timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL, - INDEX report_data_time_series_point_ts (timestamp) + date DATE GENERATED ALWAYS AS (FROM_UNIXTIME((json ->> '$.timestamp') DIV 1000)) NOT NULL, + INDEX report_data_time_series_point_ts (timestamp), + INDEX report_data_time_series_date (date) ); INSERT INTO report_data_time_series (entityFQNHash,extension,jsonSchema,json) @@ -25,7 +28,9 @@ FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult DELETE FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult'; +COMMIT; +START TRANSACTION; -- Create profiler data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS profiler_data_time_series ( entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, @@ -45,7 +50,9 @@ WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemPr DELETE FROM entity_extension_time_series WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemProfile'); +COMMIT; +START TRANSACTION; -- Create data quality data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS data_quality_data_time_series ( entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, @@ -64,6 +71,7 @@ WHERE extension = 'testCase.testCaseResult'; DELETE FROM entity_extension_time_series WHERE extension = 'testCase.testCaseResult'; +COMMIT; ALTER TABLE automations_workflow MODIFY COLUMN nameHash VARCHAR(256) COLLATE ascii_bin,MODIFY COLUMN workflowType VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN status VARCHAR(256) COLLATE ascii_bin; ALTER TABLE entity_extension MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin; diff --git a/bootstrap/sql/migrations/native/1.1.3/postgres/postDataMigrationSQLScript.sql b/bootstrap/sql/migrations/native/1.1.3/postgres/postDataMigrationSQLScript.sql index e69de29bb2d..948f4fb754c 100644 --- a/bootstrap/sql/migrations/native/1.1.3/postgres/postDataMigrationSQLScript.sql +++ b/bootstrap/sql/migrations/native/1.1.3/postgres/postDataMigrationSQLScript.sql @@ -0,0 +1,53 @@ +BEGIN; +-- We'll rank all the runs (timestamps) for every day, and delete all the data but the most recent one. +DELETE FROM report_data_time_series WHERE (json ->> 'id') IN ( + select ids FROM ( + SELECT + (json ->> 'id') AS ids, + DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank + FROM ( + SELECT + *, + DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date + FROM report_data_time_series rdts + WHERE json ->> 'reportDataType' = 'WebAnalyticEntityViewReportData' + ) duplicates + ORDER BY date DESC, timestamp DESC + ) as dense_ranked + WHERE denseRank != 1 +); + +DELETE FROM report_data_time_series WHERE (json ->> 'id') IN ( + select ids FROM ( + SELECT + (json ->> 'id') AS ids, + DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank + FROM ( + SELECT + *, + DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date + FROM report_data_time_series rdts + WHERE json ->> 'reportDataType' = 'EntityReportData' + ) duplicates + ORDER BY date DESC, timestamp DESC + ) as dense_ranked + WHERE denseRank != 1 +); + +DELETE FROM report_data_time_series WHERE (json ->> 'id') IN ( + select ids FROM ( + SELECT + (json ->> 'id') AS ids, + DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank + FROM ( + SELECT + *, + DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date + FROM report_data_time_series rdts + WHERE json ->> 'reportDataType' = 'WebAnalyticUserActivityReportData' + ) duplicates + ORDER BY date DESC, timestamp DESC + ) as dense_ranked + WHERE denseRank != 1 +); +COMMIT; \ No newline at end of file diff --git a/bootstrap/sql/migrations/native/1.1.3/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.1.3/postgres/schemaChanges.sql index 1cde8fa6427..e85f40f87b6 100644 --- a/bootstrap/sql/migrations/native/1.1.3/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.1.3/postgres/schemaChanges.sql @@ -9,6 +9,8 @@ WHERE extension in ('table.tableProfile', 'table.columnProfile'); ; +BEGIN; +-- Run the following SQL to update the schema in a transaction -- Create report data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS report_data_time_series ( entityFQNHash VARCHAR(768), @@ -17,16 +19,18 @@ CREATE TABLE IF NOT EXISTS report_data_time_series ( json JSONB NOT NULL, timestamp BIGINT CHECK (timestamp > 0) GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL ); - CREATE INDEX IF NOT EXISTS report_data_time_series_point_ts ON report_data_time_series (timestamp); INSERT INTO report_data_time_series (entityFQNHash,extension,jsonSchema,json) + SELECT entityFQNHash, extension, jsonSchema, json FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult'; DELETE FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult'; +COMMIT; +BEGIN; -- Create profiler data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS profiler_data_time_series ( entityFQNHash VARCHAR(768), @@ -47,7 +51,9 @@ WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemPr DELETE FROM entity_extension_time_series WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemProfile'); +COMMIT; +BEGIN; -- Create profiler data time series table and move data from entity_extension_time_series CREATE TABLE IF NOT EXISTS data_quality_data_time_series ( entityFQNHash VARCHAR(768), @@ -67,6 +73,7 @@ WHERE extension = 'testCase.testCaseResult'; DELETE FROM entity_extension_time_series WHERE extension = 'testCase.testCaseResult'; +COMMIT; ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(50) , ALTER COLUMN extension TYPE VARCHAR(100) , ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp); diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index 747916e2c81..32ae74deebd 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -27,6 +27,7 @@ from typing import Optional, Union, cast from pydantic import ValidationError from metadata.config.common import WorkflowExecutionError +from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex from metadata.data_insight.processor.data_processor import DataProcessor from metadata.data_insight.processor.entity_report_data_processor import ( EntityReportDataProcessor, @@ -57,7 +58,10 @@ from metadata.ingestion.sink.elasticsearch import ElasticsearchSink from metadata.timer.repeated_timer import RepeatedTimer from metadata.utils.importer import get_sink from metadata.utils.logger import data_insight_logger, set_loggers_level -from metadata.utils.time_utils import get_beginning_of_day_timestamp_mill +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) from metadata.workflow.base import REPORTS_INTERVAL_SECONDS from metadata.workflow.workflow_output_handler import ( get_ingestion_status_timer, @@ -81,6 +85,7 @@ class DataInsightWorkflow(WorkflowStatusMixin): def __init__(self, config: OpenMetadataWorkflowConfig) -> None: self.config = config self._timer: Optional[RepeatedTimer] = None + self.date = datetime.utcnow().strftime("%Y-%m-%d") set_loggers_level(config.workflowConfig.loggerLevel.value) @@ -168,10 +173,56 @@ class DataInsightWorkflow(WorkflowStatusMixin): return [kpi for kpi in kpis.entities if self._is_kpi_active(kpi)] + def _check_and_handle_existing_es_data(self, index: str) -> None: + """Handles scenarios where data has already been ingested for the execution data. + If we find some data for the execution date we should deleted those documents before + re indexing new documents. + + !IMPORTANT! This should be deprecared and the logic should be handle in the event + publisher side once we have the event publisher handling DI indexing. + """ + gte = get_beginning_of_day_timestamp_mill() + lte = get_end_of_day_timestamp_mill() + query = { + "size": 1000, + "query": { + "range": { + "timestamp": { + "gte": gte, + "lte": lte, + } + } + }, + } + data = self.es_sink.read_records(index, query) + try: + hit_total = data["hits"]["total"]["value"] + documents = data["hits"]["hits"] + except KeyError as exc: + logger.error(exc) + else: + if hit_total > 0: + body = [ + {"delete": {"_index": document["_index"], "_id": document["_id"]}} + for document in documents + ] + try: + self.es_sink.bulk_operation(body) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Could not delete existing data - {exc}") + raise RuntimeError + return None + return None + def _execute_data_processor(self): """Data processor method to refine raw data into report data and ingest it in ES""" for report_data_type in ReportDataType: logger.info(f"Processing data for report type {report_data_type}") + # we delete the report data for the current date to avoid duplicates + # entries in the database. + self.metadata.delete_report_data(report_data_type, self.date) + has_checked_and_handled_existing_es_data = False try: self.source = DataProcessor.create( _data_processor_type=report_data_type.value, metadata=self.metadata @@ -180,6 +231,11 @@ class DataInsightWorkflow(WorkflowStatusMixin): if hasattr(self, "sink"): self.sink.write_record(record) if hasattr(self, "es_sink"): + if not has_checked_and_handled_existing_es_data: + self._check_and_handle_existing_es_data( + DataInsightEsIndex[record.data.__class__.__name__].value + ) + has_checked_and_handled_existing_es_data = True self.es_sink.write_record(record) else: logger.warning( diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py index 9046ec580cd..09a52f46a91 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py @@ -19,7 +19,7 @@ from __future__ import annotations from typing import List, Optional from metadata.generated.schema.analytics.basic import WebAnalyticEventType -from metadata.generated.schema.analytics.reportData import ReportData +from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType from metadata.generated.schema.analytics.webAnalyticEventData import ( WebAnalyticEventData, ) @@ -174,3 +174,14 @@ class DataInsightMixin: """ event_type_value = event_type.value self.client.delete(f"/analytics/web/events/{event_type_value}/{tmsp}/collect") + + def delete_report_data(self, report_data_type: ReportDataType, date: str) -> None: + """Delete report data at a specific date for a specific report data type + + Args: + report_data_type (ReportDataType): report date type to delete + date (str): date for which to delete the report data + """ + self.client.delete( + f"/analytics/dataInsights/data/{report_data_type.value}/{date}" + ) diff --git a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py index 448589f6694..9373d845ae0 100644 --- a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py +++ b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py @@ -359,6 +359,32 @@ class DataInsightWorkflowTests(unittest.TestCase): assert kpi_result + def test_multiple_execution(self) -> None: + """test multiple execution of the workflow is not yielding duplicate entries""" + data = {} + + workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config) + workflow.execute() + workflow.stop() + sleep(2) # we'll wait for 2 seconds + new_workflow: DataInsightWorkflow = DataInsightWorkflow.create( + data_insight_config + ) + new_workflow.execute() + new_workflow.stop() + + for report_data_type in ReportDataType: + data[report_data_type] = self.metadata.get_data_insight_report_data( + self.start_ts, + self.end_ts, + report_data_type.value, + ) + + for _, values in data.items(): + timestamp = [value.get("timestamp") for value in values.get("data")] + # we'll check we only have 1 execution timestamp + assert len(set(timestamp)) == 1 + @classmethod def tearDownClass(cls) -> None: kpis: list[Kpi] = cls.metadata.list_entities( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 6a7f1694fb9..83eb11bb12a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3167,6 +3167,18 @@ public interface CollectionDAO { default String getTimeSeriesTableName() { return "report_data_time_series"; } + + @SqlQuery("SELECT json FROM report_data_time_series WHERE entityFQNHash = :reportDataType and date = :date") + List listReportDataAtDate(@BindFQN("reportDataType") String reportDataType, @Bind("date") String date); + + @ConnectionAwareSqlUpdate( + value = "DELETE FROM report_data_time_series WHERE entityFQNHash = :reportDataType and date = :date", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "DELETE FROM report_data_time_series WHERE entityFQNHash = :reportDataType and DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) = DATE(:date)", + connectionType = POSTGRES) + void deleteReportDataTypeAtDate(@BindFQN("reportDataType") String reportDataType, @Bind("date") String date); } interface ProfilerDataTimeSeriesDAO extends EntityTimeSeriesDAO { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java index c91e947f0db..9d2e1bce23f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ReportDataRepository.java @@ -1,5 +1,6 @@ package org.openmetadata.service.jdbi3; +import java.io.IOException; import java.util.List; import java.util.UUID; import javax.ws.rs.core.Response; @@ -43,4 +44,9 @@ public class ReportDataRepository { return new ResultList<>(reportData, String.valueOf(startTs), String.valueOf(endTs), reportData.size()); } + + public void deleteReportDataAtDate(ReportDataType reportDataType, String date) throws IOException { + // We'll check if we have data to delete before we delete it + daoCollection.reportDataTimeSeriesDao().deleteReportDataTypeAtDate(reportDataType.value(), date); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/analytics/ReportDataResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/analytics/ReportDataResource.java index 087cdf84770..5d09b599c5e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/analytics/ReportDataResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/analytics/ReportDataResource.java @@ -10,9 +10,11 @@ import io.swagger.v3.oas.annotations.tags.Tag; import java.io.IOException; import javax.validation.Valid; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; @@ -45,12 +47,12 @@ import org.openmetadata.service.util.ResultList; @Collection(name = "analytics") public class ReportDataResource { public static final String COLLECTION_PATH = "v1/analytics/dataInsights/data"; - @Getter protected final ReportDataRepository dao; + @Getter protected final ReportDataRepository repository; protected final Authorizer authorizer; - public ReportDataResource(CollectionDAO dao, Authorizer authorizer) { + public ReportDataResource(CollectionDAO repository, Authorizer authorizer) { this.authorizer = authorizer; - this.dao = new ReportDataRepository(dao); + this.repository = new ReportDataRepository(repository); } public static class ReportDataResultList extends ResultList { @@ -89,12 +91,11 @@ public class ReportDataResource { schema = @Schema(type = "number")) @NonNull @QueryParam("endTs") - Long endTs) - throws IOException { + Long endTs) { OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.VIEW_ALL); ResourceContextInterface resourceContext = ReportDataContext.builder().build(); authorizer.authorize(securityContext, operationContext, resourceContext); - return dao.getReportData(reportDataType, startTs, endTs); + return repository.getReportData(reportDataType, startTs, endTs); } @POST @@ -109,11 +110,41 @@ public class ReportDataResource { content = @Content(mediaType = "application/json", schema = @Schema(implementation = ReportData.class))) }) public Response addReportData( - @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid ReportData reportData) - throws IOException { + @Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid ReportData reportData) { OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.CREATE); ResourceContextInterface resourceContext = ReportDataContext.builder().build(); authorizer.authorize(securityContext, operationContext, resourceContext); - return dao.addReportData(reportData); + return repository.addReportData(reportData); + } + + @DELETE + @Path("/{reportDataType}/{date}") + @Operation( + operationId = "deleteReportData", + summary = "Delete report data for a given report data type ando date", + description = "Delete report data for a given report data type and date.", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully deleted report data.", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = ReportData.class))) + }) + public Response deleteReportData( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "report data type", schema = @Schema(implementation = ReportDataType.class)) + @NonNull + @PathParam("reportDataType") + ReportDataType reportDataType, + @Parameter(description = "date in format YYYY-MM-DD", schema = @Schema(type = "String")) + @NonNull + @PathParam("date") + String date) + throws IOException { + OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.DELETE); + ResourceContextInterface resourceContext = ReportDataContext.builder().build(); + authorizer.authorize(securityContext, operationContext, resourceContext); + repository.deleteReportDataAtDate(reportDataType, date); + return Response.ok().build(); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java index 3d1c7b064c5..8cc2d1ec54b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticSearch/ElasticSearchClientImpl.java @@ -1690,7 +1690,7 @@ public class ElasticSearchClientImpl implements SearchClient { DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram(DataInsightChartRepository.TIMESTAMP) .field(DataInsightChartRepository.TIMESTAMP) - .calendarInterval(DateHistogramInterval.minutes(1)); + .calendarInterval(DateHistogramInterval.DAY); TermsAggregationBuilder termsAggregationBuilder; SumAggregationBuilder sumAggregationBuilder; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java index 1777518f50a..8a000ab3a37 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/workflows/searchIndex/PaginatedDataInsightSource.java @@ -42,7 +42,7 @@ public class PaginatedDataInsightSource implements Source this.dao = dao; this.entityType = entityType; this.batchSize = batchSize; - stats.setTotalRecords(dao.entityExtensionTimeSeriesDao().listCount(entityType)); + stats.setTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType)); } @Override @@ -93,9 +93,9 @@ public class PaginatedDataInsightSource implements Source public ResultList getReportDataPagination(String entityFQN, int limit, String after) { // workaround. Should be fixed in https://github.com/open-metadata/OpenMetadata/issues/12298 String upperCaseFQN = StringUtils.capitalize(entityFQN); - int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(upperCaseFQN); + int reportDataCount = dao.reportDataTimeSeriesDao().listCount(upperCaseFQN); List reportDataList = - dao.entityExtensionTimeSeriesDao() + dao.reportDataTimeSeriesDao() .getAfterExtension(upperCaseFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after)); return getAfterExtensionList(reportDataList, after, limit, reportDataCount); } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/analytics/ReportDataResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/analytics/ReportDataResourceTest.java index 1739fb7e5ef..3899350149e 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/analytics/ReportDataResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/analytics/ReportDataResourceTest.java @@ -1,6 +1,8 @@ package org.openmetadata.service.resources.analytics; import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed; import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS; import static org.openmetadata.service.util.TestUtils.INGESTION_BOT_AUTH_HEADERS; @@ -8,8 +10,10 @@ import static org.openmetadata.service.util.TestUtils.TEST_AUTH_HEADERS; import static org.openmetadata.service.util.TestUtils.TEST_USER_NAME; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.ws.rs.client.WebTarget; import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.Test; @@ -17,13 +21,14 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.openmetadata.schema.analytics.EntityReportData; import org.openmetadata.schema.analytics.ReportData; +import org.openmetadata.schema.analytics.WebAnalyticUserActivityReportData; import org.openmetadata.schema.type.MetadataOperation; import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.resources.analytics.ReportDataResource.ReportDataResultList; import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.TestUtils; -public class ReportDataResourceTest extends OpenMetadataApplicationTest { +class ReportDataResourceTest extends OpenMetadataApplicationTest { private final String collectionName = "analytics/dataInsights/data"; @@ -47,6 +52,8 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest { ResultList reportDataList = getReportData("2022-10-10", "2022-10-12", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS); + + assertNotEquals(0, reportDataList.getData().size()); } @Test @@ -92,6 +99,71 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest { ResultList reportDataList = getReportData( "2022-10-10", "2022-10-12", ReportData.ReportDataType.ENTITY_REPORT_DATA, INGESTION_BOT_AUTH_HEADERS); + + assertNotEquals(0, reportDataList.getData().size()); + } + + @Test + @Execution(ExecutionMode.CONCURRENT) + void delete_endpoint_200() throws HttpResponseException, ParseException { + List createReportDataList = new ArrayList<>(); + + // create some entity report data + EntityReportData entityReportData = + new EntityReportData() + .withEntityType("table") + .withEntityTier("Tier.Tier1") + .withCompletedDescriptions(1) + .withEntityCount(11); + ReportData reportData1 = + new ReportData() + .withTimestamp(TestUtils.dateToTimestamp("2022-10-15")) + .withReportDataType(ReportData.ReportDataType.ENTITY_REPORT_DATA) + .withData(entityReportData); + + // create some web analytic user activity report data + WebAnalyticUserActivityReportData webAnalyticUserActivityReportData = + new WebAnalyticUserActivityReportData() + .withUserId(UUID.randomUUID()) + .withUserName("testUser") + .withLastSession(TestUtils.dateToTimestamp("2022-10-13")); + ReportData reportData2 = + new ReportData() + .withTimestamp(TestUtils.dateToTimestamp("2022-10-15")) + .withReportDataType(ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA) + .withData(webAnalyticUserActivityReportData); + + createReportDataList.add(reportData1); + createReportDataList.add(reportData2); + + for (ReportData reportData : createReportDataList) { + postReportData(reportData, INGESTION_BOT_AUTH_HEADERS); + } + + // check we have our data + ResultList entityReportDataList = + getReportData("2022-10-15", "2022-10-15", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS); + ResultList webAnalyticsReportDataList = + getReportData( + "2022-10-15", + "2022-10-15", + ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA, + ADMIN_AUTH_HEADERS); + assertNotEquals(0, entityReportDataList.getData().size()); + assertNotEquals(0, webAnalyticsReportDataList.getData().size()); + + // delete the entity report data and check that it as been deleted + deleteReportData(ReportData.ReportDataType.ENTITY_REPORT_DATA.value(), "2022-10-14", ADMIN_AUTH_HEADERS); + entityReportDataList = + getReportData("2022-10-14", "2022-10-16", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS); + assertEquals(0, entityReportDataList.getData().size()); + webAnalyticsReportDataList = + getReportData( + "2022-10-14", + "2022-10-16", + ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA, + ADMIN_AUTH_HEADERS); + assertNotEquals(0, webAnalyticsReportDataList.getData().size()); } public void postReportData(ReportData reportData, Map authHeader) throws HttpResponseException { @@ -108,4 +180,11 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest { target = target.queryParam("reportDataType", reportDataType); return TestUtils.get(target, ReportDataResultList.class, authHeader); } + + private void deleteReportData(String reportDataType, String date, Map authHeader) + throws HttpResponseException { + String path = String.format("/%s/%s", reportDataType, date); + WebTarget target = getResource(collectionName).path(path); + TestUtils.delete(target, authHeader); + } }