Fix #6141: Ingestion Pipeline Status Updates (#8216)

* Fix #6141: Ingestion Pipeline Status Updates

* List Pipeline Status API & Improvements

* Rename State field to PipelineState in UI

* Convert Pipeline Status array to single object

* fix braking UI

* Rebase Fixes

* Profiler, TestStuite & DataInsigts Pipeline

* py_format

* fix logs page not loading
add pipelineStatus endpoint

* fix recent run changes

* Fix Tests

* address review comments for ui

* fix failing checks

* fix unit tests

* fix cypress test

* remove loader test as not using it

* wait for API when we click on the tab

* fix cypress waitFor api

* fix failed cypress tests

Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
Mayur Singal 2022-11-03 14:37:26 +05:30 committed by GitHub
parent 8273197b48
commit 1386b43607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1645 additions and 1047 deletions

View File

@ -34,6 +34,9 @@ from metadata.generated.schema.analytics.reportData import ReportDataType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Sink,
@ -42,6 +45,9 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import data_insight_logger
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_data_insight_status
logger = data_insight_logger()
@ -60,6 +66,7 @@ class DataInsightWorkflow:
self.config.workflowConfig.openMetadataServerConfig
)
self.metadata = OpenMetadata(self.metadata_config)
self.set_ingestion_pipeline_status(state=PipelineState.running)
self.status = ProcessorStatus()
self.data_processor: Optional[
@ -155,3 +162,22 @@ class DataInsightWorkflow:
):
return 1
return 0
def stop(self):
"""
Close all connections
"""
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close()
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -11,16 +11,20 @@
"""
Workflow definition for metadata related ingestions: metadata, lineage and usage.
"""
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string
import importlib
import traceback
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string
from typing import Optional, Type, TypeVar
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseMetadataConfigType,
@ -41,6 +45,9 @@ from metadata.utils.class_helper import (
get_service_type_from_source_type,
)
from metadata.utils.logger import ingestion_logger, set_loggers_level
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_status
logger = ingestion_logger()
@ -49,6 +56,8 @@ T = TypeVar("T")
SAMPLE_SOURCE = {"sample-data", "sample-usage"}
SUCCESS_THRESHOLD_VALUE = 90
class InvalidWorkflowJSONException(Exception):
"""
@ -92,9 +101,13 @@ class Workflow:
self.config.workflowConfig.openMetadataServerConfig
)
self._retrieve_service_connection_if_needed(metadata_config, service_type)
self.metadata = OpenMetadata(config=metadata_config)
self._retrieve_dbt_config_source_if_needed(metadata_config, service_type)
self.set_ingestion_pipeline_status(state=PipelineState.running)
self._retrieve_service_connection_if_needed(service_type)
self._retrieve_dbt_config_source_if_needed(service_type)
logger.info(f"Service type:{service_type},{source_type} configured")
@ -217,10 +230,27 @@ class Workflow:
self.bulk_sink.close()
if hasattr(self, "sink"):
self.sink.close()
pipeline_state = PipelineState.success
if (
self._get_source_success() >= SUCCESS_THRESHOLD_VALUE
and self._get_source_success() < 100
):
pipeline_state = PipelineState.partialSuccess
self.set_ingestion_pipeline_status(pipeline_state)
self.source.close()
def _get_source_success(self):
return self.source.get_status().calculate_success()
def raise_from_status(self, raise_warnings=False):
if self.source.get_status().failures:
"""
Method to raise error if failed execution
"""
if (
self.source.get_status().failures
and self._get_source_success() < SUCCESS_THRESHOLD_VALUE
):
raise WorkflowExecutionError(
"Source reported errors", self.source.get_status()
)
@ -258,15 +288,24 @@ class Workflow:
return 0
def _retrieve_service_connection_if_needed(
self, metadata_config: OpenMetadataConnection, service_type: ServiceType
) -> None:
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id
def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When secrets' manager is configured, we retrieve the service connection from the secrets' manager.
Otherwise, we get the service connection from the service object itself through the default `SecretsManager`.
:param metadata_config: OpenMetadata connection config
:param service_type: source workflow service type
:return:
"""
@ -274,28 +313,23 @@ class Workflow:
self.config.source.type
):
service_name = self.config.source.serviceName
metadata = OpenMetadata(config=metadata_config)
try:
service = metadata.get_by_name(
service = self.metadata.get_by_name(
get_service_class_from_service_type(service_type),
service_name,
)
if service:
self.config.source.serviceConnection = (
metadata.secrets_manager_client.retrieve_service_connection(
service, service_type.name.lower()
)
self.config.source.serviceConnection = self.metadata.secrets_manager_client.retrieve_service_connection( # pylint: disable=line-too-long
service, service_type.name.lower()
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting dbtConfigSource for service name [{service_name}]"
f" using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
def _retrieve_dbt_config_source_if_needed(
self, metadata_config: OpenMetadataConnection, service_type: ServiceType
) -> None:
def _retrieve_dbt_config_source_if_needed(self, service_type: ServiceType) -> None:
"""
We override the current `config` source config object if it is a metadata ingestion type. When secrets' manager
is configured, we retrieve the config from the secrets' manager. Otherwise, we get the config from the source
@ -309,10 +343,9 @@ class Workflow:
and config
and config.type == DatabaseMetadataConfigType.DatabaseMetadata
):
metadata = OpenMetadata(config=metadata_config)
try:
dbt_config_source: object = (
metadata.secrets_manager_client.retrieve_dbt_source_config(
self.metadata.secrets_manager_client.retrieve_dbt_source_config(
self.config.source.sourceConfig,
self.config.source.serviceName,
)
@ -327,7 +360,7 @@ class Workflow:
logger.debug(traceback.format_exc())
logger.error(
f"Error getting dbtConfigSource for config [{config}] using the secrets manager"
f" provider [{metadata.config.secretsManagerProvider}]: {exc}"
f" provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
@staticmethod

View File

@ -0,0 +1,67 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mixin class containing ingestion pipeline specific methods
To be used by OpenMetadata class
"""
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineStatus,
)
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
logger = ometa_logger()
class OMetaIngestionPipelineMixin:
"""
OpenMetadata API methods related to ingestion pipeline.
To be inherited by OpenMetadata
"""
client: REST
def create_or_update_pipeline_status(
self, ingestion_pipeline_fqn: str, pipeline_status: PipelineStatus
) -> None:
"""
PUT create or update pipeline status
:param ingestion_pipeline_fqn: Ingestion Pipeline FQN
:param pipeline_status: Pipeline Status data to add
"""
resp = self.client.put(
f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus",
data=pipeline_status.json(),
)
logger.debug(
f"Created Pipeline Status for pipeline {ingestion_pipeline_fqn}: {resp}"
)
return resp
def get_pipeline_status(
self, ingestion_pipeline_fqn: str, pipeline_status_run_id: str
) -> None:
"""
GET pipeline status
:param ingestion_pipeline_fqn: Ingestion Pipeline FQN
:param pipeline_status_run_id: Pipeline Status run id
"""
resp = self.client.get(
f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}"
)
if resp:
return PipelineStatus(**resp)
return None

View File

@ -67,6 +67,9 @@ from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin
from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInisghtMixin
from metadata.ingestion.ometa.mixins.es_mixin import ESMixin
from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin
from metadata.ingestion.ometa.mixins.ingestion_pipeline_mixin import (
OMetaIngestionPipelineMixin,
)
from metadata.ingestion.ometa.mixins.mlmodel_mixin import OMetaMlModelMixin
from metadata.ingestion.ometa.mixins.patch_mixin import OMetaPatchMixin
from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin
@ -147,6 +150,7 @@ class OpenMetadata(
OMetaPatchMixin,
OMetaTestsMixin,
DataInisghtMixin,
OMetaIngestionPipelineMixin,
Generic[T, C],
):
"""

View File

@ -38,6 +38,9 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
@ -69,6 +72,9 @@ from metadata.utils.class_helper import (
)
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import profiler_logger
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_profiler_status
logger = profiler_logger()
@ -101,6 +107,8 @@ class ProfilerWorkflow:
self._retrieve_service_connection_if_needed()
self.set_ingestion_pipeline_status(state=PipelineState.running)
# Init and type the source config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
@ -516,6 +524,7 @@ class ProfilerWorkflow:
"""
Close all connections
"""
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close()
def _retrieve_service_connection_if_needed(self) -> None:
@ -544,3 +553,15 @@ class ProfilerWorkflow:
@staticmethod
def _is_sample_source(service_type):
return service_type in {"sample-data", "sample-usage"}
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -36,6 +36,9 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
@ -55,6 +58,9 @@ from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcesso
from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link
from metadata.utils.logger import test_suite_logger
from metadata.utils.workflow_helper import (
set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper,
)
from metadata.utils.workflow_output_handler import print_test_suite_status
logger: Logger = test_suite_logger()
@ -88,6 +94,8 @@ class TestSuiteWorkflow:
)
self.metadata = OpenMetadata(self.metadata_config)
self.set_ingestion_pipeline_status(state=PipelineState.running)
self.status = ProcessorStatus()
if self.config.sink:
@ -485,4 +493,17 @@ class TestSuiteWorkflow:
"""
Close all connections
"""
self.set_ingestion_pipeline_status(PipelineState.success)
self.metadata.close()
def set_ingestion_pipeline_status(self, state: PipelineState):
"""
Method to set the pipeline status of current ingestion pipeline
"""
pipeline_run_id = set_ingestion_pipeline_status_helper(
state=state,
ingestion_pipeline_fqn=self.config.ingestionPipelineFQN,
pipeline_run_id=self.config.pipelineRunId,
metadata=self.metadata,
)
self.config.pipelineRunId = pipeline_run_id

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Modules contains helper methods for different workflows
"""
import uuid
from datetime import datetime
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
PipelineStatus,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
def set_ingestion_pipeline_status(
state: PipelineState,
ingestion_pipeline_fqn: str,
pipeline_run_id: str,
metadata: OpenMetadata,
) -> str:
"""
Method to set the pipeline status of current ingestion pipeline
"""
if ingestion_pipeline_fqn:
# if ingestion pipeline fqn is not set then setting pipeline status is avoided
pipeline_status: PipelineStatus = None
if state in (PipelineState.queued, PipelineState.running):
pipeline_run_id = pipeline_run_id or str(uuid.uuid4())
pipeline_status = PipelineStatus(
runId=pipeline_run_id,
pipelineState=state,
startDate=datetime.now().timestamp() * 1000,
timestamp=datetime.now().timestamp() * 1000,
)
elif pipeline_run_id:
pipeline_status = metadata.get_pipeline_status(
ingestion_pipeline_fqn, pipeline_run_id
)
# if workflow is ended then update the end date in status
pipeline_status.endDate = datetime.now().timestamp() * 1000
pipeline_status.pipelineState = state
metadata.create_or_update_pipeline_status(
ingestion_pipeline_fqn, pipeline_status
)
return pipeline_run_id
return None

View File

@ -74,7 +74,7 @@ class ResponseFormat:
Build the pipeline status
"""
return PipelineStatus(
state=dag_run.get_state(),
pipelineState=dag_run.get_state(),
runId=dag_run.run_id,
startDate=datetime_to_ts(dag_run.start_date),
endDate=datetime_to_ts(dag_run.end_date),

View File

@ -44,6 +44,7 @@ from openmetadata_managed_apis.workflows.ingestion.credentials_builder import (
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
PipelineState,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
@ -156,14 +157,16 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -179,12 +182,15 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -200,12 +206,16 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = TestSuiteWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -223,9 +233,14 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = DataInsightWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
try:
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
except Exception as err:
workflow.set_ingestion_pipeline_status(PipelineState.failed)
raise err
def date_to_datetime(

View File

@ -48,6 +48,7 @@ def build_data_insight_workflow_config(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config

View File

@ -51,6 +51,7 @@ def build_lineage_workflow_config(
config={},
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config

View File

@ -48,6 +48,7 @@ def build_metadata_workflow_config(
config={},
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config

View File

@ -56,6 +56,7 @@ def build_profiler_workflow_config(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config

View File

@ -52,6 +52,7 @@ def build_test_suite_workflow_config(
loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO,
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config

View File

@ -66,6 +66,7 @@ def build_usage_config_from_file(
config={"filename": filename},
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)

View File

@ -159,7 +159,7 @@ class TestAirflowOps(TestCase):
res = status(dag_id="dag_status")
self.assertEqual(res.status_code, 200)
self.assertEqual(res.json[0].get("state"), "running")
self.assertEqual(res.json[0].get("pipelineState"), "running")
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
@ -182,7 +182,7 @@ class TestAirflowOps(TestCase):
self.assertEqual(res.status_code, 200)
res_status = {elem.get("state") for elem in res.json}
res_status = {elem.get("pipelineState") for elem in res.json}
self.assertEqual(res_status, {"failed", "success"})
def test_dag_state(self):
@ -227,6 +227,7 @@ class TestAirflowOps(TestCase):
id=uuid.uuid4(),
pipelineType=PipelineType.metadata,
name="my_new_dag",
fullyQualifiedName="test-service-ops.my_new_dag",
sourceConfig=SourceConfig(config=DatabaseServiceMetadataPipeline()),
openMetadataServerConnection=self.conn,
airflowConfig=AirflowConfig(),

View File

@ -15,6 +15,7 @@ Validate metadata ingestion workflow generation
import json
import uuid
from unittest import TestCase
from unittest.mock import patch
from openmetadata_managed_apis.workflows.ingestion.lineage import (
build_lineage_workflow_config,
@ -73,6 +74,10 @@ from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.test_suite.api.workflow import TestSuiteWorkflow
def mock_set_ingestion_pipeline_status(self, state):
return True
class OMetaServiceTest(TestCase):
"""
Run this integration test with the local API available
@ -181,6 +186,9 @@ class OMetaServiceTest(TestCase):
hard_delete=True,
)
@patch.object(
Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status
)
def test_ingestion_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
@ -209,6 +217,9 @@ class OMetaServiceTest(TestCase):
Workflow.create(config)
@patch.object(
Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status
)
def test_usage_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
@ -239,6 +250,9 @@ class OMetaServiceTest(TestCase):
Workflow.create(config)
@patch.object(
Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status
)
def test_lineage_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
@ -269,6 +283,11 @@ class OMetaServiceTest(TestCase):
Workflow.create(config)
@patch.object(
ProfilerWorkflow,
"set_ingestion_pipeline_status",
mock_set_ingestion_pipeline_status,
)
def test_profiler_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
@ -297,6 +316,11 @@ class OMetaServiceTest(TestCase):
ProfilerWorkflow.create(config)
@patch.object(
TestSuiteWorkflow,
"set_ingestion_pipeline_status",
mock_set_ingestion_pipeline_status,
)
def test_test_suite_workflow(self):
"""
Validate that the ingestionPipeline can be parsed

View File

@ -59,7 +59,7 @@ public final class Entity {
public static final String FIELD_FOLLOWERS = "followers";
public static final String FIELD_TAGS = "tags";
public static final String FIELD_DELETED = "deleted";
public static final String FIELD_PIPELINE_STATUSES = "pipelineStatuses";
public static final String FIELD_PIPELINE_STATUS = "pipelineStatus";
public static final String FIELD_DISPLAY_NAME = "displayName";
public static final String FIELD_EXTENSION = "extension";
public static final String FIELD_USAGE_SUMMARY = "usageSummary";

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
@ -26,7 +25,6 @@ import org.json.JSONObject;
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.exception.PipelineServiceClientException;
@ -64,6 +62,7 @@ public class AirflowRESTClient extends PipelineServiceClient {
String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
response = post(deployUrl, pipelinePayload);
if (response.statusCode() == 200) {
ingestionPipeline.setDeployed(true);
return response.body();
}
} catch (Exception e) {
@ -159,8 +158,6 @@ public class AirflowRESTClient extends PipelineServiceClient {
response =
getRequestAuthenticatedForJsonContent(statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName());
if (response.statusCode() == 200) {
List<PipelineStatus> statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class);
ingestionPipeline.setPipelineStatuses(statuses);
ingestionPipeline.setDeployed(true);
return ingestionPipeline;
} else if (response.statusCode() == 404) {

View File

@ -3027,6 +3027,23 @@ public interface CollectionDAO {
void deleteAtTimestamp(
@Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("timestamp") Long timestamp);
@SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN and jsonSchema = :jsonSchema")
List<String> listByFQN(@Bind("entityFQN") String entityFQN, @Bind("jsonSchema") String jsonSchema);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND jsonSchema = :jsonSchema "
+ "ORDER BY timestamp DESC LIMIT 1")
String getLatestExtensionByFQN(@Bind("entityFQN") String entityFQN, @Bind("jsonSchema") String jsonSchema);
@SqlQuery(
"SELECT json FROM entity_extension_time_series where entityFQN = :entityFQN and jsonSchema = :jsonSchema "
+ " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC")
List<String> listBetweenTimestampsByFQN(
@Bind("entityFQN") String entityFQN,
@Bind("jsonSchema") String jsonSchema,
@Bind("startTs") Long startTs,
@Bind("endTs") long endTs);
@SqlQuery(
"SELECT json FROM entity_extension_time_series where entityFQN = :entityFQN and extension = :extension "
+ " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC")

View File

@ -17,27 +17,30 @@ import static org.openmetadata.service.Entity.FIELD_OWNER;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.json.JSONObject;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.services.ingestionPipelines.*;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.LogLevels;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.*;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResource;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.util.*;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.PipelineServiceClient;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled";
private static final String PIPELINE_STATUS_JSON_SCHEMA = "pipelineStatus";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
@ -134,6 +137,101 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
pipelineServiceClient = client;
}
private ChangeEvent getChangeEvent(
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
return new ChangeEvent()
.withEntity(updated)
.withChangeDescription(change)
.withEventType(EventType.ENTITY_UPDATED)
.withEntityType(entityType)
.withEntityId(updated.getId())
.withEntityFullyQualifiedName(updated.getFullyQualifiedName())
.withUserName(updated.getUpdatedBy())
.withTimestamp(System.currentTimeMillis())
.withCurrentVersion(updated.getVersion())
.withPreviousVersion(prevVersion);
}
private ChangeDescription addPipelineStatusChangeDescription(Double version, Object newValue, Object oldValue) {
FieldChange fieldChange =
new FieldChange().withName("testCaseResult").withNewValue(newValue).withOldValue(oldValue);
ChangeDescription change = new ChangeDescription().withPreviousVersion(version);
change.getFieldsUpdated().add(fieldChange);
return change;
}
@Transaction
public RestUtil.PutResponse<?> addPipelineStatus(UriInfo uriInfo, String fqn, PipelineStatus pipelineStatus)
throws IOException {
// Validate the request content
IngestionPipeline ingestionPipeline = dao.findEntityByName(fqn);
PipelineStatus storedPipelineStatus =
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getLatestExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatus.getRunId()),
PipelineStatus.class);
if (storedPipelineStatus != null) {
daoCollection
.entityExtensionTimeSeriesDao()
.update(
ingestionPipeline.getFullyQualifiedName(),
pipelineStatus.getRunId(),
JsonUtils.pojoToJson(pipelineStatus),
pipelineStatus.getTimestamp());
} else {
daoCollection
.entityExtensionTimeSeriesDao()
.insert(
ingestionPipeline.getFullyQualifiedName(),
pipelineStatus.getRunId(),
"pipelineStatus",
JsonUtils.pojoToJson(pipelineStatus));
}
ChangeDescription change =
addPipelineStatusChangeDescription(ingestionPipeline.getVersion(), pipelineStatus, storedPipelineStatus);
ChangeEvent changeEvent =
getChangeEvent(withHref(uriInfo, ingestionPipeline), change, entityType, ingestionPipeline.getVersion());
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
public ResultList<PipelineStatus> listPipelineStatus(String ingestionPipelineFQN, Long startTs, Long endTs)
throws IOException {
IngestionPipeline ingestionPipeline = dao.findEntityByName(ingestionPipelineFQN);
List<PipelineStatus> pipelineStatusList =
JsonUtils.readObjects(
daoCollection
.entityExtensionTimeSeriesDao()
.listBetweenTimestampsByFQN(
ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_JSON_SCHEMA, startTs, endTs),
PipelineStatus.class);
return new ResultList<>(
pipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), pipelineStatusList.size());
}
public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) throws IOException {
PipelineStatus pipelineStatus =
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getLatestExtensionByFQN(ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_JSON_SCHEMA),
PipelineStatus.class);
return pipelineStatus;
}
public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipelineStatusRunId) throws IOException {
IngestionPipeline ingestionPipeline = dao.findEntityByName(ingestionPipelineFQN);
PipelineStatus pipelineStatus =
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatusRunId.toString()),
PipelineStatus.class);
return pipelineStatus;
}
/** Handles entity updated from PUT and POST operation. */
public class IngestionPipelineUpdater extends EntityUpdater {
public IngestionPipelineUpdater(IngestionPipeline original, IngestionPipeline updated, Operation operation) {

View File

@ -14,8 +14,7 @@
package org.openmetadata.service.resources.services.ingestionpipelines;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.Entity.FIELD_PIPELINE_STATUSES;
import static org.openmetadata.service.Entity.*;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
@ -28,33 +27,24 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
@ -167,11 +157,13 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
ListFilter filter = new ListFilter(include).addQueryParam("service", serviceParam);
ResultList<IngestionPipeline> ingestionPipelines =
super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
addStatus(ingestionPipelines.getData());
for (IngestionPipeline ingestionPipeline : listOrEmpty(ingestionPipelines.getData())) {
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUS)) {
ingestionPipeline.setPipelineStatuses(dao.getLatestPipelineStatus(ingestionPipeline));
}
decryptOrNullify(securityContext, ingestionPipeline);
}
listOrEmpty(ingestionPipelines.getData())
.forEach(ingestionPipeline -> decryptOrNullify(securityContext, ingestionPipeline));
return ingestionPipelines;
}
@ -228,8 +220,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
Include include)
throws IOException {
IngestionPipeline ingestionPipeline = getInternal(uriInfo, securityContext, id, fieldsParam, include);
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
ingestionPipeline = addStatus(ingestionPipeline);
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUS)) {
ingestionPipeline.setPipelineStatuses(dao.getLatestPipelineStatus(ingestionPipeline));
}
return decryptOrNullify(securityContext, ingestionPipeline);
}
@ -296,8 +288,8 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
Include include)
throws IOException {
IngestionPipeline ingestionPipeline = getByNameInternal(uriInfo, securityContext, fqn, fieldsParam, include);
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) {
ingestionPipeline = addStatus(ingestionPipeline);
if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUS)) {
ingestionPipeline.setPipelineStatuses(dao.getLatestPipelineStatus(ingestionPipeline));
}
return decryptOrNullify(securityContext, ingestionPipeline);
}
@ -397,6 +389,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
ingestionPipeline.setOpenMetadataServerConnection(
new OpenMetadataServerConnectionBuilder(openMetadataApplicationConfig).build());
pipelineServiceClient.deployPipeline(ingestionPipeline);
createOrUpdate(uriInfo, securityContext, ingestionPipeline);
decryptOrNullify(securityContext, ingestionPipeline);
return addHref(uriInfo, ingestionPipeline);
}
@ -446,6 +439,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
IngestionPipeline pipeline = dao.get(uriInfo, id, fields);
// This call updates the state in Airflow as well as the `enabled` field on the IngestionPipeline
pipelineServiceClient.toggleIngestion(pipeline);
createOrUpdate(uriInfo, securityContext, pipeline);
return createOrUpdate(uriInfo, securityContext, pipeline);
}
@ -499,23 +493,6 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return Response.status(200, response.body()).build();
}
@GET
@Path("/status")
@Operation(
operationId = "checkRestAirflowStatus",
summary = "Check the Airflow REST status",
tags = "IngestionPipelines",
description = "Check that the Airflow REST endpoint is reachable and up and running",
responses = {
@ApiResponse(
responseCode = "200",
description = "Status message",
content = @Content(mediaType = "application/json"))
})
public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
return pipelineServiceClient.getServiceStatus();
}
@GET
@Path("/ip")
@Operation(
@ -534,6 +511,23 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return Response.ok(hostIp, MediaType.APPLICATION_JSON_TYPE).build();
}
@GET
@Path("/status")
@Operation(
operationId = "checkRestAirflowStatus",
summary = "Check the Airflow REST status",
tags = "IngestionPipelines",
description = "Check that the Airflow REST endpoint is reachable and up and running",
responses = {
@ApiResponse(
responseCode = "200",
description = "Status message",
content = @Content(mediaType = "application/json"))
})
public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
return pipelineServiceClient.getServiceStatus();
}
@DELETE
@Path("/{id}")
@Operation(
@ -583,6 +577,99 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
}
@PUT
@Path("/{fqn}/pipelineStatus")
@Operation(
operationId = "addPipelineStatus",
summary = "Add pipeline status",
tags = "IngestionPipelines",
description = "Add pipeline status of ingestion pipeline.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully updated the PipelineStatus. ",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class)))
})
public Response addPipelineStatus(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Encoded
@Parameter(description = "fqn of the ingestion pipeline", schema = @Schema(type = "string"))
@PathParam("fqn")
String fqn,
@Valid PipelineStatus pipelineStatus)
throws IOException {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return dao.addPipelineStatus(uriInfo, fqn, pipelineStatus).toResponse();
}
@GET
@Path("/{fqn}/pipelineStatus")
@Operation(
operationId = "listPipelineStatuses",
summary = "List of pipeline status",
tags = "IngestionPipelines",
description =
"Get a list of all the pipeline status for the given ingestion pipeline id, optionally filtered by `startTs` and `endTs` of the profile. "
+ "Use cursor-based pagination to limit the number of "
+ "entries in the list using `limit` and `before` or `after` query params.",
responses = {
@ApiResponse(
responseCode = "200",
description = "List of pipeline status",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class)))
})
public ResultList<PipelineStatus> listPipelineStatuses(
@Context SecurityContext securityContext,
@Parameter(description = "fqn of the ingestion pipeline", schema = @Schema(type = "string")) @PathParam("fqn")
String fqn,
@Parameter(
description = "Filter pipeline status after the given start timestamp",
schema = @Schema(type = "number"))
@NonNull
@QueryParam("startTs")
Long startTs,
@Parameter(
description = "Filter pipeline status before the given end timestamp",
schema = @Schema(type = "number"))
@NonNull
@QueryParam("endTs")
Long endTs)
throws IOException {
return dao.listPipelineStatus(fqn, startTs, endTs);
}
@GET
@Path("/{fqn}/pipelineStatus/{id}")
@Operation(
operationId = "getPipelineStatus",
summary = "get pipeline status",
tags = "IngestionPipelines",
description = "Get pipeline status of ingestion pipeline",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully updated state of the PipelineStatus.",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class)))
})
public PipelineStatus getPipelineStatus(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Encoded
@Parameter(description = "fqn of the ingestion pipeline", schema = @Schema(type = "string"))
@PathParam("fqn")
String fqn,
@Parameter(description = "Pipeline Status Run Id", schema = @Schema(type = "string")) @PathParam("id") UUID runId)
throws IOException {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return dao.getPipelineStatus(fqn, runId);
}
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
OpenMetadataServerConnection openMetadataServerConnection =
new OpenMetadataServerConnectionBuilder(openMetadataApplicationConfig).build();
@ -595,19 +682,6 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
.withService(create.getService());
}
public void addStatus(List<IngestionPipeline> ingestionPipelines) {
listOrEmpty(ingestionPipelines).forEach(this::addStatus);
}
private IngestionPipeline addStatus(IngestionPipeline ingestionPipeline) {
try {
ingestionPipeline = pipelineServiceClient.getPipelineStatus(ingestionPipeline);
} catch (Exception e) {
LOG.error("Failed to fetch status for {} due to {}", ingestionPipeline.getName(), e);
}
return ingestionPipeline;
}
private IngestionPipeline decryptOrNullify(SecurityContext securityContext, IngestionPipeline ingestionPipeline) {
SecretsManager secretsManager = SecretsManagerFactory.getSecretsManager();
try {

View File

@ -22,9 +22,11 @@
"description": "Pipeline unique run ID.",
"type": "string"
},
"state": {
"pipelineState": {
"description": "Pipeline status denotes if its failed or succeeded.",
"type": "string"
"type": "string",
"javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType",
"enum": ["queued","success","failed","running","partialSuccess"]
},
"startDate": {
"description": "startDate of the pipeline run for this particular execution.",
@ -159,12 +161,8 @@
"$ref": "../../../type/entityReference.json"
},
"pipelineStatuses": {
"description": "List of executions and status for the Pipeline.",
"type": "array",
"items": {
"$ref": "#/definitions/pipelineStatus"
},
"default": null
"description": "Last of executions and status for the Pipeline.",
"$ref": "#/definitions/pipelineStatus"
},
"loggerLevel": {
"description": "Set the logging level for the workflow.",

View File

@ -193,6 +193,14 @@
},
"workflowConfig": {
"$ref": "#/definitions/workflowConfig"
},
"ingestionPipelineFQN": {
"description": "Fully qualified name of ingestion pipeline, used to identify the current ingestion pipeline",
"type": "string"
},
"pipelineRunId": {
"description": "Unique identifier of pipeline run, used to identify the current pipleine run",
"$ref": "../type/basic.json#/definitions/uuid"
}
},
"required": ["source", "workflowConfig"],

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,9 @@
import { AxiosResponse } from 'axios';
import { Operation } from 'fast-json-patch';
import { PagingResponse } from 'Models';
import { CreateIngestionPipeline } from '../generated/api/services/ingestionPipelines/createIngestionPipeline';
import { PipelineStatus } from '../generated/entity/data/pipeline';
import { IngestionPipeline } from '../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { Paging } from '../generated/type/paging';
import { IngestionPipelineLogByIdInterface } from '../pages/LogsViewer/LogsViewer.interfaces';
@ -160,8 +162,22 @@ export const getIngestionPipelineLogById = (id: string, after?: string) => {
);
};
export const postkillIngestionPipelineById = (
export const postKillIngestionPipelineById = (
id: string
): Promise<AxiosResponse> => {
return APIClient.post(`/services/ingestionPipelines/kill/${id}`);
};
export const getRunHistoryForPipeline = async (
id: string,
params: { startTs: number; endTs: number }
) => {
const response = await APIClient.get<PagingResponse<PipelineStatus[]>>(
`/services/ingestionPipelines/${id}/pipelineStatus`,
{
params,
}
);
return response.data;
};

View File

@ -19,7 +19,7 @@ import cronstrue from 'cronstrue';
import { useTranslation } from 'react-i18next';
import { ColumnsType } from 'antd/lib/table';
import { capitalize, isNil, lowerCase, startCase } from 'lodash';
import { isNil, lowerCase, startCase } from 'lodash';
import React, { Fragment, useCallback, useMemo, useState } from 'react';
import { useHistory } from 'react-router-dom';
import { PAGE_SIZE } from '../../constants/constants';
@ -48,6 +48,7 @@ import Loader from '../Loader/Loader';
import EntityDeleteModal from '../Modals/EntityDeleteModal/EntityDeleteModal';
import KillIngestionModal from '../Modals/KillIngestionPipelineModal/KillIngestionPipelineModal';
import { IngestionProps } from './ingestion.interface';
import { IngestionRecentRuns } from './IngestionRecentRun/IngestionRecentRuns.component';
const Ingestion: React.FC<IngestionProps> = ({
airflowEndpoint,
@ -300,67 +301,6 @@ const Ingestion: React.FC<IngestionProps> = ({
<span className="tw-inline-block tw-text-gray-400 tw-self-center">|</span>
);
const getStatuses = (ingestion: IngestionPipeline) => {
const lastFiveIngestions = ingestion.pipelineStatuses
?.sort((a, b) => {
// Turn your strings into millis, and then subtract them
// to get a value that is either negative, positive, or zero.
const date1 = new Date(a.startDate || '');
const date2 = new Date(b.startDate || '');
return date1.getTime() - date2.getTime();
})
.slice(Math.max(ingestion.pipelineStatuses.length - 5, 0));
return lastFiveIngestions?.map((r, i) => {
const status =
i === lastFiveIngestions.length - 1 ? (
<p
className={`tw-h-5 tw-w-16 tw-rounded-sm tw-bg-status-${r.state} tw-mr-1 tw-px-1 tw-text-white tw-text-center`}
key={i}>
{capitalize(r.state)}
</p>
) : (
<p
className={`tw-w-4 tw-h-5 tw-rounded-sm tw-bg-status-${r.state} tw-mr-1`}
key={i}
/>
);
return r?.endDate || r?.startDate || r?.timestamp ? (
<PopOver
html={
<div className="tw-text-left">
{r.timestamp ? (
<p>
{t('label.execution-date')} :{' '}
{new Date(r.timestamp).toUTCString()}
</p>
) : null}
{r.startDate ? (
<p>
{t('label.start-date')}: {new Date(r.startDate).toUTCString()}
</p>
) : null}
{r.endDate ? (
<p>
{t('label.end-date')} : {new Date(r.endDate).toUTCString()}
</p>
) : null}
</div>
}
key={i}
position="bottom"
theme="light"
trigger="mouseenter">
{status}
</PopOver>
) : (
status
);
});
};
const getTriggerDeployButton = (ingestion: IngestionPipeline) => {
if (ingestion.deployed) {
return (
@ -467,8 +407,9 @@ const Ingestion: React.FC<IngestionProps> = ({
title: t('label.recent-runs'),
dataIndex: 'recentRuns',
key: 'recentRuns',
width: 180,
render: (_, record) => (
<div className="tw-flex">{getStatuses(record)}</div>
<IngestionRecentRuns classNames="align-middle" ingestion={record} />
),
},
{
@ -573,7 +514,6 @@ const Ingestion: React.FC<IngestionProps> = ({
NO_PERMISSION_TO_VIEW,
permissions,
airflowEndpoint,
getStatuses,
getTriggerDeployButton,
isRequiredDetailsAvailable,
handleEnableDisableIngestion,
@ -592,10 +532,8 @@ const Ingestion: React.FC<IngestionProps> = ({
const getIngestionTab = () => {
return (
<div
className="tw-px-4 tw-mt-4"
data-testid="ingestion-details-container">
<div className="tw-flex">
<div className="mt-4" data-testid="ingestion-details-container">
<div className="d-flex">
{!isRequiredDetailsAvailable && (
<div className="tw-rounded tw-bg-error-lite tw-text-error tw-font-medium tw-px-4 tw-py-1 tw-mb-4 tw-flex tw-items-center tw-gap-1">
<FontAwesomeIcon icon={faExclamationCircle} />

View File

@ -107,6 +107,12 @@ jest.mock(
}
);
jest.mock('./IngestionRecentRun/IngestionRecentRuns.component', () => ({
IngestionRecentRuns: jest
.fn()
.mockImplementation(() => <p>IngestionRecentRuns</p>),
}));
describe('Test Ingestion page', () => {
it('Page Should render', async () => {
const { container } = render(

View File

@ -0,0 +1,80 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { act, render, screen } from '@testing-library/react';
import React from 'react';
import { getRunHistoryForPipeline } from '../../../axiosAPIs/ingestionPipelineAPI';
import { IngestionPipeline } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { IngestionRecentRuns } from './IngestionRecentRuns.component';
jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({
getRunHistoryForPipeline: jest.fn().mockImplementation(() =>
Promise.resolve({
data: [
{
runId: '7e369da9-4d0e-4887-b99b-1a35cfab551e',
pipelineState: 'success',
startDate: 1667307722,
timestamp: 1667307722,
endDate: 1667307725,
},
{
runId: 'c95cc97b-9ea2-465c-9b5a-255401674324',
pipelineState: 'success',
startDate: 1667304123,
timestamp: 1667304123,
endDate: 1667304126,
},
{
runId: '60b3e15c-3865-4c81-a1ee-36ff85d2be8e',
pipelineState: 'success',
startDate: 1667301533,
timestamp: 1667301533,
endDate: 1667301536,
},
{
runId: 'a2c6fbf9-952f-4ddd-9b01-c203bf54f0fe',
pipelineState: 'success',
startDate: 1667297370,
timestamp: 1667297370,
endDate: 1667297373,
},
],
paging: { total: 4 },
})
),
}));
const mockIngestion = { fullyQualifiedName: 'test' } as IngestionPipeline;
describe('Test IngestionRecentRun component', () => {
it('should call getRunHistoryForPipeline to fetch all the status', async () => {
act(() => {
render(<IngestionRecentRuns ingestion={mockIngestion} />);
});
expect(getRunHistoryForPipeline).toBeCalledWith('test', expect.anything());
});
it('should render runs when API returns runs', async () => {
await act(async () => {
render(<IngestionRecentRuns ingestion={mockIngestion} />);
});
const runs = await screen.findAllByTestId('pipeline-status');
const successRun = await screen.findByText(/Success/);
expect(successRun).toBeInTheDocument();
expect(runs).toHaveLength(4);
});
});

View File

@ -0,0 +1,135 @@
/*
* Copyright 2022 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Popover, Skeleton, Space } from 'antd';
import { capitalize } from 'lodash';
import React, {
FunctionComponent,
useCallback,
useEffect,
useState,
} from 'react';
import { useTranslation } from 'react-i18next';
import { getRunHistoryForPipeline } from '../../../axiosAPIs/ingestionPipelineAPI';
import {
IngestionPipeline,
PipelineStatus,
} from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import {
getCurrentDateTimeMillis,
getDateTimeFromMilliSeconds,
getPastDaysDateTimeMillis,
} from '../../../utils/TimeUtils';
interface Props {
ingestion: IngestionPipeline;
classNames?: string;
}
const queryParams = {
startTs: getPastDaysDateTimeMillis(1),
endTs: getCurrentDateTimeMillis(),
};
export const IngestionRecentRuns: FunctionComponent<Props> = ({
ingestion,
classNames,
}: Props) => {
const { t } = useTranslation();
const [recentRunStatus, setRecentRunStatus] = useState<PipelineStatus[]>();
const [loading, setLoading] = useState(true);
const fetchPipelineStatus = useCallback(async () => {
setLoading(true);
try {
const response = await getRunHistoryForPipeline(
ingestion.fullyQualifiedName || '',
queryParams
);
const runs = response.data.splice(0, 5).reverse() ?? [];
setRecentRunStatus(
runs.length === 0
? [ingestion.pipelineStatuses as PipelineStatus]
: runs
);
} finally {
setLoading(false);
}
}, [ingestion.fullyQualifiedName]);
useEffect(() => {
if (ingestion.fullyQualifiedName) {
fetchPipelineStatus();
}
}, [ingestion.fullyQualifiedName]);
return (
<Space className={classNames} size={2}>
{loading ? (
<Skeleton.Input size="small" />
) : (
recentRunStatus?.map((r, i) => {
const status =
i === recentRunStatus.length - 1 ? (
<p
className={`tw-h-5 tw-w-16 tw-rounded-sm tw-bg-status-${r?.pipelineState} tw-px-1 tw-text-white tw-text-center`}
data-testid="pipeline-status"
key={i}>
{capitalize(r?.pipelineState)}
</p>
) : (
<p
className={`tw-w-4 tw-h-5 tw-rounded-sm tw-bg-status-${r?.pipelineState} `}
data-testid="pipeline-status"
key={i}
/>
);
const showTooltip = r?.endDate || r?.startDate || r?.timestamp;
return showTooltip ? (
<Popover
key={i}
title={
<div className="tw-text-left">
{r.timestamp && (
<p>
{t('label.execution-date')} :{' '}
{getDateTimeFromMilliSeconds(r.timestamp)}
</p>
)}
{r.startDate && (
<p>
{t('label.start-date')}:{' '}
{getDateTimeFromMilliSeconds(r.startDate)}
</p>
)}
{r.endDate && (
<p>
{t('label.end-date')} :{' '}
{getDateTimeFromMilliSeconds(r.endDate)}
</p>
)}
</div>
}>
{status}
</Popover>
) : (
status
);
}) ?? 'Queued'
)}
</Space>
);
};

View File

@ -11,10 +11,9 @@
* limitations under the License.
*/
import { IngestionType, ServiceCategory } from '../../enums/service.enum';
import { ServiceCategory } from '../../enums/service.enum';
import { DatabaseService } from '../../generated/entity/services/databaseService';
import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { EntityReference } from '../../generated/type/entityReference';
import { Paging } from '../../generated/type/paging';
import { ServicesType } from '../../interface/service.interface';
import { OperationPermission } from '../PermissionProvider/PermissionProvider.interface';
@ -30,24 +29,6 @@ export interface ConnectorConfig {
excludeDataProfiler?: boolean;
enableDataProfiler?: boolean;
}
export interface IngestionData {
id?: string;
name: string;
displayName: string;
ingestionType: IngestionType;
service: EntityReference;
scheduleInterval: string;
ingestionStatuses?: Array<{
state: string;
startDate: string;
endDate: string;
}>;
nextExecutionDate?: string;
connectorConfig?: ConnectorConfig;
owner?: { id: string; name?: string; type: string };
startDate?: string;
endDate?: string;
}
export interface IngestionProps {
airflowEndpoint: string;

View File

@ -13,7 +13,7 @@
import { act, fireEvent, render, screen } from '@testing-library/react';
import React from 'react';
import { postkillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI';
import { postKillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI';
import KillIngestionModal from './KillIngestionPipelineModal';
const mockHandleClose = jest.fn();
@ -28,7 +28,7 @@ const mockProps = {
};
jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({
postkillIngestionPipelineById: jest
postKillIngestionPipelineById: jest
.fn()
.mockImplementation(() => Promise.resolve()),
}));
@ -74,7 +74,7 @@ describe('Test Kill Ingestion Modal component', () => {
fireEvent.click(confirmButton);
expect(postkillIngestionPipelineById).toHaveBeenCalledWith(
expect(postKillIngestionPipelineById).toHaveBeenCalledWith(
mockProps.pipelineId
);
});

View File

@ -15,7 +15,7 @@ import { Modal, Typography } from 'antd';
import { AxiosError } from 'axios';
import React, { FC, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { postkillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI';
import { postKillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI';
import { showErrorToast, showSuccessToast } from '../../../utils/ToastUtils';
interface KillIngestionModalProps {
@ -39,7 +39,7 @@ const KillIngestionModal: FC<KillIngestionModalProps> = ({
const handleConfirm = async () => {
setIsLoading(true);
try {
const response = await postkillIngestionPipelineById(pipelineId);
const response = await postKillIngestionPipelineById(pipelineId);
const status = response.status;
if (status === 200) {
onClose();

View File

@ -35,10 +35,7 @@ import {
import { Operation } from '../../generated/entity/policies/policy';
import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import jsonData from '../../jsons/en';
import {
getIngestionStatuses,
getLoadingStatus,
} from '../../utils/CommonUtils';
import { getLoadingStatus } from '../../utils/CommonUtils';
import { checkPermission, userPermissions } from '../../utils/PermissionsUtils';
import {
getLogsViewerPath,
@ -48,6 +45,7 @@ import SVGIcons, { Icons } from '../../utils/SvgUtils';
import { showErrorToast, showSuccessToast } from '../../utils/ToastUtils';
import ErrorPlaceHolderIngestion from '../common/error-with-placeholder/ErrorPlaceHolderIngestion';
import PopOver from '../common/popover/PopOver';
import { IngestionRecentRuns } from '../Ingestion/IngestionRecentRun/IngestionRecentRuns.component';
import Loader from '../Loader/Loader';
import EntityDeleteModal from '../Modals/EntityDeleteModal/EntityDeleteModal';
import KillIngestionModal from '../Modals/KillIngestionPipelineModal/KillIngestionPipelineModal';
@ -381,7 +379,9 @@ const TestSuitePipelineTab = () => {
dataIndex: 'pipelineStatuses',
key: 'recentRuns',
render: (_, record) => (
<Row align="middle">{getIngestionStatuses(record)}</Row>
<Row align="middle">
<IngestionRecentRuns ingestion={record} />
</Row>
),
},
{

View File

@ -193,8 +193,11 @@
"pause": "Pause",
"metadata-ingestion": "Metadata Ingestion",
"unpause": "UnPause",
"end-date": "End Date",
"execution-date": "Execution Date",
"start-date": "Start Date",
"view-dag": "View Dag",
"search-team": "Search Team",
"end-date": "End Date",
"show-deleted": "Show deleted",
"add-bot": "Add Bot",

View File

@ -41,6 +41,15 @@ jest.mock('../../axiosAPIs/ingestionPipelineAPI', () => ({
.mockImplementation(() => Promise.resolve(mockIngestionPipeline)),
}));
jest.mock(
'../../components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component',
() => ({
IngestionRecentRuns: jest
.fn()
.mockImplementation(() => <p>IngestionRecentRuns</p>),
})
);
describe('LogsViewer.component', () => {
it('On initial, component should render', async () => {
await act(async () => {

View File

@ -30,11 +30,11 @@ import {
} from '../../axiosAPIs/ingestionPipelineAPI';
import { CopyToClipboardButton } from '../../components/buttons/CopyToClipboardButton/CopyToClipboardButton';
import TitleBreadcrumb from '../../components/common/title-breadcrumb/title-breadcrumb.component';
import { IngestionRecentRuns } from '../../components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component';
import Loader from '../../components/Loader/Loader';
import { PipelineType } from '../../generated/api/services/ingestionPipelines/createIngestionPipeline';
import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { Paging } from '../../generated/type/paging';
import { getIngestionStatuses } from '../../utils/CommonUtils';
import { getLogBreadCrumbs } from '../../utils/LogsViewer.utils';
import { showErrorToast } from '../../utils/ToastUtils';
import LogViewerSkeleton from './LogsViewer-skeleton.component';
@ -197,10 +197,11 @@ const LogsViewer = () => {
return {
Type: ingestionDetails?.pipelineType || '--',
Schedule: ingestionDetails?.airflowConfig.scheduleInterval || '--',
['Recent Runs']:
getIngestionStatuses(
ingestionDetails ? ingestionDetails : ({} as IngestionPipeline)
) || '--',
['Recent Runs']: ingestionDetails?.fullyQualifiedName ? (
<IngestionRecentRuns ingestion={ingestionDetails} />
) : (
'--'
),
};
}, [ingestionDetails]);

View File

@ -47,7 +47,6 @@ import {
getHourCron,
} from '../components/common/CronEditor/CronEditor.constant';
import ErrorPlaceHolder from '../components/common/error-with-placeholder/ErrorPlaceHolder';
import PopOver from '../components/common/popover/PopOver';
import Loader from '../components/Loader/Loader';
import { FQN_SEPARATOR_CHAR } from '../constants/char.constants';
import {
@ -75,10 +74,7 @@ import { Topic } from '../generated/entity/data/topic';
import { Webhook } from '../generated/entity/events/webhook';
import { ThreadTaskStatus, ThreadType } from '../generated/entity/feed/thread';
import { Policy } from '../generated/entity/policies/policy';
import {
IngestionPipeline,
PipelineType,
} from '../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { PipelineType } from '../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { Role } from '../generated/entity/teams/role';
import { Team } from '../generated/entity/teams/team';
import { EntityReference, User } from '../generated/entity/teams/user';
@ -822,60 +818,6 @@ export const getTeamsUser = (
return;
};
export const getIngestionStatuses = (ingestion: IngestionPipeline) => {
const lastFiveIngestions = ingestion.pipelineStatuses
?.sort((a, b) => {
// Turn your strings into millis, and then subtract them
// to get a value that is either negative, positive, or zero.
const date1 = new Date(a.startDate || '');
const date2 = new Date(b.startDate || '');
return date1.getTime() - date2.getTime();
})
.slice(Math.max(ingestion.pipelineStatuses.length - 5, 0));
return lastFiveIngestions?.map((r, i) => {
const status =
i === lastFiveIngestions.length - 1 ? (
<p
className={`tw-h-5 tw-w-16 tw-rounded-sm tw-bg-status-${r.state} tw-mr-1 tw-px-1 tw-text-white tw-text-center`}
key={i}>
{capitalize(r.state)}
</p>
) : (
<p
className={`tw-w-4 tw-h-5 tw-rounded-sm tw-bg-status-${r.state} tw-mr-1`}
key={i}
/>
);
return r?.endDate || r?.startDate || r?.timestamp ? (
<PopOver
html={
<div className="tw-text-left">
{r.timestamp ? (
<p>Execution Date: {new Date(r.timestamp).toUTCString()}</p>
) : null}
{r.startDate ? (
<p>Start Date: {new Date(r.startDate).toUTCString()}</p>
) : null}
{r.endDate ? (
<p>End Date: {new Date(r.endDate).toUTCString()}</p>
) : null}
</div>
}
key={i}
position="bottom"
theme="light"
trigger="mouseenter">
{status}
</PopOver>
) : (
status
);
});
};
export const getDiffArray = (
compareWith: string[],
toCompare: string[]

View File

@ -283,6 +283,19 @@ export const getPastDatesTimeStampFromCurrentDate = (pastDayCount: number) =>
*/
export const getCurrentDateTimeStamp = () => DateTime.now().toUnixInteger();
/**
* Get the current date and time in milliseconds.
*/
export const getCurrentDateTimeMillis = () => DateTime.now().toMillis();
/**
* It returns the number of milliseconds since the Unix Epoch for a date that is pastDayCount days before
* the current date
* @param {number} days - The number of days you want to go back from the current date.
*/
export const getPastDaysDateTimeMillis = (days: number) =>
DateTime.now().minus({ days }).toMillis();
/**
* It takes a timestamp in seconds and returns a formatted date string
* @param {number} timeStamp - The timeStamp in seconds.