mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-25 15:17:54 +00:00
Issue 12297 bis -- Delete Insert logic in the DI workflow (#13058)
* fix: updated ingestion to delete existing data for the ingestion day * fix: added delete endpoint for the report data * fix: added migration to delete duplicate data + added tables creation & data deltion logic in a transaction * fix: made deletion SQL engine aware * fix: added ES deletion logic back while we work on DI event publisher
This commit is contained in:
parent
c3f3a976f8
commit
ab3042e8ee
@ -0,0 +1,50 @@
|
||||
START TRANSACTION;
|
||||
-- We'll rank all the runs (timestamps) for every day, and delete all the data but the most recent one.
|
||||
DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> '$.id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> '$.reportDataType' = 'WebAnalyticEntityViewReportData'
|
||||
) duplicates
|
||||
ORDER BY `date` DESC, `timestamp` DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
|
||||
DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> '$.id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> '$.reportDataType' = 'EntityReportData'
|
||||
) duplicates
|
||||
ORDER BY `date` DESC, `timestamp` DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
|
||||
DELETE FROM report_data_time_series WHERE JSON_EXTRACT(json, '$.id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> '$.id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY `date` ORDER BY `timestamp` DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> '$.reportDataType' = 'WebAnalyticUserActivityReportData'
|
||||
) duplicates
|
||||
ORDER BY `date` DESC, `timestamp` DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
COMMIT;
|
||||
@ -9,6 +9,7 @@ WHERE
|
||||
extension in ('table.tableProfile', 'table.columnProfile');
|
||||
;
|
||||
|
||||
START TRANSACTION;
|
||||
-- Create report data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS report_data_time_series (
|
||||
entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL,
|
||||
@ -16,7 +17,9 @@ CREATE TABLE IF NOT EXISTS report_data_time_series (
|
||||
jsonSchema VARCHAR(256) NOT NULL,
|
||||
json JSON NOT NULL,
|
||||
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL,
|
||||
INDEX report_data_time_series_point_ts (timestamp)
|
||||
date DATE GENERATED ALWAYS AS (FROM_UNIXTIME((json ->> '$.timestamp') DIV 1000)) NOT NULL,
|
||||
INDEX report_data_time_series_point_ts (timestamp),
|
||||
INDEX report_data_time_series_date (date)
|
||||
);
|
||||
|
||||
INSERT INTO report_data_time_series (entityFQNHash,extension,jsonSchema,json)
|
||||
@ -25,7 +28,9 @@ FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension = 'reportData.reportDataResult';
|
||||
COMMIT;
|
||||
|
||||
START TRANSACTION;
|
||||
-- Create profiler data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS profiler_data_time_series (
|
||||
entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL,
|
||||
@ -45,7 +50,9 @@ WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemPr
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemProfile');
|
||||
COMMIT;
|
||||
|
||||
START TRANSACTION;
|
||||
-- Create data quality data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS data_quality_data_time_series (
|
||||
entityFQNHash VARCHAR(768) CHARACTER SET ascii COLLATE ascii_bin NOT NULL,
|
||||
@ -64,6 +71,7 @@ WHERE extension = 'testCase.testCaseResult';
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension = 'testCase.testCaseResult';
|
||||
COMMIT;
|
||||
|
||||
ALTER TABLE automations_workflow MODIFY COLUMN nameHash VARCHAR(256) COLLATE ascii_bin,MODIFY COLUMN workflowType VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN status VARCHAR(256) COLLATE ascii_bin;
|
||||
ALTER TABLE entity_extension MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin;
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
BEGIN;
|
||||
-- We'll rank all the runs (timestamps) for every day, and delete all the data but the most recent one.
|
||||
DELETE FROM report_data_time_series WHERE (json ->> 'id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> 'id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> 'reportDataType' = 'WebAnalyticEntityViewReportData'
|
||||
) duplicates
|
||||
ORDER BY date DESC, timestamp DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
|
||||
DELETE FROM report_data_time_series WHERE (json ->> 'id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> 'id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> 'reportDataType' = 'EntityReportData'
|
||||
) duplicates
|
||||
ORDER BY date DESC, timestamp DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
|
||||
DELETE FROM report_data_time_series WHERE (json ->> 'id') IN (
|
||||
select ids FROM (
|
||||
SELECT
|
||||
(json ->> 'id') AS ids,
|
||||
DENSE_RANK() OVER(PARTITION BY date ORDER BY timestamp DESC) as denseRank
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) as date
|
||||
FROM report_data_time_series rdts
|
||||
WHERE json ->> 'reportDataType' = 'WebAnalyticUserActivityReportData'
|
||||
) duplicates
|
||||
ORDER BY date DESC, timestamp DESC
|
||||
) as dense_ranked
|
||||
WHERE denseRank != 1
|
||||
);
|
||||
COMMIT;
|
||||
@ -9,6 +9,8 @@ WHERE
|
||||
extension in ('table.tableProfile', 'table.columnProfile');
|
||||
;
|
||||
|
||||
BEGIN;
|
||||
-- Run the following SQL to update the schema in a transaction
|
||||
-- Create report data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS report_data_time_series (
|
||||
entityFQNHash VARCHAR(768),
|
||||
@ -17,16 +19,18 @@ CREATE TABLE IF NOT EXISTS report_data_time_series (
|
||||
json JSONB NOT NULL,
|
||||
timestamp BIGINT CHECK (timestamp > 0) GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS report_data_time_series_point_ts ON report_data_time_series (timestamp);
|
||||
|
||||
INSERT INTO report_data_time_series (entityFQNHash,extension,jsonSchema,json)
|
||||
|
||||
SELECT entityFQNHash, extension, jsonSchema, json
|
||||
FROM entity_extension_time_series WHERE extension = 'reportData.reportDataResult';
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension = 'reportData.reportDataResult';
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
-- Create profiler data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS profiler_data_time_series (
|
||||
entityFQNHash VARCHAR(768),
|
||||
@ -47,7 +51,9 @@ WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemPr
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension IN ('table.columnProfile', 'table.tableProfile', 'table.systemProfile');
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
-- Create profiler data time series table and move data from entity_extension_time_series
|
||||
CREATE TABLE IF NOT EXISTS data_quality_data_time_series (
|
||||
entityFQNHash VARCHAR(768),
|
||||
@ -67,6 +73,7 @@ WHERE extension = 'testCase.testCaseResult';
|
||||
|
||||
DELETE FROM entity_extension_time_series
|
||||
WHERE extension = 'testCase.testCaseResult';
|
||||
COMMIT;
|
||||
|
||||
ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(50) , ALTER COLUMN extension TYPE VARCHAR(100) ,
|
||||
ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp);
|
||||
|
||||
@ -27,6 +27,7 @@ from typing import Optional, Union, cast
|
||||
from pydantic import ValidationError
|
||||
|
||||
from metadata.config.common import WorkflowExecutionError
|
||||
from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex
|
||||
from metadata.data_insight.processor.data_processor import DataProcessor
|
||||
from metadata.data_insight.processor.entity_report_data_processor import (
|
||||
EntityReportDataProcessor,
|
||||
@ -57,7 +58,10 @@ from metadata.ingestion.sink.elasticsearch import ElasticsearchSink
|
||||
from metadata.timer.repeated_timer import RepeatedTimer
|
||||
from metadata.utils.importer import get_sink
|
||||
from metadata.utils.logger import data_insight_logger, set_loggers_level
|
||||
from metadata.utils.time_utils import get_beginning_of_day_timestamp_mill
|
||||
from metadata.utils.time_utils import (
|
||||
get_beginning_of_day_timestamp_mill,
|
||||
get_end_of_day_timestamp_mill,
|
||||
)
|
||||
from metadata.workflow.base import REPORTS_INTERVAL_SECONDS
|
||||
from metadata.workflow.workflow_output_handler import (
|
||||
get_ingestion_status_timer,
|
||||
@ -81,6 +85,7 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
||||
def __init__(self, config: OpenMetadataWorkflowConfig) -> None:
|
||||
self.config = config
|
||||
self._timer: Optional[RepeatedTimer] = None
|
||||
self.date = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
|
||||
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
||||
|
||||
@ -168,10 +173,56 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
||||
|
||||
return [kpi for kpi in kpis.entities if self._is_kpi_active(kpi)]
|
||||
|
||||
def _check_and_handle_existing_es_data(self, index: str) -> None:
|
||||
"""Handles scenarios where data has already been ingested for the execution data.
|
||||
If we find some data for the execution date we should deleted those documents before
|
||||
re indexing new documents.
|
||||
|
||||
!IMPORTANT! This should be deprecared and the logic should be handle in the event
|
||||
publisher side once we have the event publisher handling DI indexing.
|
||||
"""
|
||||
gte = get_beginning_of_day_timestamp_mill()
|
||||
lte = get_end_of_day_timestamp_mill()
|
||||
query = {
|
||||
"size": 1000,
|
||||
"query": {
|
||||
"range": {
|
||||
"timestamp": {
|
||||
"gte": gte,
|
||||
"lte": lte,
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
data = self.es_sink.read_records(index, query)
|
||||
try:
|
||||
hit_total = data["hits"]["total"]["value"]
|
||||
documents = data["hits"]["hits"]
|
||||
except KeyError as exc:
|
||||
logger.error(exc)
|
||||
else:
|
||||
if hit_total > 0:
|
||||
body = [
|
||||
{"delete": {"_index": document["_index"], "_id": document["_id"]}}
|
||||
for document in documents
|
||||
]
|
||||
try:
|
||||
self.es_sink.bulk_operation(body)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Could not delete existing data - {exc}")
|
||||
raise RuntimeError
|
||||
return None
|
||||
return None
|
||||
|
||||
def _execute_data_processor(self):
|
||||
"""Data processor method to refine raw data into report data and ingest it in ES"""
|
||||
for report_data_type in ReportDataType:
|
||||
logger.info(f"Processing data for report type {report_data_type}")
|
||||
# we delete the report data for the current date to avoid duplicates
|
||||
# entries in the database.
|
||||
self.metadata.delete_report_data(report_data_type, self.date)
|
||||
has_checked_and_handled_existing_es_data = False
|
||||
try:
|
||||
self.source = DataProcessor.create(
|
||||
_data_processor_type=report_data_type.value, metadata=self.metadata
|
||||
@ -180,6 +231,11 @@ class DataInsightWorkflow(WorkflowStatusMixin):
|
||||
if hasattr(self, "sink"):
|
||||
self.sink.write_record(record)
|
||||
if hasattr(self, "es_sink"):
|
||||
if not has_checked_and_handled_existing_es_data:
|
||||
self._check_and_handle_existing_es_data(
|
||||
DataInsightEsIndex[record.data.__class__.__name__].value
|
||||
)
|
||||
has_checked_and_handled_existing_es_data = True
|
||||
self.es_sink.write_record(record)
|
||||
else:
|
||||
logger.warning(
|
||||
|
||||
@ -19,7 +19,7 @@ from __future__ import annotations
|
||||
from typing import List, Optional
|
||||
|
||||
from metadata.generated.schema.analytics.basic import WebAnalyticEventType
|
||||
from metadata.generated.schema.analytics.reportData import ReportData
|
||||
from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType
|
||||
from metadata.generated.schema.analytics.webAnalyticEventData import (
|
||||
WebAnalyticEventData,
|
||||
)
|
||||
@ -174,3 +174,14 @@ class DataInsightMixin:
|
||||
"""
|
||||
event_type_value = event_type.value
|
||||
self.client.delete(f"/analytics/web/events/{event_type_value}/{tmsp}/collect")
|
||||
|
||||
def delete_report_data(self, report_data_type: ReportDataType, date: str) -> None:
|
||||
"""Delete report data at a specific date for a specific report data type
|
||||
|
||||
Args:
|
||||
report_data_type (ReportDataType): report date type to delete
|
||||
date (str): date for which to delete the report data
|
||||
"""
|
||||
self.client.delete(
|
||||
f"/analytics/dataInsights/data/{report_data_type.value}/{date}"
|
||||
)
|
||||
|
||||
@ -359,6 +359,32 @@ class DataInsightWorkflowTests(unittest.TestCase):
|
||||
|
||||
assert kpi_result
|
||||
|
||||
def test_multiple_execution(self) -> None:
|
||||
"""test multiple execution of the workflow is not yielding duplicate entries"""
|
||||
data = {}
|
||||
|
||||
workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config)
|
||||
workflow.execute()
|
||||
workflow.stop()
|
||||
sleep(2) # we'll wait for 2 seconds
|
||||
new_workflow: DataInsightWorkflow = DataInsightWorkflow.create(
|
||||
data_insight_config
|
||||
)
|
||||
new_workflow.execute()
|
||||
new_workflow.stop()
|
||||
|
||||
for report_data_type in ReportDataType:
|
||||
data[report_data_type] = self.metadata.get_data_insight_report_data(
|
||||
self.start_ts,
|
||||
self.end_ts,
|
||||
report_data_type.value,
|
||||
)
|
||||
|
||||
for _, values in data.items():
|
||||
timestamp = [value.get("timestamp") for value in values.get("data")]
|
||||
# we'll check we only have 1 execution timestamp
|
||||
assert len(set(timestamp)) == 1
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
kpis: list[Kpi] = cls.metadata.list_entities(
|
||||
|
||||
@ -3167,6 +3167,18 @@ public interface CollectionDAO {
|
||||
default String getTimeSeriesTableName() {
|
||||
return "report_data_time_series";
|
||||
}
|
||||
|
||||
@SqlQuery("SELECT json FROM report_data_time_series WHERE entityFQNHash = :reportDataType and date = :date")
|
||||
List<String> listReportDataAtDate(@BindFQN("reportDataType") String reportDataType, @Bind("date") String date);
|
||||
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value = "DELETE FROM report_data_time_series WHERE entityFQNHash = :reportDataType and date = :date",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlUpdate(
|
||||
value =
|
||||
"DELETE FROM report_data_time_series WHERE entityFQNHash = :reportDataType and DATE(TO_TIMESTAMP((json ->> 'timestamp')::bigint/1000)) = DATE(:date)",
|
||||
connectionType = POSTGRES)
|
||||
void deleteReportDataTypeAtDate(@BindFQN("reportDataType") String reportDataType, @Bind("date") String date);
|
||||
}
|
||||
|
||||
interface ProfilerDataTimeSeriesDAO extends EntityTimeSeriesDAO {
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package org.openmetadata.service.jdbi3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import javax.ws.rs.core.Response;
|
||||
@ -43,4 +44,9 @@ public class ReportDataRepository {
|
||||
|
||||
return new ResultList<>(reportData, String.valueOf(startTs), String.valueOf(endTs), reportData.size());
|
||||
}
|
||||
|
||||
public void deleteReportDataAtDate(ReportDataType reportDataType, String date) throws IOException {
|
||||
// We'll check if we have data to delete before we delete it
|
||||
daoCollection.reportDataTimeSeriesDao().deleteReportDataTypeAtDate(reportDataType.value(), date);
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,9 +10,11 @@ import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import java.io.IOException;
|
||||
import javax.validation.Valid;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
@ -45,12 +47,12 @@ import org.openmetadata.service.util.ResultList;
|
||||
@Collection(name = "analytics")
|
||||
public class ReportDataResource {
|
||||
public static final String COLLECTION_PATH = "v1/analytics/dataInsights/data";
|
||||
@Getter protected final ReportDataRepository dao;
|
||||
@Getter protected final ReportDataRepository repository;
|
||||
protected final Authorizer authorizer;
|
||||
|
||||
public ReportDataResource(CollectionDAO dao, Authorizer authorizer) {
|
||||
public ReportDataResource(CollectionDAO repository, Authorizer authorizer) {
|
||||
this.authorizer = authorizer;
|
||||
this.dao = new ReportDataRepository(dao);
|
||||
this.repository = new ReportDataRepository(repository);
|
||||
}
|
||||
|
||||
public static class ReportDataResultList extends ResultList<ReportData> {
|
||||
@ -89,12 +91,11 @@ public class ReportDataResource {
|
||||
schema = @Schema(type = "number"))
|
||||
@NonNull
|
||||
@QueryParam("endTs")
|
||||
Long endTs)
|
||||
throws IOException {
|
||||
Long endTs) {
|
||||
OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.VIEW_ALL);
|
||||
ResourceContextInterface resourceContext = ReportDataContext.builder().build();
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
return dao.getReportData(reportDataType, startTs, endTs);
|
||||
return repository.getReportData(reportDataType, startTs, endTs);
|
||||
}
|
||||
|
||||
@POST
|
||||
@ -109,11 +110,41 @@ public class ReportDataResource {
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ReportData.class)))
|
||||
})
|
||||
public Response addReportData(
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid ReportData reportData)
|
||||
throws IOException {
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid ReportData reportData) {
|
||||
OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.CREATE);
|
||||
ResourceContextInterface resourceContext = ReportDataContext.builder().build();
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
return dao.addReportData(reportData);
|
||||
return repository.addReportData(reportData);
|
||||
}
|
||||
|
||||
@DELETE
|
||||
@Path("/{reportDataType}/{date}")
|
||||
@Operation(
|
||||
operationId = "deleteReportData",
|
||||
summary = "Delete report data for a given report data type ando date",
|
||||
description = "Delete report data for a given report data type and date.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "Successfully deleted report data.",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = ReportData.class)))
|
||||
})
|
||||
public Response deleteReportData(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "report data type", schema = @Schema(implementation = ReportDataType.class))
|
||||
@NonNull
|
||||
@PathParam("reportDataType")
|
||||
ReportDataType reportDataType,
|
||||
@Parameter(description = "date in format YYYY-MM-DD", schema = @Schema(type = "String"))
|
||||
@NonNull
|
||||
@PathParam("date")
|
||||
String date)
|
||||
throws IOException {
|
||||
OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.DELETE);
|
||||
ResourceContextInterface resourceContext = ReportDataContext.builder().build();
|
||||
authorizer.authorize(securityContext, operationContext, resourceContext);
|
||||
repository.deleteReportDataAtDate(reportDataType, date);
|
||||
return Response.ok().build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1690,7 +1690,7 @@ public class ElasticSearchClientImpl implements SearchClient {
|
||||
DateHistogramAggregationBuilder dateHistogramAggregationBuilder =
|
||||
AggregationBuilders.dateHistogram(DataInsightChartRepository.TIMESTAMP)
|
||||
.field(DataInsightChartRepository.TIMESTAMP)
|
||||
.calendarInterval(DateHistogramInterval.minutes(1));
|
||||
.calendarInterval(DateHistogramInterval.DAY);
|
||||
|
||||
TermsAggregationBuilder termsAggregationBuilder;
|
||||
SumAggregationBuilder sumAggregationBuilder;
|
||||
|
||||
@ -42,7 +42,7 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
|
||||
this.dao = dao;
|
||||
this.entityType = entityType;
|
||||
this.batchSize = batchSize;
|
||||
stats.setTotalRecords(dao.entityExtensionTimeSeriesDao().listCount(entityType));
|
||||
stats.setTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -93,9 +93,9 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
|
||||
public ResultList<ReportData> getReportDataPagination(String entityFQN, int limit, String after) {
|
||||
// workaround. Should be fixed in https://github.com/open-metadata/OpenMetadata/issues/12298
|
||||
String upperCaseFQN = StringUtils.capitalize(entityFQN);
|
||||
int reportDataCount = dao.entityExtensionTimeSeriesDao().listCount(upperCaseFQN);
|
||||
int reportDataCount = dao.reportDataTimeSeriesDao().listCount(upperCaseFQN);
|
||||
List<CollectionDAO.ReportDataRow> reportDataList =
|
||||
dao.entityExtensionTimeSeriesDao()
|
||||
dao.reportDataTimeSeriesDao()
|
||||
.getAfterExtension(upperCaseFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after));
|
||||
return getAfterExtensionList(reportDataList, after, limit, reportDataCount);
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package org.openmetadata.service.resources.analytics;
|
||||
|
||||
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.permissionNotAllowed;
|
||||
import static org.openmetadata.service.util.TestUtils.ADMIN_AUTH_HEADERS;
|
||||
import static org.openmetadata.service.util.TestUtils.INGESTION_BOT_AUTH_HEADERS;
|
||||
@ -8,8 +10,10 @@ import static org.openmetadata.service.util.TestUtils.TEST_AUTH_HEADERS;
|
||||
import static org.openmetadata.service.util.TestUtils.TEST_USER_NAME;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import javax.ws.rs.client.WebTarget;
|
||||
import org.apache.http.client.HttpResponseException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -17,13 +21,14 @@ import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import org.openmetadata.schema.analytics.EntityReportData;
|
||||
import org.openmetadata.schema.analytics.ReportData;
|
||||
import org.openmetadata.schema.analytics.WebAnalyticUserActivityReportData;
|
||||
import org.openmetadata.schema.type.MetadataOperation;
|
||||
import org.openmetadata.service.OpenMetadataApplicationTest;
|
||||
import org.openmetadata.service.resources.analytics.ReportDataResource.ReportDataResultList;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
|
||||
public class ReportDataResourceTest extends OpenMetadataApplicationTest {
|
||||
class ReportDataResourceTest extends OpenMetadataApplicationTest {
|
||||
|
||||
private final String collectionName = "analytics/dataInsights/data";
|
||||
|
||||
@ -47,6 +52,8 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest {
|
||||
|
||||
ResultList<ReportData> reportDataList =
|
||||
getReportData("2022-10-10", "2022-10-12", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS);
|
||||
|
||||
assertNotEquals(0, reportDataList.getData().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -92,6 +99,71 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest {
|
||||
ResultList<ReportData> reportDataList =
|
||||
getReportData(
|
||||
"2022-10-10", "2022-10-12", ReportData.ReportDataType.ENTITY_REPORT_DATA, INGESTION_BOT_AUTH_HEADERS);
|
||||
|
||||
assertNotEquals(0, reportDataList.getData().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Execution(ExecutionMode.CONCURRENT)
|
||||
void delete_endpoint_200() throws HttpResponseException, ParseException {
|
||||
List<ReportData> createReportDataList = new ArrayList<>();
|
||||
|
||||
// create some entity report data
|
||||
EntityReportData entityReportData =
|
||||
new EntityReportData()
|
||||
.withEntityType("table")
|
||||
.withEntityTier("Tier.Tier1")
|
||||
.withCompletedDescriptions(1)
|
||||
.withEntityCount(11);
|
||||
ReportData reportData1 =
|
||||
new ReportData()
|
||||
.withTimestamp(TestUtils.dateToTimestamp("2022-10-15"))
|
||||
.withReportDataType(ReportData.ReportDataType.ENTITY_REPORT_DATA)
|
||||
.withData(entityReportData);
|
||||
|
||||
// create some web analytic user activity report data
|
||||
WebAnalyticUserActivityReportData webAnalyticUserActivityReportData =
|
||||
new WebAnalyticUserActivityReportData()
|
||||
.withUserId(UUID.randomUUID())
|
||||
.withUserName("testUser")
|
||||
.withLastSession(TestUtils.dateToTimestamp("2022-10-13"));
|
||||
ReportData reportData2 =
|
||||
new ReportData()
|
||||
.withTimestamp(TestUtils.dateToTimestamp("2022-10-15"))
|
||||
.withReportDataType(ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)
|
||||
.withData(webAnalyticUserActivityReportData);
|
||||
|
||||
createReportDataList.add(reportData1);
|
||||
createReportDataList.add(reportData2);
|
||||
|
||||
for (ReportData reportData : createReportDataList) {
|
||||
postReportData(reportData, INGESTION_BOT_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
// check we have our data
|
||||
ResultList<ReportData> entityReportDataList =
|
||||
getReportData("2022-10-15", "2022-10-15", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS);
|
||||
ResultList<ReportData> webAnalyticsReportDataList =
|
||||
getReportData(
|
||||
"2022-10-15",
|
||||
"2022-10-15",
|
||||
ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA,
|
||||
ADMIN_AUTH_HEADERS);
|
||||
assertNotEquals(0, entityReportDataList.getData().size());
|
||||
assertNotEquals(0, webAnalyticsReportDataList.getData().size());
|
||||
|
||||
// delete the entity report data and check that it as been deleted
|
||||
deleteReportData(ReportData.ReportDataType.ENTITY_REPORT_DATA.value(), "2022-10-14", ADMIN_AUTH_HEADERS);
|
||||
entityReportDataList =
|
||||
getReportData("2022-10-14", "2022-10-16", ReportData.ReportDataType.ENTITY_REPORT_DATA, ADMIN_AUTH_HEADERS);
|
||||
assertEquals(0, entityReportDataList.getData().size());
|
||||
webAnalyticsReportDataList =
|
||||
getReportData(
|
||||
"2022-10-14",
|
||||
"2022-10-16",
|
||||
ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA,
|
||||
ADMIN_AUTH_HEADERS);
|
||||
assertNotEquals(0, webAnalyticsReportDataList.getData().size());
|
||||
}
|
||||
|
||||
public void postReportData(ReportData reportData, Map<String, String> authHeader) throws HttpResponseException {
|
||||
@ -108,4 +180,11 @@ public class ReportDataResourceTest extends OpenMetadataApplicationTest {
|
||||
target = target.queryParam("reportDataType", reportDataType);
|
||||
return TestUtils.get(target, ReportDataResultList.class, authHeader);
|
||||
}
|
||||
|
||||
private void deleteReportData(String reportDataType, String date, Map<String, String> authHeader)
|
||||
throws HttpResponseException {
|
||||
String path = String.format("/%s/%s", reportDataType, date);
|
||||
WebTarget target = getResource(collectionName).path(path);
|
||||
TestUtils.delete(target, authHeader);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user