From dcd0bbb5667e29be96987d00b801ccff2214eb6f Mon Sep 17 00:00:00 2001 From: Teddy Date: Tue, 8 Nov 2022 17:25:31 +0100 Subject: [PATCH] Fixes 8470 -- Implements ES aggregation for web analytic data (#8566) * Moved webanalytics type in its own folder * Added data insight chart api endpoint * Jave formatting * Added resource descriptor * Added metadata entity endpoint * Added aggregation endpoint for dataInsight * Fix tag name * Added logic to ingestion pipeline resource to add ES config info if pipeline type is dataInsight * added domo to test subpackage * cleaned up branch by removing commit from issue-8353 that were not merged in main * Added web analytics data refinement * Added get_status function * Added from __futur__ for typing * Added dailyActiveUsers aggregation * Added page views entities aggregation and active users aggregation --- .../helper/data_insight_es_index.py | 2 +- .../web_analytic_report_data_processor.py | 4 +- .../metadata/ingestion/sink/elasticsearch.py | 2 +- ...c_entity_view_report_data_index_mapping.py | 2 +- ...user_activity_report_data_index_mapping.py | 2 +- .../DailyActiveUsersAggregator.java | 39 ++++++++ .../DataInsightAggregatorFactory.java | 8 ++ .../MostActiveUsersAggregator.java | 57 ++++++++++++ .../MostViewedEntitiesAggregator.java | 44 +++++++++ .../PageViewsByEntitiesAggregator.java | 47 ++++++++++ .../jdbi3/DataInsightChartRepository.java | 91 +++++++++++++++++-- .../dataInsight/DataInsightChartResource.java | 6 +- .../data/dataInsight/dailyActiveUsers.json | 12 +++ .../data/dataInsight/mostActiveUsers.json | 16 ++++ .../data/dataInsight/mostViewedEntities.json | 13 +++ .../data/dataInsight/pageViewsByEntities.json | 13 +++ .../schema/dataInsight/dataInsightChart.json | 4 +- .../dataInsight/dataInsightChartResult.json | 14 ++- .../dataInsight/type/dailyActiveUsers.json | 19 ++++ .../dataInsight/type/mostActiveUsers.json | 39 ++++++++ .../dataInsight/type/mostViewedEntities.json | 23 +++++ .../dataInsight/type/pageViewsByEntities.json | 23 +++++ 22 files changed, 461 insertions(+), 19 deletions(-) create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DailyActiveUsersAggregator.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostActiveUsersAggregator.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostViewedEntitiesAggregator.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/PageViewsByEntitiesAggregator.java create mode 100644 openmetadata-service/src/main/resources/json/data/dataInsight/dailyActiveUsers.json create mode 100644 openmetadata-service/src/main/resources/json/data/dataInsight/mostActiveUsers.json create mode 100644 openmetadata-service/src/main/resources/json/data/dataInsight/mostViewedEntities.json create mode 100644 openmetadata-service/src/main/resources/json/data/dataInsight/pageViewsByEntities.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/dataInsight/type/dailyActiveUsers.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/dataInsight/type/mostActiveUsers.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/dataInsight/type/mostViewedEntities.json create mode 100644 openmetadata-spec/src/main/resources/json/schema/dataInsight/type/pageViewsByEntities.json diff --git a/ingestion/src/metadata/data_insight/helper/data_insight_es_index.py b/ingestion/src/metadata/data_insight/helper/data_insight_es_index.py index bbde00130ae..c1ead4d4622 100644 --- a/ingestion/src/metadata/data_insight/helper/data_insight_es_index.py +++ b/ingestion/src/metadata/data_insight/helper/data_insight_es_index.py @@ -20,4 +20,4 @@ class DataInsightEsIndex(enum.Enum): EntityReportData = "entity_report_data_index" WebAnalyticUserActivityReportData = "web_analytic_user_activity_report_data_index" - WebAnalyticEntityViewReportData = "web_analytic_entity_view_report_data" + WebAnalyticEntityViewReportData = "web_analytic_entity_view_report_data_index" diff --git a/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py b/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py index 76b45519441..0e73b49a748 100644 --- a/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py +++ b/ingestion/src/metadata/data_insight/processor/web_analytic_report_data_processor.py @@ -269,9 +269,9 @@ class WebAnalyticUserActivityReportDataProcessor(DataProcessor): if not refined_data.get(user_id): refined_data[user_id] = { - "userName": user_details.get("user_name"), + "userName": user_details[user_id].get("user_name"), "userId": user_id, - "team": user_details.get("team"), + "team": user_details[user_id].get("team"), "sessions": { session_id: [timestamp], }, diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index f0a4ff410bd..136195e73be 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -148,7 +148,7 @@ class ElasticSearchConfig(ConfigModel): "web_analytic_user_activity_report_data_index" ) web_analytic_entity_view_report_data_name: str = ( - "web_analytic_entity_view_report_data" + "web_analytic_entity_view_report_data_index" ) scheme: str = "http" use_ssl: bool = False diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_entity_view_report_data_index_mapping.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_entity_view_report_data_index_mapping.py index cba851a4316..b65268dc386 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_entity_view_report_data_index_mapping.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_entity_view_report_data_index_mapping.py @@ -36,7 +36,7 @@ WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA_INDEX_MAPPING = textwrap.dedent( "type": "keyword" }, "owner": { - "type": "text" + "type": "keyword" }, "ownerId": { "type": "text" diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_user_activity_report_data_index_mapping.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_user_activity_report_data_index_mapping.py index 69d6bed8402..2e287010181 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_user_activity_report_data_index_mapping.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/web_analytic_user_activity_report_data_index_mapping.py @@ -27,7 +27,7 @@ WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA_INDEX_MAPPING = textwrap.dedent( "data": { "properties": { "userName": { - "type": "text" + "type": "keyword" }, "userId": { "type": "text" diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DailyActiveUsersAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DailyActiveUsersAggregator.java new file mode 100644 index 00000000000..9ff5a72b64a --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DailyActiveUsersAggregator.java @@ -0,0 +1,39 @@ +package org.openmetadata.service.dataInsight; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.openmetadata.schema.dataInsight.DataInsightChartResult; +import org.openmetadata.schema.datatInsight.type.DailyActiveUsers; + +public class DailyActiveUsersAggregator extends DataInsightAggregatorInterface { + + public DailyActiveUsersAggregator( + Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) { + super(aggregations, dataInsightChartType); + } + + @Override + public DataInsightChartResult process() throws ParseException { + List data = this.aggregate(); + DataInsightChartResult dataInsightChartResult = new DataInsightChartResult(); + return dataInsightChartResult.withData(data).withChartType(this.dataInsightChartType); + } + + @Override + List aggregate() throws ParseException { + Histogram timestampBuckets = this.aggregations.get(TIMESTAMP); + List data = new ArrayList(); + for (Histogram.Bucket timestampBucket : timestampBuckets.getBuckets()) { + String dateTimeString = timestampBucket.getKeyAsString(); + Long timestamp = this.convertDatTimeStringToTimestamp(dateTimeString); + Long activeUsers = timestampBucket.getDocCount(); + + data.add(new DailyActiveUsers().withTimestamp(timestamp).withActiveUsers(activeUsers.intValue())); + } + + return data; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DataInsightAggregatorFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DataInsightAggregatorFactory.java index 8b3a896bfe9..393b0f511ae 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DataInsightAggregatorFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/DataInsightAggregatorFactory.java @@ -19,6 +19,14 @@ public class DataInsightAggregatorFactory { return new TotalEntitiesAggregator(aggregations, dataInsightChartType); case TOTAL_ENTITIES_BY_TIER: return new TotalEntitiesByTierAggregator(aggregations, dataInsightChartType); + case DAILY_ACTIVE_USERS: + return new DailyActiveUsersAggregator(aggregations, dataInsightChartType); + case PAGE_VIEWS_BY_ENTITIES: + return new PageViewsByEntitiesAggregator(aggregations, dataInsightChartType); + case MOST_ACTIVE_USERS: + return new MostActiveUsersAggregator(aggregations, dataInsightChartType); + case MOST_VIEWED_ENTITIES: + return new MostViewedEntitiesAggregator(aggregations, dataInsightChartType); default: throw new IllegalArgumentException( String.format("No processor found for chart Type %s ", dataInsightChartType)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostActiveUsersAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostActiveUsersAggregator.java new file mode 100644 index 00000000000..2bedbb726a5 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostActiveUsersAggregator.java @@ -0,0 +1,57 @@ +package org.openmetadata.service.dataInsight; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.metrics.Max; +import org.elasticsearch.search.aggregations.metrics.Sum; +import org.openmetadata.schema.dataInsight.DataInsightChartResult; +import org.openmetadata.schema.datatInsight.type.MostActiveUsers; + +public class MostActiveUsersAggregator extends DataInsightAggregatorInterface { + + public MostActiveUsersAggregator( + Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) { + super(aggregations, dataInsightChartType); + } + + @Override + public DataInsightChartResult process() throws ParseException { + List data = this.aggregate(); + DataInsightChartResult dataInsightChartResult = new DataInsightChartResult(); + return dataInsightChartResult.withData(data).withChartType(this.dataInsightChartType); + } + + @Override + List aggregate() throws ParseException { + MultiBucketsAggregation userNameBuckets = this.aggregations.get("userName"); + List data = new ArrayList(); + for (MultiBucketsAggregation.Bucket userNameBucket : userNameBuckets.getBuckets()) { + String userName = userNameBucket.getKeyAsString(); + Sum sumSession = userNameBucket.getAggregations().get("sessions"); + Sum sumPageViews = userNameBucket.getAggregations().get("pageViews"); + Sum sumSessionDuration = userNameBucket.getAggregations().get("sessionDuration"); + Max lastSession = userNameBucket.getAggregations().get("lastSession"); + MultiBucketsAggregation teamBucket = userNameBucket.getAggregations().get("team"); + + String team = null; + if (!teamBucket.getBuckets().isEmpty()) { + team = teamBucket.getBuckets().get(0).getKeyAsString(); + } + + data.add( + new MostActiveUsers() + .withUserName(userName) + .withLastSession((long) lastSession.getValue()) + .withPageViews(sumPageViews.getValue()) + .withSessionDuration(sumSessionDuration.getValue()) + .withSessions(sumSession.getValue()) + .withTeam(team) + .withAvgSessionDuration(sumSessionDuration.getValue() / sumSession.getValue())); + } + + return data; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostViewedEntitiesAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostViewedEntitiesAggregator.java new file mode 100644 index 00000000000..1a5b29719cd --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/MostViewedEntitiesAggregator.java @@ -0,0 +1,44 @@ +package org.openmetadata.service.dataInsight; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.metrics.Sum; +import org.openmetadata.schema.dataInsight.DataInsightChartResult; +import org.openmetadata.schema.datatInsight.type.MostViewedEntities; + +public class MostViewedEntitiesAggregator extends DataInsightAggregatorInterface { + + public MostViewedEntitiesAggregator( + Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) { + super(aggregations, dataInsightChartType); + } + + @Override + public DataInsightChartResult process() throws ParseException { + List data = this.aggregate(); + DataInsightChartResult dataInsightChartResult = new DataInsightChartResult(); + return dataInsightChartResult.withData(data).withChartType(this.dataInsightChartType); + } + + @Override + List aggregate() throws ParseException { + MultiBucketsAggregation entityFqnBuckets = this.aggregations.get("entityFqn"); + List data = new ArrayList(); + for (MultiBucketsAggregation.Bucket entityFqnBucket : entityFqnBuckets.getBuckets()) { + String tableFqn = entityFqnBucket.getKeyAsString(); + Sum sumPageViews = entityFqnBucket.getAggregations().get("pageViews"); + MultiBucketsAggregation ownerBucket = entityFqnBucket.getAggregations().get("owner"); + String owner = null; + if (!ownerBucket.getBuckets().isEmpty()) { + owner = ownerBucket.getBuckets().get(0).getKeyAsString(); + } + + data.add( + new MostViewedEntities().withEntityFqn(tableFqn).withOwner(owner).withPageViews(sumPageViews.getValue())); + } + return data; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/PageViewsByEntitiesAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/PageViewsByEntitiesAggregator.java new file mode 100644 index 00000000000..4d498fe757c --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/dataInsight/PageViewsByEntitiesAggregator.java @@ -0,0 +1,47 @@ +package org.openmetadata.service.dataInsight; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.metrics.Sum; +import org.openmetadata.schema.dataInsight.DataInsightChartResult; +import org.openmetadata.schema.datatInsight.type.PageViewsByEntities; + +public class PageViewsByEntitiesAggregator extends DataInsightAggregatorInterface { + public PageViewsByEntitiesAggregator( + Aggregations aggregations, DataInsightChartResult.DataInsightChartType dataInsightChartType) { + super(aggregations, dataInsightChartType); + } + + @Override + public DataInsightChartResult process() throws ParseException { + List data = this.aggregate(); + DataInsightChartResult dataInsightChartResult = new DataInsightChartResult(); + return dataInsightChartResult.withData(data).withChartType(this.dataInsightChartType); + } + + @Override + List aggregate() throws ParseException { + Histogram timestampBuckets = this.aggregations.get(TIMESTAMP); + List data = new ArrayList(); + for (Histogram.Bucket timestampBucket : timestampBuckets.getBuckets()) { + String dateTimeString = timestampBucket.getKeyAsString(); + Long timestamp = this.convertDatTimeStringToTimestamp(dateTimeString); + MultiBucketsAggregation entityTypeBuckets = timestampBucket.getAggregations().get(ENTITY_TYPE); + for (MultiBucketsAggregation.Bucket entityTypeBucket : entityTypeBuckets.getBuckets()) { + String entityType = entityTypeBucket.getKeyAsString(); + Sum sumPageViews = entityTypeBucket.getAggregations().get("pageViews"); + + data.add( + new PageViewsByEntities() + .withEntityType(entityType) + .withTimestamp(timestamp) + .withPageViews(sumPageViews.getValue())); + } + } + return data; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightChartRepository.java index 246e39cc3a0..bb1269488b8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataInsightChartRepository.java @@ -11,9 +11,11 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.openmetadata.schema.dataInsight.DataInsightChart; @@ -24,6 +26,7 @@ import org.openmetadata.service.util.EntityUtil; public class DataInsightChartRepository extends EntityRepository { public static final String COLLECTION_PATH = "/v1/dataInsight"; + public static final String LAST_SESSION = "lastSession"; private static final String UPDATE_FIELDS = "owner"; private static final String PATCH_FIELDS = "owner"; private static final String DATA_ENTITY_TYPE = "data.entityType"; @@ -38,6 +41,38 @@ public class DataInsightChartRepository extends EntityRepository SUPPORTS_TEAM_FILTER = + Arrays.asList( + "TotalEntitiesByType", + "TotalEntitiesByTier", + "PercentageOfEntitiesWithDescriptionByType", + "PercentageOfEntitiesWithOwnerByType", + "DailyActiveUsers", + "MostActiveUsers"); + + private static final List SUPPORTS_TIER_FILTER = + Arrays.asList( + "TotalEntitiesByType", + "TotalEntitiesByTier", + "PercentageOfEntitiesWithDescriptionByType", + "PercentageOfEntitiesWithOwnerByType", + "PageViewsByEntities", + "MostViewedEntities"); public DataInsightChartRepository(CollectionDAO dao) { super( @@ -76,12 +111,13 @@ public class DataInsightChartRepository extends EntityRepository teamArray = new ArrayList(Arrays.asList(team)); BoolQueryBuilder teamQueryFilter = QueryBuilders.boolQuery(); @@ -89,7 +125,7 @@ public class DataInsightChartRepository extends EntityRepository tierArray = new ArrayList(Arrays.asList(tier)); BoolQueryBuilder tierQueryFilter = QueryBuilders.boolQuery(); @@ -115,7 +151,7 @@ public class DataInsightChartRepository extends EntityRepository