diff --git a/ingestion/src/metadata/data_insight/processor/reports/cost_analysis_report_data_processor.py b/ingestion/src/metadata/data_insight/processor/reports/cost_analysis_report_data_processor.py index bf951a8d4e4..82641f1f282 100644 --- a/ingestion/src/metadata/data_insight/processor/reports/cost_analysis_report_data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/reports/cost_analysis_report_data_processor.py @@ -18,7 +18,7 @@ from __future__ import annotations import traceback from collections import defaultdict from copy import deepcopy -from typing import Iterable, Optional +from typing import Dict, Iterable, Optional from metadata.data_insight.processor.reports.data_processor import DataProcessor from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType @@ -28,9 +28,11 @@ from metadata.generated.schema.analytics.reportDataType.aggregatedCostAnalysisRe from metadata.generated.schema.analytics.reportDataType.rawCostAnalysisReportData import ( RawCostAnalysisReportData, ) -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.lifeCycle import LifeCycle from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.utils import model_str +from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP from metadata.utils.logger import data_insight_logger from metadata.utils.time_utils import get_end_of_day_timestamp_mill @@ -98,36 +100,32 @@ class RawCostAnalysisReportDataProcessor(DataProcessor): data=value, ) # type: ignore - def refine(self, entity: Table) -> None: + def refine(self, entity: Dict) -> None: """Aggregate data Returns: list: """ - try: - cost_analysis_data = RawCostAnalysisReportData( - entity=self.metadata.get_entity_reference( - entity=type(entity), fqn=entity.fullyQualifiedName + for entity_fqn, cost_analysis_report_data in entity.items(): + try: + cost_analysis_data = RawCostAnalysisReportData( + entity=EntityReference( + id=cost_analysis_report_data.entity.id, + fullyQualifiedName=model_str( + cost_analysis_report_data.entity.fullyQualifiedName + ), + type=ENTITY_REFERENCE_TYPE_MAP[ + type(cost_analysis_report_data.entity).__name__ + ], + ), + lifeCycle=cost_analysis_report_data.life_cycle, + sizeInByte=cost_analysis_report_data.size, ) - ) - if entity.lifeCycle: - cost_analysis_data.lifeCycle = entity.lifeCycle - - table_profile = self.metadata.get_latest_table_profile( - fqn=entity.fullyQualifiedName - ) - if table_profile.profile: - cost_analysis_data.sizeInByte = table_profile.profile.sizeInByte - - if cost_analysis_data.lifeCycle or cost_analysis_data.sizeInByte: - self._refined_data[ - entity.fullyQualifiedName.__root__ - ] = cost_analysis_data - - self.processor_status.scanned(entity.name.__root__) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(f"Error trying fetch cost analysis data -- {err}") + self._refined_data[entity_fqn] = cost_analysis_data + self.processor_status.scanned(entity_fqn) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error trying fetch cost analysis data -- {err}") def get_status(self): return self.processor_status @@ -142,6 +140,7 @@ class AggregatedCostAnalysisReportDataProcessor(DataProcessor): super().__init__(metadata) self._refined_data = defaultdict(lambda: defaultdict(dict)) self.post_hook = self._post_hook_fn + self.clean_up_cache = True def yield_refined_data(self) -> Iterable[ReportData]: """Yield refined data""" @@ -152,27 +151,17 @@ class AggregatedCostAnalysisReportDataProcessor(DataProcessor): data=data, ) # type: ignore - def refine(self, entity: Table) -> None: + def refine(self, entity: Dict) -> None: """Aggregate data Returns: list: """ try: - life_cycle = None - if entity.lifeCycle: - life_cycle = entity.lifeCycle - size = None - table_profile = self.metadata.get_latest_table_profile( - fqn=entity.fullyQualifiedName - ) - if table_profile.profile: - size = table_profile.profile.sizeInByte - - if life_cycle or size: - entity_type = str(entity.__class__.__name__) - service_type = str(entity.serviceType.name) - service_name = str(entity.service.name) + for entity_fqn, cost_analysis_report_data in entity.items(): + entity_type = str(cost_analysis_report_data.entity.__class__.__name__) + service_type = str(cost_analysis_report_data.entity.serviceType.name) + service_name = str(cost_analysis_report_data.entity.service.name) if not self._refined_data[str(entity_type)][service_type].get( service_name ): @@ -185,18 +174,18 @@ class AggregatedCostAnalysisReportDataProcessor(DataProcessor): else: self._refined_data[entity_type][service_type][service_name][ TOTAL_SIZE - ] += (size or 0) + ] += (cost_analysis_report_data.size or 0) self._refined_data[entity_type][service_type][service_name][ TOTAL_COUNT ] += 1 self._get_data_assets_dict( - life_cycle=life_cycle, - size=size, + life_cycle=cost_analysis_report_data.life_cycle, + size=cost_analysis_report_data.size, data=self._refined_data[entity_type][service_type][service_name], ) - self.processor_status.scanned(entity.name.__root__) + self.processor_status.scanned(entity_fqn) except Exception as err: logger.debug(traceback.format_exc()) logger.error(f"Error trying fetch cost analysis data -- {err}") @@ -247,7 +236,10 @@ class AggregatedCostAnalysisReportDataProcessor(DataProcessor): # Iterate over the different time periods and update the data for days, key in DAYS: days_before_timestamp = get_end_of_day_timestamp_mill(days=days) - if life_cycle.accessed.timestamp.__root__ <= days_before_timestamp: + if ( + life_cycle.accessed + and life_cycle.accessed.timestamp.__root__ <= days_before_timestamp + ): data[UNUSED_DATA_ASSETS][COUNT][key] += 1 data[UNUSED_DATA_ASSETS][SIZE][key] += size or 0 else: diff --git a/ingestion/src/metadata/data_insight/processor/reports/data_processor.py b/ingestion/src/metadata/data_insight/processor/reports/data_processor.py index eb869844bf4..8a582aba24d 100644 --- a/ingestion/src/metadata/data_insight/processor/reports/data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/reports/data_processor.py @@ -47,6 +47,7 @@ class DataProcessor(abc.ABC): self._refined_data = {} self.post_hook: Optional[Callable] = None self.pre_hook: Optional[Callable] = None + self.clean_up_cache: bool = False @classmethod def create(cls, _data_processor_type, metadata: OpenMetadata): diff --git a/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py b/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py index af7381aa781..aaed2d5358d 100644 --- a/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py +++ b/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py @@ -13,16 +13,31 @@ Producer class for data insight entity reports """ import traceback -from typing import Iterable +from typing import Dict, Iterable, Optional + +from pydantic import BaseModel from metadata.data_insight.producer.producer_interface import ProducerInterface from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.type.lifeCycle import LifeCycle +from metadata.ingestion.api.models import Entity +from metadata.ingestion.ometa.utils import model_str from metadata.utils.logger import data_insight_logger logger = data_insight_logger() +class CostAnalysisReportData(BaseModel): + """ + Query executed get life cycle + """ + + entity: Entity + life_cycle: Optional[LifeCycle] + size: Optional[float] + + class CostAnalysisProducer(ProducerInterface): """entity producer class""" @@ -36,27 +51,76 @@ class CostAnalysisProducer(ProducerInterface): and database_service.connection.config.supportsProfiler.__root__ ) + def _check_life_cycle_and_size_data( + self, table: Table + ) -> Optional[CostAnalysisReportData]: + """ + Method to check if the valid life cycle and table size data is present for the table + """ + cost_analysis_report_data = CostAnalysisReportData(entity=table) + if table.lifeCycle and table.lifeCycle.accessed: + cost_analysis_report_data.life_cycle = table.lifeCycle + + table_profile = self.metadata.get_latest_table_profile( + fqn=table.fullyQualifiedName + ) + if table_profile.profile: + cost_analysis_report_data.size = table_profile.profile.sizeInByte + + if cost_analysis_report_data.life_cycle or cost_analysis_report_data.size: + return cost_analysis_report_data + return None + + def life_cycle_data_dict( + self, entities_cache: Optional[Dict], database_service_fqn: str + ) -> Iterable[Dict]: + """ + Cache the required lifecycle data to be used by the processors and return the dict + """ + if entities_cache.get(database_service_fqn): + yield entities_cache[database_service_fqn] + else: + tables = self.metadata.list_all_entities( + Table, + limit=100, + skip_on_failure=True, + params={"database": database_service_fqn}, + ) + entities_cache[database_service_fqn] = {} + + for table in tables: + try: + cost_analysis_data = self._check_life_cycle_and_size_data( + table=table + ) + if cost_analysis_data: + entities_cache[database_service_fqn][ + model_str(table.fullyQualifiedName) + ] = cost_analysis_data + except Exception as err: + logger.error( + f"Error trying to fetch cost analysis data for [{model_str(table.fullyQualifiedName)}] -- {err}" + ) + logger.debug(traceback.format_exc()) + + yield entities_cache[database_service_fqn] + # pylint: disable=dangerous-default-value - def fetch_data(self, limit=100, fields=["*"]) -> Iterable: + def fetch_data( + self, limit=100, fields=["*"], entities_cache=None + ) -> Optional[Iterable[Dict]]: database_services = self.metadata.list_all_entities( DatabaseService, limit=limit, fields=fields, skip_on_failure=True ) - entities_list = [] for database_service in database_services or []: try: if self._check_profiler_and_usage_support(database_service): - entities_list.extend( - self.metadata.list_all_entities( - Table, - limit=limit, - fields=fields, - skip_on_failure=True, - params={ - "database": database_service.fullyQualifiedName.__root__ - }, - ) + yield from self.life_cycle_data_dict( + entities_cache=entities_cache, + database_service_fqn=model_str( + database_service.fullyQualifiedName + ), ) except Exception as err: logger.error(f"Error trying to fetch entities -- {err}") logger.debug(traceback.format_exc()) - return entities_list diff --git a/ingestion/src/metadata/data_insight/producer/entity_producer.py b/ingestion/src/metadata/data_insight/producer/entity_producer.py index c7f7c6bdf25..8528ff6e443 100644 --- a/ingestion/src/metadata/data_insight/producer/entity_producer.py +++ b/ingestion/src/metadata/data_insight/producer/entity_producer.py @@ -52,7 +52,7 @@ class EntityProducer(ProducerInterface): ] # pylint: disable=dangerous-default-value - def fetch_data(self, limit=100, fields=["*"]) -> Iterable: + def fetch_data(self, limit=100, fields=["*"], entities_cache=None) -> Iterable: for entity in self.entities: try: yield from self.metadata.list_all_entities( diff --git a/ingestion/src/metadata/data_insight/producer/producer_interface.py b/ingestion/src/metadata/data_insight/producer/producer_interface.py index 5032ba647be..8f20ab3034f 100644 --- a/ingestion/src/metadata/data_insight/producer/producer_interface.py +++ b/ingestion/src/metadata/data_insight/producer/producer_interface.py @@ -24,6 +24,6 @@ class ProducerInterface(ABC): self.metadata = metadata @abstractmethod - def fetch_data(self, limit, fields): + def fetch_data(self, limit, fields, entities_cache=None): """fetch data from source""" raise NotImplementedError diff --git a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py index 068b009228d..3cb51b1bc03 100644 --- a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py +++ b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py @@ -75,7 +75,7 @@ class WebAnalyticsProducer(ProducerInterface): CACHED_EVENTS.clear() def fetch_data( - self, limit=100, fields=["*"] + self, limit=100, fields=["*"], entities_cache=None ): # pylint: disable=dangerous-default-value """fetch data for web analytics event""" events = self._get_events(None, limit, fields) diff --git a/ingestion/src/metadata/data_insight/source/metadata.py b/ingestion/src/metadata/data_insight/source/metadata.py index 49e01744ad4..47ab829600b 100644 --- a/ingestion/src/metadata/data_insight/source/metadata.py +++ b/ingestion/src/metadata/data_insight/source/metadata.py @@ -74,6 +74,7 @@ class DataInsightSource(Source): super().__init__() self.metadata = metadata self.date = datetime.utcnow().strftime("%Y-%m-%d") + self.entities_cache = {} _processors = self._instantiate_processors() self._processors: Dict[ @@ -130,11 +131,19 @@ class DataInsightSource(Source): processor = cast(DataProcessor, processor) processor.pre_hook() if processor.pre_hook else None # pylint: disable=expression-not-assigned - for data in producer.fetch_data(fields=["owner", "tags"]): + for data in ( + producer.fetch_data( + fields=["owner", "tags"], entities_cache=self.entities_cache + ) + or [] + ): processor.refine(data) processor.post_hook() if processor.post_hook else None # pylint: disable=expression-not-assigned + if processor.clean_up_cache: + self.entities_cache.clear() + for data in processor.yield_refined_data(): yield Either(left=None, right=DataInsightRecord(data=data)) except KeyError as key_error: