Cost Analysis - Data Insights Reports and Aggregation (#13379)

* Added reports and indexes

* Clean code

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Onkar Ravgan 2023-09-29 15:20:43 +05:30 committed by GitHub
parent 162b181b4f
commit 855790924e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 596 additions and 7 deletions

View File

@ -27,6 +27,10 @@ from typing import Optional, Union, cast
from pydantic import ValidationError
from metadata.config.common import WorkflowExecutionError
from metadata.data_insight.processor.cost_analysis_report_data_processor import (
AggregatedCostAnalysisReportDataProcessor,
RawCostAnalysisReportDataProcessor,
)
from metadata.data_insight.processor.data_processor import DataProcessor
from metadata.data_insight.processor.entity_report_data_processor import (
EntityReportDataProcessor,
@ -97,6 +101,8 @@ class DataInsightWorkflow(WorkflowStatusMixin):
EntityReportDataProcessor,
WebAnalyticEntityViewReportDataProcessor,
WebAnalyticUserActivityReportDataProcessor,
RawCostAnalysisReportDataProcessor,
AggregatedCostAnalysisReportDataProcessor,
]
] = None

View File

@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base Cost Analysis Report
"""
import traceback
from abc import abstractmethod
from typing import Iterable
from metadata.data_insight.processor.data_processor import DataProcessor
from metadata.generated.schema.analytics.reportData import ReportData
from metadata.generated.schema.entity.data import table
from metadata.utils.logger import data_insight_logger
logger = data_insight_logger()
class BaseCostAnalysisReportDataProcessor(DataProcessor):
"""Processor class used as a bridge to refine the data"""
def fetch_data(self) -> Iterable[table.Table]:
try:
yield from self.metadata.list_all_entities(
table.Table, limit=1000, fields=["*"]
)
except Exception as err:
logger.error(f"Error trying to fetch entity -- {err}")
logger.debug(traceback.format_exc())
@abstractmethod
def refine(self) -> Iterable[ReportData]:
"""Aggregate the data"""
raise NotImplementedError()
def process(self) -> Iterable[ReportData]:
yield from self.refine()
def get_status(self):
return self.processor_status

View File

@ -0,0 +1,206 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor class used to compute refined report data
"""
from __future__ import annotations
import traceback
from collections import defaultdict
from typing import Iterable
from metadata.data_insight.processor.cost_analysis_base import (
BaseCostAnalysisReportDataProcessor,
)
from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType
from metadata.generated.schema.analytics.reportDataType.aggregatedCostAnalysisReportData import (
AggregatedCostAnalysisReportData,
)
from metadata.generated.schema.analytics.reportDataType.rawCostAnalysisReportData import (
RawCostAnalysisReportData,
)
from metadata.generated.schema.type.lifeCycle import LifeCycle
from metadata.utils.logger import data_insight_logger
from metadata.utils.time_utils import get_end_of_day_timestamp_mill
logger = data_insight_logger()
UNUSED_DATA_ASSETS = "unusedDataAssets"
FREQUENTLY_USED_DATA_ASSETS = "frequentlyUsedDataAssets"
TOTAL_SIZE = "totalSize"
THREE_DAYS = "threeDays"
SEVEN_DAYS = "sevenDays"
FOURTEEN_DAYS = "fourteenDays"
THIRTY_DAYS = "thirtyDays"
SIXTY_DAYS = "sixtyDays"
DAYS = [
(3, THREE_DAYS),
(7, SEVEN_DAYS),
(14, FOURTEEN_DAYS),
(30, THIRTY_DAYS),
(60, SIXTY_DAYS),
]
class RawCostAnalysisReportDataProcessor(BaseCostAnalysisReportDataProcessor):
"""Processor class used as a bridge to refine the data"""
_data_processor_type = "RawCostAnalysisReportData"
def refine(self) -> Iterable[ReportData]:
"""Aggregate data
Returns:
list:
"""
for entity in self.fetch_data():
try:
cost_analysis_data = RawCostAnalysisReportData(
entity=self.metadata.get_entity_reference(
entity=type(entity), fqn=entity.fullyQualifiedName
)
)
if entity.lifeCycle:
cost_analysis_data.lifeCycle = entity.lifeCycle
table_profile = self.metadata.get_latest_table_profile(
fqn=entity.fullyQualifiedName
)
if table_profile.profile:
cost_analysis_data.sizeInByte = table_profile.profile.sizeInByte
if cost_analysis_data.lifeCycle or cost_analysis_data.sizeInByte:
yield ReportData(
timestamp=self.timestamp,
reportDataType=ReportDataType.RawCostAnalysisReportData.value,
data=cost_analysis_data,
)
self.processor_status.scanned(entity.name.__root__)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying fetch cost analysis data -- {err}")
class AggregatedCostAnalysisReportDataProcessor(BaseCostAnalysisReportDataProcessor):
"""Processor class used as a bridge to refine the data"""
_data_processor_type = "AggregatedCostAnalysisReportData"
def refine(self) -> Iterable[ReportData]:
"""Aggregate data
Returns:
list:
"""
refined_data = defaultdict(lambda: defaultdict(dict))
for entity in self.fetch_data():
try:
life_cycle = None
if entity.lifeCycle:
life_cycle = entity.lifeCycle
size = None
table_profile = self.metadata.get_latest_table_profile(
fqn=entity.fullyQualifiedName
)
if table_profile.profile:
size = table_profile.profile.sizeInByte
if life_cycle or size:
entity_type = str(entity.__class__.__name__)
service_type = str(entity.serviceType.name)
service_name = str(entity.service.name)
if not refined_data[str(entity_type)][service_type].get(
service_name
):
refined_data[entity_type][service_type][service_name] = {
TOTAL_SIZE: size or 0,
UNUSED_DATA_ASSETS: {
THREE_DAYS: 0,
SEVEN_DAYS: 0,
FOURTEEN_DAYS: 0,
THIRTY_DAYS: 0,
SIXTY_DAYS: 0,
},
FREQUENTLY_USED_DATA_ASSETS: {
THREE_DAYS: 0,
SEVEN_DAYS: 0,
FOURTEEN_DAYS: 0,
THIRTY_DAYS: 0,
SIXTY_DAYS: 0,
},
}
else:
refined_data[entity_type][service_type][service_name][
TOTAL_SIZE
] += (size or 0)
self._get_data_assets_dict(
life_cycle=life_cycle,
data=refined_data[entity_type][service_type][service_name],
)
self.processor_status.scanned(entity.name.__root__)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying fetch cost analysis data -- {err}")
yield from self._flatten_results(refined_data)
def _flatten_results(self, refined_data):
"""
Method to format the structure of data
"""
try:
for entity_type, entity_item in refined_data.items():
for service_type, service_item in entity_item.items():
for service_name, service_data in service_item.items():
aggregated_data = AggregatedCostAnalysisReportData(
entityType=str(entity_type),
serviceType=str(service_type),
serviceName=str(service_name),
**service_data,
)
yield ReportData(
timestamp=self.timestamp,
reportDataType=ReportDataType.AggregatedCostAnalysisReportData.value,
data=aggregated_data,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Unable to yield report data -- {err}")
@staticmethod
def _get_data_assets_dict(life_cycle: LifeCycle, data: dict):
"""
Helper method to calculate number of data assets within time period
"""
try:
if not life_cycle:
return
# Iterate over the different time periods and update the data
for days, key in DAYS:
days_before_timestamp = get_end_of_day_timestamp_mill(days=days)
if life_cycle.accessed.timestamp.__root__ <= days_before_timestamp:
data[UNUSED_DATA_ASSETS][key] += 1
else:
data[FREQUENTLY_USED_DATA_ASSETS][key] += 1
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error calculating data -- {err}")

View File

@ -151,7 +151,7 @@ class EntityReportDataProcessor(DataProcessor):
logger.debug(traceback.format_exc())
def refine(self) -> dict:
"""Aggegate data. We'll return a dictionary of the following shape
"""Aggregate data. We'll return a dictionary of the following shape
{
"entity_class": {
@ -159,7 +159,7 @@ class EntityReportDataProcessor(DataProcessor):
"tier": {
"missingDescription": <int>,
"missingOwner": <int>,
"hasOnwer": <int>,
"hasOwner": <int>,
"completedDescription": <int>,
}
}

View File

@ -167,6 +167,8 @@ public final class Entity {
public static final String ENTITY_REPORT_DATA = "EntityReportData";
public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "WebAnalyticEntityViewReportData";
public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = "WebAnalyticUserActivityReportData";
public static final String RAW_COST_ANALYSIS_REPORT_DATA = "RawCostAnalysisReportData";
public static final String AGGREGATED_COST_ANALYSIS_REPORT_DATA = "AggregatedCostAnalysisReportData";
//
// Reserved names in OpenMetadata

View File

@ -33,6 +33,8 @@ public class IndexUtil {
public static final String ENTITY_REPORT_DATA = "entityReportData";
public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "webAnalyticEntityViewReportData";
public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = "webAnalyticUserActivityReportData";
public static final String RAW_COST_ANALYSIS_REPORT_DATA = "rawCostAnalysisReportData";
public static final String AGGREGATED_COST_ANALYSIS_REPORT_DATA = "AggregatedCostAnalysisReportData";
public static final String REPORT_DATA = "reportData";
public static final Map<String, String> ENTITY_TYPE_TO_INDEX_MAP;
@ -125,6 +127,10 @@ public class IndexUtil {
return SearchIndexDefinition.ElasticSearchIndexType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) {
return SearchIndexDefinition.ElasticSearchIndexType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(RAW_COST_ANALYSIS_REPORT_DATA)) {
return SearchIndexDefinition.ElasticSearchIndexType.RAW_COST_ANALYSIS_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(AGGREGATED_COST_ANALYSIS_REPORT_DATA)) {
return SearchIndexDefinition.ElasticSearchIndexType.AGGREGATED_COST_ANALYSIS_REPORT_DATA_INDEX;
} else if (type.equalsIgnoreCase(Entity.CONTAINER)) {
return SearchIndexDefinition.ElasticSearchIndexType.CONTAINER_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.QUERY)) {

View File

@ -25,6 +25,8 @@ public class SearchIndexDefinition {
public static final String ENTITY_REPORT_DATA = "entityReportData";
public static final String WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA = "webAnalyticEntityViewReportData";
public static final String WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA = "webAnalyticUserActivityReportData";
public static final String RAW_COST_ANALYSIS_REPORT_DATA = "rawCostAnalysisReportData";
public static final String AGGREGATED_COST_ANALYSIS_REPORT_DATA = "aggregatedCostAnalysisReportData";
final EnumMap<ElasticSearchIndexType, ElasticSearchIndexStatus> elasticSearchIndexes =
new EnumMap<>(ElasticSearchIndexType.class);
public static final Map<String, Object> ENTITY_TO_MAPPING_SCHEMA_MAP = new HashMap<>();
@ -160,7 +162,15 @@ public class SearchIndexDefinition {
WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX(
WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA,
"web_analytic_user_activity_report_data_index",
"/elasticsearch/web_analytic_user_activity_report_data_index.json");
"/elasticsearch/web_analytic_user_activity_report_data_index.json"),
RAW_COST_ANALYSIS_REPORT_DATA_INDEX(
RAW_COST_ANALYSIS_REPORT_DATA,
"raw_cost_analysis_report_data_index",
"/elasticsearch/raw_cost_analysis_report_data_index.json"),
AGGREGATED_COST_ANALYSIS_REPORT_DATA_INDEX(
AGGREGATED_COST_ANALYSIS_REPORT_DATA,
"aggregated_cost_analysis_report_data_index",
"/elasticsearch/aggregated_cost_analysis_report_data_index.json");
public final String indexName;
public final String indexMappingFile;

View File

@ -32,6 +32,7 @@ import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.schema.tests.TestSuite;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.indexes.AggregatedCostAnalysisReportDataIndex;
import org.openmetadata.service.search.indexes.ChartIndex;
import org.openmetadata.service.search.indexes.ClassificationIndex;
import org.openmetadata.service.search.indexes.ContainerIndex;
@ -53,6 +54,7 @@ import org.openmetadata.service.search.indexes.MlModelServiceIndex;
import org.openmetadata.service.search.indexes.PipelineIndex;
import org.openmetadata.service.search.indexes.PipelineServiceIndex;
import org.openmetadata.service.search.indexes.QueryIndex;
import org.openmetadata.service.search.indexes.RawCostAnalysisReportDataIndex;
import org.openmetadata.service.search.indexes.SearchEntityIndex;
import org.openmetadata.service.search.indexes.SearchServiceIndex;
import org.openmetadata.service.search.indexes.StorageServiceIndex;
@ -139,6 +141,10 @@ public class SearchIndexFactory {
return new WebAnalyticEntityViewReportDataIndex((ReportData) entity);
case Entity.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA:
return new WebAnalyticUserActivityReportDataIndex((ReportData) entity);
case Entity.RAW_COST_ANALYSIS_REPORT_DATA:
return new RawCostAnalysisReportDataIndex((ReportData) entity);
case Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA:
return new AggregatedCostAnalysisReportDataIndex((ReportData) entity);
default:
LOG.warn("Ignoring Entity Type {}", entityType);
}

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.search.indexes;
import java.util.Map;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.service.util.JsonUtils;
public class AggregatedCostAnalysisReportDataIndex implements ElasticSearchIndex {
private final ReportData reportData;
public AggregatedCostAnalysisReportDataIndex(ReportData reportData) {
this.reportData = reportData;
}
@Override
public Map<String, Object> buildESDoc() {
Map<String, Object> doc = JsonUtils.getMap(reportData);
doc.put("entityType", "aggregatedCostAnalysisReportData");
return doc;
}
}

View File

@ -0,0 +1,20 @@
package org.openmetadata.service.search.indexes;
import java.util.Map;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.service.util.JsonUtils;
public class RawCostAnalysisReportDataIndex implements ElasticSearchIndex {
private final ReportData reportData;
public RawCostAnalysisReportDataIndex(ReportData reportData) {
this.reportData = reportData;
}
@Override
public Map<String, Object> buildESDoc() {
Map<String, Object> doc = JsonUtils.getMap(reportData);
doc.put("entityType", "rawCostAnalysisReportData");
return doc;
}
}

View File

@ -0,0 +1,72 @@
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"data": {
"properties": {
"totalSize": {
"type": "double"
},
"entityType": {
"type": "keyword"
},
"serviceName": {
"type": "keyword"
},
"serviceType": {
"type": "keyword"
},
"serviceOwner": {
"type": "keyword"
},
"unusedDataAssets": {
"properties": {
"sevenDays": {
"type": "double"
},
"sixtyDays": {
"type": "double"
},
"threeDays": {
"type": "double"
},
"thirtyDays": {
"type": "double"
},
"fourteenDays": {
"type": "double"
}
}
},
"frequentlyUsedDataAssets": {
"properties": {
"sevenDays": {
"type": "double"
},
"sixtyDays": {
"type": "double"
},
"threeDays": {
"type": "double"
},
"thirtyDays": {
"type": "double"
},
"fourteenDays": {
"type": "double"
}
}
}
}
},
"timestamp": {
"type": "date"
},
"reportDataType": {
"type": "keyword"
}
}
}
}

View File

@ -0,0 +1,93 @@
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"data": {
"properties": {
"entity": {
"properties": {
"id": {
"type": "keyword"
},
"href": {
"type": "keyword"
},
"name": {
"type": "text"
},
"type": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"description": {
"type": "text"
},
"displayName": {
"type": "text"
},
"fullyQualifiedName": {
"type": "keyword"
}
}
},
"lifeCycle": {
"properties": {
"created": {
"properties": {
"timestamp": {
"type": "date"
},
"accessedBy": {
"type": "keyword"
},
"accessedByAProcess": {
"type": "keyword"
}
}
},
"updated": {
"properties": {
"timestamp": {
"type": "date"
},
"accessedBy": {
"type": "keyword"
},
"accessedByAProcess": {
"type": "keyword"
}
}
},
"accessed": {
"properties": {
"timestamp": {
"type": "date"
},
"accessedBy": {
"type": "keyword"
},
"accessedByAProcess": {
"type": "keyword"
}
}
}
}
},
"sizeInByte": {
"type": "double"
}
}
},
"timestamp": {
"type": "date"
},
"reportDataType": {
"type": "keyword"
}
}
}
}

View File

@ -23,7 +23,9 @@
"enum": [
"EntityReportData",
"WebAnalyticUserActivityReportData",
"WebAnalyticEntityViewReportData"
"WebAnalyticEntityViewReportData",
"RawCostAnalysisReportData",
"AggregatedCostAnalysisReportData"
]
},
"data": {
@ -31,7 +33,9 @@
"oneOf": [
{"$ref": "reportDataType/entityReportData.json"},
{"$ref": "reportDataType/webAnalyticUserActivityReportData.json"},
{"$ref": "reportDataType/webAnalyticEntityViewReportData.json"}
{"$ref": "reportDataType/webAnalyticEntityViewReportData.json"},
{"$ref": "reportDataType/rawCostAnalysisReportData.json"},
{"$ref": "reportDataType/aggregatedCostAnalysisReportData.json"}
]
}
},

View File

@ -0,0 +1,70 @@
{
"$id": "https://open-metadata.org/schema/analytics/aggregatedCostAnalysisReportData.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "aggregatedCostAnalysisReportData",
"type": "object",
"javaType": "org.openmetadata.schema.analytics.AggregatedCostAnalysisReportData.json",
"description": "Aggregated data for Cost Analysis Report.",
"definitions": {
"dataAssetCount": {
"description": "Count of Data Assets over a time period",
"type": "object",
"properties": {
"threeDays": {
"description": "Data Asset Count for 3 days",
"type": "number"
},
"sevenDays": {
"description": "Data Asset Count for 7 days",
"type": "number"
},
"fourteenDays": {
"description": "Data Asset Count for 14 days",
"type": "number"
},
"thirtyDays": {
"description": "Data Asset Count for 30 days",
"type": "number"
},
"sixtyDays": {
"description": "Data Asset Count for 60 days",
"type": "number"
}
},
"additionalProperties": false,
"required": ["entity"]
}
},
"properties": {
"unusedDataAssets": {
"description": "Number of unused Data Assets over a period of time",
"$ref": "#/definitions/dataAssetCount"
},
"frequentlyUsedDataAssets": {
"description": "Number of frequently used Data Assets over a period of time",
"$ref": "#/definitions/dataAssetCount"
},
"totalSize": {
"description": "Total Size based in Bytes",
"type": "number"
},
"serviceName": {
"type": "string",
"description": "Name of the service"
},
"serviceType": {
"description": "Type of the service",
"type": "string"
},
"entityType": {
"type": "string",
"description": "Type of the entity"
},
"serviceOwner": {
"description": "Name of the service owner",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,24 @@
{
"$id": "https://open-metadata.org/schema/analytics/rawCostAnalysisReportData.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "rawCostAnalysisReportData",
"type": "object",
"javaType": "org.openmetadata.schema.analytics.RawCostAnalysisReportData.json",
"description": "Raw data for Cost Analysis Report.",
"properties": {
"entity": {
"description": "Entity of the life cycle data",
"$ref": "../../type/entityReference.json"
},
"lifeCycle": {
"$ref": "../../type/lifeCycle.json",
"description": "Life Cycle data related to the entity"
},
"sizeInByte": {
"description": "Entity size in bytes",
"type": "number"
}
},
"required": ["entity"],
"additionalProperties": false
}

View File

@ -51,7 +51,9 @@
"enum": [
"entity_report_data_index",
"web_analytic_entity_view_report_data_index",
"web_analytic_user_activity_report_data_index"
"web_analytic_user_activity_report_data_index",
"raw_cost_analysis_report_data_index",
"aggregated_cost_analysis_report_data_index"
]
}
},

View File

@ -1627,4 +1627,3 @@ export const updateTableFieldDescription = (
verifyResponseStatusCode('@updateDescription', 200);
};