diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py index 22369513faa..3fe8a6b30b6 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py @@ -16,9 +16,13 @@ To be used by OpenMetadata class from __future__ import annotations -from typing import Optional +from typing import List, Optional +from metadata.generated.schema.analytics.basic import WebAnalyticEventType from metadata.generated.schema.analytics.reportData import ReportData +from metadata.generated.schema.analytics.webAnalyticEventData import ( + WebAnalyticEventData, +) from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import ( CreateKpiRequest, ) @@ -56,6 +60,16 @@ class DataInisghtMixin: return resp + def add_web_analytic_events( + self, + event_data: WebAnalyticEventData, + ) -> List[WebAnalyticEventData]: + """Get web analytic event""" + + resp = self.client.put("/analytics/webAnalyticEvent/collect", event_data.json()) + + return resp + def get_data_insight_report_data( self, start_ts: int, end_ts: int, report_data_type: str ) -> dict[str, list[ReportData]]: @@ -135,3 +149,16 @@ class DataInisghtMixin: resp = self.client.post("/kpi", create.json()) return Kpi.parse_obj(resp) + + def get_web_analytic_events( + self, event_type: WebAnalyticEventType, start_ts: int, end_ts: int + ) -> List[WebAnalyticEventData]: + """Get web analytic event""" + + event_type_value = event_type.value + + params = {"eventType": event_type_value, "startTs": start_ts, "endTs": end_ts} + + resp = self.client.get("/analytics/webAnalyticEvent/collect", params) + + return [WebAnalyticEventData(**data) for data in resp["data"]] diff --git a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py index f177987de19..d4b507fd46c 100644 --- a/ingestion/tests/integration/data_insight/test_data_insight_workflow.py +++ b/ingestion/tests/integration/data_insight/test_data_insight_workflow.py @@ -16,8 +16,9 @@ Validate workflow configs and filters from __future__ import annotations import unittest +import uuid from copy import deepcopy -from datetime import datetime, time +from datetime import datetime, time, timedelta from time import sleep import pytest @@ -25,7 +26,14 @@ import requests from metadata.data_insight.api.workflow import DataInsightWorkflow from metadata.data_insight.helper.data_insight_es_index import DataInsightEsIndex +from metadata.generated.schema.analytics.basic import WebAnalyticEventType from metadata.generated.schema.analytics.reportData import ReportDataType +from metadata.generated.schema.analytics.webAnalyticEventData import ( + WebAnalyticEventData, +) +from metadata.generated.schema.analytics.webAnalyticEventType.pageViewEvent import ( + PageViewData, +) from metadata.generated.schema.api.dataInsight.kpi.createKpiRequest import ( CreateKpiRequest, ) @@ -36,6 +44,10 @@ from metadata.generated.schema.dataInsight.dataInsightChartResult import ( ) from metadata.generated.schema.dataInsight.kpi.basic import KpiResult, KpiTarget from metadata.generated.schema.dataInsight.kpi.kpi import Kpi +from metadata.generated.schema.dataInsight.type.dailyActiveUsers import DailyActiveUsers +from metadata.generated.schema.dataInsight.type.pageViewsByEntities import ( + PageViewsByEntities, +) from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithDescriptionByType import ( PercentageOfEntitiesWithDescriptionByType, ) @@ -58,7 +70,7 @@ data_insight_config = { "processor": {"type": "data-insight-processor", "config": {}}, "sink": { "type": "elasticsearch", - "config": {"es_host": "localhost", "es_port": 9200, "recreate_indexes": True}, + "config": {"es_host": "localhost", "es_port": 9200, "recreate_indexes": False}, }, "workflowConfig": { "openMetadataServerConfig": { @@ -71,6 +83,41 @@ data_insight_config = { }, } +WEB_EVENT_DATA = [ + WebAnalyticEventData( + eventId=None, + timestamp=int((datetime.utcnow() - timedelta(days=1)).timestamp() * 1000), + eventType=WebAnalyticEventType.PageView, + eventData=PageViewData( + fullUrl="http://localhost:8585/table/sample_data.ecommerce_db.shopify.%22dim.shop%22", + url="/table/sample_data.ecommerce_db.shopify.%22dim.shop%22", + hostname="localhost", + language="en-US", + screenSize="1280x720", + userId=uuid.uuid4(), + sessionId=uuid.uuid4(), + pageLoadTime=0.0, + referrer="", + ), + ), + WebAnalyticEventData( + eventId=None, + timestamp=int((datetime.utcnow() - timedelta(days=1)).timestamp() * 1000), + eventType=WebAnalyticEventType.PageView, + eventData=PageViewData( + fullUrl="http://localhost:8585/table/mysql.default.airflow_db.dag_run/profiler", + url="/table/mysql.default.airflow_db.dag_run/profiler", + hostname="localhost", + language="en-US", + screenSize="1280x720", + userId=uuid.uuid4(), + sessionId=uuid.uuid4(), + pageLoadTime=0.0, + referrer="", + ), + ), +] + class DataInsightWorkflowTests(unittest.TestCase): """Test class for data insight workflow validation""" @@ -111,6 +158,9 @@ class DataInsightWorkflowTests(unittest.TestCase): cls.metadata.create_kpi(create) + for event in WEB_EVENT_DATA: + cls.metadata.add_web_analytic_events(event) + def test_create_method(self): """Test validation of the workflow config is properly happening""" DataInsightWorkflow.create(data_insight_config) @@ -128,21 +178,32 @@ class DataInsightWorkflowTests(unittest.TestCase): sleep(1) # wait for data to be available # Test the indexes have been created as expected and the data have been loaded - entity_report_indexes = requests.get( + entity_report_docs = requests.get( "http://localhost:9200/entity_report_data_index/_search", timeout=30 ) - requests.get( - "http://localhost:9200/entity_report_data_index/_search", timeout=30 - ) - requests.get( - "http://localhost:9200/web_analytic_entity_view_report_data/_search", + web_analytic_user_activity_report_data_docs = requests.get( + "http://localhost:9200/web_analytic_user_activity_report_data_index/_search", + timeout=30, + ) + web_analytic_entity_view_report_data_docs = requests.get( + "http://localhost:9200/web_analytic_entity_view_report_data_index/_search", timeout=30, ) - assert ( - entity_report_indexes.json()["hits"]["total"]["value"] > 0 - ) # check data have been correctly indexed in ES - # test report endpoint is returning data + # check data have been correctly indexed in ES + # -------------------------------------------- + assert entity_report_docs.json()["hits"]["total"]["value"] > 0 + assert ( + web_analytic_user_activity_report_data_docs.json()["hits"]["total"]["value"] + > 0 + ) + assert ( + web_analytic_entity_view_report_data_docs.json()["hits"]["total"]["value"] + > 0 + ) + + # test report endpoints are returning data + # -------------------------------------- report_data = self.metadata.get_data_insight_report_data( self.start_ts, self.end_ts, @@ -150,7 +211,22 @@ class DataInsightWorkflowTests(unittest.TestCase): ) assert report_data.get("data") - # test data insight aggregation endpoint is returning data + web_entity_analytics = self.metadata.get_data_insight_report_data( + self.start_ts, + self.end_ts, + ReportDataType.WebAnalyticEntityViewReportData.value, + ) + assert web_entity_analytics.get("data") + + web_user_analytics = self.metadata.get_data_insight_report_data( + self.start_ts, + self.end_ts, + ReportDataType.WebAnalyticUserActivityReportData.value, + ) + assert web_user_analytics.get("data") + + # test data insight aggregation endpoints are returning data + # ---------------------------------------------------------- resp = self.metadata.get_aggregated_data_insight_results( start_ts=self.start_ts, end_ts=self.end_ts, @@ -172,6 +248,26 @@ class DataInsightWorkflowTests(unittest.TestCase): assert resp.data assert isinstance(resp.data[0], PercentageOfEntitiesWithOwnerByType) + resp = self.metadata.get_aggregated_data_insight_results( + start_ts=self.start_ts, + end_ts=self.end_ts, + data_insight_chart_nane=DataInsightChartType.DailyActiveUsers.value, + data_report_index=DataInsightEsIndex.WebAnalyticUserActivityReportData.value, + ) + + assert resp.data + assert isinstance(resp.data[0], DailyActiveUsers) + + resp = self.metadata.get_aggregated_data_insight_results( + start_ts=self.start_ts, + end_ts=self.end_ts, + data_insight_chart_nane=DataInsightChartType.PageViewsByEntities.value, + data_report_index=DataInsightEsIndex.WebAnalyticEntityViewReportData.value, + ) + + assert resp.data + assert isinstance(resp.data[0], PageViewsByEntities) + def test_get_kpis(self): """test Kpis are returned as expected""" # TO DO: Add KPI creation step and deletion (setUp + tearDown) diff --git a/ingestion/tests/integration/data_insight/test_web_analytic_events.py b/ingestion/tests/integration/data_insight/test_web_analytic_events.py new file mode 100644 index 00000000000..af05f586445 --- /dev/null +++ b/ingestion/tests/integration/data_insight/test_web_analytic_events.py @@ -0,0 +1,116 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test web analytics events +""" + +from __future__ import annotations + +import unittest +import uuid +from datetime import datetime, timedelta + +from metadata.generated.schema.analytics.basic import WebAnalyticEventType +from metadata.generated.schema.analytics.webAnalyticEventData import ( + WebAnalyticEventData, +) +from metadata.generated.schema.analytics.webAnalyticEventType.pageViewEvent import ( + PageViewData, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) + +data_insight_config = { + "source": { + "type": "dataInsight", + "serviceName": "dataInsightWorkflow", + "sourceConfig": {"config": {"type": "dataInsight"}}, + }, + "processor": {"type": "data-insight-processor", "config": {}}, + "sink": { + "type": "elasticsearch", + "config": {}, + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" # pylint: disable=line-too-long + }, + } + }, +} + + +class WebAnalyticsEndpointsTests(unittest.TestCase): + """Test class for data insight workflow validation""" + + @classmethod + def setUpClass(cls) -> None: + """Set up om client for the test class""" + + cls.metadata = OpenMetadata( + OpenMetadataConnection.parse_obj( + data_insight_config["workflowConfig"]["openMetadataServerConfig"] + ) + ) + + cls.start_ts = get_beginning_of_day_timestamp_mill(days=1) + cls.end_ts = get_end_of_day_timestamp_mill(days=1) + cls.yesterday = int((datetime.utcnow() - timedelta(days=1)).timestamp() * 1000) + + def test_web_analytic_events(self): + """Test web analytic get function""" + user_id = uuid.uuid4() + session_id = uuid.uuid4() + + event_data = WebAnalyticEventData( + eventId=None, + timestamp=self.yesterday, + eventType=WebAnalyticEventType.PageView, + eventData=PageViewData( + fullUrl="http://localhost:8585/table/sample_data.ecommerce_db.shopify.%22dim.shop%22", + url="/table/sample_data.ecommerce_db.shopify.dim.foo", + hostname="localhost", + language="en-US", + screenSize="1280x720", + userId=user_id, + sessionId=session_id, + pageLoadTime=0.0, + referrer="", + ), + ) + self.metadata.add_web_analytic_events( + event_data, + ) + + web_events = self.metadata.get_web_analytic_events( + WebAnalyticEventType.PageView, self.start_ts, self.end_ts + ) + + test_event = next( + ( + web_event + for web_event in web_events + if web_event.eventData.userId.__root__ == user_id + ), + None, + ) + + assert test_event