diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index ad9f0c2baee..1e3b1b4417e 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -248,3 +248,16 @@ CREATE TABLE IF NOT EXISTS apps_extension_time_series ( json JSON NOT NULL, timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL ); + +-- Adding back the COLLATE queries from 1.1.5 to keep the correct VARCHAR length +ALTER TABLE glossary_term_entity MODIFY fqnHash VARCHAR(756) COLLATE ascii_bin; + +-- We don't have an ID, so we'll create a temp SERIAL number and use it for deletion +ALTER TABLE entity_extension_time_series ADD COLUMN temp SERIAL; +WITH CTE AS ( + SELECT temp, ROW_NUMBER() OVER (PARTITION BY entityFQNHash, extension, timestamp ORDER BY entityFQNHash) RN FROM entity_extension_time_series) +DELETE FROM entity_extension_time_series WHERE temp in (SELECT temp FROM CTE WHERE RN > 1); +ALTER TABLE entity_extension_time_series DROP COLUMN temp; + +ALTER TABLE entity_extension_time_series MODIFY COLUMN entityFQNHash VARCHAR(768) COLLATE ascii_bin, MODIFY COLUMN jsonSchema VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin, + ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp); diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index ffc78f85fe6..58068702d02 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -264,3 +264,15 @@ CREATE TABLE IF NOT EXISTS apps_extension_time_series ( json JSONB NOT NULL, timestamp BIGINT GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL ); + + +-- Adding back the PK queries from 1.1.5 to keep the correct VARCHAR length +-- We don't have an ID, so we'll create a temp SERIAL number and use it for deletion +ALTER TABLE entity_extension_time_series ADD COLUMN temp SERIAL; +WITH CTE AS ( + SELECT temp, ROW_NUMBER() OVER (PARTITION BY entityFQNHash, extension, timestamp ORDER BY entityFQNHash) RN FROM entity_extension_time_series) +DELETE FROM entity_extension_time_series WHERE temp in (SELECT temp FROM CTE WHERE RN > 1); +ALTER TABLE entity_extension_time_series DROP COLUMN temp; + +ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(256) , ALTER COLUMN extension TYPE VARCHAR(256), + ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp); diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/status.py b/ingestion/src/airflow_provider_openmetadata/lineage/status.py index d235cf95548..81a7b96cecc 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/status.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/status.py @@ -113,7 +113,7 @@ def add_status( ] updated_status = PipelineStatus( - timestamp=execution_date, + timestamp=datetime_to_ts(execution_date), executionStatus=get_dag_status( all_tasks=dag.task_ids, task_status=updated_task_status, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index ffffe03c51c..a41693442be 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -222,7 +222,7 @@ class AirflowSource(PipelineServiceSource): executionStatus=STATUS_MAP.get( dag_run.state, StatusType.Pending.value ), - timestamp=dag_run.execution_date.timestamp(), + timestamp=datetime_to_ts(dag_run.execution_date), ) yield Either( right=OMetaPipelineStatus( diff --git a/ingestion/src/metadata/utils/time_utils.py b/ingestion/src/metadata/utils/time_utils.py index deb8a8f02df..987fdf979b4 100644 --- a/ingestion/src/metadata/utils/time_utils.py +++ b/ingestion/src/metadata/utils/time_utils.py @@ -16,6 +16,8 @@ Time utility functions from datetime import datetime, time, timedelta, timezone from typing import Union +from metadata.utils.helpers import datetime_to_ts + def datetime_to_timestamp(datetime_value, milliseconds=False) -> int: """Convert a datetime object to timestamp integer @@ -63,9 +65,8 @@ def get_beginning_of_day_timestamp_mill( microseconds=microseconds, milliseconds=milliseconds, ) - return datetime_to_timestamp( + return datetime_to_ts( datetime.combine(now_utc - delta, time.min, tzinfo=timezone.utc), - milliseconds=True, ) @@ -96,9 +97,8 @@ def get_end_of_day_timestamp_mill( microseconds=microseconds, milliseconds=milliseconds, ) - return datetime_to_timestamp( + return datetime_to_ts( datetime.combine(now_utc - delta, time.max, tzinfo=timezone.utc), - milliseconds=True, ) diff --git a/ingestion/tests/integration/data_insight/test_web_analytic_events.py b/ingestion/tests/integration/data_insight/test_web_analytic_events.py index 57f9d10a2c2..760efc76fc6 100644 --- a/ingestion/tests/integration/data_insight/test_web_analytic_events.py +++ b/ingestion/tests/integration/data_insight/test_web_analytic_events.py @@ -31,8 +31,8 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.helpers import datetime_to_ts from metadata.utils.time_utils import ( - datetime_to_timestamp, get_beginning_of_day_timestamp_mill, get_end_of_day_timestamp_mill, ) @@ -122,7 +122,7 @@ class WebAnalyticsEndpointsTests(unittest.TestCase): for delta in range(7): delta = timedelta(days=delta, milliseconds=randint(100, 999)) - tmsp = datetime_to_timestamp(datetime.utcnow() - delta, milliseconds=True) + tmsp = datetime_to_ts(datetime.utcnow() - delta) user_id = uuid.uuid4() session_id = uuid.uuid4()