diff --git a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql index ee2c5125bf5..5b4f96d4bae 100644 --- a/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql @@ -272,5 +272,5 @@ SET ts.json = JSON_INSERT( JSON_EXTRACT(ts.json, '$.timestamp') * 1000 ) WHERE ts.extension = 'pipeline.pipelineStatus' - AND JSON_EXTRACT(p.json, '$.serviceType') = 'Airflow' + AND JSON_EXTRACT(p.json, '$.serviceType') in ('Airflow', 'GluePipeline', 'Airbyte', 'Dagster', 'DomoPipeline') ; diff --git a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql index 6cb0f366722..47238c42270 100644 --- a/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql +++ b/bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql @@ -287,5 +287,5 @@ SET json = jsonb_set( FROM pipeline_entity p WHERE ts.entityFQNHash = p.fqnHash and ts.extension = 'pipeline.pipelineStatus' - AND p.json #>> '{serviceType}' = 'Airflow' + AND p.json #>> '{serviceType}' in ('Airflow', 'GluePipeline', 'Airbyte', 'Dagster', 'DomoPipeline') ; \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py index f66ae205ad4..ff5bb393b42 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py @@ -43,6 +43,7 @@ from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceS from metadata.utils import fqn from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger +from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -138,14 +139,24 @@ class AirbyteSource(PipelineServiceSource): if not job or not job.get("attempts"): continue for attempt in job["attempts"]: + created_at = ( + convert_timestamp_to_milliseconds(attempt["createdAt"]) + if attempt.get("createdAt") + else None + ) + ended_at = ( + convert_timestamp_to_milliseconds(attempt["endedAt"]) + if attempt.get("endedAt") + else None + ) task_status = [ TaskStatus( name=str(pipeline_details.connection.get("connectionId")), executionStatus=STATUS_MAP.get( attempt["status"].lower(), StatusType.Pending ).value, - startTime=attempt.get("createdAt"), - endTime=attempt.get("endedAt"), + startTime=created_at, + endTime=ended_at, logLink=log_link, ) ] @@ -154,7 +165,7 @@ class AirbyteSource(PipelineServiceSource): attempt["status"].lower(), StatusType.Pending ).value, taskStatus=task_status, - timestamp=attempt["createdAt"], + timestamp=created_at, ) yield Either( right=OMetaPipelineStatus( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py index c3a5775399c..ab921954cf7 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster/metadata.py @@ -43,6 +43,7 @@ from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceS from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels +from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -161,8 +162,12 @@ class DagsterSource(PipelineServiceSource): executionStatus=STATUS_MAP.get( run.status.lower(), StatusType.Pending.value ), - startTime=round(run.startTime) if run.startTime else None, - endTime=round(run.endTime) if run.endTime else None, + startTime=round(convert_timestamp_to_milliseconds(run.startTime)) + if run.startTime + else None, + endTime=round(convert_timestamp_to_milliseconds(run.endTime)) + if run.endTime + else None, ) pipeline_status = PipelineStatus( @@ -170,7 +175,9 @@ class DagsterSource(PipelineServiceSource): executionStatus=STATUS_MAP.get( run.status.lower(), StatusType.Pending.value ), - timestamp=round(run.endTime) if run.endTime else None, + timestamp=round(convert_timestamp_to_milliseconds(run.endTime)) + if run.endTime + else None, ) pipeline_status_yield = OMetaPipelineStatus( pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py index 15269f2f93c..139efa49634 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -38,6 +38,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.logger import ingestion_logger +from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -199,14 +200,20 @@ class DatabrickspipelineSource(PipelineServiceSource): task_run["state"].get("result_state"), StatusType.Failed, ), - startTime=task_run["start_time"], - endTime=task_run["end_time"], + startTime=convert_timestamp_to_milliseconds( + task_run["start_time"] + ), + endTime=convert_timestamp_to_milliseconds( + task_run["end_time"] + ), logLink=task_run["run_page_url"], ) ) pipeline_status = PipelineStatus( taskStatus=task_status, - timestamp=attempt["start_time"], + timestamp=convert_timestamp_to_milliseconds( + attempt["start_time"] + ), executionStatus=STATUS_MAP.get( attempt["state"].get("result_state"), StatusType.Failed, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py index 6e295cddfe1..f8ac28307d0 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline/metadata.py @@ -36,6 +36,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger +from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -127,8 +128,16 @@ class DomopipelineSource(PipelineServiceSource): runs = self.connection.get_runs(pipeline_id) try: for run in runs or []: - start_time = run["beginTime"] // 1000 if run.get("beginTime") else None - end_time = run["endTime"] // 1000 if run.get("endTime") else None + start_time = ( + convert_timestamp_to_milliseconds(run["beginTime"]) + if run.get("beginTime") + else None + ) + end_time = ( + convert_timestamp_to_milliseconds(run["endTime"]) + if run.get("endTime") + else None + ) run_state = run.get("state", "Pending") task_status = TaskStatus( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py index c363ebc31fe..de636632889 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py @@ -36,6 +36,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils.logger import ingestion_logger +from metadata.utils.time_utils import convert_timestamp_to_milliseconds logger = ingestion_logger() @@ -146,13 +147,19 @@ class GluepipelineSource(PipelineServiceSource): executionStatus=STATUS_MAP.get( attempt["JobRunState"].lower(), StatusType.Pending ).value, - startTime=attempt["StartedOn"].timestamp(), - endTime=attempt["CompletedOn"].timestamp(), + startTime=convert_timestamp_to_milliseconds( + attempt["StartedOn"].timestamp() + ), + endTime=convert_timestamp_to_milliseconds( + attempt["CompletedOn"].timestamp() + ), ) ) pipeline_status = PipelineStatus( taskStatus=task_status, - timestamp=attempt["StartedOn"].timestamp(), + timestamp=convert_timestamp_to_milliseconds( + attempt["StartedOn"].timestamp() + ), executionStatus=STATUS_MAP.get( attempt["JobRunState"].lower(), StatusType.Pending ).value, diff --git a/ingestion/src/metadata/utils/time_utils.py b/ingestion/src/metadata/utils/time_utils.py index 987fdf979b4..52303fe16c1 100644 --- a/ingestion/src/metadata/utils/time_utils.py +++ b/ingestion/src/metadata/utils/time_utils.py @@ -121,6 +121,6 @@ def convert_timestamp_to_milliseconds(timestamp: int) -> int: Retunrs: int """ - if len(str(timestamp)) == 13: + if len(str(round(timestamp))) == 13: return timestamp return timestamp * 1000 diff --git a/ingestion/tests/unit/topology/pipeline/test_airbyte.py b/ingestion/tests/unit/topology/pipeline/test_airbyte.py index 13c487d7fcd..844231d53bc 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airbyte.py +++ b/ingestion/tests/unit/topology/pipeline/test_airbyte.py @@ -89,12 +89,12 @@ EXPECTED_PIPELINE_STATUS = [ TaskStatus( name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", executionStatus=StatusType.Pending.value, - startTime=1655482894, + startTime=1655482894000, endTime=None, logLink=f"{MOCK_CONNECTION_URI_PATH}/status", ) ], - timestamp=1655482894, + timestamp=1655482894000, ), ), OMetaPipelineStatus( @@ -105,12 +105,12 @@ EXPECTED_PIPELINE_STATUS = [ TaskStatus( name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", executionStatus=StatusType.Successful.value, - startTime=1655393914, - endTime=1655394054, + startTime=1655393914000, + endTime=1655394054000, logLink=f"{MOCK_CONNECTION_URI_PATH}/status", ) ], - timestamp=1655393914, + timestamp=1655393914000, ), ), ] diff --git a/ingestion/tests/unit/topology/pipeline/test_dagster.py b/ingestion/tests/unit/topology/pipeline/test_dagster.py index 4cf99f1316f..1c499b16c38 100644 --- a/ingestion/tests/unit/topology/pipeline/test_dagster.py +++ b/ingestion/tests/unit/topology/pipeline/test_dagster.py @@ -216,12 +216,12 @@ EXPECTED_PIPELINE_STATUS = [ TaskStatus( name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", executionStatus=StatusType.Successful.value, - startTime=1655393914, - endTime=1655394054, + startTime=1655393914000, + endTime=1655394054000, logLink=f"{MOCK_LOG_URL}/status", ) ], - timestamp=1655393914, + timestamp=1655393914000, ), ), ] diff --git a/ingestion/tests/unit/topology/pipeline/test_domopipeline.py b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py index 7af246ed1e4..52527d7a0fe 100644 --- a/ingestion/tests/unit/topology/pipeline/test_domopipeline.py +++ b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py @@ -96,14 +96,14 @@ EXPECTED_PIPELINE_STATUS = [ OMetaPipelineStatus( pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( - timestamp=1665476792, + timestamp=1665476792000, executionStatus="Successful", taskStatus=[ TaskStatus( name="1", executionStatus="Successful", - startTime=1665476783, - endTime=1665476792, + startTime=1665476783000, + endTime=1665476792000, logLink=None, ) ], @@ -112,14 +112,14 @@ EXPECTED_PIPELINE_STATUS = [ OMetaPipelineStatus( pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( - timestamp=1665470252, + timestamp=1665470252000, executionStatus="Successful", taskStatus=[ TaskStatus( name="1", executionStatus="Successful", - startTime=1665470244, - endTime=1665470252, + startTime=1665470244000, + endTime=1665470252000, logLink=None, ) ], @@ -128,14 +128,14 @@ EXPECTED_PIPELINE_STATUS = [ OMetaPipelineStatus( pipeline_fqn="local_domo_pipeline.1", pipeline_status=PipelineStatus( - timestamp=1665148827, + timestamp=1665148827000, executionStatus="Successful", taskStatus=[ TaskStatus( name="1", executionStatus="Successful", - startTime=1665148818, - endTime=1665148827, + startTime=1665148818000, + endTime=1665148827000, logLink=None, ) ], @@ -272,8 +272,8 @@ class DomoPipelineUnitTest(TestCase): pipeline_status_list = [] results = self.domopipeline.yield_pipeline_status(MOCK_PIPELINE_DETAILS) for result in results: - if isinstance(result, OMetaPipelineStatus): - pipeline_status_list.append(result) + if isinstance(result.right, OMetaPipelineStatus): + pipeline_status_list.append(result.right) for _, (expected, original) in enumerate( zip(EXPECTED_PIPELINE_STATUS, pipeline_status_list)