fix: python test to remove database race condition (#13307)

This commit is contained in:
Teddy 2023-09-22 15:05:57 +02:00 committed by GitHub
parent 7524b673d0
commit a7dd7012ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -15,6 +15,7 @@ Validate workflow configs and filters
from __future__ import annotations from __future__ import annotations
import random
import unittest import unittest
import uuid import uuid
from copy import deepcopy from copy import deepcopy
@ -88,12 +89,6 @@ data_insight_config = {
WEB_EVENT_DATA = [ WEB_EVENT_DATA = [
WebAnalyticEventData( WebAnalyticEventData(
eventId=None, eventId=None,
timestamp=int(
(
datetime.utcnow() - timedelta(days=1, milliseconds=randint(100, 999))
).timestamp()
* 1000
),
eventType=WebAnalyticEventType.PageView, eventType=WebAnalyticEventType.PageView,
eventData=PageViewData( eventData=PageViewData(
fullUrl='http://localhost:8585/table/sample_data.ecommerce_db.shopify."dim.shop"', fullUrl='http://localhost:8585/table/sample_data.ecommerce_db.shopify."dim.shop"',
@ -109,12 +104,6 @@ WEB_EVENT_DATA = [
), ),
WebAnalyticEventData( WebAnalyticEventData(
eventId=None, eventId=None,
timestamp=int(
(
datetime.utcnow() - timedelta(days=1, milliseconds=randint(100, 999))
).timestamp()
* 1000
),
eventType=WebAnalyticEventType.PageView, eventType=WebAnalyticEventType.PageView,
eventData=PageViewData( eventData=PageViewData(
fullUrl="http://localhost:8585/table/mysql.default.airflow_db.dag_run/profiler", fullUrl="http://localhost:8585/table/mysql.default.airflow_db.dag_run/profiler",
@ -136,56 +125,43 @@ class DataInsightWorkflowTests(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
"""Set up om client for the test class"""
cls.metadata = OpenMetadata( cls.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj( OpenMetadataConnection.parse_obj(
data_insight_config["workflowConfig"]["openMetadataServerConfig"] data_insight_config["workflowConfig"]["openMetadataServerConfig"]
) )
) )
# clean up kpis in case we have linguering ones def setUp(self) -> None:
kpis: list[Kpi] = cls.metadata.list_entities( """Set up om client for the test class"""
entity=Kpi, fields="*" # type: ignore self.start_ts = int(
).entities
for kpi in kpis:
cls.metadata.delete(
entity=Kpi,
entity_id=kpi.id,
hard_delete=True,
recursive=True,
)
cls.start_ts = int(
datetime.combine(datetime.utcnow(), time.min).timestamp() * 1000 datetime.combine(datetime.utcnow(), time.min).timestamp() * 1000
) ) - random.randint(1, 999)
cls.end_ts = int( self.end_ts = int(
datetime.combine(datetime.utcnow(), time.max).timestamp() * 1000 datetime.combine(datetime.utcnow(), time.max).timestamp() * 1000
) ) - random.randint(1, 999)
completed_description_chart = cls.metadata.get_by_name( completed_description_chart = self.metadata.get_by_name(
DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*" DataInsightChart, "PercentageOfEntitiesWithDescriptionByType", fields="*"
) )
create = CreateKpiRequest( create = CreateKpiRequest(
name="CompletedDescription", name=f"CompletedDescription__{self.id().split('.')[-1]}",
dataInsightChart=completed_description_chart.fullyQualifiedName, dataInsightChart=completed_description_chart.fullyQualifiedName,
description="foo", description="foo",
startDate=cls.start_ts, startDate=self.start_ts,
endDate=cls.end_ts, endDate=self.end_ts,
targetDefinition=[ targetDefinition=[
KpiTarget(name="completedDescriptionFraction", value="0.63") KpiTarget(name="completedDescriptionFraction", value="0.63")
], ],
metricType="PERCENTAGE", metricType="PERCENTAGE",
) )
cls.metadata.create_kpi(create) self.kpi = self.metadata.create_kpi(create)
table: Table = cls.metadata.get_by_name( table: Table = self.metadata.get_by_name(
Table, 'sample_data.ecommerce_db.shopify."dim.shop"' Table, 'sample_data.ecommerce_db.shopify."dim.shop"'
) )
user: User = cls.metadata.get_by_name(User, "aaron_johnson0") user: User = self.metadata.get_by_name(User, "aaron_johnson0")
cls.metadata.patch_owner( self.metadata.patch_owner(
entity=Table, entity=Table,
source=table, source=table,
owner=EntityReference( owner=EntityReference(
@ -196,13 +172,24 @@ class DataInsightWorkflowTests(unittest.TestCase):
) )
for event in WEB_EVENT_DATA: for event in WEB_EVENT_DATA:
cls.metadata.add_web_analytic_events(event) event.timestamp = int(
(
datetime.utcnow() - timedelta(days=1, milliseconds=randint(0, 999))
).timestamp()
* 1000
)
self.metadata.add_web_analytic_events(event)
cls.metadata.add_web_analytic_events( # we'll add the user ID
self.metadata.add_web_analytic_events(
WebAnalyticEventData( WebAnalyticEventData(
eventId=None, eventId=None,
timestamp=int( timestamp=int(
(datetime.utcnow() - timedelta(days=1)).timestamp() * 1000 (
datetime.utcnow()
- timedelta(days=1, milliseconds=randint(0, 999))
).timestamp()
* 1000
), ),
eventType=WebAnalyticEventType.PageView, eventType=WebAnalyticEventType.PageView,
eventData=PageViewData( eventData=PageViewData(
@ -338,12 +325,12 @@ class DataInsightWorkflowTests(unittest.TestCase):
def test_write_kpi_result(self): def test_write_kpi_result(self):
"""test write kpi result""" """test write kpi result"""
fqn = "CompletedDescription" fqn = "CompletedDescription__test_write_kpi_result"
self.metadata.add_kpi_result( self.metadata.add_kpi_result(
fqn, fqn,
KpiResult( KpiResult(
timestamp=int(datetime.utcnow().timestamp() * 1000), timestamp=int(datetime.utcnow().timestamp() * 1000),
kpiFqn="CompletedDescription", kpiFqn="CompletedDescription__test_write_kpi_result",
targetResult=[ targetResult=[
KpiTarget( KpiTarget(
name="completedDescriptionFraction", name="completedDescriptionFraction",
@ -384,16 +371,11 @@ class DataInsightWorkflowTests(unittest.TestCase):
# we'll check we only have 1 execution timestamp # we'll check we only have 1 execution timestamp
assert len(set(timestamp)) == 1 assert len(set(timestamp)) == 1
@classmethod def tearDown(self) -> None:
def tearDownClass(cls) -> None: """teardown class"""
kpis: list[Kpi] = cls.metadata.list_entities( self.metadata.delete(
entity=Kpi, fields="*" # type: ignore entity=Kpi,
).entities entity_id=str(self.kpi.id.__root__),
hard_delete=True,
for kpi in kpis: recursive=True,
cls.metadata.delete( )
entity=Kpi,
entity_id=kpi.id,
hard_delete=True,
recursive=True,
)