Fix #4021 - Pipeline status sample data (#4023)

* Update connection arguments

* Pipeline status sample data
This commit is contained in:
Pere Miquel Brull 2022-04-11 13:27:06 +02:00 committed by GitHub
parent 9c92424f74
commit 2732906059
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 8 deletions

View File

@ -7,18 +7,18 @@
"connectionOptions": {
"javaType": "org.openmetadata.catalog.services.connections.database.ConnectionOptions",
"description": "Additional connection options that can be sent to service during the connection.",
"type": "object"
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"connectionArguments": {
"javaType": "org.openmetadata.catalog.services.connections.database.ConnectionArguments",
"description": "Additional connection arguments such as security or protocol configs that can be sent to service during connection.",
"type": "object",
"properties": {
"http_path": {
"description": "HTTP path of databricks cluster",
"type": "string"
}
"additionalProperties": {
"type": "string"
}
}
}
}
}

View File

@ -0,0 +1,105 @@
[
{
"pipeline": "sample_airflow.dim_address_etl",
"pipelineStatus": [
{
"executionDate": 1649582444,
"executionStatus": "Failed",
"taskStatus": [
{
"executionStatus": "Failed",
"name": "dim_address_task"
},
{
"executionStatus": "Failed",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669589,
"executionStatus": "Successful",
"taskStatus": [
{
"executionStatus": "Successful",
"name": "dim_address_task"
},
{
"executionStatus": "Successful",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669394,
"executionStatus": "Failed",
"taskStatus": [
{
"executionStatus": "Successful",
"name": "dim_address_task"
},
{
"executionStatus": "Failed",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669174,
"executionStatus": "Failed",
"taskStatus": [
{
"executionStatus": "Failed",
"name": "dim_address_task"
},
{
"executionStatus": "Successful",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669274,
"executionStatus": "Pending",
"taskStatus": [
{
"executionStatus": "Pending",
"name": "dim_address_task"
},
{
"executionStatus": "Successful",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669374,
"executionStatus": "Pending",
"taskStatus": [
{
"executionStatus": "Failed",
"name": "dim_address_task"
},
{
"executionStatus": "Pending",
"name": "assert_table_exists"
}
]
},
{
"executionDate": 1649669474,
"executionStatus": "Pending",
"taskStatus": [
{
"executionStatus": "Pending",
"name": "dim_address_task"
},
{
"executionStatus": "Pending",
"name": "assert_table_exists"
}
]
}
]
}
]

View File

@ -0,0 +1,22 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Model required to ingest pipeline status data
from the sample data
"""
from pydantic import BaseModel
from metadata.generated.schema.entity.data.pipeline import PipelineStatus
class OMetaPipelineStatus(BaseModel):
pipeline_fqdn: str
pipeline_status: PipelineStatus

View File

@ -46,6 +46,7 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
@ -128,6 +129,8 @@ class MetadataRestSink(Sink[Entity]):
self.delete_table(record)
elif isinstance(record, OMetaTableTest):
self.write_table_tests(record)
elif isinstance(record, OMetaPipelineStatus):
self.write_pipeline_status(record)
else:
logging.info(
f"Ignoring the record due to unknown Record type {type(record)}"
@ -528,6 +531,24 @@ class MetadataRestSink(Sink[Entity]):
logger.debug(traceback.print_exc())
logger.error(err)
def write_pipeline_status(self, record: OMetaPipelineStatus) -> None:
"""
Use the /status endpoint to add PipelineStatus
data to a Pipeline Entity
"""
try:
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqdn=record.pipeline_fqdn
)
self.metadata.add_pipeline_status(
pipeline=pipeline, status=record.pipeline_status
)
self.status.records_written(f"Pipeline Status: {record.pipeline_fqdn}")
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_status(self):
return self.status

View File

@ -33,7 +33,7 @@ from metadata.generated.schema.api.tests.createTableTest import CreateTableTestR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
@ -53,6 +53,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
@ -303,6 +304,9 @@ class SampleDataSource(Source[Entity]):
self.table_tests = json.load(
open(self.config.sample_data_folder + "/datasets/tableTests.json", "r")
)
self.pipeline_status = json.load(
open(self.config.sample_data_folder + "/pipelines/pipelineStatus.json", "r")
)
@classmethod
def create(cls, config_dict, metadata_config):
@ -324,6 +328,7 @@ class SampleDataSource(Source[Entity]):
yield from self.ingest_dashboards()
yield from self.ingest_pipelines()
yield from self.ingest_lineage()
yield from self.ingest_pipeline_status()
yield from self.ingest_mlmodels()
def ingest_locations(self) -> Iterable[Location]:
@ -498,6 +503,18 @@ class SampleDataSource(Source[Entity]):
)
yield lineage
def ingest_pipeline_status(self) -> Iterable[OMetaPipelineStatus]:
"""
Ingest sample pipeline status
"""
for status_data in self.pipeline_status:
pipeline_fqdn = status_data["pipeline"]
for status in status_data["pipelineStatus"]:
yield OMetaPipelineStatus(
pipeline_fqdn=pipeline_fqdn,
pipeline_status=PipelineStatus(**status),
)
def ingest_mlmodels(self) -> Iterable[CreateMlModelRequest]:
"""
Convert sample model data into a Model Entity