Remove duplicates from entity_extension_time_series and add the const… (#13626)

* Remove duplicates from entity_extension_time_series and add the constraing if missing

* Add sort buffer and work mem

* Revert "Add sort buffer and work mem"

This reverts commit fcfff5feb60c9212bb7c1cad34b524dc8c03bfc5.
This commit is contained in:
Pere Miquel Brull 2023-10-19 12:15:02 +02:00 committed by GitHub
parent accbc59d76
commit 255bfb95b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 8 deletions

View File

@ -248,3 +248,16 @@ CREATE TABLE IF NOT EXISTS apps_extension_time_series (
json JSON NOT NULL,
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL
);
-- Adding back the COLLATE queries from 1.1.5 to keep the correct VARCHAR length
ALTER TABLE glossary_term_entity MODIFY fqnHash VARCHAR(756) COLLATE ascii_bin;
-- We don't have an ID, so we'll create a temp SERIAL number and use it for deletion
ALTER TABLE entity_extension_time_series ADD COLUMN temp SERIAL;
WITH CTE AS (
SELECT temp, ROW_NUMBER() OVER (PARTITION BY entityFQNHash, extension, timestamp ORDER BY entityFQNHash) RN FROM entity_extension_time_series)
DELETE FROM entity_extension_time_series WHERE temp in (SELECT temp FROM CTE WHERE RN > 1);
ALTER TABLE entity_extension_time_series DROP COLUMN temp;
ALTER TABLE entity_extension_time_series MODIFY COLUMN entityFQNHash VARCHAR(768) COLLATE ascii_bin, MODIFY COLUMN jsonSchema VARCHAR(256) COLLATE ascii_bin, MODIFY COLUMN extension VARCHAR(256) COLLATE ascii_bin,
ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp);

View File

@ -264,3 +264,15 @@ CREATE TABLE IF NOT EXISTS apps_extension_time_series (
json JSONB NOT NULL,
timestamp BIGINT GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL
);
-- Adding back the PK queries from 1.1.5 to keep the correct VARCHAR length
-- We don't have an ID, so we'll create a temp SERIAL number and use it for deletion
ALTER TABLE entity_extension_time_series ADD COLUMN temp SERIAL;
WITH CTE AS (
SELECT temp, ROW_NUMBER() OVER (PARTITION BY entityFQNHash, extension, timestamp ORDER BY entityFQNHash) RN FROM entity_extension_time_series)
DELETE FROM entity_extension_time_series WHERE temp in (SELECT temp FROM CTE WHERE RN > 1);
ALTER TABLE entity_extension_time_series DROP COLUMN temp;
ALTER TABLE entity_extension_time_series ALTER COLUMN entityFQNHash TYPE VARCHAR(768), ALTER COLUMN jsonSchema TYPE VARCHAR(256) , ALTER COLUMN extension TYPE VARCHAR(256),
ADD CONSTRAINT entity_extension_time_series_constraint UNIQUE (entityFQNHash, extension, timestamp);

View File

@ -113,7 +113,7 @@ def add_status(
]
updated_status = PipelineStatus(
timestamp=execution_date,
timestamp=datetime_to_ts(execution_date),
executionStatus=get_dag_status(
all_tasks=dag.task_ids,
task_status=updated_task_status,

View File

@ -222,7 +222,7 @@ class AirflowSource(PipelineServiceSource):
executionStatus=STATUS_MAP.get(
dag_run.state, StatusType.Pending.value
),
timestamp=dag_run.execution_date.timestamp(),
timestamp=datetime_to_ts(dag_run.execution_date),
)
yield Either(
right=OMetaPipelineStatus(

View File

@ -16,6 +16,8 @@ Time utility functions
from datetime import datetime, time, timedelta, timezone
from typing import Union
from metadata.utils.helpers import datetime_to_ts
def datetime_to_timestamp(datetime_value, milliseconds=False) -> int:
"""Convert a datetime object to timestamp integer
@ -63,9 +65,8 @@ def get_beginning_of_day_timestamp_mill(
microseconds=microseconds,
milliseconds=milliseconds,
)
return datetime_to_timestamp(
return datetime_to_ts(
datetime.combine(now_utc - delta, time.min, tzinfo=timezone.utc),
milliseconds=True,
)
@ -96,9 +97,8 @@ def get_end_of_day_timestamp_mill(
microseconds=microseconds,
milliseconds=milliseconds,
)
return datetime_to_timestamp(
return datetime_to_ts(
datetime.combine(now_utc - delta, time.max, tzinfo=timezone.utc),
milliseconds=True,
)

View File

@ -31,8 +31,8 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.time_utils import (
datetime_to_timestamp,
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
@ -122,7 +122,7 @@ class WebAnalyticsEndpointsTests(unittest.TestCase):
for delta in range(7):
delta = timedelta(days=delta, milliseconds=randint(100, 999))
tmsp = datetime_to_timestamp(datetime.utcnow() - delta, milliseconds=True)
tmsp = datetime_to_ts(datetime.utcnow() - delta)
user_id = uuid.uuid4()
session_id = uuid.uuid4()