Converted and fixed pipelinestatus timestamps to milliseconds (#13670)

* fixed pipelinestatus timestamps in mills

* Added migrations
This commit is contained in:
Onkar Ravgan 2023-10-20 22:09:24 +05:30 committed by GitHub
parent eb9c7f681d
commit 0f0bccdd45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 77 additions and 36 deletions

View File

@ -272,5 +272,5 @@ SET ts.json = JSON_INSERT(
JSON_EXTRACT(ts.json, '$.timestamp') * 1000
)
WHERE ts.extension = 'pipeline.pipelineStatus'
AND JSON_EXTRACT(p.json, '$.serviceType') = 'Airflow'
AND JSON_EXTRACT(p.json, '$.serviceType') in ('Airflow', 'GluePipeline', 'Airbyte', 'Dagster', 'DomoPipeline')
;

View File

@ -287,5 +287,5 @@ SET json = jsonb_set(
FROM pipeline_entity p
WHERE ts.entityFQNHash = p.fqnHash
and ts.extension = 'pipeline.pipelineStatus'
AND p.json #>> '{serviceType}' = 'Airflow'
AND p.json #>> '{serviceType}' in ('Airflow', 'GluePipeline', 'Airbyte', 'Dagster', 'DomoPipeline')
;

View File

@ -43,6 +43,7 @@ from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceS
from metadata.utils import fqn
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -138,14 +139,24 @@ class AirbyteSource(PipelineServiceSource):
if not job or not job.get("attempts"):
continue
for attempt in job["attempts"]:
created_at = (
convert_timestamp_to_milliseconds(attempt["createdAt"])
if attempt.get("createdAt")
else None
)
ended_at = (
convert_timestamp_to_milliseconds(attempt["endedAt"])
if attempt.get("endedAt")
else None
)
task_status = [
TaskStatus(
name=str(pipeline_details.connection.get("connectionId")),
executionStatus=STATUS_MAP.get(
attempt["status"].lower(), StatusType.Pending
).value,
startTime=attempt.get("createdAt"),
endTime=attempt.get("endedAt"),
startTime=created_at,
endTime=ended_at,
logLink=log_link,
)
]
@ -154,7 +165,7 @@ class AirbyteSource(PipelineServiceSource):
attempt["status"].lower(), StatusType.Pending
).value,
taskStatus=task_status,
timestamp=attempt["createdAt"],
timestamp=created_at,
)
yield Either(
right=OMetaPipelineStatus(

View File

@ -43,6 +43,7 @@ from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceS
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -161,8 +162,12 @@ class DagsterSource(PipelineServiceSource):
executionStatus=STATUS_MAP.get(
run.status.lower(), StatusType.Pending.value
),
startTime=round(run.startTime) if run.startTime else None,
endTime=round(run.endTime) if run.endTime else None,
startTime=round(convert_timestamp_to_milliseconds(run.startTime))
if run.startTime
else None,
endTime=round(convert_timestamp_to_milliseconds(run.endTime))
if run.endTime
else None,
)
pipeline_status = PipelineStatus(
@ -170,7 +175,9 @@ class DagsterSource(PipelineServiceSource):
executionStatus=STATUS_MAP.get(
run.status.lower(), StatusType.Pending.value
),
timestamp=round(run.endTime) if run.endTime else None,
timestamp=round(convert_timestamp_to_milliseconds(run.endTime))
if run.endTime
else None,
)
pipeline_status_yield = OMetaPipelineStatus(
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,

View File

@ -38,6 +38,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -199,14 +200,20 @@ class DatabrickspipelineSource(PipelineServiceSource):
task_run["state"].get("result_state"),
StatusType.Failed,
),
startTime=task_run["start_time"],
endTime=task_run["end_time"],
startTime=convert_timestamp_to_milliseconds(
task_run["start_time"]
),
endTime=convert_timestamp_to_milliseconds(
task_run["end_time"]
),
logLink=task_run["run_page_url"],
)
)
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=attempt["start_time"],
timestamp=convert_timestamp_to_milliseconds(
attempt["start_time"]
),
executionStatus=STATUS_MAP.get(
attempt["state"].get("result_state"),
StatusType.Failed,

View File

@ -36,6 +36,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -127,8 +128,16 @@ class DomopipelineSource(PipelineServiceSource):
runs = self.connection.get_runs(pipeline_id)
try:
for run in runs or []:
start_time = run["beginTime"] // 1000 if run.get("beginTime") else None
end_time = run["endTime"] // 1000 if run.get("endTime") else None
start_time = (
convert_timestamp_to_milliseconds(run["beginTime"])
if run.get("beginTime")
else None
)
end_time = (
convert_timestamp_to_milliseconds(run["endTime"])
if run.get("endTime")
else None
)
run_state = run.get("state", "Pending")
task_status = TaskStatus(

View File

@ -36,6 +36,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
logger = ingestion_logger()
@ -146,13 +147,19 @@ class GluepipelineSource(PipelineServiceSource):
executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
startTime=attempt["StartedOn"].timestamp(),
endTime=attempt["CompletedOn"].timestamp(),
startTime=convert_timestamp_to_milliseconds(
attempt["StartedOn"].timestamp()
),
endTime=convert_timestamp_to_milliseconds(
attempt["CompletedOn"].timestamp()
),
)
)
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=attempt["StartedOn"].timestamp(),
timestamp=convert_timestamp_to_milliseconds(
attempt["StartedOn"].timestamp()
),
executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending
).value,

View File

@ -121,6 +121,6 @@ def convert_timestamp_to_milliseconds(timestamp: int) -> int:
Retunrs:
int
"""
if len(str(timestamp)) == 13:
if len(str(round(timestamp))) == 13:
return timestamp
return timestamp * 1000

View File

@ -89,12 +89,12 @@ EXPECTED_PIPELINE_STATUS = [
TaskStatus(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
executionStatus=StatusType.Pending.value,
startTime=1655482894,
startTime=1655482894000,
endTime=None,
logLink=f"{MOCK_CONNECTION_URI_PATH}/status",
)
],
timestamp=1655482894,
timestamp=1655482894000,
),
),
OMetaPipelineStatus(
@ -105,12 +105,12 @@ EXPECTED_PIPELINE_STATUS = [
TaskStatus(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
executionStatus=StatusType.Successful.value,
startTime=1655393914,
endTime=1655394054,
startTime=1655393914000,
endTime=1655394054000,
logLink=f"{MOCK_CONNECTION_URI_PATH}/status",
)
],
timestamp=1655393914,
timestamp=1655393914000,
),
),
]

View File

@ -216,12 +216,12 @@ EXPECTED_PIPELINE_STATUS = [
TaskStatus(
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
executionStatus=StatusType.Successful.value,
startTime=1655393914,
endTime=1655394054,
startTime=1655393914000,
endTime=1655394054000,
logLink=f"{MOCK_LOG_URL}/status",
)
],
timestamp=1655393914,
timestamp=1655393914000,
),
),
]

View File

@ -96,14 +96,14 @@ EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665476792,
timestamp=1665476792000,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="1",
executionStatus="Successful",
startTime=1665476783,
endTime=1665476792,
startTime=1665476783000,
endTime=1665476792000,
logLink=None,
)
],
@ -112,14 +112,14 @@ EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665470252,
timestamp=1665470252000,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="1",
executionStatus="Successful",
startTime=1665470244,
endTime=1665470252,
startTime=1665470244000,
endTime=1665470252000,
logLink=None,
)
],
@ -128,14 +128,14 @@ EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.1",
pipeline_status=PipelineStatus(
timestamp=1665148827,
timestamp=1665148827000,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="1",
executionStatus="Successful",
startTime=1665148818,
endTime=1665148827,
startTime=1665148818000,
endTime=1665148827000,
logLink=None,
)
],
@ -272,8 +272,8 @@ class DomoPipelineUnitTest(TestCase):
pipeline_status_list = []
results = self.domopipeline.yield_pipeline_status(MOCK_PIPELINE_DETAILS)
for result in results:
if isinstance(result, OMetaPipelineStatus):
pipeline_status_list.append(result)
if isinstance(result.right, OMetaPipelineStatus):
pipeline_status_list.append(result.right)
for _, (expected, original) in enumerate(
zip(EXPECTED_PIPELINE_STATUS, pipeline_status_list)