diff --git a/metadata-ingestion/src/datahub/configuration/datetimes.py b/metadata-ingestion/src/datahub/configuration/datetimes.py index 55f5c6fbd6..41af756559 100644 --- a/metadata-ingestion/src/datahub/configuration/datetimes.py +++ b/metadata-ingestion/src/datahub/configuration/datetimes.py @@ -43,24 +43,28 @@ def parse_user_datetime(input: str) -> datetime: # Then try parsing as a relative time. with contextlib.suppress(humanfriendly.InvalidTimespan): - delta = _parse_relative_timespan(input) + delta = parse_relative_timespan(input) return datetime.now(tz=timezone.utc) + delta # Finally, try parsing as an absolute time. with contextlib.suppress(dateutil.parser.ParserError): - dt = dateutil.parser.parse(input) - if dt.tzinfo is None: - # Assume that the user meant to specify a time in UTC. - dt = dt.replace(tzinfo=timezone.utc) - else: - # Convert to UTC. - dt = dt.astimezone(timezone.utc) - return dt + return parse_absolute_time(input) raise ValueError(f"Could not parse {input} as a datetime or relative time.") -def _parse_relative_timespan(input: str) -> timedelta: +def parse_absolute_time(input: str) -> datetime: + dt = dateutil.parser.parse(input) + if dt.tzinfo is None: + # Assume that the user meant to specify a time in UTC. + dt = dt.replace(tzinfo=timezone.utc) + else: + # Convert to UTC. + dt = dt.astimezone(timezone.utc) + return dt + + +def parse_relative_timespan(input: str) -> timedelta: neg = False input = input.strip() diff --git a/metadata-ingestion/src/datahub/configuration/time_window_config.py b/metadata-ingestion/src/datahub/configuration/time_window_config.py index a4b451f0cd..1bf9929527 100644 --- a/metadata-ingestion/src/datahub/configuration/time_window_config.py +++ b/metadata-ingestion/src/datahub/configuration/time_window_config.py @@ -2,10 +2,12 @@ import enum from datetime import datetime, timedelta, timezone from typing import Any, Dict, List +import humanfriendly import pydantic from pydantic.fields import Field from datahub.configuration.common import ConfigModel +from datahub.configuration.datetimes import parse_absolute_time, parse_relative_timespan from datahub.metadata.schema_classes import CalendarIntervalClass @@ -42,25 +44,42 @@ class BaseTimeWindowConfig(ConfigModel): # if those fields are not set by the user. end_time: datetime = Field( default_factory=lambda: datetime.now(tz=timezone.utc), - description="Latest date of usage to consider. Default: Current time in UTC", + description="Latest date of lineage/usage to consider. Default: Current time in UTC", ) - start_time: datetime = Field(default=None, description="Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`)") # type: ignore + start_time: datetime = Field(default=None, description="Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.") # type: ignore @pydantic.validator("start_time", pre=True, always=True) def default_start_time( - cls, v: Any, *, values: Dict[str, Any], **kwargs: Any + cls, v: Any, values: Dict[str, Any], **kwargs: Any ) -> datetime: - return v or get_time_bucket( - values["end_time"] - get_bucket_duration_delta(values["bucket_duration"]), - values["bucket_duration"], - ) + if v is None: + return get_time_bucket( + values["end_time"] + - get_bucket_duration_delta(values["bucket_duration"]), + values["bucket_duration"], + ) + elif isinstance(v, str): + # This is where start_time str is resolved to datetime + try: + delta = parse_relative_timespan(v) + assert delta < timedelta( + 0 + ), "Relative start time should start with minus sign (-) e.g. '-2 days'." + assert abs(delta) > get_bucket_duration_delta( + values["bucket_duration"] + ), "Relative start time should be in terms of configured bucket duration. e.g '-2 days' or '-2 hours'." + return values["end_time"] + delta + except humanfriendly.InvalidTimespan: + return parse_absolute_time(v) + + return v @pydantic.validator("start_time", "end_time") def ensure_timestamps_in_utc(cls, v: datetime) -> datetime: - if v.tzinfo != timezone.utc: - raise ValueError( - 'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"' - ) + assert ( + v.tzinfo == timezone.utc + ), 'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"' + return v def buckets(self) -> List[datetime]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index cf4e3a5b01..268de58325 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -343,7 +343,9 @@ class RedshiftLineageExtractor: if self.config.table_lineage_mode == LineageMode.STL_SCAN_BASED: # Populate table level lineage by getting upstream tables from stl_scan redshift table query = RedshiftQuery.stl_scan_based_lineage_query( - self.config.database, self.config.start_time, self.config.end_time + self.config.database, + self.config.start_time, + self.config.end_time, ) populate_calls.append((query, LineageCollectorType.QUERY_SCAN)) elif self.config.table_lineage_mode == LineageMode.SQL_BASED: diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py index 92f8223f34..8d4ac37f49 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py @@ -213,6 +213,19 @@ class BaseUsageConfig(BaseTimeWindowConfig): ) return v + @pydantic.validator("start_time") + def ensure_start_time_aligns_with_bucket_start_time( + cls, v: datetime, values: dict + ) -> datetime: + if get_time_bucket(v, values["bucket_duration"]) != v: + new_start_time = get_time_bucket(v, values["bucket_duration"]) + logger.warning( + f"`start_time` will be changed to {new_start_time}, although the input `start_time` is {v}." + "This is necessary to record correct usage for the configured bucket duration." + ) + return new_start_time + return v + class UsageAggregator(Generic[ResourceType]): # TODO: Move over other connectors to use this class diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index b08a14d080..43f5e04fbc 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -144,7 +144,7 @@ def default_query_results( # noqa: C901 } ] elif query == snowflake_query.SnowflakeQuery.operational_data_for_time_window( - 1654499820000, + 1654473600000, 1654586220000, ): return [ @@ -257,7 +257,7 @@ def default_query_results( # noqa: C901 elif ( query == snowflake_query.SnowflakeQuery.usage_per_object_per_time_bucket_for_time_window( - 1654499820000, + 1654473600000, 1654586220000, use_base_objects=False, top_n_queries=10, @@ -268,11 +268,11 @@ def default_query_results( # noqa: C901 return [] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654499820000, + 1654473600000, 1654586220000, ), snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654499820000, 1654586220000, False + 1654473600000, 1654586220000, False ), ): return [ @@ -331,7 +331,7 @@ def default_query_results( # noqa: C901 ] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - start_time_millis=1654499820000, + start_time_millis=1654473600000, end_time_millis=1654586220000, include_view_lineage=True, include_column_lineage=True, @@ -403,7 +403,7 @@ def default_query_results( # noqa: C901 ] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - start_time_millis=1654499820000, + start_time_millis=1654473600000, end_time_millis=1654586220000, include_view_lineage=False, include_column_lineage=False, @@ -435,7 +435,7 @@ def default_query_results( # noqa: C901 for op_idx in range(1, num_ops + 1) ] elif query == snowflake_query.SnowflakeQuery.external_table_lineage_history( - 1654499820000, + 1654473600000, 1654586220000, ): return [] @@ -470,11 +470,11 @@ def default_query_results( # noqa: C901 ] elif query in [ snowflake_query.SnowflakeQuery.view_lineage_history( - 1654499820000, + 1654473600000, 1654586220000, ), snowflake_query.SnowflakeQuery.view_lineage_history( - 1654499820000, 1654586220000, False + 1654473600000, 1654586220000, False ), ]: return [ @@ -510,7 +510,7 @@ def default_query_results( # noqa: C901 ] elif query in [ snowflake_query.SnowflakeQuery.external_table_lineage_history( - 1654499820000, + 1654473600000, 1654586220000, ), snowflake_query.SnowflakeQuery.view_dependencies_v2(), diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index ed3bea49f0..73a261bb3c 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -228,7 +228,7 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( default_query_results, [ snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( - start_time_millis=1654499820000, + start_time_millis=1654473600000, end_time_millis=1654586220000, include_view_lineage=False, include_column_lineage=True, diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py index 18779bd564..a5993793e5 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py @@ -228,7 +228,7 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( default_query_results, [ snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654499820000, 1654586220000, True + 1654473600000, 1654586220000, True ), ], "Database 'SNOWFLAKE' does not exist or not authorized.", diff --git a/metadata-ingestion/tests/unit/test_base_usage_config.py b/metadata-ingestion/tests/unit/test_base_usage_config.py new file mode 100644 index 0000000000..008dcf25e3 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_base_usage_config.py @@ -0,0 +1,34 @@ +from datetime import datetime, timezone + +from freezegun import freeze_time + +from datahub.ingestion.source.usage.usage_common import BaseUsageConfig + +FROZEN_TIME = "2023-08-03 09:00:00" +FROZEN_TIME2 = "2023-08-03 09:10:00" + + +@freeze_time(FROZEN_TIME) +def test_relative_start_time_aligns_with_bucket_start_time(): + config = BaseUsageConfig.parse_obj( + {"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"} + ) + assert config.start_time == datetime(2023, 7, 5, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc) + + config = BaseUsageConfig.parse_obj( + {"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"} + ) + assert config.start_time == datetime(2023, 7, 5, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc) + + +@freeze_time(FROZEN_TIME) +def test_absolute_start_time_aligns_with_bucket_start_time(): + config = BaseUsageConfig.parse_obj({"start_time": "2023-07-01T00:00:00Z"}) + assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + config = BaseUsageConfig.parse_obj({"start_time": "2023-07-01T09:00:00Z"}) + assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) diff --git a/metadata-ingestion/tests/unit/test_time_window_config.py b/metadata-ingestion/tests/unit/test_time_window_config.py new file mode 100644 index 0000000000..127dc179c2 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_time_window_config.py @@ -0,0 +1,80 @@ +from datetime import datetime, timezone + +import pytest +from freezegun import freeze_time + +from datahub.configuration.time_window_config import BaseTimeWindowConfig + +FROZEN_TIME = "2023-08-03 09:00:00" +FROZEN_TIME2 = "2023-08-03 09:10:00" + + +@freeze_time(FROZEN_TIME) +def test_default_start_end_time(): + config = BaseTimeWindowConfig.parse_obj({}) + assert config.start_time == datetime(2023, 8, 2, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + +@freeze_time(FROZEN_TIME2) +def test_default_start_end_time_hour_bucket_duration(): + config = BaseTimeWindowConfig.parse_obj({"bucket_duration": "HOUR"}) + assert config.start_time == datetime(2023, 8, 3, 8, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, 10, tzinfo=timezone.utc) + + +@freeze_time(FROZEN_TIME) +def test_relative_start_time(): + config = BaseTimeWindowConfig.parse_obj({"start_time": "-2 days"}) + assert config.start_time == datetime(2023, 8, 1, 9, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + config = BaseTimeWindowConfig.parse_obj({"start_time": "-2d"}) + assert config.start_time == datetime(2023, 8, 1, 9, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + config = BaseTimeWindowConfig.parse_obj( + {"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"} + ) + assert config.start_time == datetime(2023, 7, 5, 9, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc) + + config = BaseTimeWindowConfig.parse_obj( + {"start_time": "-2 days", "end_time": "2023-07-07T09:00:00Z"} + ) + assert config.start_time == datetime(2023, 7, 5, 9, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 7, 7, 9, tzinfo=timezone.utc) + + +@freeze_time(FROZEN_TIME) +def test_absolute_start_time(): + config = BaseTimeWindowConfig.parse_obj({"start_time": "2023-07-01T00:00:00Z"}) + assert config.start_time == datetime(2023, 7, 1, 0, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + config = BaseTimeWindowConfig.parse_obj({"start_time": "2023-07-01T09:00:00Z"}) + assert config.start_time == datetime(2023, 7, 1, 9, tzinfo=timezone.utc) + assert config.end_time == datetime(2023, 8, 3, 9, tzinfo=timezone.utc) + + +@freeze_time(FROZEN_TIME) +def test_invalid_relative_start_time(): + with pytest.raises(ValueError, match="Unknown string format"): + BaseTimeWindowConfig.parse_obj({"start_time": "-2 das"}) + + with pytest.raises( + ValueError, + match="Relative start time should be in terms of configured bucket duration", + ): + BaseTimeWindowConfig.parse_obj({"start_time": "-2"}) + + with pytest.raises( + ValueError, match="Relative start time should start with minus sign" + ): + BaseTimeWindowConfig.parse_obj({"start_time": "2d"}) + + with pytest.raises( + ValueError, + match="Relative start time should be in terms of configured bucket duration", + ): + BaseTimeWindowConfig.parse_obj({"start_time": "-2m"})