diff --git a/ingestion/src/metadata/data_insight/api/workflow.py b/ingestion/src/metadata/data_insight/api/workflow.py index 8a4ed6cab3a..f2f925a19b6 100644 --- a/ingestion/src/metadata/data_insight/api/workflow.py +++ b/ingestion/src/metadata/data_insight/api/workflow.py @@ -34,6 +34,9 @@ from metadata.generated.schema.analytics.reportData import ReportDataType from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, Sink, @@ -42,6 +45,9 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import data_insight_logger +from metadata.utils.workflow_helper import ( + set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, +) from metadata.utils.workflow_output_handler import print_data_insight_status logger = data_insight_logger() @@ -60,6 +66,7 @@ class DataInsightWorkflow: self.config.workflowConfig.openMetadataServerConfig ) self.metadata = OpenMetadata(self.metadata_config) + self.set_ingestion_pipeline_status(state=PipelineState.running) self.status = ProcessorStatus() self.data_processor: Optional[ @@ -155,3 +162,22 @@ class DataInsightWorkflow: ): return 1 return 0 + + def stop(self): + """ + Close all connections + """ + self.set_ingestion_pipeline_status(PipelineState.success) + self.metadata.close() + + def set_ingestion_pipeline_status(self, state: PipelineState): + """ + Method to set the pipeline status of current ingestion pipeline + """ + pipeline_run_id = set_ingestion_pipeline_status_helper( + state=state, + ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, + pipeline_run_id=self.config.pipelineRunId, + metadata=self.metadata, + ) + self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/ingestion/api/workflow.py b/ingestion/src/metadata/ingestion/api/workflow.py index 3d58ce28fd6..e5f053c496b 100644 --- a/ingestion/src/metadata/ingestion/api/workflow.py +++ b/ingestion/src/metadata/ingestion/api/workflow.py @@ -11,16 +11,20 @@ """ Workflow definition for metadata related ingestions: metadata, lineage and usage. """ -# module building strings read better with .format instead of f-strings -# pylint: disable=consider-using-f-string import importlib import traceback + +# module building strings read better with .format instead of f-strings +# pylint: disable=consider-using-f-string from typing import Optional, Type, TypeVar from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseMetadataConfigType, @@ -41,6 +45,9 @@ from metadata.utils.class_helper import ( get_service_type_from_source_type, ) from metadata.utils.logger import ingestion_logger, set_loggers_level +from metadata.utils.workflow_helper import ( + set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, +) from metadata.utils.workflow_output_handler import print_status logger = ingestion_logger() @@ -49,6 +56,8 @@ T = TypeVar("T") SAMPLE_SOURCE = {"sample-data", "sample-usage"} +SUCCESS_THRESHOLD_VALUE = 90 + class InvalidWorkflowJSONException(Exception): """ @@ -92,9 +101,13 @@ class Workflow: self.config.workflowConfig.openMetadataServerConfig ) - self._retrieve_service_connection_if_needed(metadata_config, service_type) + self.metadata = OpenMetadata(config=metadata_config) - self._retrieve_dbt_config_source_if_needed(metadata_config, service_type) + self.set_ingestion_pipeline_status(state=PipelineState.running) + + self._retrieve_service_connection_if_needed(service_type) + + self._retrieve_dbt_config_source_if_needed(service_type) logger.info(f"Service type:{service_type},{source_type} configured") @@ -217,10 +230,27 @@ class Workflow: self.bulk_sink.close() if hasattr(self, "sink"): self.sink.close() + + pipeline_state = PipelineState.success + if ( + self._get_source_success() >= SUCCESS_THRESHOLD_VALUE + and self._get_source_success() < 100 + ): + pipeline_state = PipelineState.partialSuccess + self.set_ingestion_pipeline_status(pipeline_state) self.source.close() + def _get_source_success(self): + return self.source.get_status().calculate_success() + def raise_from_status(self, raise_warnings=False): - if self.source.get_status().failures: + """ + Method to raise error if failed execution + """ + if ( + self.source.get_status().failures + and self._get_source_success() < SUCCESS_THRESHOLD_VALUE + ): raise WorkflowExecutionError( "Source reported errors", self.source.get_status() ) @@ -258,15 +288,24 @@ class Workflow: return 0 - def _retrieve_service_connection_if_needed( - self, metadata_config: OpenMetadataConnection, service_type: ServiceType - ) -> None: + def set_ingestion_pipeline_status(self, state: PipelineState): + """ + Method to set the pipeline status of current ingestion pipeline + """ + pipeline_run_id = set_ingestion_pipeline_status_helper( + state=state, + ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, + pipeline_run_id=self.config.pipelineRunId, + metadata=self.metadata, + ) + self.config.pipelineRunId = pipeline_run_id + + def _retrieve_service_connection_if_needed(self, service_type: ServiceType) -> None: """ We override the current `serviceConnection` source config object if source workflow service already exists in OM. When secrets' manager is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get the service connection from the service object itself through the default `SecretsManager`. - :param metadata_config: OpenMetadata connection config :param service_type: source workflow service type :return: """ @@ -274,28 +313,23 @@ class Workflow: self.config.source.type ): service_name = self.config.source.serviceName - metadata = OpenMetadata(config=metadata_config) try: - service = metadata.get_by_name( + service = self.metadata.get_by_name( get_service_class_from_service_type(service_type), service_name, ) if service: - self.config.source.serviceConnection = ( - metadata.secrets_manager_client.retrieve_service_connection( - service, service_type.name.lower() - ) + self.config.source.serviceConnection = self.metadata.secrets_manager_client.retrieve_service_connection( # pylint: disable=line-too-long + service, service_type.name.lower() ) except Exception as exc: logger.debug(traceback.format_exc()) logger.error( f"Error getting dbtConfigSource for service name [{service_name}]" - f" using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}" + f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) - def _retrieve_dbt_config_source_if_needed( - self, metadata_config: OpenMetadataConnection, service_type: ServiceType - ) -> None: + def _retrieve_dbt_config_source_if_needed(self, service_type: ServiceType) -> None: """ We override the current `config` source config object if it is a metadata ingestion type. When secrets' manager is configured, we retrieve the config from the secrets' manager. Otherwise, we get the config from the source @@ -309,10 +343,9 @@ class Workflow: and config and config.type == DatabaseMetadataConfigType.DatabaseMetadata ): - metadata = OpenMetadata(config=metadata_config) try: dbt_config_source: object = ( - metadata.secrets_manager_client.retrieve_dbt_source_config( + self.metadata.secrets_manager_client.retrieve_dbt_source_config( self.config.source.sourceConfig, self.config.source.serviceName, ) @@ -327,7 +360,7 @@ class Workflow: logger.debug(traceback.format_exc()) logger.error( f"Error getting dbtConfigSource for config [{config}] using the secrets manager" - f" provider [{metadata.config.secretsManagerProvider}]: {exc}" + f" provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) @staticmethod diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py new file mode 100644 index 00000000000..947e369c985 --- /dev/null +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -0,0 +1,67 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Mixin class containing ingestion pipeline specific methods + +To be used by OpenMetadata class +""" + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineStatus, +) +from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.utils import ometa_logger + +logger = ometa_logger() + + +class OMetaIngestionPipelineMixin: + """ + OpenMetadata API methods related to ingestion pipeline. + + To be inherited by OpenMetadata + """ + + client: REST + + def create_or_update_pipeline_status( + self, ingestion_pipeline_fqn: str, pipeline_status: PipelineStatus + ) -> None: + """ + PUT create or update pipeline status + + :param ingestion_pipeline_fqn: Ingestion Pipeline FQN + :param pipeline_status: Pipeline Status data to add + """ + resp = self.client.put( + f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus", + data=pipeline_status.json(), + ) + logger.debug( + f"Created Pipeline Status for pipeline {ingestion_pipeline_fqn}: {resp}" + ) + return resp + + def get_pipeline_status( + self, ingestion_pipeline_fqn: str, pipeline_status_run_id: str + ) -> None: + """ + GET pipeline status + + :param ingestion_pipeline_fqn: Ingestion Pipeline FQN + :param pipeline_status_run_id: Pipeline Status run id + """ + resp = self.client.get( + f"/services/ingestionPipelines/{ingestion_pipeline_fqn}/pipelineStatus/{pipeline_status_run_id}" + ) + if resp: + return PipelineStatus(**resp) + return None diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 9c772e70e34..0a0bde4e6a1 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -67,6 +67,9 @@ from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInisghtMixin from metadata.ingestion.ometa.mixins.es_mixin import ESMixin from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin +from metadata.ingestion.ometa.mixins.ingestion_pipeline_mixin import ( + OMetaIngestionPipelineMixin, +) from metadata.ingestion.ometa.mixins.mlmodel_mixin import OMetaMlModelMixin from metadata.ingestion.ometa.mixins.patch_mixin import OMetaPatchMixin from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin @@ -147,6 +150,7 @@ class OpenMetadata( OMetaPatchMixin, OMetaTestsMixin, DataInisghtMixin, + OMetaIngestionPipelineMixin, Generic[T, C], ): """ diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index bbc2668f0f9..1d8b5fe35e1 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -38,6 +38,9 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, @@ -69,6 +72,9 @@ from metadata.utils.class_helper import ( ) from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.logger import profiler_logger +from metadata.utils.workflow_helper import ( + set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, +) from metadata.utils.workflow_output_handler import print_profiler_status logger = profiler_logger() @@ -101,6 +107,8 @@ class ProfilerWorkflow: self._retrieve_service_connection_if_needed() + self.set_ingestion_pipeline_status(state=PipelineState.running) + # Init and type the source config self.source_config: DatabaseServiceProfilerPipeline = cast( DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config @@ -516,6 +524,7 @@ class ProfilerWorkflow: """ Close all connections """ + self.set_ingestion_pipeline_status(PipelineState.success) self.metadata.close() def _retrieve_service_connection_if_needed(self) -> None: @@ -544,3 +553,15 @@ class ProfilerWorkflow: @staticmethod def _is_sample_source(service_type): return service_type in {"sample-data", "sample-usage"} + + def set_ingestion_pipeline_status(self, state: PipelineState): + """ + Method to set the pipeline status of current ingestion pipeline + """ + pipeline_run_id = set_ingestion_pipeline_status_helper( + state=state, + ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, + pipeline_run_id=self.config.pipelineRunId, + metadata=self.metadata, + ) + self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index 4c941703a42..77254427aad 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -36,6 +36,9 @@ from metadata.generated.schema.entity.services.databaseService import ( DatabaseService, DatabaseServiceType, ) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) @@ -55,6 +58,9 @@ from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcesso from metadata.test_suite.runner.core import DataTestsRunner from metadata.utils import entity_link from metadata.utils.logger import test_suite_logger +from metadata.utils.workflow_helper import ( + set_ingestion_pipeline_status as set_ingestion_pipeline_status_helper, +) from metadata.utils.workflow_output_handler import print_test_suite_status logger: Logger = test_suite_logger() @@ -88,6 +94,8 @@ class TestSuiteWorkflow: ) self.metadata = OpenMetadata(self.metadata_config) + self.set_ingestion_pipeline_status(state=PipelineState.running) + self.status = ProcessorStatus() if self.config.sink: @@ -485,4 +493,17 @@ class TestSuiteWorkflow: """ Close all connections """ + self.set_ingestion_pipeline_status(PipelineState.success) self.metadata.close() + + def set_ingestion_pipeline_status(self, state: PipelineState): + """ + Method to set the pipeline status of current ingestion pipeline + """ + pipeline_run_id = set_ingestion_pipeline_status_helper( + state=state, + ingestion_pipeline_fqn=self.config.ingestionPipelineFQN, + pipeline_run_id=self.config.pipelineRunId, + metadata=self.metadata, + ) + self.config.pipelineRunId = pipeline_run_id diff --git a/ingestion/src/metadata/utils/workflow_helper.py b/ingestion/src/metadata/utils/workflow_helper.py new file mode 100644 index 00000000000..b6c3269cebf --- /dev/null +++ b/ingestion/src/metadata/utils/workflow_helper.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Modules contains helper methods for different workflows +""" +import uuid +from datetime import datetime + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, + PipelineStatus, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +def set_ingestion_pipeline_status( + state: PipelineState, + ingestion_pipeline_fqn: str, + pipeline_run_id: str, + metadata: OpenMetadata, +) -> str: + """ + Method to set the pipeline status of current ingestion pipeline + """ + + if ingestion_pipeline_fqn: + # if ingestion pipeline fqn is not set then setting pipeline status is avoided + pipeline_status: PipelineStatus = None + + if state in (PipelineState.queued, PipelineState.running): + pipeline_run_id = pipeline_run_id or str(uuid.uuid4()) + pipeline_status = PipelineStatus( + runId=pipeline_run_id, + pipelineState=state, + startDate=datetime.now().timestamp() * 1000, + timestamp=datetime.now().timestamp() * 1000, + ) + elif pipeline_run_id: + pipeline_status = metadata.get_pipeline_status( + ingestion_pipeline_fqn, pipeline_run_id + ) + # if workflow is ended then update the end date in status + pipeline_status.endDate = datetime.now().timestamp() * 1000 + pipeline_status.pipelineState = state + metadata.create_or_update_pipeline_status( + ingestion_pipeline_fqn, pipeline_status + ) + return pipeline_run_id + return None diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/api/response.py b/openmetadata-airflow-apis/openmetadata_managed_apis/api/response.py index 496851d342a..3015100b408 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/api/response.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/api/response.py @@ -74,7 +74,7 @@ class ResponseFormat: Build the pipeline status """ return PipelineStatus( - state=dag_run.get_state(), + pipelineState=dag_run.get_state(), runId=dag_run.run_id, startDate=datetime_to_ts(dag_run.start_date), endDate=datetime_to_ts(dag_run.end_date), diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index b31500b3b97..cca8c00646e 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -44,6 +44,7 @@ from openmetadata_managed_apis.workflows.ingestion.credentials_builder import ( from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, + PipelineState, ) from metadata.generated.schema.metadataIngestion.workflow import ( LogLevels, @@ -156,14 +157,16 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): This is the callable used to create the PythonOperator """ set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) - config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = Workflow.create(config) - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() + try: + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + except Exception as err: + workflow.set_ingestion_pipeline_status(PipelineState.failed) + raise err def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -179,12 +182,15 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = ProfilerWorkflow.create(config) - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() + try: + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + except Exception as err: + workflow.set_ingestion_pipeline_status(PipelineState.failed) + raise err def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -200,12 +206,16 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - workflow = TestSuiteWorkflow.create(config) - workflow.execute() - workflow.raise_from_status() - workflow.print_status() - workflow.stop() + + try: + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + except Exception as err: + workflow.set_ingestion_pipeline_status(PipelineState.failed) + raise err def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -223,9 +233,14 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = DataInsightWorkflow.create(config) - workflow.execute() - workflow.raise_from_status() - workflow.print_status() + try: + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + except Exception as err: + workflow.set_ingestion_pipeline_status(PipelineState.failed) + raise err def date_to_datetime( diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index d4b7a155f59..aa6717a6e68 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -48,6 +48,7 @@ def build_data_insight_workflow_config( loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py index dae2ce01b40..c99dcba21ff 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py @@ -51,6 +51,7 @@ def build_lineage_workflow_config( config={}, ), workflowConfig=build_workflow_config_property(ingestion_pipeline), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py index b2442e7bc0f..4852d8cb360 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py @@ -48,6 +48,7 @@ def build_metadata_workflow_config( config={}, ), workflowConfig=build_workflow_config_property(ingestion_pipeline), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 37f8485cd23..090d70cdef1 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -56,6 +56,7 @@ def build_profiler_workflow_config( loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 7661310f9a2..025858d1bf2 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -52,6 +52,7 @@ def build_test_suite_workflow_config( loggerLevel=ingestion_pipeline.loggerLevel or LogLevels.INFO, openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index ee18bc29c12..5add390bba3 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -66,6 +66,7 @@ def build_usage_config_from_file( config={"filename": filename}, ), workflowConfig=build_workflow_config_property(ingestion_pipeline), + ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__, ) diff --git a/openmetadata-airflow-apis/tests/integration/operations/test_airflow_ops.py b/openmetadata-airflow-apis/tests/integration/operations/test_airflow_ops.py index 42ad41d66a0..3e3c4c1066d 100644 --- a/openmetadata-airflow-apis/tests/integration/operations/test_airflow_ops.py +++ b/openmetadata-airflow-apis/tests/integration/operations/test_airflow_ops.py @@ -159,7 +159,7 @@ class TestAirflowOps(TestCase): res = status(dag_id="dag_status") self.assertEqual(res.status_code, 200) - self.assertEqual(res.json[0].get("state"), "running") + self.assertEqual(res.json[0].get("pipelineState"), "running") self.dag.create_dagrun( run_type=DagRunType.MANUAL, @@ -182,7 +182,7 @@ class TestAirflowOps(TestCase): self.assertEqual(res.status_code, 200) - res_status = {elem.get("state") for elem in res.json} + res_status = {elem.get("pipelineState") for elem in res.json} self.assertEqual(res_status, {"failed", "success"}) def test_dag_state(self): @@ -227,6 +227,7 @@ class TestAirflowOps(TestCase): id=uuid.uuid4(), pipelineType=PipelineType.metadata, name="my_new_dag", + fullyQualifiedName="test-service-ops.my_new_dag", sourceConfig=SourceConfig(config=DatabaseServiceMetadataPipeline()), openMetadataServerConnection=self.conn, airflowConfig=AirflowConfig(), diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index fb69861f01d..8e9ce0684a0 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -15,6 +15,7 @@ Validate metadata ingestion workflow generation import json import uuid from unittest import TestCase +from unittest.mock import patch from openmetadata_managed_apis.workflows.ingestion.lineage import ( build_lineage_workflow_config, @@ -73,6 +74,10 @@ from metadata.orm_profiler.api.workflow import ProfilerWorkflow from metadata.test_suite.api.workflow import TestSuiteWorkflow +def mock_set_ingestion_pipeline_status(self, state): + return True + + class OMetaServiceTest(TestCase): """ Run this integration test with the local API available @@ -181,6 +186,9 @@ class OMetaServiceTest(TestCase): hard_delete=True, ) + @patch.object( + Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status + ) def test_ingestion_workflow(self): """ Validate that the ingestionPipeline can be parsed @@ -209,6 +217,9 @@ class OMetaServiceTest(TestCase): Workflow.create(config) + @patch.object( + Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status + ) def test_usage_workflow(self): """ Validate that the ingestionPipeline can be parsed @@ -239,6 +250,9 @@ class OMetaServiceTest(TestCase): Workflow.create(config) + @patch.object( + Workflow, "set_ingestion_pipeline_status", mock_set_ingestion_pipeline_status + ) def test_lineage_workflow(self): """ Validate that the ingestionPipeline can be parsed @@ -269,6 +283,11 @@ class OMetaServiceTest(TestCase): Workflow.create(config) + @patch.object( + ProfilerWorkflow, + "set_ingestion_pipeline_status", + mock_set_ingestion_pipeline_status, + ) def test_profiler_workflow(self): """ Validate that the ingestionPipeline can be parsed @@ -297,6 +316,11 @@ class OMetaServiceTest(TestCase): ProfilerWorkflow.create(config) + @patch.object( + TestSuiteWorkflow, + "set_ingestion_pipeline_status", + mock_set_ingestion_pipeline_status, + ) def test_test_suite_workflow(self): """ Validate that the ingestionPipeline can be parsed diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index e4dce36f7cc..00225344687 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -59,7 +59,7 @@ public final class Entity { public static final String FIELD_FOLLOWERS = "followers"; public static final String FIELD_TAGS = "tags"; public static final String FIELD_DELETED = "deleted"; - public static final String FIELD_PIPELINE_STATUSES = "pipelineStatuses"; + public static final String FIELD_PIPELINE_STATUS = "pipelineStatus"; public static final String FIELD_DISPLAY_NAME = "displayName"; public static final String FIELD_EXTENSION = "extension"; public static final String FIELD_USAGE_SUMMARY = "usageSummary"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java index bc73d89e666..9371838c07e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/airflow/AirflowRESTClient.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.net.URI; import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.util.List; import java.util.Map; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; @@ -26,7 +25,6 @@ import org.json.JSONObject; import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; -import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.service.exception.IngestionPipelineDeploymentException; import org.openmetadata.service.exception.PipelineServiceClientException; @@ -64,6 +62,7 @@ public class AirflowRESTClient extends PipelineServiceClient { String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline); response = post(deployUrl, pipelinePayload); if (response.statusCode() == 200) { + ingestionPipeline.setDeployed(true); return response.body(); } } catch (Exception e) { @@ -159,8 +158,6 @@ public class AirflowRESTClient extends PipelineServiceClient { response = getRequestAuthenticatedForJsonContent(statusEndPoint, serviceURL, API_ENDPOINT, ingestionPipeline.getName()); if (response.statusCode() == 200) { - List statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class); - ingestionPipeline.setPipelineStatuses(statuses); ingestionPipeline.setDeployed(true); return ingestionPipeline; } else if (response.statusCode() == 404) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 8324be869ee..3705111ae5f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3027,6 +3027,23 @@ public interface CollectionDAO { void deleteAtTimestamp( @Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("timestamp") Long timestamp); + @SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN and jsonSchema = :jsonSchema") + List listByFQN(@Bind("entityFQN") String entityFQN, @Bind("jsonSchema") String jsonSchema); + + @SqlQuery( + "SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND jsonSchema = :jsonSchema " + + "ORDER BY timestamp DESC LIMIT 1") + String getLatestExtensionByFQN(@Bind("entityFQN") String entityFQN, @Bind("jsonSchema") String jsonSchema); + + @SqlQuery( + "SELECT json FROM entity_extension_time_series where entityFQN = :entityFQN and jsonSchema = :jsonSchema " + + " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC") + List listBetweenTimestampsByFQN( + @Bind("entityFQN") String entityFQN, + @Bind("jsonSchema") String jsonSchema, + @Bind("startTs") Long startTs, + @Bind("endTs") long endTs); + @SqlQuery( "SELECT json FROM entity_extension_time_series where entityFQN = :entityFQN and extension = :extension " + " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 8ad9f0560ee..3890f07e150 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -17,27 +17,30 @@ import static org.openmetadata.service.Entity.FIELD_OWNER; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; +import java.util.List; +import java.util.UUID; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import org.jdbi.v3.sqlobject.transaction.Transaction; import org.json.JSONObject; -import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig; -import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; -import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.services.ingestionPipelines.*; import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.schema.metadataIngestion.LogLevels; import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection; -import org.openmetadata.schema.type.EntityReference; -import org.openmetadata.schema.type.Relationship; +import org.openmetadata.schema.type.*; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResource; import org.openmetadata.service.secrets.SecretsManager; import org.openmetadata.service.secrets.SecretsManagerFactory; +import org.openmetadata.service.util.*; import org.openmetadata.service.util.EntityUtil.Fields; -import org.openmetadata.service.util.FullyQualifiedName; -import org.openmetadata.service.util.JsonUtils; -import org.openmetadata.service.util.PipelineServiceClient; public class IngestionPipelineRepository extends EntityRepository { private static final String UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled"; private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled"; + + private static final String PIPELINE_STATUS_JSON_SCHEMA = "pipelineStatus"; private static PipelineServiceClient pipelineServiceClient; public IngestionPipelineRepository(CollectionDAO dao) { @@ -134,6 +137,101 @@ public class IngestionPipelineRepository extends EntityRepository addPipelineStatus(UriInfo uriInfo, String fqn, PipelineStatus pipelineStatus) + throws IOException { + // Validate the request content + IngestionPipeline ingestionPipeline = dao.findEntityByName(fqn); + + PipelineStatus storedPipelineStatus = + JsonUtils.readValue( + daoCollection + .entityExtensionTimeSeriesDao() + .getLatestExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatus.getRunId()), + PipelineStatus.class); + if (storedPipelineStatus != null) { + daoCollection + .entityExtensionTimeSeriesDao() + .update( + ingestionPipeline.getFullyQualifiedName(), + pipelineStatus.getRunId(), + JsonUtils.pojoToJson(pipelineStatus), + pipelineStatus.getTimestamp()); + } else { + daoCollection + .entityExtensionTimeSeriesDao() + .insert( + ingestionPipeline.getFullyQualifiedName(), + pipelineStatus.getRunId(), + "pipelineStatus", + JsonUtils.pojoToJson(pipelineStatus)); + } + ChangeDescription change = + addPipelineStatusChangeDescription(ingestionPipeline.getVersion(), pipelineStatus, storedPipelineStatus); + ChangeEvent changeEvent = + getChangeEvent(withHref(uriInfo, ingestionPipeline), change, entityType, ingestionPipeline.getVersion()); + + return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED); + } + + public ResultList listPipelineStatus(String ingestionPipelineFQN, Long startTs, Long endTs) + throws IOException { + IngestionPipeline ingestionPipeline = dao.findEntityByName(ingestionPipelineFQN); + List pipelineStatusList = + JsonUtils.readObjects( + daoCollection + .entityExtensionTimeSeriesDao() + .listBetweenTimestampsByFQN( + ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_JSON_SCHEMA, startTs, endTs), + PipelineStatus.class); + return new ResultList<>( + pipelineStatusList, String.valueOf(startTs), String.valueOf(endTs), pipelineStatusList.size()); + } + + public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) throws IOException { + PipelineStatus pipelineStatus = + JsonUtils.readValue( + daoCollection + .entityExtensionTimeSeriesDao() + .getLatestExtensionByFQN(ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_JSON_SCHEMA), + PipelineStatus.class); + return pipelineStatus; + } + + public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipelineStatusRunId) throws IOException { + IngestionPipeline ingestionPipeline = dao.findEntityByName(ingestionPipelineFQN); + PipelineStatus pipelineStatus = + JsonUtils.readValue( + daoCollection + .entityExtensionTimeSeriesDao() + .getExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatusRunId.toString()), + PipelineStatus.class); + return pipelineStatus; + } + /** Handles entity updated from PUT and POST operation. */ public class IngestionPipelineUpdater extends EntityUpdater { public IngestionPipelineUpdater(IngestionPipeline original, IngestionPipeline updated, Operation operation) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 32dbe88ca12..1bba939809e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -14,8 +14,7 @@ package org.openmetadata.service.resources.services.ingestionpipelines; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; -import static org.openmetadata.service.Entity.FIELD_OWNER; -import static org.openmetadata.service.Entity.FIELD_PIPELINE_STATUSES; +import static org.openmetadata.service.Entity.*; import io.swagger.annotations.Api; import io.swagger.v3.oas.annotations.ExternalDocumentation; @@ -28,33 +27,24 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.io.IOException; import java.net.http.HttpResponse; -import java.util.List; import java.util.Map; import java.util.UUID; import javax.json.JsonPatch; import javax.validation.Valid; import javax.validation.constraints.Max; import javax.validation.constraints.Min; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.PATCH; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.ws.rs.core.UriInfo; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline; import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection; import org.openmetadata.schema.type.EntityHistory; import org.openmetadata.schema.type.Include; @@ -167,11 +157,13 @@ public class IngestionPipelineResource extends EntityResource ingestionPipelines = super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); - if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUSES)) { - addStatus(ingestionPipelines.getData()); + + for (IngestionPipeline ingestionPipeline : listOrEmpty(ingestionPipelines.getData())) { + if (fieldsParam != null && fieldsParam.contains(FIELD_PIPELINE_STATUS)) { + ingestionPipeline.setPipelineStatuses(dao.getLatestPipelineStatus(ingestionPipeline)); + } + decryptOrNullify(securityContext, ingestionPipeline); } - listOrEmpty(ingestionPipelines.getData()) - .forEach(ingestionPipeline -> decryptOrNullify(securityContext, ingestionPipeline)); return ingestionPipelines; } @@ -228,8 +220,8 @@ public class IngestionPipelineResource extends EntityResource listPipelineStatuses( + @Context SecurityContext securityContext, + @Parameter(description = "fqn of the ingestion pipeline", schema = @Schema(type = "string")) @PathParam("fqn") + String fqn, + @Parameter( + description = "Filter pipeline status after the given start timestamp", + schema = @Schema(type = "number")) + @NonNull + @QueryParam("startTs") + Long startTs, + @Parameter( + description = "Filter pipeline status before the given end timestamp", + schema = @Schema(type = "number")) + @NonNull + @QueryParam("endTs") + Long endTs) + throws IOException { + return dao.listPipelineStatus(fqn, startTs, endTs); + } + + @GET + @Path("/{fqn}/pipelineStatus/{id}") + @Operation( + operationId = "getPipelineStatus", + summary = "get pipeline status", + tags = "IngestionPipelines", + description = "Get pipeline status of ingestion pipeline", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully updated state of the PipelineStatus.", + content = + @Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class))) + }) + public PipelineStatus getPipelineStatus( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Encoded + @Parameter(description = "fqn of the ingestion pipeline", schema = @Schema(type = "string")) + @PathParam("fqn") + String fqn, + @Parameter(description = "Pipeline Status Run Id", schema = @Schema(type = "string")) @PathParam("id") UUID runId) + throws IOException { + OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); + authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn)); + return dao.getPipelineStatus(fqn, runId); + } + private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException { OpenMetadataServerConnection openMetadataServerConnection = new OpenMetadataServerConnectionBuilder(openMetadataApplicationConfig).build(); @@ -595,19 +682,6 @@ public class IngestionPipelineResource extends EntityResource ingestionPipelines) { - listOrEmpty(ingestionPipelines).forEach(this::addStatus); - } - - private IngestionPipeline addStatus(IngestionPipeline ingestionPipeline) { - try { - ingestionPipeline = pipelineServiceClient.getPipelineStatus(ingestionPipeline); - } catch (Exception e) { - LOG.error("Failed to fetch status for {} due to {}", ingestionPipeline.getName(), e); - } - return ingestionPipeline; - } - private IngestionPipeline decryptOrNullify(SecurityContext securityContext, IngestionPipeline ingestionPipeline) { SecretsManager secretsManager = SecretsManagerFactory.getSecretsManager(); try { diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json index a4afd382dde..05bc865f8d5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/ingestionPipelines/ingestionPipeline.json @@ -22,9 +22,11 @@ "description": "Pipeline unique run ID.", "type": "string" }, - "state": { + "pipelineState": { "description": "Pipeline status denotes if its failed or succeeded.", - "type": "string" + "type": "string", + "javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType", + "enum": ["queued","success","failed","running","partialSuccess"] }, "startDate": { "description": "startDate of the pipeline run for this particular execution.", @@ -159,12 +161,8 @@ "$ref": "../../../type/entityReference.json" }, "pipelineStatuses": { - "description": "List of executions and status for the Pipeline.", - "type": "array", - "items": { - "$ref": "#/definitions/pipelineStatus" - }, - "default": null + "description": "Last of executions and status for the Pipeline.", + "$ref": "#/definitions/pipelineStatus" }, "loggerLevel": { "description": "Set the logging level for the workflow.", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index 7bf396f7a4e..15d70d94771 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -193,6 +193,14 @@ }, "workflowConfig": { "$ref": "#/definitions/workflowConfig" + }, + "ingestionPipelineFQN": { + "description": "Fully qualified name of ingestion pipeline, used to identify the current ingestion pipeline", + "type": "string" + }, + "pipelineRunId": { + "description": "Unique identifier of pipeline run, used to identify the current pipleine run", + "$ref": "../type/basic.json#/definitions/uuid" } }, "required": ["source", "workflowConfig"], diff --git a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js index 0a70493f3ed..c18627b7e1e 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js @@ -16,7 +16,7 @@ import { SEARCH_INDEX } from '../constants/constants'; export const descriptionBox = - '.toastui-editor-md-container > .toastui-editor > .ProseMirror'; + '.toastui-editor-md-container > .toastui-editor > .ProseMirror'; export const uuid = () => Cypress._.random(0, 1e6); const RETRY_TIMES = 30; @@ -27,257 +27,263 @@ const TEAM_TYPES = ['BusinessUnit', 'Department', 'Division', 'Group']; const isDatabaseService = (type) => type === 'database'; const checkTeamTypeOptions = () => { - for (const teamType of TEAM_TYPES) { - cy.get(`.ant-select-dropdown [title="${teamType}"]`) - .should('exist') - .should('be.visible'); - } + for (const teamType of TEAM_TYPES) { + cy.get(`.ant-select-dropdown [title="${teamType}"]`) + .should('exist') + .should('be.visible'); + } }; //intercepting URL with cy.intercept export const interceptURL = (method, url, alias) => { - cy.intercept({ method: method, url: url }).as(alias); + cy.intercept({ method: method, url: url }).as(alias); }; //waiting for response and validating the response status code export const verifyResponseStatusCode = (alias, responseCode) => { - cy.wait(alias).its('response.statusCode').should('eq', responseCode); + cy.wait(alias).its('response.statusCode').should('eq', responseCode); }; export const handleIngestionRetry = ( - type, - testIngestionButton, - count = 0, - ingestionType = 'metadata' + type, + testIngestionButton, + count = 0, + ingestionType = 'metadata' ) => { - const rowIndex = ingestionType === 'metadata' ? 1 : 2; - // ingestions page + const rowIndex = ingestionType === 'metadata' ? 1 : 2; - let retryCount = count; - const testIngestionsTab = () => { - cy.get('[data-testid="Ingestions"]').should('be.visible'); - cy.get('[data-testid="Ingestions"] >> [data-testid="filter-count"]').should( - 'have.text', - rowIndex - ); - // click on the tab only for the first time - if (retryCount === 0) { - cy.get('[data-testid="Ingestions"]').click(); - } - if (isDatabaseService(type) && testIngestionButton) { - cy.get('[data-testid="add-new-ingestion-button"]').should('be.visible'); - } - }; - const checkSuccessState = () => { - testIngestionsTab(); - retryCount++; - // the latest run should be success - cy.get(`.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)`).then( - ($ingestionStatus) => { - if ( - ($ingestionStatus.text() === 'Running' || - $ingestionStatus.text() === 'Queued') && - retryCount <= RETRY_TIMES - ) { - // retry after waiting for 20 seconds - cy.wait(20000); - cy.reload(); - checkSuccessState(); - } else { - cy.get( - `.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)` - ).should('have.text', 'Success'); + interceptURL('GET', '/api/v1/services/ingestionPipelines/*/pipelineStatus?*', 'pipelineStatuses') + + // ingestions page + let retryCount = count; + const testIngestionsTab = () => { + + cy.get('[data-testid="Ingestions"]').should('be.visible'); + cy.get('[data-testid="Ingestions"] >> [data-testid="filter-count"]').should( + 'have.text', + rowIndex + ); + // click on the tab only for the first time + if (retryCount === 0) { + // Wait for pipeline status to be loaded + cy.get('[data-testid="Ingestions"]').click(); } - } - ); - }; + verifyResponseStatusCode('@pipelineStatuses', 200) + if (isDatabaseService(type) && testIngestionButton) { + cy.get('[data-testid="add-new-ingestion-button"]').should('be.visible'); + } + }; + const checkSuccessState = () => { + testIngestionsTab(); + retryCount++; + // the latest run should be success + cy.get(`.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)`).then( + ($ingestionStatus) => { + if ( + ($ingestionStatus.text() === 'Running' || + $ingestionStatus.text() === 'Queued') && + retryCount <= RETRY_TIMES + ) { + // retry after waiting for 20 seconds + cy.wait(20000); + cy.reload(); + checkSuccessState(); + } else { + cy.get(`.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)`).should( + 'have.text', + 'Success' + ); + } + } + ); + }; - checkSuccessState(); + checkSuccessState(); }; export const scheduleIngestion = () => { - // Schedule & Deploy - cy.contains('Schedule for Ingestion').should('be.visible'); - cy.get('[data-testid="cron-type"]').should('be.visible').select('hour'); - cy.get('[data-testid="deploy-button"]').should('be.visible').click(); + // Schedule & Deploy + cy.contains('Schedule for Ingestion').should('be.visible'); + cy.get('[data-testid="cron-type"]').should('be.visible').select('hour'); + cy.get('[data-testid="deploy-button"]').should('be.visible').click(); - // check success - cy.get('[data-testid="success-line"]', { timeout: 15000 }).should( - 'be.visible' - ); - cy.contains('has been created and deployed successfully').should( - 'be.visible' - ); + // check success + cy.get('[data-testid="success-line"]', { timeout: 15000 }).should( + 'be.visible' + ); + cy.contains('has been created and deployed successfully').should( + 'be.visible' + ); }; //Storing the created service name and the type of service for later use export const testServiceCreationAndIngestion = ( - serviceType, - connectionInput, - addIngestionInput, - serviceName, - type = 'database', - testIngestionButton = true, - configureDBT + serviceType, + connectionInput, + addIngestionInput, + serviceName, + type = 'database', + testIngestionButton = true, + configureDBT ) => { - //Storing the created service name and the type of service - // Select Service in step 1 - cy.get(`[data-testid="${serviceType}"]`).should('exist').click(); - cy.get('[data-testid="next-button"]').should('exist').click(); + //Storing the created service name and the type of service + // Select Service in step 1 + cy.get(`[data-testid="${serviceType}"]`).should('exist').click(); + cy.get('[data-testid="next-button"]').should('exist').click(); - // Enter service name in step 2 - cy.get('[data-testid="service-name"]').should('exist').type(serviceName); - cy.get('[data-testid="next-button"]').click(); + // Enter service name in step 2 + cy.get('[data-testid="service-name"]').should('exist').type(serviceName); + cy.get('[data-testid="next-button"]').click(); - // Connection Details in step 3 - cy.get('[data-testid="add-new-service-container"]') - .parent() - .parent() - .scrollTo('top', { - ensureScrollable: false, - }); - cy.contains('Connection Details').scrollIntoView().should('be.visible'); + // Connection Details in step 3 + cy.get('[data-testid="add-new-service-container"]') + .parent() + .parent() + .scrollTo('top', { + ensureScrollable: false, + }); + cy.contains('Connection Details').scrollIntoView().should('be.visible'); - connectionInput(); + connectionInput(); - // check for the ip-address widget - cy.get('[data-testid="ip-address"]').should('exist'); + // check for the ip-address widget + cy.get('[data-testid="ip-address"]').should('exist'); - // Test the connection - interceptURL( - 'POST', - '/api/v1/services/ingestionPipelines/testConnection', - 'testConnection' - ); - cy.get('[data-testid="test-connection-btn"]').should('exist'); - cy.get('[data-testid="test-connection-btn"]').click(); - verifyResponseStatusCode('@testConnection', 200); - cy.contains('Connection test was successful').should('exist'); - cy.get('[data-testid="submit-btn"]').should('exist').click(); + // Test the connection + interceptURL( + 'POST', + '/api/v1/services/ingestionPipelines/testConnection', + 'testConnection' + ); + cy.get('[data-testid="test-connection-btn"]').should('exist'); + cy.get('[data-testid="test-connection-btn"]').click(); + verifyResponseStatusCode('@testConnection', 200); + cy.contains('Connection test was successful').should('exist'); + cy.get('[data-testid="submit-btn"]').should('exist').click(); - // check success - cy.get('[data-testid="success-line"]').should('be.visible'); - cy.contains(`"${serviceName}"`).should('be.visible'); - cy.contains('has been created successfully').should('be.visible'); + // check success + cy.get('[data-testid="success-line"]').should('be.visible'); + cy.contains(`"${serviceName}"`).should('be.visible'); + cy.contains('has been created successfully').should('be.visible'); - cy.get('[data-testid="add-ingestion-button"]').should('be.visible'); - cy.get('[data-testid="add-ingestion-button"]').click(); + cy.get('[data-testid="add-ingestion-button"]').should('be.visible'); + cy.get('[data-testid="add-ingestion-button"]').click(); - // Add ingestion page - cy.get('[data-testid="add-ingestion-container"]').should('be.visible'); + // Add ingestion page + cy.get('[data-testid="add-ingestion-container"]').should('be.visible'); - if (isDatabaseService(type)) { - cy.get('[data-testid="schema-filter-pattern-checkbox"]').should( - 'be.visible' + if (isDatabaseService(type)) { + cy.get('[data-testid="schema-filter-pattern-checkbox"]').should( + 'be.visible' + ); + + // Set mark-deleted slider to off to disable it. + cy.get('[data-testid="toggle-button-mark-deleted"]') + .should('exist') + .click(); + } + + addIngestionInput(); + + cy.get('[data-testid="next-button"]').should('exist').click(); + + // Configure DBT Model + if (isDatabaseService(type)) { + cy.contains('Configure DBT Model').should('be.visible'); + configureDBT && configureDBT(); + cy.get('[data-testid="submit-btn"]').should('be.visible').click(); + } + + scheduleIngestion(); + + cy.contains(`${serviceName}_metadata`).should('be.visible'); + // On the Right panel + cy.contains('Metadata Ingestion Added & Deployed Successfully').should( + 'be.visible' ); - // Set mark-deleted slider to off to disable it. - cy.get('[data-testid="toggle-button-mark-deleted"]') - .should('exist') - .click(); - } + // wait for ingestion to run + cy.clock(); + cy.wait(10000); - addIngestionInput(); + cy.get('[data-testid="view-service-button"]').should('be.visible'); + cy.get('[data-testid="view-service-button"]').click(); - cy.get('[data-testid="next-button"]').should('exist').click(); - - // Configure DBT Model - if (isDatabaseService(type)) { - cy.contains('Configure DBT Model').should('be.visible'); - configureDBT && configureDBT(); - cy.get('[data-testid="submit-btn"]').should('be.visible').click(); - } - - scheduleIngestion(); - - cy.contains(`${serviceName}_metadata`).should('be.visible'); - // On the Right panel - cy.contains('Metadata Ingestion Added & Deployed Successfully').should( - 'be.visible' - ); - - // wait for ingestion to run - cy.clock(); - cy.wait(10000); - - cy.get('[data-testid="view-service-button"]').should('be.visible'); - cy.get('[data-testid="view-service-button"]').click(); - - handleIngestionRetry(type, testIngestionButton); + handleIngestionRetry(type, testIngestionButton); }; export const deleteCreatedService = (typeOfService, service_Name) => { - //Click on settings page - cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); + //Click on settings page + cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); - // Services page - interceptURL('GET', '/api/v1/services/*', 'getServices'); + // Services page + interceptURL('GET', '/api/v1/services/*', 'getServices'); - cy.get('.ant-menu-title-content') - .contains(typeOfService) - .should('be.visible') - .click(); + cy.get('.ant-menu-title-content') + .contains(typeOfService) + .should('be.visible') + .click(); - verifyResponseStatusCode('@getServices', 200); + verifyResponseStatusCode('@getServices', 200); - //click on created service - cy.get(`[data-testid="service-name-${service_Name}"]`) - .should('exist') - .should('be.visible') - .click(); + //click on created service + cy.get(`[data-testid="service-name-${service_Name}"]`) + .should('exist') + .should('be.visible') + .click(); - cy.get(`[data-testid="inactive-link"]`) - .should('exist') - .should('be.visible') - .invoke('text') - .then((text) => { - expect(text).to.equal(service_Name); - }); + cy.get(`[data-testid="inactive-link"]`) + .should('exist') + .should('be.visible') + .invoke('text') + .then((text) => { + expect(text).to.equal(service_Name); + }); - verifyResponseStatusCode('@getServices', 200); + verifyResponseStatusCode('@getServices', 200); - cy.get('[data-testid="service-delete"]') - .should('exist') - .should('be.visible') - .click(); + cy.get('[data-testid="service-delete"]') + .should('exist') + .should('be.visible') + .click(); - //Clicking on permanent delete radio button and checking the service name - cy.get('[data-testid="hard-delete-option"]') - .contains(service_Name) - .should('be.visible') - .click(); + //Clicking on permanent delete radio button and checking the service name + cy.get('[data-testid="hard-delete-option"]') + .contains(service_Name) + .should('be.visible') + .click(); - cy.get('[data-testid="confirmation-text-input"]') - .should('be.visible') - .type('DELETE'); - interceptURL( - 'DELETE', - '/api/v1/services/*/*?hardDelete=true&recursive=true', - 'deleteService' - ); - interceptURL( - 'GET', - '/api/v1/services/*/name/*?fields=owner', - 'serviceDetails' - ); + cy.get('[data-testid="confirmation-text-input"]') + .should('be.visible') + .type('DELETE'); + interceptURL( + 'DELETE', + '/api/v1/services/*/*?hardDelete=true&recursive=true', + 'deleteService' + ); + interceptURL( + 'GET', + '/api/v1/services/*/name/*?fields=owner', + 'serviceDetails' + ); - cy.get('[data-testid="confirm-button"]').should('be.visible').click(); - verifyResponseStatusCode('@deleteService', 200); - cy.reload(); - verifyResponseStatusCode('@serviceDetails', 404); - cy.contains(`instance for ${service_Name} not found`); - //Checking if the service got deleted successfully - //Click on settings page - cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); + cy.get('[data-testid="confirm-button"]').should('be.visible').click(); + verifyResponseStatusCode('@deleteService', 200); + cy.reload(); + verifyResponseStatusCode('@serviceDetails', 404); + cy.contains(`instance for ${service_Name} not found`); + //Checking if the service got deleted successfully + //Click on settings page + cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); - // Services page - cy.get('.ant-menu-title-content') - .contains(typeOfService) - .should('be.visible') - .click(); + // Services page + cy.get('.ant-menu-title-content') + .contains(typeOfService) + .should('be.visible') + .click(); - cy.get(`[data-testid="service-name-${service_Name}"]`).should('not.exist'); + cy.get(`[data-testid="service-name-${service_Name}"]`).should('not.exist'); }; export const editOwnerforCreatedService = ( @@ -288,11 +294,11 @@ export const editOwnerforCreatedService = ( //Click on settings page cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); - // Services page - cy.get('.ant-menu-title-content') - .contains(service_type) - .should('be.visible') - .click(); + // Services page + cy.get('.ant-menu-title-content') + .contains(service_type) + .should('be.visible') + .click(); interceptURL( 'GET', @@ -315,11 +321,11 @@ export const editOwnerforCreatedService = ( verifyResponseStatusCode('@getSelectedService', 200); verifyResponseStatusCode('@waitForIngestion', 200); - interceptURL( - 'GET', - '/api/v1/search/query?q=*%20AND%20teamType:Group&from=0&size=10&index=team_search_index', - 'waitForTeams' - ); + interceptURL( + 'GET', + '/api/v1/search/query?q=*%20AND%20teamType:Group&from=0&size=10&index=team_search_index', + 'waitForTeams' + ); //Click on edit owner button cy.get('[data-testid="edit-Owner-icon"]') @@ -328,76 +334,76 @@ export const editOwnerforCreatedService = ( .trigger('mouseover') .click(); - verifyResponseStatusCode('@waitForTeams', 200); + verifyResponseStatusCode('@waitForTeams', 200); - //Clicking on users tab - cy.get('[data-testid="dropdown-tab"]') - .contains('Users') - .should('exist') - .should('be.visible') - .click(); + //Clicking on users tab + cy.get('[data-testid="dropdown-tab"]') + .contains('Users') + .should('exist') + .should('be.visible') + .click(); - //Selecting the user - cy.get('[data-testid="list-item"]') - .first() - .should('exist') - .should('be.visible') - .click(); + //Selecting the user + cy.get('[data-testid="list-item"]') + .first() + .should('exist') + .should('be.visible') + .click(); - cy.get('[data-testid="owner-dropdown"]') - .invoke('text') - .then((text) => { - expect(text).equal(ADMIN); - }); + cy.get('[data-testid="owner-dropdown"]') + .invoke('text') + .then((text) => { + expect(text).equal(ADMIN); + }); }; export const goToAddNewServicePage = (service_type) => { - cy.get('[data-testid="tables"]').should('be.visible'); - //Click on settings page - cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); + cy.get('[data-testid="tables"]').should('be.visible'); + //Click on settings page + cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); - // Services page - cy.get('.ant-menu-title-content') - .contains(service_type) - .should('be.visible') - .click(); - - cy.wait(500); - - cy.get('.ant-card').then(($serviceCount) => { - if ($serviceCount.length > 0) { - cy.get('[data-testid="add-new-service-button"]') + // Services page + cy.get('.ant-menu-title-content') + .contains(service_type) .should('be.visible') .click(); - } else { - cy.get('[data-testid="add-service-button"]').should('be.visible').click(); - } - }); - // Add new service page - cy.url().should('include', '/add-service'); - cy.get('[data-testid="header"]').should('be.visible'); - cy.contains('Add New Service').should('be.visible'); - cy.get('[data-testid="service-category"]').should('be.visible'); + cy.wait(500); + + cy.get('.ant-card').then(($serviceCount) => { + if ($serviceCount.length > 0) { + cy.get('[data-testid="add-new-service-button"]') + .should('be.visible') + .click(); + } else { + cy.get('[data-testid="add-service-button"]').should('be.visible').click(); + } + }); + + // Add new service page + cy.url().should('include', '/add-service'); + cy.get('[data-testid="header"]').should('be.visible'); + cy.contains('Add New Service').should('be.visible'); + cy.get('[data-testid="service-category"]').should('be.visible'); }; export const testServiceSampleData = (database, schema, table) => { - cy.get('[data-testid="Databases"]').click(); - cy.get('[data-testid="column"] > :nth-child(1)') - .should('be.visible') - .contains(database); + cy.get('[data-testid="Databases"]').click(); + cy.get('[data-testid="column"] > :nth-child(1)') + .should('be.visible') + .contains(database); - cy.get('[data-testid="column"] > :nth-child(1) > a').click(); - cy.get('[data-testid="table-column"] > :nth-child(1)') - .should('be.visible') - .contains(schema); + cy.get('[data-testid="column"] > :nth-child(1) > a').click(); + cy.get('[data-testid="table-column"] > :nth-child(1)') + .should('be.visible') + .contains(schema); - cy.get('[data-testid="table-column"] > :nth-child(1) > a').click(); - cy.get('.odd-row > :nth-child(1) > a').should('be.visible').contains(table); + cy.get('[data-testid="table-column"] > :nth-child(1) > a').click(); + cy.get('.odd-row > :nth-child(1) > a').should('be.visible').contains(table); - cy.get('.odd-row > :nth-child(1) > a').click(); - cy.get('[data-testid="inactive-link"]').should('be.visible').contains(table); - cy.get('[data-testid="Schema"]').should('be.visible'); + cy.get('.odd-row > :nth-child(1) > a').click(); + cy.get('[data-testid="inactive-link"]').should('be.visible').contains(table); + cy.get('[data-testid="Schema"]').should('be.visible'); }; /** @@ -405,640 +411,640 @@ export const testServiceSampleData = (database, schema, table) => { * @param {string} id -> data-testid is required */ export const visitEntityTab = (id) => { - cy.get(`[data-testid="${id}"]`).click(); - cy.get(`[data-testid="${id}-tab"]`).should('be.visible'); + cy.get(`[data-testid="${id}"]`).click(); + cy.get(`[data-testid="${id}-tab"]`).should('be.visible'); }; /** * Search for entities through the search bar * @param {string} term Entity name */ export const searchEntity = (term, suggestionOverly = true) => { - cy.get('[data-testid="searchBox"]').scrollIntoView().should('be.visible'); - cy.get('[data-testid="searchBox"]').type(`${term}{enter}`); - if (suggestionOverly) { - cy.get('[data-testid="suggestion-overlay"]').click(1, 1); - } + cy.get('[data-testid="searchBox"]').scrollIntoView().should('be.visible'); + cy.get('[data-testid="searchBox"]').type(`${term}{enter}`); + if (suggestionOverly) { + cy.get('[data-testid="suggestion-overlay"]').click(1, 1); + } }; export const visitEntityDetailsPage = (term, serviceName, entity) => { - interceptURL('GET', '/api/v1/*/name/*', 'getEntityDetails'); - interceptURL( - 'GET', - `/api/v1/search/query?q=*&index=${SEARCH_INDEX[entity]}&from=*&size=**`, - 'explorePageTabSearch' - ); - interceptURL('GET', `/api/v1/search/suggest?q=*&index=*`, 'searchQuery'); - interceptURL('GET', `/api/v1/search/*`, 'explorePageSearch'); + interceptURL('GET', '/api/v1/*/name/*', 'getEntityDetails'); + interceptURL( + 'GET', + `/api/v1/search/query?q=*&index=${SEARCH_INDEX[entity]}&from=*&size=**`, + 'explorePageTabSearch' + ); + interceptURL('GET', `/api/v1/search/suggest?q=*&index=*`, 'searchQuery'); + interceptURL('GET', `/api/v1/search/*`, 'explorePageSearch'); - // searching term in search box - cy.get('[data-testid="searchBox"]').scrollIntoView().should('be.visible'); - cy.get('[data-testid="searchBox"]').type(term); - cy.get('[data-testid="suggestion-overlay"]').should('exist'); - verifyResponseStatusCode('@searchQuery', 200); - cy.get('body').then(($body) => { - // checking if requested term is available in search suggestion - if ( - $body.find( - `[data-testid="${serviceName}-${term}"] [data-testid="data-name"]` - ).length - ) { - // if term is available in search suggestion, redirecting to entity details page - cy.get(`[data-testid="${serviceName}-${term}"] [data-testid="data-name"]`) - .should('be.visible') - .first() - .click(); - } else { - // if term is not available in search suggestion, hitting enter to search box so it will redirect to explore page - cy.get('body').click(1, 1); - cy.get('[data-testid="searchBox"]').type('{enter}'); - verifyResponseStatusCode('@explorePageSearch', 200); + // searching term in search box + cy.get('[data-testid="searchBox"]').scrollIntoView().should('be.visible'); + cy.get('[data-testid="searchBox"]').type(term); + cy.get('[data-testid="suggestion-overlay"]').should('exist'); + verifyResponseStatusCode('@searchQuery', 200); + cy.get('body').then(($body) => { + // checking if requested term is available in search suggestion + if ( + $body.find( + `[data-testid="${serviceName}-${term}"] [data-testid="data-name"]` + ).length + ) { + // if term is available in search suggestion, redirecting to entity details page + cy.get(`[data-testid="${serviceName}-${term}"] [data-testid="data-name"]`) + .should('be.visible') + .first() + .click(); + } else { + // if term is not available in search suggestion, hitting enter to search box so it will redirect to explore page + cy.get('body').click(1, 1); + cy.get('[data-testid="searchBox"]').type('{enter}'); + verifyResponseStatusCode('@explorePageSearch', 200); - cy.get(`[data-testid="${entity}-tab"]`).should('be.visible').click(); - cy.get(`[data-testid="${entity}-tab"]`).should('be.visible'); - verifyResponseStatusCode('@explorePageTabSearch', 200); + cy.get(`[data-testid="${entity}-tab"]`).should('be.visible').click(); + cy.get(`[data-testid="${entity}-tab"]`).should('be.visible'); + verifyResponseStatusCode('@explorePageTabSearch', 200); - cy.get(`[data-testid="${serviceName}-${term}"]`) - .scrollIntoView() - .should('be.visible') - .click(); - } - }); + cy.get(`[data-testid="${serviceName}-${term}"]`) + .scrollIntoView() + .should('be.visible') + .click(); + } + }); - verifyResponseStatusCode('@getEntityDetails', 200); - cy.get('body').then(($body) => { - if ($body.find('[data-testid="suggestion-overlay"]').length) { - cy.get('[data-testid="suggestion-overlay"]').click(1, 1); - } - }); - cy.get('body').click(1, 1); - cy.get('[data-testid="searchBox"]').clear(); + verifyResponseStatusCode('@getEntityDetails', 200); + cy.get('body').then(($body) => { + if ($body.find('[data-testid="suggestion-overlay"]').length) { + cy.get('[data-testid="suggestion-overlay"]').click(1, 1); + } + }); + cy.get('body').click(1, 1); + cy.get('[data-testid="searchBox"]').clear(); }; // add new tag to entity and its table export const addNewTagToEntity = (entityObj, term) => { - visitEntityDetailsPage( - entityObj.term, - entityObj.serviceName, - entityObj.entity - ); - cy.wait(500); - cy.get('[data-testid="tags"] > [data-testid="add-tag"]') - .eq(0) - .should('be.visible') - .scrollIntoView() - .click(); + visitEntityDetailsPage( + entityObj.term, + entityObj.serviceName, + entityObj.entity + ); + cy.wait(500); + cy.get('[data-testid="tags"] > [data-testid="add-tag"]') + .eq(0) + .should('be.visible') + .scrollIntoView() + .click(); - cy.get('[class*="-control"]').should('be.visible').type(term); - cy.wait(500); - cy.get('[id*="-option-0"]').should('be.visible').click(); - cy.get( - '[data-testid="tags-wrapper"] > [data-testid="tag-container"]' - ).contains(term); - cy.get('[data-testid="saveAssociatedTag"]').should('be.visible').click(); - cy.get('[data-testid="entity-tags"]') - .scrollIntoView() - .should('be.visible') - .contains(term); + cy.get('[class*="-control"]').should('be.visible').type(term); + cy.wait(500); + cy.get('[id*="-option-0"]').should('be.visible').click(); + cy.get( + '[data-testid="tags-wrapper"] > [data-testid="tag-container"]' + ).contains(term); + cy.get('[data-testid="saveAssociatedTag"]').should('be.visible').click(); + cy.get('[data-testid="entity-tags"]') + .scrollIntoView() + .should('be.visible') + .contains(term); - cy.get(':nth-child(1) > :nth-child(4) [data-testid="tag-container"]') - .contains('Tags') - .should('be.visible') - .click(); + cy.get(':nth-child(1) > :nth-child(4) [data-testid="tag-container"]') + .contains('Tags') + .should('be.visible') + .click(); - cy.get('[class*="-control"]') - .scrollIntoView() - .should('be.visible') - .type(term); - cy.wait(500); - cy.get('[id*="-option-0"]').should('be.visible').click(); - cy.get('[data-testid="saveAssociatedTag"]') - .scrollIntoView() - .should('be.visible') - .click(); - cy.get(':nth-child(1) > :nth-child(4) [data-testid="tag-container"]') - .scrollIntoView() - .contains(term) - .should('exist'); + cy.get('[class*="-control"]') + .scrollIntoView() + .should('be.visible') + .type(term); + cy.wait(500); + cy.get('[id*="-option-0"]').should('be.visible').click(); + cy.get('[data-testid="saveAssociatedTag"]') + .scrollIntoView() + .should('be.visible') + .click(); + cy.get(':nth-child(1) > :nth-child(4) [data-testid="tag-container"]') + .scrollIntoView() + .contains(term) + .should('exist'); }; export const addUser = (username, email) => { - cy.get('[data-testid="email"]') - .scrollIntoView() - .should('exist') - .should('be.visible') - .type(email); - cy.get('[data-testid="displayName"]') - .should('exist') - .should('be.visible') - .type(username); - cy.get(descriptionBox) - .should('exist') - .should('be.visible') - .type('Adding user'); - interceptURL('GET', ' /api/v1/users/generateRandomPwd', 'generatePassword'); - cy.get('[data-testid="password-generator"]').should('be.visible').click(); - verifyResponseStatusCode('@generatePassword', 200); - cy.wait(1000); - interceptURL('POST', ' /api/v1/users', 'add-user'); - cy.get('[data-testid="save-user"]').scrollIntoView().click(); + cy.get('[data-testid="email"]') + .scrollIntoView() + .should('exist') + .should('be.visible') + .type(email); + cy.get('[data-testid="displayName"]') + .should('exist') + .should('be.visible') + .type(username); + cy.get(descriptionBox) + .should('exist') + .should('be.visible') + .type('Adding user'); + interceptURL('GET', ' /api/v1/users/generateRandomPwd', 'generatePassword'); + cy.get('[data-testid="password-generator"]').should('be.visible').click(); + verifyResponseStatusCode('@generatePassword', 200); + cy.wait(1000); + interceptURL('POST', ' /api/v1/users', 'add-user'); + cy.get('[data-testid="save-user"]').scrollIntoView().click(); }; export const softDeleteUser = (username) => { - //Search the created user - interceptURL('GET', '/api/v1/search/query*', 'searchUser'); - cy.get('[data-testid="searchbar"]') - .should('exist') - .should('be.visible') - .type(username); + //Search the created user + interceptURL('GET', '/api/v1/search/query*', 'searchUser'); + cy.get('[data-testid="searchbar"]') + .should('exist') + .should('be.visible') + .type(username); - verifyResponseStatusCode('@searchUser', 200); + verifyResponseStatusCode('@searchUser', 200); - //Click on delete button - cy.get(`[data-testid="delete-user-btn-${username}"]`) - .should('exist') - .should('be.visible') - .click(); + //Click on delete button + cy.get(`[data-testid="delete-user-btn-${username}"]`) + .should('exist') + .should('be.visible') + .click(); - //Soft deleting the user - cy.get('[data-testid="soft-delete"]').click(); - cy.get('[data-testid="confirmation-text-input"]').type('DELETE'); + //Soft deleting the user + cy.get('[data-testid="soft-delete"]').click(); + cy.get('[data-testid="confirmation-text-input"]').type('DELETE'); - interceptURL('DELETE', '/api/v1/users/*', 'softdeleteUser'); - interceptURL('GET', '/api/v1/users*', 'userDeleted'); - cy.get('[data-testid="confirm-button"]') - .should('exist') - .should('be.visible') - .click(); - verifyResponseStatusCode('@softdeleteUser', 200); - verifyResponseStatusCode('@userDeleted', 200); + interceptURL('DELETE', '/api/v1/users/*', 'softdeleteUser'); + interceptURL('GET', '/api/v1/users*', 'userDeleted'); + cy.get('[data-testid="confirm-button"]') + .should('exist') + .should('be.visible') + .click(); + verifyResponseStatusCode('@softdeleteUser', 200); + verifyResponseStatusCode('@userDeleted', 200); - cy.get('.Toastify__toast-body > :nth-child(2)').should( - 'have.text', - 'User deleted successfully!' - ); + cy.get('.Toastify__toast-body > :nth-child(2)').should( + 'have.text', + 'User deleted successfully!' + ); - //Closing the toast message - cy.get('.Toastify__close-button > svg') - .should('exist') - .should('be.visible') - .click(); + //Closing the toast message + cy.get('.Toastify__close-button > svg') + .should('exist') + .should('be.visible') + .click(); - interceptURL('GET', '/api/v1/search/query*', 'searchUser'); + interceptURL('GET', '/api/v1/search/query*', 'searchUser'); - //Verifying the deleted user - cy.get('[data-testid="searchbar"]') - .should('exist') - .should('be.visible') - .clear() - .type(username); + //Verifying the deleted user + cy.get('[data-testid="searchbar"]') + .should('exist') + .should('be.visible') + .clear() + .type(username); - verifyResponseStatusCode('@searchUser', 200); - cy.get('.ant-table-placeholder > .ant-table-cell').should( - 'not.contain', - username - ); + verifyResponseStatusCode('@searchUser', 200); + cy.get('.ant-table-placeholder > .ant-table-cell').should( + 'not.contain', + username + ); }; export const restoreUser = (username) => { - //Click on deleted user toggle - interceptURL('GET', '/api/v1/users*', 'deletedUser'); - cy.get('.ant-switch-handle').should('exist').should('be.visible').click(); - verifyResponseStatusCode('@deletedUser', 200); + //Click on deleted user toggle + interceptURL('GET', '/api/v1/users*', 'deletedUser'); + cy.get('.ant-switch-handle').should('exist').should('be.visible').click(); + verifyResponseStatusCode('@deletedUser', 200); - cy.get(`[data-testid="restore-user-btn-${username}"]`) - .should('exist') - .should('be.visible') - .click(); - cy.get('.ant-modal-body > p').should( - 'contain', - `Are you sure you want to restore ${username}?` - ); - interceptURL('PUT', '/api/v1/users', 'restoreUser'); - cy.get('.ant-modal-footer > .ant-btn-primary') - .should('exist') - .should('be.visible') - .click(); - verifyResponseStatusCode('@restoreUser', 200); - cy.get('.Toastify__toast-body > :nth-child(2)').should( - 'contain', - 'User restored successfully!' - ); + cy.get(`[data-testid="restore-user-btn-${username}"]`) + .should('exist') + .should('be.visible') + .click(); + cy.get('.ant-modal-body > p').should( + 'contain', + `Are you sure you want to restore ${username}?` + ); + interceptURL('PUT', '/api/v1/users', 'restoreUser'); + cy.get('.ant-modal-footer > .ant-btn-primary') + .should('exist') + .should('be.visible') + .click(); + verifyResponseStatusCode('@restoreUser', 200); + cy.get('.Toastify__toast-body > :nth-child(2)').should( + 'contain', + 'User restored successfully!' + ); - //Closing toast message - cy.get('.Toastify__close-button > svg') - .should('exist') - .should('be.visible') - .click(); + //Closing toast message + cy.get('.Toastify__close-button > svg') + .should('exist') + .should('be.visible') + .click(); - //Verifying the restored user - cy.get('.ant-switch').should('exist').should('be.visible').click(); + //Verifying the restored user + cy.get('.ant-switch').should('exist').should('be.visible').click(); - interceptURL('GET', '/api/v1/search/query*', 'searchUser'); - cy.get('[data-testid="searchbar"]') - .should('exist') - .should('be.visible') - .type(username); - verifyResponseStatusCode('@searchUser', 200); + interceptURL('GET', '/api/v1/search/query*', 'searchUser'); + cy.get('[data-testid="searchbar"]') + .should('exist') + .should('be.visible') + .type(username); + verifyResponseStatusCode('@searchUser', 200); - cy.get('.ant-table-row > :nth-child(1)').should('contain', username); + cy.get('.ant-table-row > :nth-child(1)').should('contain', username); }; export const deleteSoftDeletedUser = (username) => { - cy.get('.ant-switch-handle').should('exist').should('be.visible').click(); + cy.get('.ant-switch-handle').should('exist').should('be.visible').click(); - cy.wait(1000); + cy.wait(1000); - cy.get(`[data-testid="delete-user-btn-${username}"]`) - .should('exist') - .should('be.visible') - .click(); - cy.get('[data-testid="confirmation-text-input"]').type('DELETE'); - cy.get('[data-testid="confirm-button"]') - .should('exist') - .should('be.visible') - .click(); + cy.get(`[data-testid="delete-user-btn-${username}"]`) + .should('exist') + .should('be.visible') + .click(); + cy.get('[data-testid="confirmation-text-input"]').type('DELETE'); + cy.get('[data-testid="confirm-button"]') + .should('exist') + .should('be.visible') + .click(); - cy.wait(1000); + cy.wait(1000); - cy.get('.Toastify__toast-body > :nth-child(2)').should( - 'have.text', - 'User deleted successfully!' - ); + cy.get('.Toastify__toast-body > :nth-child(2)').should( + 'have.text', + 'User deleted successfully!' + ); - //Closing toast message - cy.get('.Toastify__close-button > svg') - .should('exist') - .should('be.visible') - .click(); + //Closing toast message + cy.get('.Toastify__close-button > svg') + .should('exist') + .should('be.visible') + .click(); - cy.get('[data-testid="searchbar"]') - .should('exist') - .should('be.visible') - .type(username); + cy.get('[data-testid="searchbar"]') + .should('exist') + .should('be.visible') + .type(username); - cy.wait(1000); + cy.wait(1000); - cy.get('.ant-table-placeholder > .ant-table-cell').should( - 'not.contain', - username - ); + cy.get('.ant-table-placeholder > .ant-table-cell').should( + 'not.contain', + username + ); }; export const toastNotification = (msg) => { - cy.get('.Toastify__toast-body').should('be.visible').contains(msg); - cy.wait(200); - cy.get('.Toastify__close-button').should('be.visible').click(); + cy.get('.Toastify__toast-body').should('be.visible').contains(msg); + cy.wait(200); + cy.get('.Toastify__close-button').should('be.visible').click(); }; export const addCustomPropertiesForEntity = ( - entityType, - customType, - value, - entityObj + entityType, + customType, + value, + entityObj ) => { - const propertyName = `entity${entityType.name}test${uuid()}`; + const propertyName = `entity${entityType.name}test${uuid()}`; - //Add Custom property for selected entity - cy.get('[data-testid="add-field-button"]') - .should('exist') - .should('be.visible') - .click(); - cy.get('[data-testid="name"]').should('be.visible').type(propertyName); - cy.get('select').select(customType); - cy.get('.toastui-editor-md-container > .toastui-editor > .ProseMirror') - .should('be.visible') - .type(entityType.description); - //Check if the property got added - cy.intercept('/api/v1/metadata/types/name/*?fields=customProperties').as( - 'customProperties' - ); - cy.get('[data-testid="create-custom-field"]').scrollIntoView().click(); - - cy.wait('@customProperties'); - cy.get('.ant-table-row').should('contain', propertyName); - - //Navigating to home page - cy.clickOnLogo(); - - //Checking the added property in Entity - - visitEntityDetailsPage( - entityObj.term, - entityObj.serviceName, - entityObj.entity - ); - - cy.get('[data-testid="Custom Properties"]') - .should('exist') - .should('be.visible') - .click(); - cy.get('tbody').should('contain', propertyName); - - //Adding value for the custom property - - //Navigating through the created custom property for adding value - cy.get('tbody') - .contains(propertyName) - .scrollIntoView() - .next('td') - .contains('No data') - .next('[data-testid="edit-icon"]') - .as('editbutton'); - - cy.wait(1000); - - cy.get('@editbutton').should('exist').should('be.visible').click(); - - //Checking for value text box or markdown box - cy.get('body').then(($body) => { - if ($body.find('[data-testid="value-input"]').length > 0) { - cy.get('[data-testid="value-input"]').should('be.visible').type(value); - cy.get('[data-testid="save-value"]').click(); - } else if ( - $body.find( - '.toastui-editor-md-container > .toastui-editor > .ProseMirror' - ) - ) { - cy.get('.toastui-editor-md-container > .toastui-editor > .ProseMirror') + //Add Custom property for selected entity + cy.get('[data-testid="add-field-button"]') + .should('exist') .should('be.visible') - .type(value); - cy.get('[data-testid="save"]').click(); - } - }); + .click(); + cy.get('[data-testid="name"]').should('be.visible').type(propertyName); + cy.get('select').select(customType); + cy.get('.toastui-editor-md-container > .toastui-editor > .ProseMirror') + .should('be.visible') + .type(entityType.description); + //Check if the property got added + cy.intercept('/api/v1/metadata/types/name/*?fields=customProperties').as( + 'customProperties' + ); + cy.get('[data-testid="create-custom-field"]').scrollIntoView().click(); - //Checking the added value to the property - cy.get('tbody') - .contains(propertyName) - .scrollIntoView() - .next('td') - .as('value'); + cy.wait('@customProperties'); + cy.get('.ant-table-row').should('contain', propertyName); - cy.get('tbody') - .contains(propertyName) - .scrollIntoView() - .next('td') - .should('contain', value); + //Navigating to home page + cy.clickOnLogo(); - //returning the property name since it needs to be deleted and updated - return propertyName; + //Checking the added property in Entity + + visitEntityDetailsPage( + entityObj.term, + entityObj.serviceName, + entityObj.entity + ); + + cy.get('[data-testid="Custom Properties"]') + .should('exist') + .should('be.visible') + .click(); + cy.get('tbody').should('contain', propertyName); + + //Adding value for the custom property + + //Navigating through the created custom property for adding value + cy.get('tbody') + .contains(propertyName) + .scrollIntoView() + .next('td') + .contains('No data') + .next('[data-testid="edit-icon"]') + .as('editbutton'); + + cy.wait(1000); + + cy.get('@editbutton').should('exist').should('be.visible').click(); + + //Checking for value text box or markdown box + cy.get('body').then(($body) => { + if ($body.find('[data-testid="value-input"]').length > 0) { + cy.get('[data-testid="value-input"]').should('be.visible').type(value); + cy.get('[data-testid="save-value"]').click(); + } else if ( + $body.find( + '.toastui-editor-md-container > .toastui-editor > .ProseMirror' + ) + ) { + cy.get('.toastui-editor-md-container > .toastui-editor > .ProseMirror') + .should('be.visible') + .type(value); + cy.get('[data-testid="save"]').click(); + } + }); + + //Checking the added value to the property + cy.get('tbody') + .contains(propertyName) + .scrollIntoView() + .next('td') + .as('value'); + + cy.get('tbody') + .contains(propertyName) + .scrollIntoView() + .next('td') + .should('contain', value); + + //returning the property name since it needs to be deleted and updated + return propertyName; }; export const editCreatedProperty = (propertyName) => { - //Fetching for edit button - cy.get('tbody') - .children() - .contains(propertyName) - .scrollIntoView() - .nextUntil('button') - .find('[data-testid="edit-button"]') - .as('editbutton'); + //Fetching for edit button + cy.get('tbody') + .children() + .contains(propertyName) + .scrollIntoView() + .nextUntil('button') + .find('[data-testid="edit-button"]') + .as('editbutton'); - cy.get('@editbutton').click(); + cy.get('@editbutton').click(); - cy.get(descriptionBox) - .should('be.visible') - .clear() - .type('This is new description'); + cy.get(descriptionBox) + .should('be.visible') + .clear() + .type('This is new description'); - interceptURL('PATCH', '/api/v1/metadata/types/*', 'checkPatchForDescription'); + interceptURL('PATCH', '/api/v1/metadata/types/*', 'checkPatchForDescription'); - cy.get('[data-testid="save"]').should('be.visible').click(); + cy.get('[data-testid="save"]').should('be.visible').click(); - verifyResponseStatusCode('@checkPatchForDescription', 200); + verifyResponseStatusCode('@checkPatchForDescription', 200); - cy.get('.tw-modal-container').should('not.exist'); + cy.get('.tw-modal-container').should('not.exist'); - //Fetching for updated descriptions for the created custom property - cy.get('tbody') - .children() - .contains(propertyName) - .nextUntil('div') - .find('[data-testid="viewer-container"]') - .should('contain', 'This is new description'); + //Fetching for updated descriptions for the created custom property + cy.get('tbody') + .children() + .contains(propertyName) + .nextUntil('div') + .find('[data-testid="viewer-container"]') + .should('contain', 'This is new description'); }; export const deleteCreatedProperty = (propertyName) => { - //Fetching for delete button + //Fetching for delete button - cy.get('tbody') - .children() - .contains(propertyName) - .nextUntil('button') - .find('[data-testid="delete-button"]') - .as('deletebutton'); + cy.get('tbody') + .children() + .contains(propertyName) + .nextUntil('button') + .find('[data-testid="delete-button"]') + .as('deletebutton'); - cy.get('@deletebutton').click(); + cy.get('@deletebutton').click(); - //Checking property name is present on the delete pop-up - cy.get('[data-testid="body-text"] > p').should('contain', propertyName); + //Checking property name is present on the delete pop-up + cy.get('[data-testid="body-text"] > p').should('contain', propertyName); - cy.get('[data-testid="save-button"]').should('be.visible').click(); + cy.get('[data-testid="save-button"]').should('be.visible').click(); - //Checking if property got deleted successfully - cy.get('[data-testid="add-field-button"]') - .scrollIntoView() - .should('be.visible'); + //Checking if property got deleted successfully + cy.get('[data-testid="add-field-button"]') + .scrollIntoView() + .should('be.visible'); }; export const updateOwner = () => { - cy.get('[data-testid="avatar"]').should('be.visible').click(); - cy.get('[data-testid="user-name"]') - .should('exist') - .invoke('text') - .then((text) => { - cy.get('[data-testid="hiden-layer"]').should('exist').click(); - interceptURL( - 'GET', - '/api/v1/search/query?q=*%20AND%20teamType:Group&from=0&size=10&index=team_search_index', - 'getTeams' - ); - //Clicking on edit owner button - cy.get('[data-testid="edit-Owner-icon"]').should('be.visible').click(); - - verifyResponseStatusCode('@getTeams', 200); - - //Clicking on users tab - cy.get('button[data-testid="dropdown-tab"]') + cy.get('[data-testid="avatar"]').should('be.visible').click(); + cy.get('[data-testid="user-name"]') .should('exist') - .should('be.visible') - .contains('Users') - .click(); + .invoke('text') + .then((text) => { + cy.get('[data-testid="hiden-layer"]').should('exist').click(); + interceptURL( + 'GET', + '/api/v1/search/query?q=*%20AND%20teamType:Group&from=0&size=10&index=team_search_index', + 'getTeams' + ); + //Clicking on edit owner button + cy.get('[data-testid="edit-Owner-icon"]').should('be.visible').click(); - cy.get('[data-testid="list-item"]') - .first() - .should('contain', text.trim()) - .click(); + verifyResponseStatusCode('@getTeams', 200); - //Asserting the added name - cy.get('[data-testid="owner-link"]').should('contain', text.trim()); - }); + //Clicking on users tab + cy.get('button[data-testid="dropdown-tab"]') + .should('exist') + .should('be.visible') + .contains('Users') + .click(); + + cy.get('[data-testid="list-item"]') + .first() + .should('contain', text.trim()) + .click(); + + //Asserting the added name + cy.get('[data-testid="owner-link"]').should('contain', text.trim()); + }); }; export const mySqlConnectionInput = () => { - cy.get('#root_username').type(Cypress.env('mysqlUsername')); - cy.get('#root_password').type(Cypress.env('mysqlPassword')); - cy.get('#root_hostPort').type(Cypress.env('mysqlHostPort')); - cy.get('#root_databaseSchema').type(Cypress.env('mysqlDatabaseSchema')); + cy.get('#root_username').type(Cypress.env('mysqlUsername')); + cy.get('#root_password').type(Cypress.env('mysqlPassword')); + cy.get('#root_hostPort').type(Cypress.env('mysqlHostPort')); + cy.get('#root_databaseSchema').type(Cypress.env('mysqlDatabaseSchema')); }; export const login = (username, password) => { - cy.visit('/'); - cy.get('[id="email"]').should('be.visible').clear().type(username); - cy.get('[id="password"]').should('be.visible').clear().type(password); - cy.get('.ant-btn').contains('Login').should('be.visible').click(); + cy.visit('/'); + cy.get('[id="email"]').should('be.visible').clear().type(username); + cy.get('[id="password"]').should('be.visible').clear().type(password); + cy.get('.ant-btn').contains('Login').should('be.visible').click(); }; export const addTeam = (TEAM_DETAILS) => { - interceptURL('GET', '/api/v1/teams*', 'addTeam'); - //Fetching the add button and clicking on it - cy.get('[data-testid="add-team"]').should('be.visible').click(); + interceptURL('GET', '/api/v1/teams*', 'addTeam'); + //Fetching the add button and clicking on it + cy.get('[data-testid="add-team"]').should('be.visible').click(); - verifyResponseStatusCode('@addTeam', 200); + verifyResponseStatusCode('@addTeam', 200); - //Entering team details - cy.get('[data-testid="name"]') - .should('exist') - .should('be.visible') - .type(TEAM_DETAILS.name); + //Entering team details + cy.get('[data-testid="name"]') + .should('exist') + .should('be.visible') + .type(TEAM_DETAILS.name); - cy.get('[data-testid="display-name"]') - .should('exist') - .should('be.visible') - .type(TEAM_DETAILS.displayName); + cy.get('[data-testid="display-name"]') + .should('exist') + .should('be.visible') + .type(TEAM_DETAILS.displayName); - cy.get('[data-testid="team-selector"]') - .should('exist') - .should('be.visible') - .click(); + cy.get('[data-testid="team-selector"]') + .should('exist') + .should('be.visible') + .click(); - checkTeamTypeOptions(); + checkTeamTypeOptions(); - cy.get(`.ant-select-dropdown [title="${TEAM_DETAILS.teamType}"]`) - .should('exist') - .should('be.visible') - .click(); + cy.get(`.ant-select-dropdown [title="${TEAM_DETAILS.teamType}"]`) + .should('exist') + .should('be.visible') + .click(); - cy.get(descriptionBox) - .should('exist') - .should('be.visible') - .type(TEAM_DETAILS.description); + cy.get(descriptionBox) + .should('exist') + .should('be.visible') + .type(TEAM_DETAILS.description); - interceptURL('POST', '/api/v1/teams', 'saveTeam'); - interceptURL('GET', '/api/v1/team*', 'createTeam'); + interceptURL('POST', '/api/v1/teams', 'saveTeam'); + interceptURL('GET', '/api/v1/team*', 'createTeam'); - //Saving the created team - cy.get('[form="add-team-form"]') - .scrollIntoView() - .should('be.visible') - .click(); + //Saving the created team + cy.get('[form="add-team-form"]') + .scrollIntoView() + .should('be.visible') + .click(); - verifyResponseStatusCode('@saveTeam', 201); - verifyResponseStatusCode('@createTeam', 200); + verifyResponseStatusCode('@saveTeam', 201); + verifyResponseStatusCode('@createTeam', 200); }; export const retryIngestionRun = () => { - let retryCount = 0; + let retryCount = 0; - const testIngestionsTab = () => { - cy.get('[data-testid="Ingestions"]').should('be.visible'); - cy.get('[data-testid="Ingestions"] >> [data-testid="filter-count"]').should( - 'have.text', - '1' - ); - if (retryCount === 0) { - cy.wait(1000); - cy.get('[data-testid="Ingestions"]').should('be.visible'); - } - }; - - const checkSuccessState = () => { - testIngestionsTab(); - retryCount++; - // the latest run should be success - cy.get('[aria-describedby*="tippy-tooltip"] > .tw-h-5').then( - ($ingestionStatus) => { - if ( - ($ingestionStatus.text() === 'Running' || - $ingestionStatus.text() === 'Queued') && - retryCount <= RETRY_TIMES - ) { - // retry after waiting for 20 seconds - cy.wait(20000); - cy.reload(); - checkSuccessState(); - } else { - cy.get('[aria-describedby*="tippy-tooltip"] > .tw-h-5').should( + const testIngestionsTab = () => { + cy.get('[data-testid="Ingestions"]').should('be.visible'); + cy.get('[data-testid="Ingestions"] >> [data-testid="filter-count"]').should( 'have.text', - 'Success' - ); + '1' + ); + if (retryCount === 0) { + cy.wait(1000); + cy.get('[data-testid="Ingestions"]').should('be.visible'); } - } - ); - }; + }; - checkSuccessState(); + const checkSuccessState = () => { + testIngestionsTab(); + retryCount++; + // the latest run should be success + cy.get('[data-testid="pipeline-status"]').then( + ($ingestionStatus) => { + if ( + ($ingestionStatus.text() === 'Running' || + $ingestionStatus.text() === 'Queued') && + retryCount <= RETRY_TIMES + ) { + // retry after waiting for 20 seconds + cy.wait(20000); + cy.reload(); + checkSuccessState(); + } else { + cy.get('[data-testid="pipeline-status"]').should( + 'have.text', + 'Success' + ); + } + } + ); + }; + + checkSuccessState(); }; export const updateDescriptionForIngestedTables = ( - serviceName, - tableName, - description, - type, - entity + serviceName, + tableName, + description, + type, + entity ) => { - //Navigate to ingested table + //Navigate to ingested table - visitEntityDetailsPage(tableName, serviceName, entity); + visitEntityDetailsPage(tableName, serviceName, entity); - //update description - cy.get('[data-testid="edit-description"]') - .should('be.visible') - .click({ force: true }); - cy.get(descriptionBox).should('be.visible').click().clear().type(description); - interceptURL('PATCH', '/api/v1/*/*', 'updateEntity'); - cy.get('[data-testid="save"]').click(); - verifyResponseStatusCode('@updateEntity', 200); + //update description + cy.get('[data-testid="edit-description"]') + .should('be.visible') + .click({ force: true }); + cy.get(descriptionBox).should('be.visible').click().clear().type(description); + interceptURL('PATCH', '/api/v1/*/*', 'updateEntity'); + cy.get('[data-testid="save"]').click(); + verifyResponseStatusCode('@updateEntity', 200); - //re-run ingestion flow + //re-run ingestion flow - cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); + cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click(); - // Services page - cy.get('.ant-menu-title-content').contains(type).should('be.visible').click(); + // Services page + cy.get('.ant-menu-title-content').contains(type).should('be.visible').click(); - interceptURL( - 'GET', - `/api/v1/services/ingestionPipelines?fields=owner,pipelineStatuses&service=${serviceName}`, - 'getSelectedService' - ); + interceptURL( + 'GET', + `/api/v1/services/ingestionPipelines?fields=owner,pipelineStatuses&service=${serviceName}`, + 'getSelectedService' + ); - //click on created service - cy.get(`[data-testid="service-name-${serviceName}"]`) - .should('exist') - .should('be.visible') - .click(); + //click on created service + cy.get(`[data-testid="service-name-${serviceName}"]`) + .should('exist') + .should('be.visible') + .click(); - verifyResponseStatusCode('@getSelectedService', 200); + verifyResponseStatusCode('@getSelectedService', 200); - cy.get('[data-testid="Ingestions"]').should('be.visible').click(); - interceptURL( - 'POST', - '/api/v1/services/ingestionPipelines/trigger/*', - 'checkRun' - ); - cy.get('[data-testid="run"]').should('be.visible').click(); - verifyResponseStatusCode('@checkRun', 200); + cy.get('[data-testid="Ingestions"]').should('be.visible').click(); + interceptURL( + 'POST', + '/api/v1/services/ingestionPipelines/trigger/*', + 'checkRun' + ); + cy.get('[data-testid="run"]').should('be.visible').click(); + verifyResponseStatusCode('@checkRun', 200); - //Close the toast message - cy.get('.Toastify__close-button').should('be.visible').click(); + //Close the toast message + cy.get('.Toastify__close-button').should('be.visible').click(); - //Wait for success - retryIngestionRun(); + //Wait for success + retryIngestionRun(); - //Navigate to table name - visitEntityDetailsPage(tableName, serviceName, entity); - cy.get('[data-testid="markdown-parser"]') - .first() - .invoke('text') - .should('contain', description); + //Navigate to table name + visitEntityDetailsPage(tableName, serviceName, entity); + cy.get('[data-testid="markdown-parser"]') + .first() + .invoke('text') + .should('contain', description); }; \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/axiosAPIs/ingestionPipelineAPI.ts b/openmetadata-ui/src/main/resources/ui/src/axiosAPIs/ingestionPipelineAPI.ts index 189fa3fc2a0..3683239a8f3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/axiosAPIs/ingestionPipelineAPI.ts +++ b/openmetadata-ui/src/main/resources/ui/src/axiosAPIs/ingestionPipelineAPI.ts @@ -13,7 +13,9 @@ import { AxiosResponse } from 'axios'; import { Operation } from 'fast-json-patch'; +import { PagingResponse } from 'Models'; import { CreateIngestionPipeline } from '../generated/api/services/ingestionPipelines/createIngestionPipeline'; +import { PipelineStatus } from '../generated/entity/data/pipeline'; import { IngestionPipeline } from '../generated/entity/services/ingestionPipelines/ingestionPipeline'; import { Paging } from '../generated/type/paging'; import { IngestionPipelineLogByIdInterface } from '../pages/LogsViewer/LogsViewer.interfaces'; @@ -160,8 +162,22 @@ export const getIngestionPipelineLogById = (id: string, after?: string) => { ); }; -export const postkillIngestionPipelineById = ( +export const postKillIngestionPipelineById = ( id: string ): Promise => { return APIClient.post(`/services/ingestionPipelines/kill/${id}`); }; + +export const getRunHistoryForPipeline = async ( + id: string, + params: { startTs: number; endTs: number } +) => { + const response = await APIClient.get>( + `/services/ingestionPipelines/${id}/pipelineStatus`, + { + params, + } + ); + + return response.data; +}; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.component.tsx index fd07ebcb932..43689871dd6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.component.tsx @@ -19,7 +19,7 @@ import cronstrue from 'cronstrue'; import { useTranslation } from 'react-i18next'; import { ColumnsType } from 'antd/lib/table'; -import { capitalize, isNil, lowerCase, startCase } from 'lodash'; +import { isNil, lowerCase, startCase } from 'lodash'; import React, { Fragment, useCallback, useMemo, useState } from 'react'; import { useHistory } from 'react-router-dom'; import { PAGE_SIZE } from '../../constants/constants'; @@ -48,6 +48,7 @@ import Loader from '../Loader/Loader'; import EntityDeleteModal from '../Modals/EntityDeleteModal/EntityDeleteModal'; import KillIngestionModal from '../Modals/KillIngestionPipelineModal/KillIngestionPipelineModal'; import { IngestionProps } from './ingestion.interface'; +import { IngestionRecentRuns } from './IngestionRecentRun/IngestionRecentRuns.component'; const Ingestion: React.FC = ({ airflowEndpoint, @@ -300,67 +301,6 @@ const Ingestion: React.FC = ({ | ); - const getStatuses = (ingestion: IngestionPipeline) => { - const lastFiveIngestions = ingestion.pipelineStatuses - ?.sort((a, b) => { - // Turn your strings into millis, and then subtract them - // to get a value that is either negative, positive, or zero. - const date1 = new Date(a.startDate || ''); - const date2 = new Date(b.startDate || ''); - - return date1.getTime() - date2.getTime(); - }) - .slice(Math.max(ingestion.pipelineStatuses.length - 5, 0)); - - return lastFiveIngestions?.map((r, i) => { - const status = - i === lastFiveIngestions.length - 1 ? ( -

- {capitalize(r.state)} -

- ) : ( -

- ); - - return r?.endDate || r?.startDate || r?.timestamp ? ( - - {r.timestamp ? ( -

- {t('label.execution-date')} :{' '} - {new Date(r.timestamp).toUTCString()} -

- ) : null} - {r.startDate ? ( -

- {t('label.start-date')}: {new Date(r.startDate).toUTCString()} -

- ) : null} - {r.endDate ? ( -

- {t('label.end-date')} : {new Date(r.endDate).toUTCString()} -

- ) : null} - - } - key={i} - position="bottom" - theme="light" - trigger="mouseenter"> - {status} - - ) : ( - status - ); - }); - }; - const getTriggerDeployButton = (ingestion: IngestionPipeline) => { if (ingestion.deployed) { return ( @@ -467,8 +407,9 @@ const Ingestion: React.FC = ({ title: t('label.recent-runs'), dataIndex: 'recentRuns', key: 'recentRuns', + width: 180, render: (_, record) => ( -
{getStatuses(record)}
+ ), }, { @@ -573,7 +514,6 @@ const Ingestion: React.FC = ({ NO_PERMISSION_TO_VIEW, permissions, airflowEndpoint, - getStatuses, getTriggerDeployButton, isRequiredDetailsAvailable, handleEnableDisableIngestion, @@ -592,10 +532,8 @@ const Ingestion: React.FC = ({ const getIngestionTab = () => { return ( -
-
+
+
{!isRequiredDetailsAvailable && (
diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.test.tsx index 4e20b4bed0a..60bc6ec39ef 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/Ingestion.test.tsx @@ -107,6 +107,12 @@ jest.mock( } ); +jest.mock('./IngestionRecentRun/IngestionRecentRuns.component', () => ({ + IngestionRecentRuns: jest + .fn() + .mockImplementation(() =>

IngestionRecentRuns

), +})); + describe('Test Ingestion page', () => { it('Page Should render', async () => { const { container } = render( diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRun.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRun.test.tsx new file mode 100644 index 00000000000..9417fff55fe --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRun.test.tsx @@ -0,0 +1,80 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { act, render, screen } from '@testing-library/react'; +import React from 'react'; +import { getRunHistoryForPipeline } from '../../../axiosAPIs/ingestionPipelineAPI'; +import { IngestionPipeline } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline'; +import { IngestionRecentRuns } from './IngestionRecentRuns.component'; + +jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({ + getRunHistoryForPipeline: jest.fn().mockImplementation(() => + Promise.resolve({ + data: [ + { + runId: '7e369da9-4d0e-4887-b99b-1a35cfab551e', + pipelineState: 'success', + startDate: 1667307722, + timestamp: 1667307722, + endDate: 1667307725, + }, + { + runId: 'c95cc97b-9ea2-465c-9b5a-255401674324', + pipelineState: 'success', + startDate: 1667304123, + timestamp: 1667304123, + endDate: 1667304126, + }, + { + runId: '60b3e15c-3865-4c81-a1ee-36ff85d2be8e', + pipelineState: 'success', + startDate: 1667301533, + timestamp: 1667301533, + endDate: 1667301536, + }, + { + runId: 'a2c6fbf9-952f-4ddd-9b01-c203bf54f0fe', + pipelineState: 'success', + startDate: 1667297370, + timestamp: 1667297370, + endDate: 1667297373, + }, + ], + paging: { total: 4 }, + }) + ), +})); + +const mockIngestion = { fullyQualifiedName: 'test' } as IngestionPipeline; + +describe('Test IngestionRecentRun component', () => { + it('should call getRunHistoryForPipeline to fetch all the status', async () => { + act(() => { + render(); + }); + + expect(getRunHistoryForPipeline).toBeCalledWith('test', expect.anything()); + }); + + it('should render runs when API returns runs', async () => { + await act(async () => { + render(); + }); + + const runs = await screen.findAllByTestId('pipeline-status'); + const successRun = await screen.findByText(/Success/); + + expect(successRun).toBeInTheDocument(); + expect(runs).toHaveLength(4); + }); +}); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component.tsx new file mode 100644 index 00000000000..412c6c9e17a --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component.tsx @@ -0,0 +1,135 @@ +/* + * Copyright 2022 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Popover, Skeleton, Space } from 'antd'; +import { capitalize } from 'lodash'; +import React, { + FunctionComponent, + useCallback, + useEffect, + useState, +} from 'react'; +import { useTranslation } from 'react-i18next'; +import { getRunHistoryForPipeline } from '../../../axiosAPIs/ingestionPipelineAPI'; +import { + IngestionPipeline, + PipelineStatus, +} from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline'; +import { + getCurrentDateTimeMillis, + getDateTimeFromMilliSeconds, + getPastDaysDateTimeMillis, +} from '../../../utils/TimeUtils'; + +interface Props { + ingestion: IngestionPipeline; + classNames?: string; +} +const queryParams = { + startTs: getPastDaysDateTimeMillis(1), + endTs: getCurrentDateTimeMillis(), +}; + +export const IngestionRecentRuns: FunctionComponent = ({ + ingestion, + classNames, +}: Props) => { + const { t } = useTranslation(); + const [recentRunStatus, setRecentRunStatus] = useState(); + const [loading, setLoading] = useState(true); + + const fetchPipelineStatus = useCallback(async () => { + setLoading(true); + try { + const response = await getRunHistoryForPipeline( + ingestion.fullyQualifiedName || '', + queryParams + ); + + const runs = response.data.splice(0, 5).reverse() ?? []; + + setRecentRunStatus( + runs.length === 0 + ? [ingestion.pipelineStatuses as PipelineStatus] + : runs + ); + } finally { + setLoading(false); + } + }, [ingestion.fullyQualifiedName]); + + useEffect(() => { + if (ingestion.fullyQualifiedName) { + fetchPipelineStatus(); + } + }, [ingestion.fullyQualifiedName]); + + return ( + + {loading ? ( + + ) : ( + recentRunStatus?.map((r, i) => { + const status = + i === recentRunStatus.length - 1 ? ( +

+ {capitalize(r?.pipelineState)} +

+ ) : ( +

+ ); + + const showTooltip = r?.endDate || r?.startDate || r?.timestamp; + + return showTooltip ? ( + + {r.timestamp && ( +

+ {t('label.execution-date')} :{' '} + {getDateTimeFromMilliSeconds(r.timestamp)} +

+ )} + {r.startDate && ( +

+ {t('label.start-date')}:{' '} + {getDateTimeFromMilliSeconds(r.startDate)} +

+ )} + {r.endDate && ( +

+ {t('label.end-date')} :{' '} + {getDateTimeFromMilliSeconds(r.endDate)} +

+ )} +
+ }> + {status} + + ) : ( + status + ); + }) ?? 'Queued' + )} + + ); +}; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/ingestion.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/ingestion.interface.ts index 1f639c8366a..496f88275c1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/ingestion.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/Ingestion/ingestion.interface.ts @@ -11,10 +11,9 @@ * limitations under the License. */ -import { IngestionType, ServiceCategory } from '../../enums/service.enum'; +import { ServiceCategory } from '../../enums/service.enum'; import { DatabaseService } from '../../generated/entity/services/databaseService'; import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline'; -import { EntityReference } from '../../generated/type/entityReference'; import { Paging } from '../../generated/type/paging'; import { ServicesType } from '../../interface/service.interface'; import { OperationPermission } from '../PermissionProvider/PermissionProvider.interface'; @@ -30,24 +29,6 @@ export interface ConnectorConfig { excludeDataProfiler?: boolean; enableDataProfiler?: boolean; } -export interface IngestionData { - id?: string; - name: string; - displayName: string; - ingestionType: IngestionType; - service: EntityReference; - scheduleInterval: string; - ingestionStatuses?: Array<{ - state: string; - startDate: string; - endDate: string; - }>; - nextExecutionDate?: string; - connectorConfig?: ConnectorConfig; - owner?: { id: string; name?: string; type: string }; - startDate?: string; - endDate?: string; -} export interface IngestionProps { airflowEndpoint: string; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionModal.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionModal.test.tsx index 46fd2628586..1b05fcb3591 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionModal.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionModal.test.tsx @@ -13,7 +13,7 @@ import { act, fireEvent, render, screen } from '@testing-library/react'; import React from 'react'; -import { postkillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI'; +import { postKillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI'; import KillIngestionModal from './KillIngestionPipelineModal'; const mockHandleClose = jest.fn(); @@ -28,7 +28,7 @@ const mockProps = { }; jest.mock('../../../axiosAPIs/ingestionPipelineAPI', () => ({ - postkillIngestionPipelineById: jest + postKillIngestionPipelineById: jest .fn() .mockImplementation(() => Promise.resolve()), })); @@ -74,7 +74,7 @@ describe('Test Kill Ingestion Modal component', () => { fireEvent.click(confirmButton); - expect(postkillIngestionPipelineById).toHaveBeenCalledWith( + expect(postKillIngestionPipelineById).toHaveBeenCalledWith( mockProps.pipelineId ); }); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionPipelineModal.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionPipelineModal.tsx index c57eda32af9..3b654c19843 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionPipelineModal.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Modals/KillIngestionPipelineModal/KillIngestionPipelineModal.tsx @@ -15,7 +15,7 @@ import { Modal, Typography } from 'antd'; import { AxiosError } from 'axios'; import React, { FC, useState } from 'react'; import { useTranslation } from 'react-i18next'; -import { postkillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI'; +import { postKillIngestionPipelineById } from '../../../axiosAPIs/ingestionPipelineAPI'; import { showErrorToast, showSuccessToast } from '../../../utils/ToastUtils'; interface KillIngestionModalProps { @@ -39,7 +39,7 @@ const KillIngestionModal: FC = ({ const handleConfirm = async () => { setIsLoading(true); try { - const response = await postkillIngestionPipelineById(pipelineId); + const response = await postKillIngestionPipelineById(pipelineId); const status = response.status; if (status === 200) { onClose(); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/TestSuitePipelineTab/TestSuitePipelineTab.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/TestSuitePipelineTab/TestSuitePipelineTab.component.tsx index 8d769565714..1189a89d172 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/TestSuitePipelineTab/TestSuitePipelineTab.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/TestSuitePipelineTab/TestSuitePipelineTab.component.tsx @@ -35,10 +35,7 @@ import { import { Operation } from '../../generated/entity/policies/policy'; import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline'; import jsonData from '../../jsons/en'; -import { - getIngestionStatuses, - getLoadingStatus, -} from '../../utils/CommonUtils'; +import { getLoadingStatus } from '../../utils/CommonUtils'; import { checkPermission, userPermissions } from '../../utils/PermissionsUtils'; import { getLogsViewerPath, @@ -48,6 +45,7 @@ import SVGIcons, { Icons } from '../../utils/SvgUtils'; import { showErrorToast, showSuccessToast } from '../../utils/ToastUtils'; import ErrorPlaceHolderIngestion from '../common/error-with-placeholder/ErrorPlaceHolderIngestion'; import PopOver from '../common/popover/PopOver'; +import { IngestionRecentRuns } from '../Ingestion/IngestionRecentRun/IngestionRecentRuns.component'; import Loader from '../Loader/Loader'; import EntityDeleteModal from '../Modals/EntityDeleteModal/EntityDeleteModal'; import KillIngestionModal from '../Modals/KillIngestionPipelineModal/KillIngestionPipelineModal'; @@ -381,7 +379,9 @@ const TestSuitePipelineTab = () => { dataIndex: 'pipelineStatuses', key: 'recentRuns', render: (_, record) => ( - {getIngestionStatuses(record)} + + + ), }, { diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json index 65c142eafb9..423bfaa797d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json @@ -193,8 +193,11 @@ "pause": "Pause", "metadata-ingestion": "Metadata Ingestion", "unpause": "UnPause", + "end-date": "End Date", "execution-date": "Execution Date", "start-date": "Start Date", + "view-dag": "View Dag", + "search-team": "Search Team", "end-date": "End Date", "show-deleted": "Show deleted", "add-bot": "Add Bot", diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.test.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.test.tsx index f8f9deea792..2171f9bb3a2 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.test.tsx @@ -41,6 +41,15 @@ jest.mock('../../axiosAPIs/ingestionPipelineAPI', () => ({ .mockImplementation(() => Promise.resolve(mockIngestionPipeline)), })); +jest.mock( + '../../components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component', + () => ({ + IngestionRecentRuns: jest + .fn() + .mockImplementation(() =>

IngestionRecentRuns

), + }) +); + describe('LogsViewer.component', () => { it('On initial, component should render', async () => { await act(async () => { diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.tsx index 97e654f3d6a..56ed0efabea 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/LogsViewer/LogsViewer.component.tsx @@ -30,11 +30,11 @@ import { } from '../../axiosAPIs/ingestionPipelineAPI'; import { CopyToClipboardButton } from '../../components/buttons/CopyToClipboardButton/CopyToClipboardButton'; import TitleBreadcrumb from '../../components/common/title-breadcrumb/title-breadcrumb.component'; +import { IngestionRecentRuns } from '../../components/Ingestion/IngestionRecentRun/IngestionRecentRuns.component'; import Loader from '../../components/Loader/Loader'; import { PipelineType } from '../../generated/api/services/ingestionPipelines/createIngestionPipeline'; import { IngestionPipeline } from '../../generated/entity/services/ingestionPipelines/ingestionPipeline'; import { Paging } from '../../generated/type/paging'; -import { getIngestionStatuses } from '../../utils/CommonUtils'; import { getLogBreadCrumbs } from '../../utils/LogsViewer.utils'; import { showErrorToast } from '../../utils/ToastUtils'; import LogViewerSkeleton from './LogsViewer-skeleton.component'; @@ -197,10 +197,11 @@ const LogsViewer = () => { return { Type: ingestionDetails?.pipelineType || '--', Schedule: ingestionDetails?.airflowConfig.scheduleInterval || '--', - ['Recent Runs']: - getIngestionStatuses( - ingestionDetails ? ingestionDetails : ({} as IngestionPipeline) - ) || '--', + ['Recent Runs']: ingestionDetails?.fullyQualifiedName ? ( + + ) : ( + '--' + ), }; }, [ingestionDetails]); diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/CommonUtils.tsx b/openmetadata-ui/src/main/resources/ui/src/utils/CommonUtils.tsx index 410166d5572..0fa0c2638cd 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/CommonUtils.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/utils/CommonUtils.tsx @@ -47,7 +47,6 @@ import { getHourCron, } from '../components/common/CronEditor/CronEditor.constant'; import ErrorPlaceHolder from '../components/common/error-with-placeholder/ErrorPlaceHolder'; -import PopOver from '../components/common/popover/PopOver'; import Loader from '../components/Loader/Loader'; import { FQN_SEPARATOR_CHAR } from '../constants/char.constants'; import { @@ -75,10 +74,7 @@ import { Topic } from '../generated/entity/data/topic'; import { Webhook } from '../generated/entity/events/webhook'; import { ThreadTaskStatus, ThreadType } from '../generated/entity/feed/thread'; import { Policy } from '../generated/entity/policies/policy'; -import { - IngestionPipeline, - PipelineType, -} from '../generated/entity/services/ingestionPipelines/ingestionPipeline'; +import { PipelineType } from '../generated/entity/services/ingestionPipelines/ingestionPipeline'; import { Role } from '../generated/entity/teams/role'; import { Team } from '../generated/entity/teams/team'; import { EntityReference, User } from '../generated/entity/teams/user'; @@ -822,60 +818,6 @@ export const getTeamsUser = ( return; }; -export const getIngestionStatuses = (ingestion: IngestionPipeline) => { - const lastFiveIngestions = ingestion.pipelineStatuses - ?.sort((a, b) => { - // Turn your strings into millis, and then subtract them - // to get a value that is either negative, positive, or zero. - const date1 = new Date(a.startDate || ''); - const date2 = new Date(b.startDate || ''); - - return date1.getTime() - date2.getTime(); - }) - .slice(Math.max(ingestion.pipelineStatuses.length - 5, 0)); - - return lastFiveIngestions?.map((r, i) => { - const status = - i === lastFiveIngestions.length - 1 ? ( -

- {capitalize(r.state)} -

- ) : ( -

- ); - - return r?.endDate || r?.startDate || r?.timestamp ? ( - - {r.timestamp ? ( -

Execution Date: {new Date(r.timestamp).toUTCString()}

- ) : null} - {r.startDate ? ( -

Start Date: {new Date(r.startDate).toUTCString()}

- ) : null} - {r.endDate ? ( -

End Date: {new Date(r.endDate).toUTCString()}

- ) : null} -
- } - key={i} - position="bottom" - theme="light" - trigger="mouseenter"> - {status} - - ) : ( - status - ); - }); -}; - export const getDiffArray = ( compareWith: string[], toCompare: string[] diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/TimeUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/TimeUtils.ts index 30a911a3d87..72a11dae279 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/TimeUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/TimeUtils.ts @@ -283,6 +283,19 @@ export const getPastDatesTimeStampFromCurrentDate = (pastDayCount: number) => */ export const getCurrentDateTimeStamp = () => DateTime.now().toUnixInteger(); +/** + * Get the current date and time in milliseconds. + */ +export const getCurrentDateTimeMillis = () => DateTime.now().toMillis(); + +/** + * It returns the number of milliseconds since the Unix Epoch for a date that is pastDayCount days before + * the current date + * @param {number} days - The number of days you want to go back from the current date. + */ +export const getPastDaysDateTimeMillis = (days: number) => + DateTime.now().minus({ days }).toMillis(); + /** * It takes a timestamp in seconds and returns a formatted date string * @param {number} timeStamp - The timeStamp in seconds.