mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-14 10:18:23 +00:00
* Added KPI workflow logic * Update ingestion/src/metadata/data_insight/runner/run_result_registry.py * ran py format Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
6ba5f7ec90
commit
9d594efcb5
@ -20,6 +20,7 @@ Workflow definition for the ORM Profiler.
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import Optional, Union, cast
|
||||
|
||||
from pydantic import ValidationError
|
||||
@ -34,7 +35,9 @@ from metadata.data_insight.processor.web_analytic_report_data_processor import (
|
||||
WebAnalyticEntityViewReportDataProcessor,
|
||||
WebAnalyticUserActivityReportDataProcessor,
|
||||
)
|
||||
from metadata.data_insight.runner.kpi_runner import KpiRunner
|
||||
from metadata.generated.schema.analytics.reportData import ReportDataType
|
||||
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
@ -47,7 +50,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
)
|
||||
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
||||
from metadata.ingestion.api.processor import ProcessorStatus
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.ometa_api import EntityList, OpenMetadata
|
||||
from metadata.utils.logger import data_insight_logger
|
||||
from metadata.utils.workflow_helper import (
|
||||
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
|
||||
@ -56,6 +59,8 @@ from metadata.utils.workflow_output_handler import print_data_insight_status
|
||||
|
||||
logger = data_insight_logger()
|
||||
|
||||
NOW = datetime.utcnow().timestamp() * 1000
|
||||
|
||||
|
||||
class DataInsightWorkflow:
|
||||
"""
|
||||
@ -82,6 +87,8 @@ class DataInsightWorkflow:
|
||||
]
|
||||
] = None
|
||||
|
||||
self.kpi_runner: Optional[KpiRunner] = None
|
||||
|
||||
if self.config.sink:
|
||||
self.sink = get_sink(
|
||||
sink_type="metadata-rest",
|
||||
@ -97,6 +104,84 @@ class DataInsightWorkflow:
|
||||
_from="ingestion",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _is_kpi_active(entity: Kpi) -> bool:
|
||||
"""Check if a KPI is active
|
||||
|
||||
Args:
|
||||
entity (Kpi): KPI entity
|
||||
|
||||
Returns:
|
||||
Kpi:
|
||||
"""
|
||||
|
||||
start_date = entity.startDate.__root__
|
||||
end_date = entity.endDate.__root__
|
||||
|
||||
if not start_date or not end_date:
|
||||
logger.warning(
|
||||
f"Start date or End date was not defined.\n\t-startDate: {start_date}\n\t-end_date: {end_date}\n"
|
||||
"We won't be running the KPI validation"
|
||||
)
|
||||
return False
|
||||
|
||||
if start_date <= NOW <= end_date:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _get_kpis(self) -> list[Kpi]:
|
||||
"""get the list of KPIs and return the active ones
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
"""
|
||||
|
||||
kpis: EntityList[Kpi] = self.metadata.list_entities(
|
||||
entity=Kpi, fields="*" # type: ignore
|
||||
)
|
||||
|
||||
return [kpi for kpi in kpis.entities if self._is_kpi_active(kpi)]
|
||||
|
||||
def _execute_data_processor(self):
|
||||
"""Data processor method to refine raw data into report data and ingest it in ES"""
|
||||
for report_data_type in ReportDataType:
|
||||
logger.info(f"Processing data for report type {report_data_type}")
|
||||
try:
|
||||
self.data_processor = DataProcessor.create(
|
||||
_data_processor_type=report_data_type.value, metadata=self.metadata
|
||||
)
|
||||
for record in self.data_processor.process():
|
||||
if hasattr(self, "sink"):
|
||||
self.sink.write_record(record)
|
||||
self.es_sink.write_record(record)
|
||||
else:
|
||||
logger.warning(
|
||||
"No sink attribute found, skipping ingestion of KPI result"
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"Error while executing data insight workflow for report type {report_data_type} -- {exc}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
self.status.failure(
|
||||
f"Error while executing data insight workflow for report type {report_data_type} -- {exc}"
|
||||
)
|
||||
|
||||
def _execute_kpi_runner(self):
|
||||
"""KPI runner method to run KPI definiton against platform latest metric"""
|
||||
kpis = self._get_kpis()
|
||||
self.kpi_runner = KpiRunner(kpis, self.metadata)
|
||||
|
||||
for kpi_result in self.kpi_runner.run():
|
||||
if hasattr(self, "sink"):
|
||||
self.sink.write_record(kpi_result)
|
||||
else:
|
||||
logger.warning(
|
||||
"No sink attribute found, skipping ingestion of KPI result"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict) -> DataInsightWorkflow:
|
||||
"""instantiate a class object
|
||||
@ -122,24 +207,15 @@ class DataInsightWorkflow:
|
||||
raise err
|
||||
|
||||
def execute(self):
|
||||
for report_data_type in ReportDataType:
|
||||
try:
|
||||
self.data_processor = DataProcessor.create(
|
||||
_data_processor_type=report_data_type.value, metadata=self.metadata
|
||||
)
|
||||
for record in self.data_processor.process():
|
||||
if hasattr(self, "sink"):
|
||||
self.sink.write_record(record)
|
||||
self.es_sink.write_record(record)
|
||||
"""Execute workflow"""
|
||||
logger.info("Starting data processor executiong")
|
||||
self._execute_data_processor()
|
||||
logger.info("Data processor finished running")
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"Error while executing data insight workflow for report type {report_data_type} -- {exc}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
self.status.failure(
|
||||
f"Error while executing data insight workflow for report type {report_data_type} -- {exc}"
|
||||
)
|
||||
logger.info("Sleeping for 1 second. Waiting for ES data to be indexed.")
|
||||
logger.info("Starting KPI runner")
|
||||
self._execute_kpi_runner()
|
||||
logger.info("KPI runner finished running")
|
||||
|
||||
def raise_from_status(self, raise_warnings=False):
|
||||
if self.data_processor and self.data_processor.get_status().failures:
|
||||
|
111
ingestion/src/metadata/data_insight/runner/kpi_runner.py
Normal file
111
ingestion/src/metadata/data_insight/runner/kpi_runner.py
Normal file
@ -0,0 +1,111 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Runner class used to check KPI status
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time as tme
|
||||
from datetime import datetime, time
|
||||
from typing import Iterator, Optional
|
||||
|
||||
from metadata.data_insight.runner.run_result_registry import run_result_registry
|
||||
from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart
|
||||
from metadata.generated.schema.dataInsight.dataInsightChartResult import (
|
||||
DataInsightChartResult,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget
|
||||
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.source import SourceStatus
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.logger import data_insight_logger
|
||||
|
||||
logger = data_insight_logger()
|
||||
|
||||
TIMEOUT = 30
|
||||
|
||||
|
||||
class KpiRunner:
|
||||
"""KPI runner class
|
||||
|
||||
Attrs:
|
||||
kpis: list[Kpi]
|
||||
metadata: OpenMetadata
|
||||
processor_status: SourceStatus
|
||||
"""
|
||||
|
||||
def __init__(self, kpis: list[Kpi], metadata: OpenMetadata) -> None:
|
||||
self.kpis = kpis
|
||||
|
||||
self.metadata = metadata
|
||||
self.datetime = datetime.utcnow()
|
||||
self.processor_status = SourceStatus()
|
||||
|
||||
def _get_data_insight_chart_result(
|
||||
self, data_insight_chart: EntityReference
|
||||
) -> DataInsightChartResult:
|
||||
"""get data insight result for a specific chart
|
||||
|
||||
Args:
|
||||
data_insight_chart (EntityReference): _description_
|
||||
"""
|
||||
results = None
|
||||
data_insight_chart_entity: Optional[
|
||||
DataInsightChart
|
||||
] = self.metadata.get_by_name(
|
||||
entity=DataInsightChart,
|
||||
fqn=data_insight_chart.fullyQualifiedName,
|
||||
fields="*",
|
||||
)
|
||||
|
||||
if not data_insight_chart_entity:
|
||||
logger.warning(
|
||||
f"No entity returned for dataInsightChart {data_insight_chart.name}"
|
||||
)
|
||||
return None
|
||||
|
||||
timeout = tme.time() + TIMEOUT
|
||||
while True:
|
||||
results = self.metadata.get_aggregated_data_insight_results(
|
||||
start_ts=int(
|
||||
datetime.combine(self.datetime, time.min).timestamp() * 1000
|
||||
),
|
||||
end_ts=int(
|
||||
datetime.combine(self.datetime, time.max).timestamp() * 1000
|
||||
),
|
||||
data_insight_chart_nane=data_insight_chart_entity.name.__root__,
|
||||
data_report_index=data_insight_chart_entity.dataIndexType.value,
|
||||
)
|
||||
if results.data or tme.time() > timeout:
|
||||
break
|
||||
|
||||
return results
|
||||
|
||||
def run(self) -> Iterator[KpiResult]:
|
||||
"""Method to run the KPI status check"""
|
||||
for kpi in self.kpis:
|
||||
kpi_target: list[KpiTarget] = kpi.targetDefinition
|
||||
data_insight_chart_result: DataInsightChartResult = (
|
||||
self._get_data_insight_chart_result(kpi.dataInsightChart)
|
||||
)
|
||||
kpi_result = run_result_registry.registry[kpi.dataInsightChart.name](
|
||||
kpi_target,
|
||||
data_insight_chart_result.data,
|
||||
kpi.fullyQualifiedName,
|
||||
int(self.datetime.timestamp() * 1000),
|
||||
)
|
||||
|
||||
yield kpi_result
|
||||
|
||||
def get_status(self):
|
||||
return self.processor_status
|
@ -0,0 +1,162 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
function to compute kpi
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import traceback
|
||||
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import (
|
||||
PercentageOfEntitiesWithDescriptionByType,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import (
|
||||
PercentageOfEntitiesWithOwnerByType,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
|
||||
from metadata.utils.dispatch import enum_register
|
||||
from metadata.utils.logger import sqa_interface_registry_logger
|
||||
|
||||
logger = sqa_interface_registry_logger()
|
||||
|
||||
|
||||
def percentage_of_entities_with_description_kpi_result(
|
||||
kpi_target: list[KpiTarget],
|
||||
results: list[PercentageOfEntitiesWithDescriptionByType],
|
||||
kpi_fqn: FullyQualifiedEntityName,
|
||||
timestamp: int,
|
||||
) -> KpiResult:
|
||||
"""Validate percentage of entities with description kpi
|
||||
|
||||
Args:
|
||||
kpi_target (list[KpiTarget]):
|
||||
results (list[PercentageOfEntitiesWithDescriptionByType]):
|
||||
|
||||
Raises:
|
||||
ValueError:
|
||||
RuntimeError:
|
||||
|
||||
Returns:
|
||||
KpiResult:
|
||||
"""
|
||||
if not results:
|
||||
raise ValueError("Cannot compute KPI. No results found")
|
||||
|
||||
latest_chart_result = {}
|
||||
|
||||
try:
|
||||
total_entity_with_description = sum(
|
||||
result.completedDescription for result in results
|
||||
)
|
||||
total_entity_count = sum(result.entityCount for result in results)
|
||||
latest_chart_result["completedDescriptionFraction"] = (
|
||||
total_entity_with_description / total_entity_count
|
||||
)
|
||||
except (TypeError, ZeroDivisionError) as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Could not run KPI result - {exc}")
|
||||
raise RuntimeError(exc)
|
||||
|
||||
latest_chart_result["completedDescription"] = total_entity_with_description
|
||||
|
||||
target_results = []
|
||||
|
||||
for target in kpi_target:
|
||||
try:
|
||||
value = latest_chart_result[target.name]
|
||||
except KeyError as exc:
|
||||
logger.warning(f"Could not compute KPI result for {target.name} - {exc}")
|
||||
else:
|
||||
target_results.append(
|
||||
KpiTarget(
|
||||
name=target.name,
|
||||
value=value,
|
||||
targetMet=value > ast.literal_eval(target.value),
|
||||
)
|
||||
)
|
||||
|
||||
return KpiResult(
|
||||
timestamp=timestamp,
|
||||
targetResult=target_results,
|
||||
kpiFqn=kpi_fqn,
|
||||
)
|
||||
|
||||
|
||||
def percentage_of_entities_with_owner_kpi_result(
|
||||
kpi_target: list[KpiTarget],
|
||||
results: list[PercentageOfEntitiesWithOwnerByType],
|
||||
kpi_fqn: FullyQualifiedEntityName,
|
||||
timestamp: int,
|
||||
) -> KpiResult:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
kpi_target (list[KpiTarget]): _description_
|
||||
data_insight_chart_result (list[PercentageOfEntitiesWithOwnerByType]): _description_
|
||||
|
||||
Raises:
|
||||
RuntimeError: _description_
|
||||
|
||||
Returns:
|
||||
KpiResult: _description_
|
||||
"""
|
||||
|
||||
if not results:
|
||||
raise ValueError("Cannot compute KPI. No results found")
|
||||
|
||||
latest_chart_result = {}
|
||||
|
||||
try:
|
||||
total_entity_with_owner = sum(result.hasOwner for result in results)
|
||||
total_entity_count = sum(result.entityCount for result in results)
|
||||
latest_chart_result["hasOwnerFraction"] = (
|
||||
total_entity_with_owner / total_entity_count
|
||||
)
|
||||
except (TypeError, ZeroDivisionError) as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Could not run KPI result - {exc}")
|
||||
raise RuntimeError(exc)
|
||||
|
||||
latest_chart_result["hasOwner"] = total_entity_with_owner
|
||||
|
||||
target_results = []
|
||||
|
||||
for target in kpi_target:
|
||||
try:
|
||||
value = latest_chart_result[target.name]
|
||||
except KeyError as exc:
|
||||
logger.warning(f"Could not compute KPI result for {target.name} - {exc}")
|
||||
else:
|
||||
target_results.append(
|
||||
KpiTarget(
|
||||
name=target.name,
|
||||
value=value,
|
||||
targetMet=value > ast.literal_eval(target.value),
|
||||
)
|
||||
)
|
||||
|
||||
return KpiResult(
|
||||
timestamp=timestamp,
|
||||
targetResult=target_results,
|
||||
kpiFqn=kpi_fqn,
|
||||
)
|
||||
|
||||
|
||||
run_result_registry = enum_register()
|
||||
run_result_registry.add("PercentageOfEntitiesWithDescriptionByType")(
|
||||
percentage_of_entities_with_description_kpi_result
|
||||
)
|
||||
run_result_registry.add("PercentageOfEntitiesWithOwnerByType")(
|
||||
percentage_of_entities_with_owner_kpi_result
|
||||
)
|
@ -14,10 +14,11 @@ OpenMetadata REST Sink implementation for the Data Insight Profiler results
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from metadata.config.common import ConfigModel
|
||||
from metadata.generated.schema.analytics.reportData import ReportData
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
@ -66,21 +67,33 @@ class MetadataRestSink(Sink[Entity]):
|
||||
def close(self) -> None:
|
||||
self.metadata.close()
|
||||
|
||||
def write_record(self, record: ReportData) -> None:
|
||||
def write_record(self, record: Union[ReportData, KpiResult]) -> None:
|
||||
try:
|
||||
self.metadata.add_data_insight_report_data(record)
|
||||
logger.info(
|
||||
f"Successfully ingested data insight for {record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
self.status.records_written(
|
||||
f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
if isinstance(record, ReportData):
|
||||
self.metadata.add_data_insight_report_data(record)
|
||||
logger.info(
|
||||
"Successfully ingested data insight for"
|
||||
f"{record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
self.status.records_written(
|
||||
f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
if isinstance(record, KpiResult):
|
||||
self.metadata.add_kpi_result(fqn=record.kpiFqn.__root__, record=record)
|
||||
logger.info(f"Successfully ingested KPI for {record.kpiFqn}")
|
||||
self.status.records_written(f"Data Insight: {record.kpiFqn}")
|
||||
|
||||
except APIError as err:
|
||||
logger.error(
|
||||
"Failed to sink data insight data for "
|
||||
f"{record.data.__class__.__name__ if record.data else 'Unknown'} - {err}"
|
||||
)
|
||||
logger.debug(traceback.format_exc())
|
||||
self.status.failure(
|
||||
f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
if isinstance(record, ReportData):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
"Failed to sink data insight data for "
|
||||
f"{record.data.__class__.__name__ if record.data else 'Unknown'} - {err}"
|
||||
)
|
||||
self.status.failure(
|
||||
f"Data Insight: {record.data.__class__.__name__ if record.data else 'Unknown'}"
|
||||
)
|
||||
if isinstance(record, KpiResult):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Failed to sink KPI reasults for {record.kpiFqn} - {err}")
|
||||
self.status.failure(f"KPI Result: {record.kpiFqn}")
|
||||
|
@ -16,7 +16,17 @@ To be used by OpenMetadata class
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from metadata.generated.schema.analytics.reportData import ReportData
|
||||
from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import (
|
||||
CreateKpiRequest,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.dataInsightChartResult import (
|
||||
DataInsightChartResult,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
|
||||
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
|
||||
|
||||
|
||||
class DataInisghtMixin:
|
||||
@ -34,6 +44,18 @@ class DataInisghtMixin:
|
||||
|
||||
return resp
|
||||
|
||||
def add_kpi_result(self, fqn: str, record: KpiResult) -> KpiResult:
|
||||
"""Given a ReportData object convert it to a json payload
|
||||
and send a POST request to the report data endpoint
|
||||
|
||||
Args:
|
||||
record (ReportData): report data
|
||||
"""
|
||||
|
||||
resp = self.client.put(f"/kpi/{fqn}/kpiResult", record.json())
|
||||
|
||||
return resp
|
||||
|
||||
def get_data_insight_report_data(
|
||||
self, start_ts: int, end_ts: int, report_data_type: str
|
||||
) -> dict[str, list[ReportData]]:
|
||||
@ -54,3 +76,62 @@ class DataInisghtMixin:
|
||||
)
|
||||
|
||||
return resp
|
||||
|
||||
def get_aggregated_data_insight_results(
|
||||
self,
|
||||
start_ts: int,
|
||||
end_ts: int,
|
||||
data_insight_chart_nane: str,
|
||||
data_report_index: str,
|
||||
params: Optional[dict] = None,
|
||||
) -> DataInsightChartResult:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
start_ts (int): _description_
|
||||
end_ts (int): _description_
|
||||
data_insight_chart_nane (str): _description_
|
||||
data_report_index (str): _description_
|
||||
params (Optional[dict], optional): _description_. Defaults to None.
|
||||
|
||||
Returns:
|
||||
DataInsightChartResult: _description_
|
||||
"""
|
||||
|
||||
request_params = {
|
||||
"startTs": start_ts,
|
||||
"endTs": end_ts,
|
||||
"dataInsightChartName": data_insight_chart_nane,
|
||||
"dataReportIndex": data_report_index,
|
||||
}
|
||||
|
||||
if params:
|
||||
request_params = {**request_params, **params}
|
||||
|
||||
resp = self.client.get(
|
||||
"/dataInsight/aggregate",
|
||||
request_params,
|
||||
)
|
||||
|
||||
return DataInsightChartResult.parse_obj(resp)
|
||||
|
||||
def get_kpi_result(self, fqn: str, start_ts, end_ts) -> list[KpiResult]:
|
||||
"""Given FQN return KPI results
|
||||
|
||||
Args:
|
||||
fqn (str): fullyQualifiedName
|
||||
"""
|
||||
|
||||
params = {"startTs": start_ts, "endTs": end_ts}
|
||||
|
||||
resp = self.client.get(
|
||||
f"/kpi/{fqn}/kpiResult",
|
||||
params,
|
||||
)
|
||||
|
||||
return [KpiResult(**data) for data in resp["data"]]
|
||||
|
||||
def create_kpi(self, create: CreateKpiRequest) -> Kpi:
|
||||
resp = self.client.post("/kpi", create.json())
|
||||
|
||||
return Kpi.parse_obj(resp)
|
||||
|
@ -29,6 +29,8 @@ from metadata.generated.schema.analytics.webAnalyticEventData import (
|
||||
WebAnalyticEventData,
|
||||
)
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart
|
||||
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
|
||||
from metadata.generated.schema.entity.data.chart import Chart
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
@ -402,6 +404,15 @@ class OpenMetadata(
|
||||
if issubclass(entity, WebAnalyticEventData):
|
||||
return "/analytics/webAnalyticEvent/collect"
|
||||
|
||||
if issubclass(entity, DataInsightChart):
|
||||
return "/dataInsight"
|
||||
|
||||
if issubclass(
|
||||
entity,
|
||||
Kpi,
|
||||
):
|
||||
return "/kpi"
|
||||
|
||||
raise MissingEntityTypeException(
|
||||
f"Missing {entity} type when generating suffixes"
|
||||
)
|
||||
|
@ -13,19 +13,39 @@
|
||||
Validate workflow configs and filters
|
||||
"""
|
||||
|
||||
import time
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, time
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from metadata.data_insight.api.workflow import DataInsightWorkflow
|
||||
from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex
|
||||
from metadata.generated.schema.analytics.reportData import ReportDataType
|
||||
from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import (
|
||||
CreateKpiRequest,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.dataInsightChart import DataInsightChart
|
||||
from metadata.generated.schema.dataInsight.dataInsightChartResult import (
|
||||
DataInsightChartResult,
|
||||
DataInsightChartType,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget
|
||||
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import (
|
||||
PercentageOfEntitiesWithDescriptionByType,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import (
|
||||
PercentageOfEntitiesWithOwnerByType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.parser import ParsingConfigurationError
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
@ -65,6 +85,32 @@ class DataInsightWorkflowTests(unittest.TestCase):
|
||||
)
|
||||
)
|
||||
|
||||
cls.start_ts = int(
|
||||
datetime.combine(datetime.utcnow(), time.min).timestamp() * 1000
|
||||
)
|
||||
cls.end_ts = int(
|
||||
datetime.combine(datetime.utcnow(), time.max).timestamp() * 1000
|
||||
)
|
||||
|
||||
completed_description_chart = cls.metadata.get_by_name(
|
||||
DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*"
|
||||
)
|
||||
create = CreateKpiRequest(
|
||||
name="CompletedDescription",
|
||||
dataInsightChart=EntityReference(
|
||||
type="dataInsightChart", id=completed_description_chart.id
|
||||
),
|
||||
description="foo",
|
||||
startDate=cls.start_ts,
|
||||
endDate=cls.end_ts,
|
||||
targetDefinition=[
|
||||
KpiTarget(name="completedDescriptionFraction", value="0.63")
|
||||
],
|
||||
metricType="PERCENTAGE",
|
||||
)
|
||||
|
||||
cls.metadata.create_kpi(create)
|
||||
|
||||
def test_create_method(self):
|
||||
"""Test validation of the workflow config is properly happening"""
|
||||
DataInsightWorkflow.create(data_insight_config)
|
||||
@ -79,7 +125,9 @@ class DataInsightWorkflowTests(unittest.TestCase):
|
||||
workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config)
|
||||
workflow.execute()
|
||||
|
||||
time.sleep(1) # wait for data to be available
|
||||
sleep(1) # wait for data to be available
|
||||
|
||||
# Test the indexes have been created as expected and the data have been loaded
|
||||
entity_report_indexes = requests.get(
|
||||
"http://localhost:9200/entity_report_data_index/_search", timeout=30
|
||||
)
|
||||
@ -94,9 +142,100 @@ class DataInsightWorkflowTests(unittest.TestCase):
|
||||
entity_report_indexes.json()["hits"]["total"]["value"] > 0
|
||||
) # check data have been correctly indexed in ES
|
||||
|
||||
# test report endpoint is returning data
|
||||
report_data = self.metadata.get_data_insight_report_data(
|
||||
int((datetime.now() - timedelta(days=1)).timestamp() * 1000),
|
||||
int((datetime.now() + timedelta(days=1)).timestamp() * 1000),
|
||||
self.start_ts,
|
||||
self.end_ts,
|
||||
ReportDataType.EntityReportData.value,
|
||||
)
|
||||
assert report_data.get("data")
|
||||
|
||||
# test data insight aggregation endpoint is returning data
|
||||
resp = self.metadata.get_aggregated_data_insight_results(
|
||||
start_ts=self.start_ts,
|
||||
end_ts=self.end_ts,
|
||||
data_insight_chart_nane=DataInsightChartType.PercentageOfEntitiesWithDescriptionByType.value,
|
||||
data_report_index=DataInsightEsIndex.EntityReportData.value,
|
||||
)
|
||||
|
||||
assert isinstance(resp, DataInsightChartResult)
|
||||
assert resp.data
|
||||
assert isinstance(resp.data[0], PercentageOfEntitiesWithDescriptionByType)
|
||||
|
||||
resp = self.metadata.get_aggregated_data_insight_results(
|
||||
start_ts=self.start_ts,
|
||||
end_ts=self.end_ts,
|
||||
data_insight_chart_nane=DataInsightChartType.PercentageOfEntitiesWithOwnerByType.value,
|
||||
data_report_index=DataInsightEsIndex.EntityReportData.value,
|
||||
)
|
||||
|
||||
assert resp.data
|
||||
assert isinstance(resp.data[0], PercentageOfEntitiesWithOwnerByType)
|
||||
|
||||
def test_get_kpis(self):
|
||||
"""test Kpis are returned as expected"""
|
||||
# TO DO: Add KPI creation step and deletion (setUp + tearDown)
|
||||
|
||||
workflow: DataInsightWorkflow = DataInsightWorkflow.create(data_insight_config)
|
||||
|
||||
kpis = workflow._get_kpis()
|
||||
|
||||
assert kpis
|
||||
|
||||
def test_write_kpi_result(self):
|
||||
"""test write kpi result"""
|
||||
fqn = "CompletedDescription"
|
||||
self.metadata.add_kpi_result(
|
||||
fqn,
|
||||
KpiResult(
|
||||
timestamp=int(datetime.utcnow().timestamp() * 1000),
|
||||
kpiFqn="CompletedDescription",
|
||||
targetResult=[
|
||||
KpiTarget(
|
||||
name="completedDescriptionFraction",
|
||||
value="0.56",
|
||||
targetMet=False,
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
kpi_result = self.metadata.get_kpi_result(fqn, self.start_ts, self.end_ts)
|
||||
|
||||
assert kpi_result
|
||||
|
||||
def test_create_kpi(self):
|
||||
completed_description_chart = self.metadata.get_by_name(
|
||||
DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*"
|
||||
)
|
||||
create = CreateKpiRequest(
|
||||
name="myKpi",
|
||||
dataInsightChart=EntityReference(
|
||||
type="dataInsightChart", id=completed_description_chart.id
|
||||
),
|
||||
description="foo",
|
||||
startDate=self.start_ts,
|
||||
endDate=self.end_ts,
|
||||
targetDefinition=[
|
||||
KpiTarget(name="completedDescriptionFraction", value="0.63")
|
||||
],
|
||||
metricType="PERCENTAGE",
|
||||
)
|
||||
|
||||
kpi = self.metadata.create_kpi(create)
|
||||
assert kpi
|
||||
assert isinstance(kpi, Kpi)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
kpis: list[Kpi] = cls.metadata.list_entities(
|
||||
entity=Kpi, fields="*" # type: ignore
|
||||
).entities
|
||||
|
||||
for kpi in kpis:
|
||||
cls.metadata.delete(
|
||||
entity=Kpi,
|
||||
entity_id=kpi.id,
|
||||
hard_delete=True,
|
||||
recursive=True,
|
||||
)
|
||||
|
217
ingestion/tests/unit/data_insight/kpi/test_registry_functions.py
Normal file
217
ingestion/tests/unit/data_insight/kpi/test_registry_functions.py
Normal file
@ -0,0 +1,217 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
function to compute kpi
|
||||
"""
|
||||
|
||||
|
||||
from metadata.data_insight.runner.run_result_registry import (
|
||||
percentage_of_entities_with_description_kpi_result,
|
||||
percentage_of_entities_with_owner_kpi_result,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.kpi.basic import KpiTarget
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import (
|
||||
PercentageOfEntitiesWithDescriptionByType,
|
||||
)
|
||||
from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByType import (
|
||||
PercentageOfEntitiesWithOwnerByType,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import Timestamp
|
||||
|
||||
|
||||
def test_percentage_of_entities_with_description_kpi_result():
|
||||
"""test regustry function"""
|
||||
kpi_target = [
|
||||
KpiTarget(name="completedDescriptionFraction", value="0.55", targetMet=None),
|
||||
KpiTarget(name="completedDescription", value="63", targetMet=None),
|
||||
]
|
||||
|
||||
results = [
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="User",
|
||||
completedDescriptionFraction=0.0,
|
||||
completedDescription=0.0,
|
||||
entityCount=11.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Chart",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=12.0,
|
||||
entityCount=12.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Dashboard",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=12.0,
|
||||
entityCount=12.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Database",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=1.0,
|
||||
entityCount=1.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="DatabaseSchema",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=1.0,
|
||||
entityCount=1.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="MlModel",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=3.0,
|
||||
entityCount=3.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Pipeline",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=8.0,
|
||||
entityCount=8.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Table",
|
||||
completedDescriptionFraction=0.6111111111111112,
|
||||
completedDescription=11.0,
|
||||
entityCount=18.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="TestSuite",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=3.0,
|
||||
entityCount=3.0,
|
||||
),
|
||||
PercentageOfEntitiesWithDescriptionByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Topic",
|
||||
completedDescriptionFraction=1.0,
|
||||
completedDescription=6.0,
|
||||
entityCount=6.0,
|
||||
),
|
||||
]
|
||||
|
||||
kpi_result = percentage_of_entities_with_description_kpi_result(
|
||||
kpi_target, results, "completedDescription", 1668083253659
|
||||
)
|
||||
|
||||
assert kpi_result.targetResult
|
||||
for result in kpi_result.targetResult:
|
||||
if result.name == "completedDescriptionFraction":
|
||||
assert result.value == "0.76"
|
||||
assert result.targetMet is True
|
||||
if result.name == "completedDescription":
|
||||
assert result.value == "57.0"
|
||||
assert result.targetMet is False
|
||||
|
||||
|
||||
def test_percentage_of_entities_with_owner_kpi_result():
|
||||
"""test regustry function"""
|
||||
kpi_target = [
|
||||
KpiTarget(name="hasOwnerFraction", value="0.46", targetMet=None),
|
||||
KpiTarget(name="hasOwner", value="63", targetMet=None),
|
||||
]
|
||||
|
||||
results = [
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="User",
|
||||
hasOwnerFraction=1.0,
|
||||
hasOwner=12.0,
|
||||
entityCount=12.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Chart",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=12.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Dashboard",
|
||||
hasOwnerFraction=1.0,
|
||||
hasOwner=12.0,
|
||||
entityCount=12.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Database",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=1.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="DatabaseSchema",
|
||||
hasOwnerFraction=1.0,
|
||||
hasOwner=1.0,
|
||||
entityCount=1.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="MlModel",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=3.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Pipeline",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=8.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Table",
|
||||
hasOwnerFraction=1.0,
|
||||
hasOwner=10.0,
|
||||
entityCount=18.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="TestSuite",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=3.0,
|
||||
),
|
||||
PercentageOfEntitiesWithOwnerByType(
|
||||
timestamp=Timestamp(__root__=1668038400000),
|
||||
entityType="Topic",
|
||||
hasOwnerFraction=0.0,
|
||||
hasOwner=0.0,
|
||||
entityCount=6.0,
|
||||
),
|
||||
]
|
||||
|
||||
kpi_result = percentage_of_entities_with_owner_kpi_result(
|
||||
kpi_target, results, "completedOwner", 1668083253659
|
||||
)
|
||||
|
||||
print(kpi_result)
|
||||
|
||||
assert kpi_result.targetResult
|
||||
for result in kpi_result.targetResult:
|
||||
if result.name == "hasOwnerFraction":
|
||||
assert result.value == "0.4605263157894737"
|
||||
assert result.targetMet is True
|
||||
if result.name == "completedDescription":
|
||||
assert result.value == "35.0"
|
||||
assert result.targetMet is False
|
@ -25,6 +25,10 @@
|
||||
"value": {
|
||||
"description": "value to be passed for the Parameters. These are input from Users. We capture this in in string and convert during the runtime.",
|
||||
"type": "string"
|
||||
},
|
||||
"targetMet": {
|
||||
"description": "whether the target value was met or not.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": ["name", "value"],
|
||||
@ -39,6 +43,10 @@
|
||||
"description": "Data one which result is updated",
|
||||
"$ref": "../../type/basic.json#/definitions/timestamp"
|
||||
},
|
||||
"kpiFqn": {
|
||||
"description": "KPI FQN",
|
||||
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
|
||||
},
|
||||
"targetResult": {
|
||||
"description": "Metric and their corresponding current results",
|
||||
"type": "array",
|
||||
|
Loading…
x
Reference in New Issue
Block a user