feat(ingest): allow relative start time config (#8562)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Mayuri Nehate 2023-08-15 06:18:06 +05:30 committed by GitHub
parent b3251577e1
commit ddcd5109dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 186 additions and 34 deletions

View File

@ -43,24 +43,28 @@ def parse_user_datetime(input: str) -> datetime:
# Then try parsing as a relative time.
with contextlib.suppress(humanfriendly.InvalidTimespan):
delta = _parse_relative_timespan(input)
delta = parse_relative_timespan(input)
return datetime.now(tz=timezone.utc) + delta
# Finally, try parsing as an absolute time.
with contextlib.suppress(dateutil.parser.ParserError):
dt = dateutil.parser.parse(input)
if dt.tzinfo is None:
# Assume that the user meant to specify a time in UTC.
dt = dt.replace(tzinfo=timezone.utc)
else:
# Convert to UTC.
dt = dt.astimezone(timezone.utc)
return dt
return parse_absolute_time(input)
raise ValueError(f"Could not parse {input} as a datetime or relative time.")
def _parse_relative_timespan(input: str) -> timedelta:
def parse_absolute_time(input: str) -> datetime:
dt = dateutil.parser.parse(input)
if dt.tzinfo is None:
# Assume that the user meant to specify a time in UTC.
dt = dt.replace(tzinfo=timezone.utc)
else:
# Convert to UTC.
dt = dt.astimezone(timezone.utc)
return dt
def parse_relative_timespan(input: str) -> timedelta:
neg = False
input = input.strip()

View File

@ -2,10 +2,12 @@ import enum
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List
import humanfriendly
import pydantic
from pydantic.fields import Field
from datahub.configuration.common import ConfigModel
from datahub.configuration.datetimes import parse_absolute_time, parse_relative_timespan
from datahub.metadata.schema_classes import CalendarIntervalClass
@ -42,25 +44,42 @@ class BaseTimeWindowConfig(ConfigModel):
# if those fields are not set by the user.
end_time: datetime = Field(
default_factory=lambda: datetime.now(tz=timezone.utc),
description="Latest date of usage to consider. Default: Current time in UTC",
description="Latest date of lineage/usage to consider. Default: Current time in UTC",
)
start_time: datetime = Field(default=None, description="Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`)") # type: ignore
start_time: datetime = Field(default=None, description="Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.") # type: ignore
@pydantic.validator("start_time", pre=True, always=True)
def default_start_time(
cls, v: Any, *, values: Dict[str, Any], **kwargs: Any
cls, v: Any, values: Dict[str, Any], **kwargs: Any
) -> datetime:
return v or get_time_bucket(
values["end_time"] - get_bucket_duration_delta(values["bucket_duration"]),
values["bucket_duration"],
)
if v is None:
return get_time_bucket(
values["end_time"]
- get_bucket_duration_delta(values["bucket_duration"]),
values["bucket_duration"],
)
elif isinstance(v, str):
# This is where start_time str is resolved to datetime
try:
delta = parse_relative_timespan(v)
assert delta < timedelta(
0
), "Relative start time should start with minus sign (-) e.g. '-2 days'."
assert abs(delta) > get_bucket_duration_delta(
values["bucket_duration"]
), "Relative start time should be in terms of configured bucket duration. e.g '-2 days' or '-2 hours'."
return values["end_time"] + delta
except humanfriendly.InvalidTimespan:
return parse_absolute_time(v)
return v
@pydantic.validator("start_time", "end_time")
def ensure_timestamps_in_utc(cls, v: datetime) -> datetime:
if v.tzinfo != timezone.utc:
raise ValueError(
'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"'
)
assert (
v.tzinfo == timezone.utc
), 'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"'
return v
def buckets(self) -> List[datetime]:

View File

@ -343,7 +343,9 @@ class RedshiftLineageExtractor:
if self.config.table_lineage_mode == LineageMode.STL_SCAN_BASED:
# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
self.config.database, self.config.start_time, self.config.end_time
self.config.database,
self.config.start_time,
self.config.end_time,
)
populate_calls.append((query, LineageCollectorType.QUERY_SCAN))
elif self.config.table_lineage_mode == LineageMode.SQL_BASED:

View File

@ -213,6 +213,19 @@ class BaseUsageConfig(BaseTimeWindowConfig):
)
return v
@pydantic.validator("start_time")
def ensure_start_time_aligns_with_bucket_start_time(
cls, v: datetime, values: dict
) -> datetime:
if get_time_bucket(v, values["bucket_duration"]) != v:
new_start_time = get_time_bucket(v, values["bucket_duration"])
logger.warning(
f"`start_time` will be changed to {new_start_time}, although the input `start_time` is {v}."
"This is necessary to record correct usage for the configured bucket duration."
)
return new_start_time
return v
class UsageAggregator(Generic[ResourceType]):
# TODO: Move over other connectors to use this class

View File

@ -144,7 +144,7 @@ def default_query_results( # noqa: C901
}
]
elif query == snowflake_query.SnowflakeQuery.operational_data_for_time_window(
1654499820000,
1654473600000,
1654586220000,
):
return [
@ -257,7 +257,7 @@ def default_query_results( # noqa: C901
elif (
query
== snowflake_query.SnowflakeQuery.usage_per_object_per_time_bucket_for_time_window(
1654499820000,
1654473600000,
1654586220000,
use_base_objects=False,
top_n_queries=10,
@ -268,11 +268,11 @@ def default_query_results( # noqa: C901
return []
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654499820000,
1654473600000,
1654586220000,
),
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654499820000, 1654586220000, False
1654473600000, 1654586220000, False
),
):
return [
@ -331,7 +331,7 @@ def default_query_results( # noqa: C901
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654499820000,
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=True,
include_column_lineage=True,
@ -403,7 +403,7 @@ def default_query_results( # noqa: C901
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654499820000,
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=False,
include_column_lineage=False,
@ -435,7 +435,7 @@ def default_query_results( # noqa: C901
for op_idx in range(1, num_ops + 1)
]
elif query == snowflake_query.SnowflakeQuery.external_table_lineage_history(
1654499820000,
1654473600000,
1654586220000,
):
return []
@ -470,11 +470,11 @@ def default_query_results( # noqa: C901
]
elif query in [
snowflake_query.SnowflakeQuery.view_lineage_history(
1654499820000,
1654473600000,
1654586220000,
),
snowflake_query.SnowflakeQuery.view_lineage_history(
1654499820000, 1654586220000, False
1654473600000, 1654586220000, False
),
]:
return [
@ -510,7 +510,7 @@ def default_query_results( # noqa: C901
]
elif query in [
snowflake_query.SnowflakeQuery.external_table_lineage_history(
1654499820000,
1654473600000,
1654586220000,
),
snowflake_query.SnowflakeQuery.view_dependencies_v2(),

View File

@ -228,7 +228,7 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
default_query_results,
[
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654499820000,
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_view_lineage=False,
include_column_lineage=True,

View File

@ -228,7 +228,7 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
default_query_results,
[
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654499820000, 1654586220000, True
1654473600000, 1654586220000, True
),
],
"Database 'SNOWFLAKE' does not exist or not authorized.",

View File

@ -0,0 +1,34 @@
from datetime import datetime, timezone
from freezegun import freeze_time
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
FROZEN_TIME = "2023-08-03 09:00:00"
FROZEN_TIME2 = "2023-08-03 09:10:00"
@freeze_time(FROZEN_TIME)
def test_relative_start_time_aligns_with_bucket_start_time():
config = BaseUsageConfig.parse_obj(
{"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"}
)
assert config.start_time == datetime(2023, 7, 5, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc)
config = BaseUsageConfig.parse_obj(
{"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"}
)
assert config.start_time == datetime(2023, 7, 5, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc)
@freeze_time(FROZEN_TIME)
def test_absolute_start_time_aligns_with_bucket_start_time():
config = BaseUsageConfig.parse_obj({"start_time": "2023-07-01T00:00:00Z"})
assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
config = BaseUsageConfig.parse_obj({"start_time": "2023-07-01T09:00:00Z"})
assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)

View File

@ -0,0 +1,80 @@
from datetime import datetime, timezone
import pytest
from freezegun import freeze_time
from datahub.configuration.time_window_config import BaseTimeWindowConfig
FROZEN_TIME = "2023-08-03 09:00:00"
FROZEN_TIME2 = "2023-08-03 09:10:00"
@freeze_time(FROZEN_TIME)
def test_default_start_end_time():
config = BaseTimeWindowConfig.parse_obj({})
assert config.start_time == datetime(2023, 8, 2, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
@freeze_time(FROZEN_TIME2)
def test_default_start_end_time_hour_bucket_duration():
config = BaseTimeWindowConfig.parse_obj({"bucket_duration": "HOUR"})
assert config.start_time == datetime(2023, 8, 3, 8, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, 10, tzinfo=timezone.utc)
@freeze_time(FROZEN_TIME)
def test_relative_start_time():
config = BaseTimeWindowConfig.parse_obj({"start_time": "-2 days"})
assert config.start_time == datetime(2023, 8, 1, 9, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
config = BaseTimeWindowConfig.parse_obj({"start_time": "-2d"})
assert config.start_time == datetime(2023, 8, 1, 9, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
config = BaseTimeWindowConfig.parse_obj(
{"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"}
)
assert config.start_time == datetime(2023, 7, 5, 9, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc)
config = BaseTimeWindowConfig.parse_obj(
{"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"}
)
assert config.start_time == datetime(2023, 7, 5, 9, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc)
@freeze_time(FROZEN_TIME)
def test_absolute_start_time():
config = BaseTimeWindowConfig.parse_obj({"start_time": "2023-07-01T00:00:00Z"})
assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
config = BaseTimeWindowConfig.parse_obj({"start_time": "2023-07-01T09:00:00Z"})
assert config.start_time == datetime(2023, 7, 1, 9, tzinfo=timezone.utc)
assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc)
@freeze_time(FROZEN_TIME)
def test_invalid_relative_start_time():
with pytest.raises(ValueError, match="Unknown string format"):
BaseTimeWindowConfig.parse_obj({"start_time": "-2 das"})
with pytest.raises(
ValueError,
match="Relative start time should be in terms of configured bucket duration",
):
BaseTimeWindowConfig.parse_obj({"start_time": "-2"})
with pytest.raises(
ValueError, match="Relative start time should start with minus sign"
):
BaseTimeWindowConfig.parse_obj({"start_time": "2d"})
with pytest.raises(
ValueError,
match="Relative start time should be in terms of configured bucket duration",
):
BaseTimeWindowConfig.parse_obj({"start_time": "-2m"})