From 2732906059a41df9fc908fe5ce32b4159ca0eeb8 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 11 Apr 2022 13:27:06 +0200 Subject: [PATCH] Fix #4021 - Pipeline status sample data (#4023) * Update connection arguments * Pipeline status sample data --- .../database/connectionBasicType.json | 14 +-- .../sample_data/pipelines/pipelineStatus.json | 105 ++++++++++++++++++ .../ingestion/models/pipeline_status.py | 22 ++++ .../metadata/ingestion/sink/metadata_rest.py | 21 ++++ .../metadata/ingestion/source/sample_data.py | 19 +++- 5 files changed, 173 insertions(+), 8 deletions(-) create mode 100644 ingestion/examples/sample_data/pipelines/pipelineStatus.json create mode 100644 ingestion/src/metadata/ingestion/models/pipeline_status.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/connectionBasicType.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/connectionBasicType.json index e3062bfae70..2ad95bdab15 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/connectionBasicType.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/connectionBasicType.json @@ -7,18 +7,18 @@ "connectionOptions": { "javaType": "org.openmetadata.catalog.services.connections.database.ConnectionOptions", "description": "Additional connection options that can be sent to service during the connection.", - "type": "object" + "type": "object", + "additionalProperties": { + "type": "string" + } }, "connectionArguments": { "javaType": "org.openmetadata.catalog.services.connections.database.ConnectionArguments", "description": "Additional connection arguments such as security or protocol configs that can be sent to service during connection.", "type": "object", - "properties": { - "http_path": { - "description": "HTTP path of databricks cluster", - "type": "string" - } + "additionalProperties": { + "type": "string" } } } -} +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/pipelines/pipelineStatus.json b/ingestion/examples/sample_data/pipelines/pipelineStatus.json new file mode 100644 index 00000000000..8ccbe117bbc --- /dev/null +++ b/ingestion/examples/sample_data/pipelines/pipelineStatus.json @@ -0,0 +1,105 @@ +[ + { + "pipeline": "sample_airflow.dim_address_etl", + "pipelineStatus": [ + { + "executionDate": 1649582444, + "executionStatus": "Failed", + "taskStatus": [ + { + "executionStatus": "Failed", + "name": "dim_address_task" + }, + { + "executionStatus": "Failed", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669589, + "executionStatus": "Successful", + "taskStatus": [ + { + "executionStatus": "Successful", + "name": "dim_address_task" + }, + { + "executionStatus": "Successful", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669394, + "executionStatus": "Failed", + "taskStatus": [ + { + "executionStatus": "Successful", + "name": "dim_address_task" + }, + { + "executionStatus": "Failed", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669174, + "executionStatus": "Failed", + "taskStatus": [ + { + "executionStatus": "Failed", + "name": "dim_address_task" + }, + { + "executionStatus": "Successful", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669274, + "executionStatus": "Pending", + "taskStatus": [ + { + "executionStatus": "Pending", + "name": "dim_address_task" + }, + { + "executionStatus": "Successful", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669374, + "executionStatus": "Pending", + "taskStatus": [ + { + "executionStatus": "Failed", + "name": "dim_address_task" + }, + { + "executionStatus": "Pending", + "name": "assert_table_exists" + } + ] + }, + { + "executionDate": 1649669474, + "executionStatus": "Pending", + "taskStatus": [ + { + "executionStatus": "Pending", + "name": "dim_address_task" + }, + { + "executionStatus": "Pending", + "name": "assert_table_exists" + } + ] + } + ] + } +] \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/pipeline_status.py b/ingestion/src/metadata/ingestion/models/pipeline_status.py new file mode 100644 index 00000000000..5a2eb9d0e6d --- /dev/null +++ b/ingestion/src/metadata/ingestion/models/pipeline_status.py @@ -0,0 +1,22 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Model required to ingest pipeline status data +from the sample data +""" +from pydantic import BaseModel + +from metadata.generated.schema.entity.data.pipeline import PipelineStatus + + +class OMetaPipelineStatus(BaseModel): + pipeline_fqdn: str + pipeline_status: PipelineStatus diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 0bcc7a061af..f3046ccb670 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -46,6 +46,7 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile @@ -128,6 +129,8 @@ class MetadataRestSink(Sink[Entity]): self.delete_table(record) elif isinstance(record, OMetaTableTest): self.write_table_tests(record) + elif isinstance(record, OMetaPipelineStatus): + self.write_pipeline_status(record) else: logging.info( f"Ignoring the record due to unknown Record type {type(record)}" @@ -528,6 +531,24 @@ class MetadataRestSink(Sink[Entity]): logger.debug(traceback.print_exc()) logger.error(err) + def write_pipeline_status(self, record: OMetaPipelineStatus) -> None: + """ + Use the /status endpoint to add PipelineStatus + data to a Pipeline Entity + """ + try: + pipeline = self.metadata.get_by_name( + entity=Pipeline, fqdn=record.pipeline_fqdn + ) + self.metadata.add_pipeline_status( + pipeline=pipeline, status=record.pipeline_status + ) + self.status.records_written(f"Pipeline Status: {record.pipeline_fqdn}") + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index ea947248d68..2fea4b4e96d 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -33,7 +33,7 @@ from metadata.generated.schema.api.tests.createTableTest import CreateTableTestR from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, @@ -53,6 +53,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile @@ -303,6 +304,9 @@ class SampleDataSource(Source[Entity]): self.table_tests = json.load( open(self.config.sample_data_folder + "/datasets/tableTests.json", "r") ) + self.pipeline_status = json.load( + open(self.config.sample_data_folder + "/pipelines/pipelineStatus.json", "r") + ) @classmethod def create(cls, config_dict, metadata_config): @@ -324,6 +328,7 @@ class SampleDataSource(Source[Entity]): yield from self.ingest_dashboards() yield from self.ingest_pipelines() yield from self.ingest_lineage() + yield from self.ingest_pipeline_status() yield from self.ingest_mlmodels() def ingest_locations(self) -> Iterable[Location]: @@ -498,6 +503,18 @@ class SampleDataSource(Source[Entity]): ) yield lineage + def ingest_pipeline_status(self) -> Iterable[OMetaPipelineStatus]: + """ + Ingest sample pipeline status + """ + for status_data in self.pipeline_status: + pipeline_fqdn = status_data["pipeline"] + for status in status_data["pipelineStatus"]: + yield OMetaPipelineStatus( + pipeline_fqdn=pipeline_fqdn, + pipeline_status=PipelineStatus(**status), + ) + def ingest_mlmodels(self) -> Iterable[CreateMlModelRequest]: """ Convert sample model data into a Model Entity