From 9d594efcb588af5c0e6cd634306e7d0276cfefda Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 11 Nov 2022 06:35:33 +0100 Subject: [PATCH] Fixes #8473 - Implements KPI Workflow (#8657) * Added KPI workflow logic * Update ingestion/src/metadata/data_insight/runner/run_result_registry.py * ran py format Co-authored-by: Pere Miquel Brull --- .../src/metadata/data_insight/api/workflow.py | 112 +++++++-- .../data_insight/runner/kpi_runner.py | 111 +++++++++ .../runner/run_result_registry.py | 162 +++++++++++++ .../data_insight/sink/metadata_rest.py | 47 ++-- .../ometa/mixins/data_insight_mixin.py | 81 +++++++ .../src/metadata/ingestion/ometa/ometa_api.py | 11 + .../test_data_insight_workflow.py | 149 +++++++++++- .../kpi/test_registry_functions.py | 217 ++++++++++++++++++ .../json/schema/dataInsight/kpi/basic.json | 8 + 9 files changed, 858 insertions(+), 40 deletions(-) create mode 100644 ingestion/src/metadata/data_insight/runner/kpi_runner.py create mode 100644 ingestion/src/metadata/data_insight/runner/run_result_registry.py create mode 100644 ingestion/tests/unit/data_insight/kpi/test_registry_functions.py diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index f6801eeda2c..ea4b8cfbdd1 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -20,6 +20,7 @@ Workflow definition for the ORM Profiler. from __future__ import annotations import traceback +from datetime import datetime from typing import Optional, Union, cast from pydantic import ValidationError @@ -34,7 +35,9 @@ from metadata.data_insight.processor.web_analytic_report_data_processor import ( WebAnalyticEntityViewReportDataProcessor, WebAnalyticUserActivityReportDataProcessor, ) +from metadata.data_insight.runner.kpi_runner import KpiRunner from metadata.generated.schema.analytics.reportData import ReportDataType +from metadata.generated.schema.dataInsight.kpi.kpi import Kpi from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -47,7 +50,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata from metadata.utils.logger import data_insight_logger from metadata.utils.workflow_helper import ( set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, @@ -56,6 +59,8 @@ from metadata.utils.workflow_output_handler import print_data_insight_status logger = data_insight_logger() +NOW = datetime.utcnow().timestamp() * 1000 + class DataInsightWorkflow: """ @@ -82,6 +87,8 @@ class DataInsightWorkflow: ] ] = None + self.kpi_runner: Optional[KpiRunner] = None + if self.config.sink: self.sink = get_sink( sink_type="metadata-rest", @@ -97,6 +104,84 @@ class DataInsightWorkflow: _from="ingestion", ) + @staticmethod + def _is_kpi_active(entity: Kpi) -> bool: + """Check if a KPI is active + + Args: + entity (Kpi): KPI entity + + Returns: + Kpi: + """ + + start_date = entity.startDate.__root__ + end_date = entity.endDate.__root__ + + if not start_date or not end_date: + logger.warning( + f"Start date or End date was not defined.\n\t-startDate: {start_date}\n\t-end_date: {end_date}\n" + "We won't be running the KPI validation" + ) + return False + + if start_date <= NOW <= end_date: + return True + + return False + + def _get_kpis(self) -> list[Kpi]: + """get the list of KPIs and return the active ones + + Returns: + _type_: _description_ + """ + + kpis: EntityList[Kpi] = self.metadata.list_entities( + entity=Kpi, fields="*" # type: ignore + ) + + return [kpi for kpi in kpis.entities if self._is_kpi_active(kpi)] + + def _execute_data_processor(self): + """Data processor method to refine raw data into report data and ingest it in ES""" + for report_data_type in ReportDataType: + logger.info(f"Processing data for report type {report_data_type}") + try: + self.data_processor = DataProcessor.create( + _data_processor_type=report_data_type.value, metadata=self.metadata + ) + for record in self.data_processor.process(): + if hasattr(self, "sink"): + self.sink.write_record(record) + self.es_sink.write_record(record) + else: + logger.warning( + "No sink attribute found, skipping ingestion of KPI result" + ) + + except Exception as exc: + logger.error( + f"Error while executing data insight workflow for report type {report_data_type} -- {exc}" + ) + logger.debug(traceback.format_exc()) + self.status.failure( + f"Error while executing data insight workflow for report type {report_data_type} -- {exc}" + ) + + def _execute_kpi_runner(self): + """KPI runner method to run KPI definiton against platform latest metric""" + kpis = self._get_kpis() + self.kpi_runner = KpiRunner(kpis, self.metadata) + + for kpi_result in self.kpi_runner.run(): + if hasattr(self, "sink"): + self.sink.write_record(kpi_result) + else: + logger.warning( + "No sink attribute found, skipping ingestion of KPI result" + ) + @classmethod def create(cls, config_dict: dict) -> DataInsightWorkflow: """instantiate a class object @@ -122,24 +207,15 @@ class DataInsightWorkflow: raise err def execute(self): - for report_data_type in ReportDataType: - try: - self.data_processor = DataProcessor.create( - _data_processor_type=report_data_type.value, metadata=self.metadata - ) - for record in self.data_processor.process(): - if hasattr(self, "sink"): - self.sink.write_record(record) - self.es_sink.write_record(record) + """Execute workflow""" + logger.info("Starting data processor executiong") + self._execute_data_processor() + logger.info("Data processor finished running") - except Exception as exc: - logger.error( - f"Error while executing data insight workflow for report type {report_data_type} -- {exc}" - ) - logger.debug(traceback.format_exc()) - self.status.failure( - f"Error while executing data insight workflow for report type {report_data_type} -- {exc}" - ) + logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.") + logger.info("Starting KPI runner") + self._execute_kpi_runner() + logger.info("KPI runner finished running") def raise_from_status(self, raise_warnings=False): if self.data_processor and self.data_processor.get_status().failures: diff --git a/ingestion/src/metadata/data_insight/runner/kpi_runner.py b/ingestion/src/metadata/data_insight/runner/kpi_runner.py new file mode 100644 index 00000000000..697ea3146fe --- /dev/null +++ b/ingestion/src/metadata/data_insight/runner/kpi_runner.py @@ -0,0 +1,111 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Runner class used to check KPI status +""" + +from __future__ import annotations + +import time as tme +from datetime import datetime, time +from typing import Iterator, Optional + +from metadata.data_insight.runner.run_result_registry import run_result_registry +from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart +from metadata.generated.schema.dataInsight.dataInsightChartResult import ( + DataInsightChartResult, +) +from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget +from metadata.generated.schema.dataInsight.kpi.kpi import Kpi +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.logger import data_insight_logger + +logger = data_insight_logger() + +TIMEOUT = 30 + + +class KpiRunner: + """KPI runner class + + Attrs: + kpis: list[Kpi] + metadata: OpenMetadata + processor_status: SourceStatus + """ + + def __init__(self, kpis: list[Kpi], metadata: OpenMetadata) -> None: + self.kpis = kpis + + self.metadata = metadata + self.datetime = datetime.utcnow() + self.processor_status = SourceStatus() + + def _get_data_insight_chart_result( + self, data_insight_chart: EntityReference + ) -> DataInsightChartResult: + """get data insight result for a specific chart + + Args: + data_insight_chart (EntityReference): _description_ + """ + results = None + data_insight_chart_entity: Optional[ + DataInsightChart + ] = self.metadata.get_by_name( + entity=DataInsightChart, + fqn=data_insight_chart.fullyQualifiedName, + fields="*", + ) + + if not data_insight_chart_entity: + logger.warning( + f"No entity returned for dataInsightChart {data_insight_chart.name}" + ) + return None + + timeout = tme.time() + TIMEOUT + while True: + results = self.metadata.get_aggregated_data_insight_results( + start_ts=int( + datetime.combine(self.datetime, time.min).timestamp() * 1000 + ), + end_ts=int( + datetime.combine(self.datetime, time.max).timestamp() * 1000 + ), + data_insight_chart_nane=data_insight_chart_entity.name.__root__, + data_report_index=data_insight_chart_entity.dataIndexType.value, + ) + if results.data or tme.time() > timeout: + break + + return results + + def run(self) -> Iterator[KpiResult]: + """Method to run the KPI status check""" + for kpi in self.kpis: + kpi_target: list[KpiTarget] = kpi.targetDefinition + data_insight_chart_result: DataInsightChartResult = ( + self._get_data_insight_chart_result(kpi.dataInsightChart) + ) + kpi_result = run_result_registry.registry[kpi.dataInsightChart.name]( + kpi_target, + data_insight_chart_result.data, + kpi.fullyQualifiedName, + int(self.datetime.timestamp() * 1000), + ) + + yield kpi_result + + def get_status(self): + return self.processor_status diff --git a/ingestion/src/metadata/data_insight/runner/run_result_registry.py b/ingestion/src/metadata/data_insight/runner/run_result_registry.py new file mode 100644 index 00000000000..5c9ffbee220 --- /dev/null +++ b/ingestion/src/metadata/data_insight/runner/run_result_registry.py @@ -0,0 +1,162 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +function to compute kpi +""" + +from __future__ import annotations + +import ast +import traceback + +from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import ( + PercentageOfEntitiesWithDescriptionByType, +) +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import ( + PercentageOfEntitiesWithOwnerByType, +) +from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.utils.dispatch import enum_register +from metadata.utils.logger import sqa_interface_registry_logger + +logger = sqa_interface_registry_logger() + + +def percentage_of_entities_with_description_kpi_result( + kpi_target: list[KpiTarget], + results: list[PercentageOfEntitiesWithDescriptionByType], + kpi_fqn: FullyQualifiedEntityName, + timestamp: int, +) -> KpiResult: + """Validate percentage of entities with description kpi + + Args: + kpi_target (list[KpiTarget]): + results (list[PercentageOfEntitiesWithDescriptionByType]): + + Raises: + ValueError: + RuntimeError: + + Returns: + KpiResult: + """ + if not results: + raise ValueError("Cannot compute KPI. No results found") + + latest_chart_result = {} + + try: + total_entity_with_description = sum( + result.completedDescription for result in results + ) + total_entity_count = sum(result.entityCount for result in results) + latest_chart_result["completedDescriptionFraction"] = ( + total_entity_with_description / total_entity_count + ) + except (TypeError, ZeroDivisionError) as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Could not run KPI result - {exc}") + raise RuntimeError(exc) + + latest_chart_result["completedDescription"] = total_entity_with_description + + target_results = [] + + for target in kpi_target: + try: + value = latest_chart_result[target.name] + except KeyError as exc: + logger.warning(f"Could not compute KPI result for {target.name} - {exc}") + else: + target_results.append( + KpiTarget( + name=target.name, + value=value, + targetMet=value > ast.literal_eval(target.value), + ) + ) + + return KpiResult( + timestamp=timestamp, + targetResult=target_results, + kpiFqn=kpi_fqn, + ) + + +def percentage_of_entities_with_owner_kpi_result( + kpi_target: list[KpiTarget], + results: list[PercentageOfEntitiesWithOwnerByType], + kpi_fqn: FullyQualifiedEntityName, + timestamp: int, +) -> KpiResult: + """_summary_ + + Args: + kpi_target (list[KpiTarget]): _description_ + data_insight_chart_result (list[PercentageOfEntitiesWithOwnerByType]): _description_ + + Raises: + RuntimeError: _description_ + + Returns: + KpiResult: _description_ + """ + + if not results: + raise ValueError("Cannot compute KPI. No results found") + + latest_chart_result = {} + + try: + total_entity_with_owner = sum(result.hasOwner for result in results) + total_entity_count = sum(result.entityCount for result in results) + latest_chart_result["hasOwnerFraction"] = ( + total_entity_with_owner / total_entity_count + ) + except (TypeError, ZeroDivisionError) as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Could not run KPI result - {exc}") + raise RuntimeError(exc) + + latest_chart_result["hasOwner"] = total_entity_with_owner + + target_results = [] + + for target in kpi_target: + try: + value = latest_chart_result[target.name] + except KeyError as exc: + logger.warning(f"Could not compute KPI result for {target.name} - {exc}") + else: + target_results.append( + KpiTarget( + name=target.name, + value=value, + targetMet=value > ast.literal_eval(target.value), + ) + ) + + return KpiResult( + timestamp=timestamp, + targetResult=target_results, + kpiFqn=kpi_fqn, + ) + + +run_result_registry = enum_register() +run_result_registry.add("PercentageOfEntitiesWithDescriptionByType")( + percentage_of_entities_with_description_kpi_result +) +run_result_registry.add("PercentageOfEntitiesWithOwnerByType")( + percentage_of_entities_with_owner_kpi_result +) diff --git a/ingestion/src/metadata/data_insight/sink/metadata_rest.py b/ingestion/src/metadata/data_insight/sink/metadata_rest.py index a083e99f899..a795b11fc6c 100644 --- a/ingestion/src/metadata/data_insight/sink/metadata_rest.py +++ b/ingestion/src/metadata/data_insight/sink/metadata_rest.py @@ -14,10 +14,11 @@ OpenMetadata REST Sink implementation for the Data Insight Profiler results """ import traceback -from typing import Optional +from typing import Optional, Union from metadata.config.common import ConfigModel from metadata.generated.schema.analytics.reportData import ReportData +from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -66,21 +67,33 @@ class MetadataRestSink(Sink[Entity]): def close(self) -> None: self.metadata.close() - def write_record(self, record: ReportData) -> None: + def write_record(self, record: Union[ReportData, KpiResult]) -> None: try: - self.metadata.add_data_insight_report_data(record) - logger.info( - f"Successfully ingested data insight for {record.data.__class__.__name__ if record.data else 'Unknown'}" - ) - self.status.records_written( - f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}" - ) + if isinstance(record, ReportData): + self.metadata.add_data_insight_report_data(record) + logger.info( + "Successfully ingested data insight for" + f"{record.data.__class__.__name__ if record.data else 'Unknown'}" + ) + self.status.records_written( + f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}" + ) + if isinstance(record, KpiResult): + self.metadata.add_kpi_result(fqn=record.kpiFqn.__root__, record=record) + logger.info(f"Successfully ingested KPI for {record.kpiFqn}") + self.status.records_written(f"Data Insight: {record.kpiFqn}") + except APIError as err: - logger.error( - "Failed to sink data insight data for " - f"{record.data.__class__.__name__ if record.data else 'Unknown'} - {err}" - ) - logger.debug(traceback.format_exc()) - self.status.failure( - f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}" - ) + if isinstance(record, ReportData): + logger.debug(traceback.format_exc()) + logger.error( + "Failed to sink data insight data for " + f"{record.data.__class__.__name__ if record.data else 'Unknown'} - {err}" + ) + self.status.failure( + f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}" + ) + if isinstance(record, KpiResult): + logger.debug(traceback.format_exc()) + logger.error(f"Failed to sink KPI reasults for {record.kpiFqn} - {err}") + self.status.failure(f"KPI Result: {record.kpiFqn}") diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py index 8cc0043514b..22369513faa 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py @@ -16,7 +16,17 @@ To be used by OpenMetadata class from __future__ import annotations +from typing import Optional + from metadata.generated.schema.analytics.reportData import ReportData +from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import ( + CreateKpiRequest, +) +from metadata.generated.schema.dataInsight.dataInsightChartResult import ( + DataInsightChartResult, +) +from metadata.generated.schema.dataInsight.kpi.basic import KpiResult +from metadata.generated.schema.dataInsight.kpi.kpi import Kpi class DataInisghtMixin: @@ -34,6 +44,18 @@ class DataInisghtMixin: return resp + def add_kpi_result(self, fqn: str, record: KpiResult) -> KpiResult: + """Given a ReportData object convert it to a json payload + and send a POST request to the report data endpoint + + Args: + record (ReportData): report data + """ + + resp = self.client.put(f"/kpi/{fqn}/kpiResult", record.json()) + + return resp + def get_data_insight_report_data( self, start_ts: int, end_ts: int, report_data_type: str ) -> dict[str, list[ReportData]]: @@ -54,3 +76,62 @@ class DataInisghtMixin: ) return resp + + def get_aggregated_data_insight_results( + self, + start_ts: int, + end_ts: int, + data_insight_chart_nane: str, + data_report_index: str, + params: Optional[dict] = None, + ) -> DataInsightChartResult: + """_summary_ + + Args: + start_ts (int): _description_ + end_ts (int): _description_ + data_insight_chart_nane (str): _description_ + data_report_index (str): _description_ + params (Optional[dict], optional): _description_. Defaults to None. + + Returns: + DataInsightChartResult: _description_ + """ + + request_params = { + "startTs": start_ts, + "endTs": end_ts, + "dataInsightChartName": data_insight_chart_nane, + "dataReportIndex": data_report_index, + } + + if params: + request_params = {**request_params, **params} + + resp = self.client.get( + "/dataInsight/aggregate", + request_params, + ) + + return DataInsightChartResult.parse_obj(resp) + + def get_kpi_result(self, fqn: str, start_ts, end_ts) -> list[KpiResult]: + """Given FQN return KPI results + + Args: + fqn (str): fullyQualifiedName + """ + + params = {"startTs": start_ts, "endTs": end_ts} + + resp = self.client.get( + f"/kpi/{fqn}/kpiResult", + params, + ) + + return [KpiResult(**data) for data in resp["data"]] + + def create_kpi(self, create: CreateKpiRequest) -> Kpi: + resp = self.client.post("/kpi", create.json()) + + return Kpi.parse_obj(resp) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 69bf9c65c56..affa59859e2 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -29,6 +29,8 @@ from metadata.generated.schema.analytics.webAnalyticEventData import ( WebAnalyticEventData, ) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart +from metadata.generated.schema.dataInsight.kpi.kpi import Kpi from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database @@ -402,6 +404,15 @@ class OpenMetadata( if issubclass(entity, WebAnalyticEventData): return "/analytics/webAnalyticEvent/collect" + if issubclass(entity, DataInsightChart): + return "/dataInsight" + + if issubclass( + entity, + Kpi, + ): + return "/kpi" + raise MissingEntityTypeException( f"Missing {entity} type when generating suffixes" ) diff --git a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py index d19ee7174cc..5d4ccc1996f 100644 --- a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py +++ b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py @@ -13,19 +13,39 @@ Validate workflow configs and filters """ -import time +from __future__ import annotations + import unittest from copy import deepcopy -from datetime import datetime, timedelta +from datetime import datetime, time +from time import sleep import pytest import requests from metadata.data_insight.api.workflow import DataInsightWorkflow +from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex from metadata.generated.schema.analytics.reportData import ReportDataType +from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import ( + CreateKpiRequest, +) +from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart +from metadata.generated.schema.dataInsight.dataInsightChartResult import ( + DataInsightChartResult, + DataInsightChartType, +) +from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget +from metadata.generated.schema.dataInsight.kpi.kpi import Kpi +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import ( + PercentageOfEntitiesWithDescriptionByType, +) +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import ( + PercentageOfEntitiesWithOwnerByType, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.parser import ParsingConfigurationError from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -65,6 +85,32 @@ class DataInsightWorkflowTests(unittest.TestCase): ) ) + cls.start_ts = int( + datetime.combine(datetime.utcnow(), time.min).timestamp() * 1000 + ) + cls.end_ts = int( + datetime.combine(datetime.utcnow(), time.max).timestamp() * 1000 + ) + + completed_description_chart = cls.metadata.get_by_name( + DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*" + ) + create = CreateKpiRequest( + name="CompletedDescription", + dataInsightChart=EntityReference( + type="dataInsightChart", id=completed_description_chart.id + ), + description="foo", + startDate=cls.start_ts, + endDate=cls.end_ts, + targetDefinition=[ + KpiTarget(name="completedDescriptionFraction", value="0.63") + ], + metricType="PERCENTAGE", + ) + + cls.metadata.create_kpi(create) + def test_create_method(self): """Test validation of the workflow config is properly happening""" DataInsightWorkflow.create(data_insight_config) @@ -79,7 +125,9 @@ class DataInsightWorkflowTests(unittest.TestCase): workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config) workflow.execute() - time.sleep(1) # wait for data to be available + sleep(1) # wait for data to be available + + # Test the indexes have been created as expected and the data have been loaded entity_report_indexes = requests.get( "http://localhost:9200/entity_report_data_index/_search", timeout=30 ) @@ -94,9 +142,100 @@ class DataInsightWorkflowTests(unittest.TestCase): entity_report_indexes.json()["hits"]["total"]["value"] > 0 ) # check data have been correctly indexed in ES + # test report endpoint is returning data report_data = self.metadata.get_data_insight_report_data( - int((datetime.now() - timedelta(days=1)).timestamp() * 1000), - int((datetime.now() + timedelta(days=1)).timestamp() * 1000), + self.start_ts, + self.end_ts, ReportDataType.EntityReportData.value, ) assert report_data.get("data") + + # test data insight aggregation endpoint is returning data + resp = self.metadata.get_aggregated_data_insight_results( + start_ts=self.start_ts, + end_ts=self.end_ts, + data_insight_chart_nane=DataInsightChartType.PercentageOfEntitiesWithDescriptionByType.value, + data_report_index=DataInsightEsIndex.EntityReportData.value, + ) + + assert isinstance(resp, DataInsightChartResult) + assert resp.data + assert isinstance(resp.data[0], PercentageOfEntitiesWithDescriptionByType) + + resp = self.metadata.get_aggregated_data_insight_results( + start_ts=self.start_ts, + end_ts=self.end_ts, + data_insight_chart_nane=DataInsightChartType.PercentageOfEntitiesWithOwnerByType.value, + data_report_index=DataInsightEsIndex.EntityReportData.value, + ) + + assert resp.data + assert isinstance(resp.data[0], PercentageOfEntitiesWithOwnerByType) + + def test_get_kpis(self): + """test Kpis are returned as expected""" + # TO DO: Add KPI creation step and deletion (setUp + tearDown) + + workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config) + + kpis = workflow._get_kpis() + + assert kpis + + def test_write_kpi_result(self): + """test write kpi result""" + fqn = "CompletedDescription" + self.metadata.add_kpi_result( + fqn, + KpiResult( + timestamp=int(datetime.utcnow().timestamp() * 1000), + kpiFqn="CompletedDescription", + targetResult=[ + KpiTarget( + name="completedDescriptionFraction", + value="0.56", + targetMet=False, + ) + ], + ), + ) + + kpi_result = self.metadata.get_kpi_result(fqn, self.start_ts, self.end_ts) + + assert kpi_result + + def test_create_kpi(self): + completed_description_chart = self.metadata.get_by_name( + DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*" + ) + create = CreateKpiRequest( + name="myKpi", + dataInsightChart=EntityReference( + type="dataInsightChart", id=completed_description_chart.id + ), + description="foo", + startDate=self.start_ts, + endDate=self.end_ts, + targetDefinition=[ + KpiTarget(name="completedDescriptionFraction", value="0.63") + ], + metricType="PERCENTAGE", + ) + + kpi = self.metadata.create_kpi(create) + assert kpi + assert isinstance(kpi, Kpi) + + @classmethod + def tearDownClass(cls) -> None: + kpis: list[Kpi] = cls.metadata.list_entities( + entity=Kpi, fields="*" # type: ignore + ).entities + + for kpi in kpis: + cls.metadata.delete( + entity=Kpi, + entity_id=kpi.id, + hard_delete=True, + recursive=True, + ) diff --git a/ingestion/tests/unit/data_insight/kpi/test_registry_functions.py b/ingestion/tests/unit/data_insight/kpi/test_registry_functions.py new file mode 100644 index 00000000000..121bf62e8b3 --- /dev/null +++ b/ingestion/tests/unit/data_insight/kpi/test_registry_functions.py @@ -0,0 +1,217 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +function to compute kpi +""" + + +from metadata.data_insight.runner.run_result_registry import ( + percentage_of_entities_with_description_kpi_result, + percentage_of_entities_with_owner_kpi_result, +) +from metadata.generated.schema.dataInsight.kpi.basic import KpiTarget +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import ( + PercentageOfEntitiesWithDescriptionByType, +) +from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import ( + PercentageOfEntitiesWithOwnerByType, +) +from metadata.generated.schema.type.basic import Timestamp + + +def test_percentage_of_entities_with_description_kpi_result(): + """test regustry function""" + kpi_target = [ + KpiTarget(name="completedDescriptionFraction", value="0.55", targetMet=None), + KpiTarget(name="completedDescription", value="63", targetMet=None), + ] + + results = [ + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="User", + completedDescriptionFraction=0.0, + completedDescription=0.0, + entityCount=11.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Chart", + completedDescriptionFraction=1.0, + completedDescription=12.0, + entityCount=12.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Dashboard", + completedDescriptionFraction=1.0, + completedDescription=12.0, + entityCount=12.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Database", + completedDescriptionFraction=1.0, + completedDescription=1.0, + entityCount=1.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="DatabaseSchema", + completedDescriptionFraction=1.0, + completedDescription=1.0, + entityCount=1.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="MlModel", + completedDescriptionFraction=1.0, + completedDescription=3.0, + entityCount=3.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Pipeline", + completedDescriptionFraction=1.0, + completedDescription=8.0, + entityCount=8.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Table", + completedDescriptionFraction=0.6111111111111112, + completedDescription=11.0, + entityCount=18.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="TestSuite", + completedDescriptionFraction=1.0, + completedDescription=3.0, + entityCount=3.0, + ), + PercentageOfEntitiesWithDescriptionByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Topic", + completedDescriptionFraction=1.0, + completedDescription=6.0, + entityCount=6.0, + ), + ] + + kpi_result = percentage_of_entities_with_description_kpi_result( + kpi_target, results, "completedDescription", 1668083253659 + ) + + assert kpi_result.targetResult + for result in kpi_result.targetResult: + if result.name == "completedDescriptionFraction": + assert result.value == "0.76" + assert result.targetMet is True + if result.name == "completedDescription": + assert result.value == "57.0" + assert result.targetMet is False + + +def test_percentage_of_entities_with_owner_kpi_result(): + """test regustry function""" + kpi_target = [ + KpiTarget(name="hasOwnerFraction", value="0.46", targetMet=None), + KpiTarget(name="hasOwner", value="63", targetMet=None), + ] + + results = [ + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="User", + hasOwnerFraction=1.0, + hasOwner=12.0, + entityCount=12.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Chart", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=12.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Dashboard", + hasOwnerFraction=1.0, + hasOwner=12.0, + entityCount=12.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Database", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=1.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="DatabaseSchema", + hasOwnerFraction=1.0, + hasOwner=1.0, + entityCount=1.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="MlModel", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=3.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Pipeline", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=8.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Table", + hasOwnerFraction=1.0, + hasOwner=10.0, + entityCount=18.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="TestSuite", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=3.0, + ), + PercentageOfEntitiesWithOwnerByType( + timestamp=Timestamp(__root__=1668038400000), + entityType="Topic", + hasOwnerFraction=0.0, + hasOwner=0.0, + entityCount=6.0, + ), + ] + + kpi_result = percentage_of_entities_with_owner_kpi_result( + kpi_target, results, "completedOwner", 1668083253659 + ) + + print(kpi_result) + + assert kpi_result.targetResult + for result in kpi_result.targetResult: + if result.name == "hasOwnerFraction": + assert result.value == "0.4605263157894737" + assert result.targetMet is True + if result.name == "completedDescription": + assert result.value == "35.0" + assert result.targetMet is False diff --git a/openmetadata-spec/src/main/resources/json/schema/dataInsight/kpi/basic.json b/openmetadata-spec/src/main/resources/json/schema/dataInsight/kpi/basic.json index f3a0d6bd438..65bec12fd61 100644 --- a/openmetadata-spec/src/main/resources/json/schema/dataInsight/kpi/basic.json +++ b/openmetadata-spec/src/main/resources/json/schema/dataInsight/kpi/basic.json @@ -25,6 +25,10 @@ "value": { "description": "value to be passed for the Parameters. These are input from Users. We capture this in in string and convert during the runtime.", "type": "string" + }, + "targetMet": { + "description": "whether the target value was met or not.", + "type": "boolean" } }, "required": ["name", "value"], @@ -39,6 +43,10 @@ "description": "Data one which result is updated", "$ref": "../../type/basic.json#/definitions/timestamp" }, + "kpiFqn": { + "description": "KPI FQN", + "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" + }, "targetResult": { "description": "Metric and their corresponding current results", "type": "array",