Cost analysis agg (#13408)

* feat: updated DI workflow to inherit from BaseWorkflow + split processor and producer classes

* feat: __init__.py files creation

* feat: updated workflow import classes in code and doc

* feat: moved kpi runner from runner to processor folder

* fix: skip failure on list entities

* feat: deleted unused files

* feat: updated status reporter

* feat: ran linting

* feat: fix test error with typing and fqn

* feat: updated test dependencies

* feat: ran linting

* feat: move execution order up

* feat: updated cost analysis report to align with new workflow

* feat: fix entity already exists for pipeline entity status

* feat: ran python linting

* feat: move skip_on_failure to method

* feat: added unusedReport to DI

* feat: added aggregated unused report

* feat: ran linting

* feat: reverted compose file changes

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Teddy 2023-10-03 09:27:18 +02:00 committed by GitHub
parent 744e5279bc
commit 9ef3ff7a58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 575 additions and 59 deletions

View File

@ -15,6 +15,30 @@
{"id": "c8a530a4-e232-4756-9f1b-f8638c6ef2d3", "data": {"team": "Organization", "userId": "b020c337-3534-43c6-afc8-e29a90026774", "userName": "Joe", "lastSession": 1692806224018, "totalPageView": 24, "totalSessions": 4, "totalSessionDuration": 1169}, "reportDataType": "WebAnalyticUserActivityReportData"},
{"id": "703a5f50-6990-4e10-8cfc-fed9faefa99d", "data": {"team": "Organization", "userId": "54d2fbb7-2942-4549-b046-8c09a32d4616", "userName": "Jane", "lastSession": 1692902339074, "totalPageView": 39, "totalSessions": 3, "totalSessionDuration": 2731}, "reportDataType": "WebAnalyticUserActivityReportData"},
{"id": "992bc53e-915a-4c7a-8630-c671e3b3960d", "data": {"team": "Organization", "userId": "b020c337-3534-43c6-afc8-e29a90026774", "userName": "John", "lastSession": 1692982368713, "totalPageView": 16, "totalSessions": 3, "totalSessionDuration": 2329}, "reportDataType": "WebAnalyticUserActivityReportData"}
],
"AggregatedCostAnalysisReportData":[
{"id": "b7f4f28c-df63-4060-9e18-89ed33d3508d","timestamp": 1696232067205,"reportDataType": "AggregatedCostAnalysisReportData","data": {"unusedDataAssets": {"threeDays": 14.0,"sevenDays": 11.0,"fourteenDays": 7.0,"thirtyDays": 3.0,"sixtyDays": 1.0},"frequentlyUsedDataAssets": {"threeDays": 1.0,"sevenDays": 4.0,"fourteenDays": 8.0,"thirtyDays": 12.0,"sixtyDays": 14.0},"totalSize": 16890.0,"serviceName": "sample_data","serviceType": "BigQuery","entityType": "Table","serviceOwner": null},"entityType": "aggregatedCostAnalysisReportData"},
{"id": "b7f4f28c-df63-4060-9e18-89ed33d3508d","timestamp": 1696232067205,"reportDataType": "AggregatedCostAnalysisReportData","data": {"unusedDataAssets": {"threeDays": 3.0,"sevenDays": 8.0,"fourteenDays": 11.0,"thirtyDays": 18.0,"sixtyDays": 10.0},"frequentlyUsedDataAssets": {"threeDays": 2.0,"sevenDays": 5.0,"fourteenDays": 9.0,"thirtyDays": 13.0,"sixtyDays": 15.0},"totalSize": 29548.0,"serviceName": "sample_data","serviceType": "Snowflake","entityType": "Table","serviceOwner": null},"entityType": "aggregatedCostAnalysisReportData"},
{"id": "b7f4f28c-df63-4060-9e18-89ed33d3508d","timestamp": 1696232067205,"reportDataType": "AggregatedCostAnalysisReportData","data": {"unusedDataAssets": {"threeDays": 1.0,"sevenDays": 4.0,"fourteenDays": 8.0,"thirtyDays": 12.0,"sixtyDays": 14.0},"frequentlyUsedDataAssets": {"threeDays": 5.0,"sevenDays": 8.0,"fourteenDays": 12.0,"thirtyDays": 16.0,"sixtyDays": 18.0},"totalSize": 45023.976,"serviceName": "sample_data","serviceType": "AzureSQL","entityType": "Table","serviceOwner": null},"entityType": "aggregatedCostAnalysisReportData"},
{"id": "b7f4f28c-df63-4060-9e18-89ed33d3508d","timestamp": 1696232067205,"reportDataType": "AggregatedCostAnalysisReportData","data": {"unusedDataAssets": {"threeDays": 14.0,"sevenDays": 11.0,"fourteenDays": 7.0,"thirtyDays": 3.0,"sixtyDays": 1.0},"frequentlyUsedDataAssets": {"threeDays": 10.0,"sevenDays": 13.0,"fourteenDays": 17.0,"thirtyDays": 21.0,"sixtyDays": 23.0},"totalSize": 32109.578,"serviceName": "sample_data","serviceType": "Redshift","entityType": "Table","serviceOwner": null},"entityType": "aggregatedCostAnalysisReportData"}
],
"RawCostAnalysisReportData": [
{"id":"e8feffaa-3277-461c-9261-b967d2aee2dd","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"34c63f1b-36f8-43ab-9420-aa1fc550653a","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.fact_session","description":"This fact table contains information about the visitors to your online store. This table has one row per session, where one session can contain many page views. If you use Urchin Traffic Module (UTM) parameters in marketing campaigns, then you can use this table to track how many customers they direct to your store.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/34c63f1b-36f8-43ab-9420-aa1fc550653a"},"lifeCycle":{"created":{"timestamp":1695022318000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695195118000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695454318000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":16890.0},"entityType":"rawCostAnalysisReportData"},
{"id":"455b3241-fc98-4d61-bf48-39df9054f89e","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"9a3b6464-3d33-4b0f-a804-930ee28c0402","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.dim_staff","description":"This dimension table contains information about the staff accounts in the store. It contains one row per staff account. Use this table to generate a list of your staff accounts, or join it with the sales, API clients and locations tables to analyze staff performance at Shopify POS locations.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/9a3b6464-3d33-4b0f-a804-930ee28c0402"},"lifeCycle":{"created":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":61580.93255764959},"entityType":"rawCostAnalysisReportData"},
{"id":"455b3241-fc98-4d61-bf48-39df9054f89e","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"9a3b6464-3d33-4b0f-a804-930ee28c0402","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.dim_staff","description":"This dimension table contains information about the staff accounts in the store. It contains one row per staff account. Use this table to generate a list of your staff accounts, or join it with the sales, API clients and locations tables to analyze staff performance at Shopify POS locations.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/9a3b6464-3d33-4b0f-a804-930ee28c0402"},"lifeCycle":{"created":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1696059118000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":47113.348918067684},"entityType":"rawCostAnalysisReportData"},
{"id":"708f9bae-87b9-4626-a945-b84a92428565","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"d55fe3c3-1268-4179-90e4-00dd1c808cc7","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.fact_sale","description":"The fact table captures the value of products sold or returned, as well as the values of other charges such as taxes and shipping costs. The sales table contains one row per order line item, one row per returned line item, and one row per shipping charge. Use this table when you need financial metrics.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/d55fe3c3-1268-4179-90e4-00dd1c808cc7"},"lifeCycle":{"created":{"timestamp":1695281518000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695281518000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695367918000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":42431.388637010125},"entityType":"rawCostAnalysisReportData"},
{"id":"d4750f97-6a71-4be9-81ba-7d7de0882bef","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"36a03fd4-2c4d-4c5a-9dff-417cbe19164b","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.fact_order","description":"The orders table contains information about each order in your store. Although this table is good for generating order lists and joining with the dim_customer, use the sales table instead for computing financial or other metrics.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/36a03fd4-2c4d-4c5a-9dff-417cbe19164b"},"lifeCycle":{"created":{"timestamp":1695022318000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695022318000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695108718000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":69096.51910918455},"entityType":"rawCostAnalysisReportData"},
{"id":"1fd31e1b-b055-4d3c-ab34-50dc6b1cd6c9","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"a02856c7-8cf6-4b88-8e51-45959d77a365","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.\"dim.product\"","description":"This dimension table contains information about each of the products in your store. This table contains one row per product. This table reflects the current state of products in your Shopify admin.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/a02856c7-8cf6-4b88-8e51-45959d77a365"},"lifeCycle":{"created":{"timestamp":1691911918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1691911918000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1691998318000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":959364.5073707071},"entityType":"rawCostAnalysisReportData"},
{"id":"89268ae3-34b5-4338-8ce6-498d93cf7a2c","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"1c063243-f60e-4768-b31b-4b2ef0d5e690","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.\"dim.product.variant\"","description":"This dimension table contains current information about each of the product variants in your store. This table contains one row per product variant.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/1c063243-f60e-4768-b31b-4b2ef0d5e690"},"lifeCycle":{"created":{"timestamp":1695799918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695886318000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695972718000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":20671.816891976534},"entityType":"rawCostAnalysisReportData"},
{"id":"89268ae3-34b5-4338-8ce6-498d93cf7b2c","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"1c063243-f60e-4768-b31b-4b2ef0d5e690","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.\"dim.product.variant\"","description":"This dimension table contains current information about each of the product variants in your store. This table contains one row per product variant.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/1c063243-f60e-4768-b31b-4b2ef0d5e690"},"lifeCycle":{"created":{"timestamp":1695799918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695886318000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695972718000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":206411.22164610436},"entityType":"rawCostAnalysisReportData"},
{"id":"8593b4a8-ceae-4afd-be83-2674cec526e2","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"5f78ad7e-e8d4-444e-b6e9-27d4cb4be957","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.\"dim.shop\"","description":"This dimension table contains online shop information. This table contains one shop per row.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/5f78ad7e-e8d4-444e-b6e9-27d4cb4be957"},"lifeCycle":{"created":{"timestamp":1690183918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1690961518000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1690875118000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":27261.282607083493},"entityType":"rawCostAnalysisReportData"},
{"id":"5edcf4f2-4fdf-4f03-aa8c-58b21f78eba8","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"53b5f66f-234e-4abf-ba3f-65804642b439","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.dim(shop)","description":"This dimension table contains online shop information with weird characters.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/53b5f66f-234e-4abf-ba3f-65804642b439"},"lifeCycle":{"created":{"timestamp":1694071918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695022318000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1694158318000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":494660.85729422217},"entityType":"rawCostAnalysisReportData"},
{"id":"e4fe174b-819a-4eed-8d37-a8c91a5bc8ee","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"e5e4e64c-2eba-4e19-8248-5b64f4d93dae","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.marketing","description":"Marketing data","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/e5e4e64c-2eba-4e19-8248-5b64f4d93dae"},"lifeCycle":{"created":{"timestamp":1695367918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695367918000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695540718000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":667097.5608493362},"entityType":"rawCostAnalysisReportData"},
{"id":"c25de105-4d47-43e5-b6b1-3662effcddeb","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"ab52d858-31e2-45a0-b20c-c32c0e5070cd","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.dim_customer","description":"The dimension table contains data about your customers. The customers table contains one row per customer. It includes historical metrics (such as the total amount that each customer has spent in your store) as well as forward-looking metrics (such as the predicted number of days between future orders and the expected order value in the next 30 days). This table also includes columns that segment customers into various categories (such as new, returning, promising, at risk, dormant, and loyal), which you can use to target marketing activities.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/ab52d858-31e2-45a0-b20c-c32c0e5070cd"},"lifeCycle":{"created":{"timestamp":1695627117000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1695713517000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1695713517000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":292479.6493415711},"entityType":"rawCostAnalysisReportData"},
{"id":"6714afb1-712d-4284-99e8-d17c61aa8b92","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"76bff45d-1d71-42bb-b0eb-c8ccc335ce79","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.dim_address_clean","description":"Created from dim_address after a small cleanup.","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/76bff45d-1d71-42bb-b0eb-c8ccc335ce79"},"lifeCycle":{"created":{"timestamp":1691047918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1691479918000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1691479918000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":null},"entityType":"rawCostAnalysisReportData"},
{"id":"ead10126-6f2f-4bac-9e1e-6909810ec40f","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"6e5ab5eb-b627-4af5-a2a6-4ee0465d95d9","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.raw_order","description":"This is a raw orders table as represented in our online DB. This table contains all the orders by the customers and can be used to buid our dim and fact tables","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/6e5ab5eb-b627-4af5-a2a6-4ee0465d95d9"},"lifeCycle":{"created":{"timestamp":1693899118000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1693985518000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1694071918000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":77437.25870415506},"entityType":"rawCostAnalysisReportData"},
{"id":"9d02102c-884d-45c2-99bd-86c1cc0632e2","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"b5a0e658-a15a-4327-b993-00688ebb7d8b","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.raw_product_catalog","description":"This is a raw product catalog table contains the product listing, price, seller etc.. represented in our online DB. ","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/b5a0e658-a15a-4327-b993-00688ebb7d8b"},"lifeCycle":{"created":{"timestamp":1693639918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1693639918000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1693726318000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":126461.16459905276},"entityType":"rawCostAnalysisReportData"},
{"id":"b8f1929f-8c58-4e5b-b0b0-df3b5273b893","timestamp":1696232067205,"reportDataType":"RawCostAnalysisReportData","data":{"entity":{"id":"f8d7633c-51e2-4607-8c4e-4f6975cf84d6","type":"table","name":null,"fullyQualifiedName":"sample_data.ecommerce_db.shopify.sales","description":"Sales data","displayName":null,"deleted":null,"href":"http://openmetadata-server:8585/api/v1/tables/f8d7633c-51e2-4607-8c4e-4f6975cf84d6"},"lifeCycle":{"created":{"timestamp":1694503918000,"accessedBy":null,"accessedByAProcess":"Alice"},"updated":{"timestamp":1694590318000,"accessedBy":null,"accessedByAProcess":"Bob"},"accessed":{"timestamp":1694763118000,"accessedBy":null,"accessedByAProcess":"Charlie"}},"sizeInByte":12839.887130541394},"entityType":"rawCostAnalysisReportData"}
]
}
}

View File

@ -23,7 +23,7 @@ from typing import Any, Dict, Iterable, List, Optional, Union
from pydantic import ValidationError
from metadata.generated.schema.analytics.reportData import ReportData
from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
@ -1389,9 +1389,16 @@ class SampleDataSource(
"""Iterate over all the data insights and ingest them"""
data: Dict[str, List] = self.data_insight_data["reports"]
for _, report_data in data.items():
for report_type, report_data in data.items():
i = 0
for report_datum in report_data:
if report_type == ReportDataType.RawCostAnalysisReportData.value:
start_ts = int(
(datetime.utcnow() - timedelta(days=60)).timestamp() * 1000
)
end_ts = int(datetime.utcnow().timestamp() * 1000)
tmstp = random.randint(start_ts, end_ts)
report_datum["data"]["lifeCycle"]["accessed"]["timestamp"] = tmstp
record = OMetaDataInsightSample(
record=ReportData(
id=report_datum["id"],

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.dataInsight;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
@ -16,6 +17,8 @@ public abstract class DataInsightAggregatorInterface {
protected static final String ENTITY_TIER = "entityTier";
protected final Aggregations aggregationsEs;
protected final org.opensearch.search.aggregations.Aggregations aggregationsOs;
protected final org.opensearch.search.SearchHits hitsOs;
protected final SearchHits hitsEs;
protected final DataInsightChartResult.DataInsightChartType dataInsightChartType;
protected DataInsightAggregatorInterface(
@ -23,6 +26,8 @@ public abstract class DataInsightAggregatorInterface {
this.aggregationsEs = aggregations;
this.aggregationsOs = null;
this.dataInsightChartType = dataInsightChartType;
this.hitsOs = null;
this.hitsEs = null;
}
protected DataInsightAggregatorInterface(
@ -31,6 +36,26 @@ public abstract class DataInsightAggregatorInterface {
this.aggregationsEs = null;
this.aggregationsOs = aggregations;
this.dataInsightChartType = dataInsightChartType;
this.hitsOs = null;
this.hitsEs = null;
}
protected DataInsightAggregatorInterface(
SearchHits hits, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
this.aggregationsEs = null;
this.aggregationsOs = null;
this.dataInsightChartType = dataInsightChartType;
this.hitsOs = null;
this.hitsEs = hits;
}
protected DataInsightAggregatorInterface(
org.opensearch.search.SearchHits hits, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
this.aggregationsEs = null;
this.aggregationsOs = null;
this.dataInsightChartType = dataInsightChartType;
this.hitsOs = hits;
this.hitsEs = null;
}
public abstract DataInsightChartResult process() throws ParseException;

View File

@ -5,6 +5,7 @@ import static org.openmetadata.service.Entity.DATA_INSIGHT_CHART;
import java.util.Arrays;
import java.util.List;
import org.openmetadata.schema.dataInsight.DataInsightChart;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.service.util.EntityUtil;
public class DataInsightChartRepository extends EntityRepository<DataInsightChart> {
@ -59,6 +60,9 @@ public class DataInsightChartRepository extends EntityRepository<DataInsightChar
"PageViewsByEntities",
"MostViewedEntities");
public static final List<String> SUPPORTS_NULL_DATE_RANGE =
Arrays.asList(DataInsightChartResult.DataInsightChartType.UNUSED_ASSETS.toString());
public DataInsightChartRepository(CollectionDAO dao) {
super(COLLECTION_PATH, DATA_INSIGHT_CHART, DataInsightChart.class, dao.dataInsightChartDAO(), dao, "", "");
}

View File

@ -406,6 +406,11 @@ public class DataInsightChartResource extends EntityResource<DataInsightChart, D
@NonNull
@QueryParam("dataInsightChartName")
DataInsightChartResult.DataInsightChartType dataInsightChartName,
@Parameter(description = "Query filter for the aggregation") @QueryParam("queryFilter") String queryFilter,
@Parameter(description = "Limit the number of results returned.") @DefaultValue("10") @QueryParam("size")
Integer size,
@Parameter(description = "Offset the results returned. (default = 0)") @DefaultValue("0") @QueryParam("from")
Integer from,
@Parameter(
description = "Specify the elasticsearch index to fetch data from",
schema = @Schema(implementation = DataReportIndex.class))
@ -428,18 +433,16 @@ public class DataInsightChartResource extends EntityResource<DataInsightChart, D
@QueryParam("organization")
String organization,
@Parameter(description = "Filter after the given start timestamp", schema = @Schema(type = "number"))
@NonNull
@QueryParam("startTs")
Long startTs,
@Parameter(description = "Filter before the given end timestamp", schema = @Schema(type = "number"))
@NonNull
@QueryParam("endTs")
Long endTs)
throws IOException, ParseException {
OperationContext operationContext = new OperationContext(Entity.DATA_INSIGHT_CHART, MetadataOperation.VIEW_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContext());
return searchRepository.listDataInsightChartResult(
startTs, endTs, tier, team, dataInsightChartName, dataReportIndex);
startTs, endTs, tier, team, dataInsightChartName, size, from, queryFilter, dataReportIndex);
}
private DataInsightChart getDataInsightChart(CreateDataInsightChart create, String user) {

View File

@ -318,6 +318,9 @@ public interface SearchRepository {
String tier,
String team,
DataInsightChartResult.DataInsightChartType dataInsightChartName,
Integer size,
Integer from,
String queryFilter,
String dataReportIndex)
throws IOException, ParseException;

View File

@ -96,7 +96,6 @@ import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
@ -342,6 +341,10 @@ public class ElasticSearchClientImpl implements SearchRepository {
case "domain_search_index":
searchSourceBuilder = buildDomainsSearch(request.getQuery(), request.getFrom(), request.getSize());
break;
case "raw_cost_analysis_report_data_index":
searchSourceBuilder =
buildRawCostAnalysisReportDataSearch(request.getQuery(), request.getFrom(), request.getSize());
break;
default:
searchSourceBuilder = buildAggregateSearchBuilder(request.getQuery(), request.getFrom(), request.getSize());
break;
@ -376,7 +379,8 @@ public class ElasticSearchClientImpl implements SearchRepository {
/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_products_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")) {
|| request.getIndex().equalsIgnoreCase("query_search_index")
|| request.getIndex().equalsIgnoreCase("raw_cost_analysis_report_data_index")) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(
@ -848,6 +852,11 @@ public class ElasticSearchClientImpl implements SearchRepository {
return searchBuilder(queryBuilder, hb, from, size);
}
private static SearchSourceBuilder buildRawCostAnalysisReportDataSearch(String query, int from, int size) {
QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(query);
return searchBuilder(queryBuilder, null, from, size);
}
private static SearchSourceBuilder buildSearchEntitySearch(String query, int from, int size) {
QueryStringQueryBuilder queryBuilder =
QueryBuilders.queryStringQuery(query)
@ -1252,7 +1261,7 @@ public class ElasticSearchClientImpl implements SearchRepository {
String indexName)
throws IOException, ParseException {
org.elasticsearch.action.search.SearchRequest searchRequestTotalAssets =
buildSearchRequest(scheduleTime, currentTime, null, team, chartType, indexName);
buildSearchRequest(scheduleTime, currentTime, null, team, chartType, null, null, null, indexName);
SearchResponse searchResponseTotalAssets = client.search(searchRequestTotalAssets, RequestOptions.DEFAULT);
DataInsightChartResult processedDataTotalAssets =
processDataInsightChartResult(searchResponseTotalAssets, chartType);
@ -1277,10 +1286,13 @@ public class ElasticSearchClientImpl implements SearchRepository {
String tier,
String team,
DataInsightChartResult.DataInsightChartType dataInsightChartName,
Integer size,
Integer from,
String queryFilter,
String dataReportIndex)
throws IOException, ParseException {
org.elasticsearch.action.search.SearchRequest searchRequest =
buildSearchRequest(startTs, endTs, tier, team, dataInsightChartName, dataReportIndex);
buildSearchRequest(startTs, endTs, tier, team, dataInsightChartName, size, from, queryFilter, dataReportIndex);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
return Response.status(OK).entity(processDataInsightChartResult(searchResponse, dataInsightChartName)).build();
}
@ -1330,35 +1342,38 @@ public class ElasticSearchClientImpl implements SearchRepository {
private static DataInsightChartResult processDataInsightChartResult(
SearchResponse searchResponse, DataInsightChartResult.DataInsightChartType dataInsightChartName)
throws ParseException {
DataInsightAggregatorInterface processor =
createDataAggregator(searchResponse.getAggregations(), dataInsightChartName);
DataInsightAggregatorInterface processor = createDataAggregator(searchResponse, dataInsightChartName);
return processor.process();
}
private static DataInsightAggregatorInterface createDataAggregator(
Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType)
SearchResponse aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType)
throws IllegalArgumentException {
switch (dataInsightChartType) {
case PERCENTAGE_OF_ENTITIES_WITH_DESCRIPTION_BY_TYPE:
return new EsEntitiesDescriptionAggregator(aggregations, dataInsightChartType);
return new EsEntitiesDescriptionAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_SERVICES_WITH_DESCRIPTION:
return new EsServicesDescriptionAggregator(aggregations, dataInsightChartType);
return new EsServicesDescriptionAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_ENTITIES_WITH_OWNER_BY_TYPE:
return new EsEntitiesOwnerAggregator(aggregations, dataInsightChartType);
return new EsEntitiesOwnerAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_SERVICES_WITH_OWNER:
return new EsServicesOwnerAggregator(aggregations, dataInsightChartType);
return new EsServicesOwnerAggregator(aggregations.getAggregations(), dataInsightChartType);
case TOTAL_ENTITIES_BY_TYPE:
return new EsTotalEntitiesAggregator(aggregations, dataInsightChartType);
return new EsTotalEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case TOTAL_ENTITIES_BY_TIER:
return new EsTotalEntitiesByTierAggregator(aggregations, dataInsightChartType);
return new EsTotalEntitiesByTierAggregator(aggregations.getAggregations(), dataInsightChartType);
case DAILY_ACTIVE_USERS:
return new EsDailyActiveUsersAggregator(aggregations, dataInsightChartType);
return new EsDailyActiveUsersAggregator(aggregations.getAggregations(), dataInsightChartType);
case PAGE_VIEWS_BY_ENTITIES:
return new EsPageViewsByEntitiesAggregator(aggregations, dataInsightChartType);
return new EsPageViewsByEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case MOST_ACTIVE_USERS:
return new EsMostActiveUsersAggregator(aggregations, dataInsightChartType);
return new EsMostActiveUsersAggregator(aggregations.getAggregations(), dataInsightChartType);
case MOST_VIEWED_ENTITIES:
return new EsMostViewedEntitiesAggregator(aggregations, dataInsightChartType);
return new EsMostViewedEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case UNUSED_ASSETS:
return new EsUnusedAssetsAggregator(aggregations.getHits(), dataInsightChartType);
case AGGREGATED_UNUSED_ASSETS:
return new EsAggregatedUnusedAssetsAggregator(aggregations.getAggregations(), dataInsightChartType);
default:
throw new IllegalArgumentException(
String.format("No processor found for chart Type %s ", dataInsightChartType));
@ -1371,12 +1386,25 @@ public class ElasticSearchClientImpl implements SearchRepository {
String tier,
String team,
DataInsightChartResult.DataInsightChartType dataInsightChartName,
Integer size,
Integer from,
String queryFilter,
String dataReportIndex) {
SearchSourceBuilder searchSourceBuilder =
buildQueryFilter(startTs, endTs, tier, team, dataInsightChartName.value());
AggregationBuilder aggregationBuilder = buildQueryAggregation(dataInsightChartName);
searchSourceBuilder.aggregation(aggregationBuilder);
searchSourceBuilder.timeout(new TimeValue(30, TimeUnit.SECONDS));
buildQueryFilter(startTs, endTs, tier, team, queryFilter, dataInsightChartName.value());
if (!dataInsightChartName
.toString()
.equalsIgnoreCase(DataInsightChartResult.DataInsightChartType.UNUSED_ASSETS.toString())) {
AggregationBuilder aggregationBuilder = buildQueryAggregation(dataInsightChartName);
searchSourceBuilder.aggregation(aggregationBuilder);
searchSourceBuilder.timeout(new TimeValue(30, TimeUnit.SECONDS));
} else {
// get raw doc for unused assets
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchSourceBuilder.sort("data.lifeCycle.accessed.timestamp", SortOrder.DESC);
}
org.elasticsearch.action.search.SearchRequest searchRequest =
new org.elasticsearch.action.search.SearchRequest(dataReportIndex);
@ -1385,11 +1413,12 @@ public class ElasticSearchClientImpl implements SearchRepository {
}
private static SearchSourceBuilder buildQueryFilter(
Long startTs, Long endTs, String tier, String team, String dataInsightChartName) {
Long startTs, Long endTs, String tier, String team, String queryFilter, String dataInsightChartName) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder searchQueryFiler = new BoolQueryBuilder();
// Add team filter
if (team != null && DataInsightChartRepository.SUPPORTS_TEAM_FILTER.contains(dataInsightChartName)) {
List<String> teamArray = Arrays.asList(team.split("\\s*,\\s*"));
@ -1398,6 +1427,7 @@ public class ElasticSearchClientImpl implements SearchRepository {
searchQueryFiler.must(teamQueryFilter);
}
// Add tier filter
if (tier != null && DataInsightChartRepository.SUPPORTS_TIER_FILTER.contains(dataInsightChartName)) {
List<String> tierArray = Arrays.asList(tier.split("\\s*,\\s*"));
@ -1406,11 +1436,34 @@ public class ElasticSearchClientImpl implements SearchRepository {
searchQueryFiler.must(tierQueryFilter);
}
RangeQueryBuilder dateQueryFilter =
QueryBuilders.rangeQuery(DataInsightChartRepository.TIMESTAMP).gte(startTs).lte(endTs);
// Add date range filter
if (!DataInsightChartRepository.SUPPORTS_NULL_DATE_RANGE.contains(dataInsightChartName)) {
if (startTs == null || endTs == null) {
throw new IllegalArgumentException(
String.format("Start and End date are required for chart type %s ", dataInsightChartName));
}
RangeQueryBuilder dateQueryFilter =
QueryBuilders.rangeQuery(DataInsightChartRepository.TIMESTAMP).gte(startTs).lte(endTs);
searchQueryFiler.must(dateQueryFilter);
}
searchQueryFiler.must(dateQueryFilter);
return searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
try {
XContentParser filterParser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
BoolQueryBuilder newQuery = QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
searchSourceBuilder.query(newQuery);
} catch (Exception ex) {
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
}
}
return searchSourceBuilder;
}
private static AggregationBuilder buildQueryAggregation(
@ -1439,6 +1492,23 @@ public class ElasticSearchClientImpl implements SearchRepository {
termsAggregationBuilder
.subAggregation(sumAggregationBuilder)
.subAggregation(sumEntityCountAggregationBuilder));
case AGGREGATED_UNUSED_ASSETS:
SumAggregationBuilder threeDaysAgg =
AggregationBuilders.sum("threeDays").field("data.unusedDataAssets.threeDays");
SumAggregationBuilder sevenDaysAgg =
AggregationBuilders.sum("sevenDays").field("data.unusedDataAssets.sevenDays");
SumAggregationBuilder fourteenDaysAgg =
AggregationBuilders.sum("fourteenDays").field("data.unusedDataAssets.fourteenDays");
SumAggregationBuilder thirtyDaysAgg =
AggregationBuilders.sum("thirtyDays").field("data.unusedDataAssets.thirtyDays");
SumAggregationBuilder sixtyDaysAgg =
AggregationBuilders.sum("sixtyDays").field("data.unusedDataAssets.sixtyDays");
return dateHistogramAggregationBuilder
.subAggregation(threeDaysAgg)
.subAggregation(sevenDaysAgg)
.subAggregation(fourteenDaysAgg)
.subAggregation(thirtyDaysAgg)
.subAggregation(sixtyDaysAgg);
case PERCENTAGE_OF_SERVICES_WITH_DESCRIPTION:
termsAggregationBuilder =
AggregationBuilders.terms(DataInsightChartRepository.SERVICE_NAME)

View File

@ -0,0 +1,49 @@
package org.openmetadata.service.search.elasticsearch;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.type.AggregatedUnusedAssets;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
public class EsAggregatedUnusedAssetsAggregator extends DataInsightAggregatorInterface {
public EsAggregatedUnusedAssetsAggregator(
Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
super(aggregations, dataInsightChartType);
}
@Override
public DataInsightChartResult process() throws ParseException {
List<Object> data = this.aggregate();
return new DataInsightChartResult().withData(data).withChartType(this.dataInsightChartType);
}
@Override
public List<Object> aggregate() throws ParseException {
Histogram timestampBuckets = this.aggregationsEs.get(TIMESTAMP);
List<Object> data = new ArrayList<>();
for (Histogram.Bucket timestampBucket : timestampBuckets.getBuckets()) {
String dateTimeString = timestampBucket.getKeyAsString();
Long timestamp = this.convertDatTimeStringToTimestamp(dateTimeString);
Sum threeDays = timestampBucket.getAggregations().get("threeDays");
Sum sevenDays = timestampBucket.getAggregations().get("sevenDays");
Sum fourteenDays = timestampBucket.getAggregations().get("fourteenDays");
Sum thirtyDays = timestampBucket.getAggregations().get("thirtyDays");
Sum sixtyDays = timestampBucket.getAggregations().get("sixtyDays");
data.add(
new AggregatedUnusedAssets()
.withTimestamp(timestamp)
.withThreeDays(threeDays.getValue())
.withSevenDays(sevenDays.getValue())
.withFourteenDays(fourteenDays.getValue())
.withThirtyDays(thirtyDays.getValue())
.withSixtyDays(sixtyDays.getValue()));
}
return data;
}
}

View File

@ -0,0 +1,60 @@
package org.openmetadata.service.search.elasticsearch;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.type.UnusedAssets;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
// TODO: refactor this class and the interface in https://github.com/open-metadata/OpenMetadata/issues/13401
@Slf4j
public class EsUnusedAssetsAggregator extends DataInsightAggregatorInterface {
public EsUnusedAssetsAggregator(SearchHits hits, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
super(hits, dataInsightChartType);
}
@Override
public DataInsightChartResult process() throws ParseException {
List<Object> data = this.aggregate();
Long hits = null;
TotalHits totalHits = this.hitsEs.getTotalHits();
if (totalHits != null) {
hits = totalHits.value;
}
return new DataInsightChartResult()
.withData(data)
.withChartType(this.dataInsightChartType)
.withTotal(hits != null ? hits.intValue() : null);
}
@Override
public List<Object> aggregate() throws ParseException {
List<Object> dataList = new ArrayList<>();
for (SearchHit hit : this.hitsEs) {
try {
HashMap<String, Object> data = (HashMap<String, Object>) hit.getSourceAsMap().get("data");
String fqn = ((HashMap<String, String>) data.get("entity")).get("fullyQualifiedName");
Long lastAccessed =
(Long)
((HashMap<String, Object>) ((HashMap<String, Object>) data.get("lifeCycle")).get("accessed"))
.get("timestamp");
Double sizeInByte = (Double) data.get("sizeInByte");
new UnusedAssets().withFullyQualifiedName(fqn).withLastAccessedAt(lastAccessed).withSizeInBytes(sizeInByte);
dataList.add(
new UnusedAssets()
.withFullyQualifiedName(fqn)
.withLastAccessedAt(lastAccessed)
.withSizeInBytes(sizeInByte));
} catch (Exception e) {
LOG.error("Error while parsing hits for UnusedData chart from ES", e);
}
}
return dataList;
}
}

View File

@ -135,7 +135,6 @@ import org.opensearch.script.ScriptType;
import org.opensearch.search.SearchModule;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
@ -337,6 +336,10 @@ public class OpenSearchClientImpl implements SearchRepository {
case "search_entity_index":
searchSourceBuilder = buildSearchEntitySearch(request.getQuery(), request.getFrom(), request.getSize());
break;
case "raw_cost_analysis_report_data_index":
searchSourceBuilder =
buildRawCostAnalysisReportDataSearch(request.getQuery(), request.getFrom(), request.getSize());
break;
default:
searchSourceBuilder = buildAggregateSearchBuilder(request.getQuery(), request.getFrom(), request.getSize());
break;
@ -371,7 +374,8 @@ public class OpenSearchClientImpl implements SearchRepository {
/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_products_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")) {
|| request.getIndex().equalsIgnoreCase("query_search_index")
|| request.getIndex().equalsIgnoreCase("raw_cost_analysis_report_data_index")) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(
@ -840,6 +844,11 @@ public class OpenSearchClientImpl implements SearchRepository {
return addAggregation(searchSourceBuilder);
}
private static SearchSourceBuilder buildRawCostAnalysisReportDataSearch(String query, int from, int size) {
QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(query);
return searchBuilder(queryBuilder, null, from, size);
}
private static SearchSourceBuilder buildDomainsSearch(String query, int from, int size) {
QueryStringQueryBuilder queryBuilder =
QueryBuilders.queryStringQuery(query)
@ -1283,7 +1292,7 @@ public class OpenSearchClientImpl implements SearchRepository {
String indexName)
throws IOException, ParseException {
org.opensearch.action.search.SearchRequest searchRequestTotalAssets =
buildSearchRequest(scheduleTime, currentTime, null, team, chartType, indexName);
buildSearchRequest(scheduleTime, currentTime, null, team, chartType, null, null, null, indexName);
SearchResponse searchResponseTotalAssets = client.search(searchRequestTotalAssets, RequestOptions.DEFAULT);
DataInsightChartResult processedDataTotalAssets =
processDataInsightChartResult(searchResponseTotalAssets, chartType);
@ -1308,10 +1317,13 @@ public class OpenSearchClientImpl implements SearchRepository {
String tier,
String team,
DataInsightChartResult.DataInsightChartType dataInsightChartName,
Integer size,
Integer from,
String queryFilter,
String dataReportIndex)
throws IOException, ParseException {
org.opensearch.action.search.SearchRequest searchRequest =
buildSearchRequest(startTs, endTs, tier, team, dataInsightChartName, dataReportIndex);
buildSearchRequest(startTs, endTs, tier, team, dataInsightChartName, size, from, queryFilter, dataReportIndex);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
return Response.status(OK).entity(processDataInsightChartResult(searchResponse, dataInsightChartName)).build();
}
@ -1324,35 +1336,38 @@ public class OpenSearchClientImpl implements SearchRepository {
private static DataInsightChartResult processDataInsightChartResult(
SearchResponse searchResponse, DataInsightChartResult.DataInsightChartType dataInsightChartName)
throws ParseException {
DataInsightAggregatorInterface processor =
createDataAggregator(searchResponse.getAggregations(), dataInsightChartName);
DataInsightAggregatorInterface processor = createDataAggregator(searchResponse, dataInsightChartName);
return processor.process();
}
private static DataInsightAggregatorInterface createDataAggregator(
Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType)
SearchResponse aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType)
throws IllegalArgumentException {
switch (dataInsightChartType) {
case PERCENTAGE_OF_ENTITIES_WITH_DESCRIPTION_BY_TYPE:
return new OsEntitiesDescriptionAggregator(aggregations, dataInsightChartType);
return new OsEntitiesDescriptionAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_SERVICES_WITH_DESCRIPTION:
return new OsServicesDescriptionAggregator(aggregations, dataInsightChartType);
return new OsServicesDescriptionAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_ENTITIES_WITH_OWNER_BY_TYPE:
return new OsEntitiesOwnerAggregator(aggregations, dataInsightChartType);
return new OsEntitiesOwnerAggregator(aggregations.getAggregations(), dataInsightChartType);
case PERCENTAGE_OF_SERVICES_WITH_OWNER:
return new OsServicesOwnerAggregator(aggregations, dataInsightChartType);
return new OsServicesOwnerAggregator(aggregations.getAggregations(), dataInsightChartType);
case TOTAL_ENTITIES_BY_TYPE:
return new OsTotalEntitiesAggregator(aggregations, dataInsightChartType);
return new OsTotalEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case TOTAL_ENTITIES_BY_TIER:
return new OsTotalEntitiesByTierAggregator(aggregations, dataInsightChartType);
return new OsTotalEntitiesByTierAggregator(aggregations.getAggregations(), dataInsightChartType);
case DAILY_ACTIVE_USERS:
return new OsDailyActiveUsersAggregator(aggregations, dataInsightChartType);
return new OsDailyActiveUsersAggregator(aggregations.getAggregations(), dataInsightChartType);
case PAGE_VIEWS_BY_ENTITIES:
return new OsPageViewsByEntitiesAggregator(aggregations, dataInsightChartType);
return new OsPageViewsByEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case MOST_ACTIVE_USERS:
return new OsMostActiveUsersAggregator(aggregations, dataInsightChartType);
return new OsMostActiveUsersAggregator(aggregations.getAggregations(), dataInsightChartType);
case MOST_VIEWED_ENTITIES:
return new OsMostViewedEntitiesAggregator(aggregations, dataInsightChartType);
return new OsMostViewedEntitiesAggregator(aggregations.getAggregations(), dataInsightChartType);
case UNUSED_ASSETS:
return new OsUnusedAssetsAggregator(aggregations.getHits(), dataInsightChartType);
case AGGREGATED_UNUSED_ASSETS:
return new OsAggregatedUnusedAssetsAggregator(aggregations.getAggregations(), dataInsightChartType);
default:
throw new IllegalArgumentException(
String.format("No processor found for chart Type %s ", dataInsightChartType));
@ -1365,12 +1380,24 @@ public class OpenSearchClientImpl implements SearchRepository {
String tier,
String team,
DataInsightChartResult.DataInsightChartType dataInsightChartName,
Integer size,
Integer from,
String queryFilter,
String dataReportIndex) {
SearchSourceBuilder searchSourceBuilder =
buildQueryFilter(startTs, endTs, tier, team, dataInsightChartName.value());
AggregationBuilder aggregationBuilder = buildQueryAggregation(dataInsightChartName);
searchSourceBuilder.aggregation(aggregationBuilder);
searchSourceBuilder.timeout(new TimeValue(30, TimeUnit.SECONDS));
buildQueryFilter(startTs, endTs, tier, team, queryFilter, dataInsightChartName.value());
if (!dataInsightChartName
.toString()
.equalsIgnoreCase(DataInsightChartResult.DataInsightChartType.UNUSED_ASSETS.toString())) {
AggregationBuilder aggregationBuilder = buildQueryAggregation(dataInsightChartName);
searchSourceBuilder.aggregation(aggregationBuilder);
searchSourceBuilder.timeout(new TimeValue(30, TimeUnit.SECONDS));
} else {
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchSourceBuilder.sort("data.lifeCycle.accessed.timestamp", SortOrder.DESC);
}
org.opensearch.action.search.SearchRequest searchRequest =
new org.opensearch.action.search.SearchRequest(dataReportIndex);
@ -1379,7 +1406,7 @@ public class OpenSearchClientImpl implements SearchRepository {
}
private static SearchSourceBuilder buildQueryFilter(
Long startTs, Long endTs, String tier, String team, String dataInsightChartName) {
Long startTs, Long endTs, String tier, String team, String queryFilter, String dataInsightChartName) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder searchQueryFiler = new BoolQueryBuilder();
@ -1400,11 +1427,33 @@ public class OpenSearchClientImpl implements SearchRepository {
searchQueryFiler.must(tierQueryFilter);
}
RangeQueryBuilder dateQueryFilter =
QueryBuilders.rangeQuery(DataInsightChartRepository.TIMESTAMP).gte(startTs).lte(endTs);
if (!DataInsightChartRepository.SUPPORTS_NULL_DATE_RANGE.contains(dataInsightChartName)) {
if (startTs == null || endTs == null) {
throw new IllegalArgumentException(
String.format("Start and End date are required for chart type %s ", dataInsightChartName));
}
RangeQueryBuilder dateQueryFilter =
QueryBuilders.rangeQuery(DataInsightChartRepository.TIMESTAMP).gte(startTs).lte(endTs);
searchQueryFiler.must(dateQueryFilter);
}
searchQueryFiler.must(dateQueryFilter);
return searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
searchSourceBuilder.query(searchQueryFiler).fetchSource(false);
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
try {
XContentParser filterParser =
XContentType.JSON
.xContent()
.createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter);
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
BoolQueryBuilder newQuery = QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
searchSourceBuilder.query(newQuery);
} catch (Exception ex) {
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
}
}
return searchSourceBuilder;
}
private static AggregationBuilder buildQueryAggregation(
@ -1433,6 +1482,23 @@ public class OpenSearchClientImpl implements SearchRepository {
termsAggregationBuilder
.subAggregation(sumAggregationBuilder)
.subAggregation(sumEntityCountAggregationBuilder));
case AGGREGATED_UNUSED_ASSETS:
SumAggregationBuilder threeDaysAgg =
AggregationBuilders.sum("threeDays").field("data.unusedDataAssets.threeDays");
SumAggregationBuilder sevenDaysAgg =
AggregationBuilders.sum("sevenDays").field("data.unusedDataAssets.sevenDays");
SumAggregationBuilder fourteenDaysAgg =
AggregationBuilders.sum("fourteenDays").field("data.unusedDataAssets.fourteenDays");
SumAggregationBuilder thirtyDaysAgg =
AggregationBuilders.sum("thirtyDays").field("data.unusedDataAssets.thirtyDays");
SumAggregationBuilder sixtyDaysAgg =
AggregationBuilders.sum("sixtyDays").field("data.unusedDataAssets.sixtyDays");
return dateHistogramAggregationBuilder
.subAggregation(threeDaysAgg)
.subAggregation(sevenDaysAgg)
.subAggregation(fourteenDaysAgg)
.subAggregation(thirtyDaysAgg)
.subAggregation(sixtyDaysAgg);
case PERCENTAGE_OF_SERVICES_WITH_DESCRIPTION:
termsAggregationBuilder =
AggregationBuilders.terms(DataInsightChartRepository.SERVICE_NAME)

View File

@ -0,0 +1,49 @@
package org.openmetadata.service.search.opensearch;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.type.AggregatedUnusedAssets;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
import org.opensearch.search.aggregations.Aggregations;
public class OsAggregatedUnusedAssetsAggregator extends DataInsightAggregatorInterface {
public OsAggregatedUnusedAssetsAggregator(
Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
super(aggregations, dataInsightChartType);
}
@Override
public DataInsightChartResult process() throws ParseException {
List<Object> data = this.aggregate();
return new DataInsightChartResult().withData(data).withChartType(this.dataInsightChartType);
}
@Override
public List<Object> aggregate() throws ParseException {
Histogram timestampBuckets = this.aggregationsEs.get(TIMESTAMP);
List<Object> data = new ArrayList<>();
for (Histogram.Bucket timestampBucket : timestampBuckets.getBuckets()) {
String dateTimeString = timestampBucket.getKeyAsString();
Long timestamp = this.convertDatTimeStringToTimestamp(dateTimeString);
Sum threeDays = timestampBucket.getAggregations().get("threeDays");
Sum sevenDays = timestampBucket.getAggregations().get("sevenDays");
Sum fourteenDays = timestampBucket.getAggregations().get("fourteenDays");
Sum thirtyDays = timestampBucket.getAggregations().get("thirtyDays");
Sum sixtyDays = timestampBucket.getAggregations().get("sixtyDays");
data.add(
new AggregatedUnusedAssets()
.withTimestamp(timestamp)
.withThreeDays(threeDays.getValue())
.withSevenDays(sevenDays.getValue())
.withFourteenDays(fourteenDays.getValue())
.withThirtyDays(thirtyDays.getValue())
.withSixtyDays(sixtyDays.getValue()));
}
return data;
}
}

View File

@ -0,0 +1,59 @@
package org.openmetadata.service.search.opensearch;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.search.TotalHits;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.dataInsight.type.UnusedAssets;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
@Slf4j
public class OsUnusedAssetsAggregator extends DataInsightAggregatorInterface {
public OsUnusedAssetsAggregator(SearchHits hits, DataInsightChartResult.DataInsightChartType dataInsightChartType) {
super(hits, dataInsightChartType);
}
@Override
public DataInsightChartResult process() throws ParseException {
List<Object> data = this.aggregate();
Long hits = null;
TotalHits totalHits = this.hitsEs.getTotalHits();
if (totalHits != null) {
hits = totalHits.value;
}
return new DataInsightChartResult()
.withData(data)
.withChartType(this.dataInsightChartType)
.withTotal(hits != null ? hits.intValue() : null);
}
@Override
public List<Object> aggregate() throws ParseException {
List<Object> dataList = new ArrayList<>();
for (SearchHit hit : this.hitsOs) {
try {
HashMap<String, Object> data = (HashMap<String, Object>) hit.getSourceAsMap().get("data");
String fqn = ((HashMap<String, String>) data.get("entity")).get("fullyQualifiedName");
Long lastAccessed =
(Long)
((HashMap<String, Object>) ((HashMap<String, Object>) data.get("lifeCycle")).get("accessed"))
.get("timestamp");
Double sizeInByte = (Double) data.get("sizeInByte");
new UnusedAssets().withFullyQualifiedName(fqn).withLastAccessedAt(lastAccessed).withSizeInBytes(sizeInByte);
dataList.add(
new UnusedAssets()
.withFullyQualifiedName(fqn)
.withLastAccessedAt(lastAccessed)
.withSizeInBytes(sizeInByte));
} catch (Exception e) {
LOG.error("Error while parsing hits for UnusedData chart from ES", e);
}
}
return dataList;
}
}

View File

@ -0,0 +1,17 @@
{
"name": "AggregatedUnusedAssets",
"fullyQualifiedName": "AggregatedUnusedAssets",
"displayName": "Aggregated Unused Assets",
"description": "Displays the list of unused assets over time",
"dataIndexType": "aggregated_cost_analysis_report_data_index",
"dimensions": [
{"name": "timestamp","chartDataType":"INT"}
],
"metrics": [
{"name": "threeDays", "displayName": "3 Days", "chartDataType": "INT"},
{"name": "sevenDays", "displayName": "7 Days", "chartDataType": "INT"},
{"name": "fourteenDays", "displayName": "14 Days", "chartDataType": "INT"},
{"name": "thirtyDays", "displayName": "30 Days", "chartDataType": "INT"},
{"name": "sixtyDays", "displayName": "60 Days", "chartDataType": "INT"}
]
}

View File

@ -0,0 +1,14 @@
{
"name": "UnusedAssets",
"fullyQualifiedName": "UnusedAssets",
"displayName": "Unused Assets",
"description": "Displays the list of unused assets filtered by last accessed",
"dataIndexType": "raw_cost_analysis_report_data_index",
"dimensions": [
{"name": "fullyQualifiedName","chartDataType":"STRING"}
],
"metrics": [
{"name": "sizeInBytes", "displayName": "Size in Bytes", "chartDataType": "FLOAT"},
{"name": "lastAccessedAt", "displayName": "Last Accessed At", "chartDataType": "INT"}
]
}

View File

@ -19,7 +19,9 @@
"MostViewedEntities",
"PageViewsByEntities",
"PercentageOfServicesWithDescription",
"PercentageOfServicesWithOwner"
"PercentageOfServicesWithOwner",
"UnusedAssets",
"AggregatedUnusedAssets"
]
}
},
@ -28,6 +30,10 @@
"description": "Chart Type that will consume the data. Must match name of dataInsightChart.",
"$ref": "#/definitions/dataInsightChartType"
},
"total": {
"description": "Total number of hits returned by the aggregation.",
"type": "integer"
},
"data": {
"description": "Array of consumable data.",
"type": "array",
@ -42,7 +48,9 @@
{"$ref": "type/mostActiveUsers.json"},
{"$ref": "type/mostViewedEntities.json"},
{"$ref": "type/percentageOfServicesWithDescription.json"},
{"$ref": "type/percentageOfServicesWithOwner.json"}
{"$ref": "type/percentageOfServicesWithOwner.json"},
{"$ref": "type/unusedAssets.json"},
{"$ref": "type/aggregatedUnusedAsserts.json"}
]
}
}

View File

@ -0,0 +1,35 @@
{
"$id": "https://open-metadata.org/schema/dataInsight/type/aggregatedUnusedAssets.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AggregatedUnusedAssets",
"description": "AggregatedUnusedAssets data blob",
"type": "object",
"javaType": "org.openmetadata.schema.dataInsight.type.AggregatedUnusedAssets",
"properties": {
"timestamp": {
"description": "timestamp",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"threeDays": {
"description": "Number of unused assets in the last 3 days",
"type": "number"
},
"sevenDays": {
"description": "Number of unused assets in the last 7 days",
"type": "number"
},
"fourteenDays": {
"description": "Number of unused assets in the last 14 days",
"type": "number"
},
"thirtyDays": {
"description": "Number of unused assets in the last 30 days",
"type": "number"
},
"sixtyDays": {
"description": "Number of unused assets in the last 60 days",
"type": "number"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,23 @@
{
"$id": "https://open-metadata.org/schema/dataInsight/type/unusedAssets.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UnusedAssets",
"description": "UnusedAssets data blob",
"type": "object",
"javaType": "org.openmetadata.schema.dataInsight.type.UnusedAssets",
"properties": {
"fullyQualifiedName": {
"description": "Fully qualified name of the asset",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"sizeInBytes": {
"description": "Size of the asset in bytes",
"type": "number"
},
"lastAccessedAt": {
"description": "timestamp",
"$ref": "../../type/basic.json#/definitions/timestamp"
}
},
"additionalProperties": false
}