From 137f86c00dba1a7fd15f8c6227d1ead87bb6dade Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Thu, 6 Mar 2025 18:19:42 +0530 Subject: [PATCH] Day1 - Pipeline Usage (#19960) --- .../ingestion/ometa/mixins/pipeline_mixin.py | 18 ++ .../metadata/ingestion/sink/metadata_rest.py | 15 +- .../source/pipeline/airflow/metadata.py | 25 ++ .../source/pipeline/airflow/models.py | 1 + .../source/pipeline/openlineage/metadata.py | 1 + .../source/pipeline/pipeline_service.py | 121 +++++++++- .../unit/topology/pipeline/test_dbtcloud.py | 223 ++++++++++++++++++ .../service/jdbi3/PipelineRepository.java | 8 + .../service/jdbi3/UsageRepository.java | 18 ++ .../resources/pipelines/PipelineMapper.java | 1 + .../json/schema/api/data/createPipeline.json | 5 + .../json/schema/entity/data/pipeline.json | 23 ++ .../resources/ui/src/enums/entity.enum.ts | 1 + .../src/generated/api/data/createPipeline.ts | 14 ++ .../ui/src/generated/entity/data/pipeline.ts | 63 +++++ .../ServiceDetailsPage/ServiceDetailsPage.tsx | 2 +- 16 files changed, 535 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py index 0e239321bb0..3be9b555d44 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py @@ -21,6 +21,7 @@ from metadata.generated.schema.entity.data.pipeline import ( PipelineStatus, Task, ) +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import REST from metadata.utils.logger import ometa_logger @@ -83,6 +84,7 @@ class OMetaPipelineMixin: sourceUrl=pipeline.sourceUrl, concurrency=pipeline.concurrency, pipelineLocation=pipeline.pipelineLocation, + state=pipeline.state, startDate=pipeline.startDate, service=pipeline.service.fullyQualifiedName, tasks=all_tasks, @@ -110,6 +112,7 @@ class OMetaPipelineMixin: sourceUrl=pipeline.sourceUrl, concurrency=pipeline.concurrency, pipelineLocation=pipeline.pipelineLocation, + state=pipeline.state, startDate=pipeline.startDate, service=pipeline.service.fullyQualifiedName, tasks=[task for task in pipeline.tasks if task.name in task_ids], @@ -118,3 +121,18 @@ class OMetaPipelineMixin: ) return self.create_or_update(updated_pipeline) + + def publish_pipeline_usage( + self, pipeline: Pipeline, pipeline_usage_request: UsageRequest + ) -> None: + """ + POST usage details for a Pipeline + + :param pipeline: Pipeline Entity to update + :param pipeline_usage_request: Usage data to add + """ + resp = self.client.put( + f"/usage/pipeline/{pipeline.id.root}", + data=pipeline_usage_request.model_dump_json(), + ) + logger.debug("Published pipeline usage %s", resp) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index ad8d2eefe4e..6fc020067bf 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -34,7 +34,7 @@ from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteR from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.pipeline import PipelineStatus +from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus from metadata.generated.schema.entity.data.searchIndex import ( SearchIndex, SearchIndexSampleData, @@ -83,6 +83,7 @@ from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.ingestion.source.pipeline.pipeline_service import PipelineUsage from metadata.profiler.api.models import ProfilerResponse from metadata.sampler.models import SamplerResponse from metadata.utils.execution_time_tracker import calculate_execution_time @@ -625,6 +626,18 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods return Either(right=record) + @_run_dispatch.register + def write_pipeline_usage(self, pipeline_usage: PipelineUsage) -> Either[Pipeline]: + """ + Send a UsageRequest update to a pipeline entity + :param pipeline_usage: pipeline entity and usage request + """ + self.metadata.publish_pipeline_usage( + pipeline=pipeline_usage.pipeline, + pipeline_usage_request=pipeline_usage.usage, + ) + return Either(right=pipeline_usage.pipeline) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 1aae7862c60..4211cb3528d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -29,6 +29,7 @@ from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( Pipeline, + PipelineState, PipelineStatus, StatusType, Task, @@ -122,6 +123,7 @@ class AirflowSource(PipelineServiceSource): metadata: OpenMetadata, ): super().__init__(config, metadata) + self.today = datetime.now().strftime("%Y-%m-%d") self._session = None @classmethod @@ -367,6 +369,16 @@ class AirflowSource(PipelineServiceSource): break for serialized_dag in results: try: + dag_model = ( + self.session.query(DagModel) + .filter(DagModel.dag_id == serialized_dag[0]) + .one_or_none() + ) + pipeline_state = ( + PipelineState.Active.value + if dag_model and not dag_model.is_paused + else PipelineState.Inactive.value + ) data = serialized_dag[1]["dag"] dag = AirflowDagDetails( dag_id=serialized_dag[0], @@ -375,6 +387,7 @@ class AirflowSource(PipelineServiceSource): max_active_runs=data.get("max_active_runs", None), description=data.get("_description", None), start_date=data.get("start_date", None), + state=pipeline_state, tasks=list( map(self._extract_serialized_task, data.get("tasks", [])) ), @@ -425,6 +438,14 @@ class AirflowSource(PipelineServiceSource): """ return pipeline_details.dag_id + def get_pipeline_state( + self, pipeline_details: AirflowDagDetails + ) -> Optional[PipelineState]: + """ + Return the state of the DAG + """ + return PipelineState[pipeline_details.state] + def get_tasks_from_dag(self, dag: AirflowDagDetails, host_port: str) -> List[Task]: """ Obtain the tasks from a SerializedDAG @@ -480,12 +501,15 @@ class AirflowSource(PipelineServiceSource): try: # Airflow uses /dags/dag_id/grid to show pipeline / dag source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid" + pipeline_state = self.get_pipeline_state(pipeline_details) + pipeline_request = CreatePipelineRequest( name=EntityName(pipeline_details.dag_id), description=Markdown(pipeline_details.description) if pipeline_details.description else None, sourceUrl=SourceUrl(source_url), + state=pipeline_state, concurrency=pipeline_details.max_active_runs, pipelineLocation=pipeline_details.fileloc, startDate=pipeline_details.start_date.isoformat() @@ -614,4 +638,5 @@ class AirflowSource(PipelineServiceSource): ) def close(self): + self.metadata.compute_percentile(Pipeline, self.today) self.session.close() diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py index 6f5e14b4b80..2c5121c6f6f 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/models.py @@ -68,4 +68,5 @@ class AirflowDagDetails(AirflowBaseModel): start_date: Optional[datetime] = None tasks: List[AirflowTask] owner: Optional[str] = None + state: Optional[str] = None schedule_interval: Optional[str] = None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index e890c480fcc..a3590603ae6 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -93,6 +93,7 @@ class OpenlineageSource(PipelineServiceSource): """Nothing to prepare""" def close(self) -> None: + self.metadata.compute_percentile(Pipeline, self.today) self.metadata.close() @classmethod diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 6c0ed62ed4c..c9e1276bd7f 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -11,16 +11,21 @@ """ Base class for ingesting database services """ +import traceback from abc import ABC, abstractmethod +from datetime import datetime from typing import Any, Iterable, List, Optional, Set -from pydantic import Field +from pydantic import BaseModel, Field from typing_extensions import Annotated from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineState from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -31,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.delete import delete_entity_from_source from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import Source @@ -60,6 +66,15 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +class PipelineUsage(BaseModel): + """ + Wrapper to handle type at the sink + """ + + pipeline: Pipeline + usage: UsageRequest + + class PipelineServiceTopology(ServiceTopology): """ Defines the hierarchy in Pipeline Services. @@ -115,6 +130,12 @@ class PipelineServiceTopology(ServiceTopology): consumer=["pipeline_service"], nullable=True, ), + NodeStage( + type_=UsageRequest, + processor="yield_pipeline_usage", + consumer=["pipeline_service"], + nullable=True, + ), ], ) @@ -143,6 +164,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): super().__init__() self.config = config self.metadata = metadata + self.today = datetime.now().strftime("%Y-%m-%d") self.service_connection = self.config.serviceConnection.root.config self.source_config: PipelineServiceMetadataPipeline = ( self.config.sourceConfig.config @@ -184,6 +206,100 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): ) -> Iterable[Either[OMetaPipelineStatus]]: """Get Pipeline Status""" + def get_pipeline_state(self, pipeline_details: Any) -> Optional[PipelineState]: + """Get Pipeline State""" + + def yield_pipeline_usage( + self, pipeline_details: Any + ) -> Iterable[Either[PipelineUsage]]: + """ + Yield the usage of the pipeline + we will check the usage of the pipeline + by checking the tasks that have run today or are running today or ends today + we get the count of these tasks and compare it with the usageSummary + if the usageSummary is not present or the date is not today + we yield the fresh usage + """ + try: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + pipeline: Pipeline = self.metadata.get_by_name( + entity=Pipeline, + fqn=pipeline_fqn, + fields=["tasks", "usageSummary"], + ) + + if pipeline.tasks: + current_task_usage = sum( + 1 + for task in pipeline.tasks + if task.startDate + and task.startDate.startswith(self.today) + or task.endDate + and task.endDate.startswith(self.today) + ) + if not current_task_usage: + logger.debug( + f"No usage to report for {pipeline.fullyQualifiedName.root}" + ) + + if not pipeline.usageSummary: + logger.info( + f"Yielding fresh usage for {pipeline.fullyQualifiedName.root}" + ) + yield Either( + right=PipelineUsage( + pipeline=pipeline, + usage=UsageRequest( + date=self.today, count=current_task_usage + ), + ) + ) + + elif ( + str(pipeline.usageSummary.date.root) != self.today + or not pipeline.usageSummary.dailyStats.count + ): + latest_usage = pipeline.usageSummary.dailyStats.count + + new_usage = current_task_usage - latest_usage + if new_usage < 0: + raise ValueError( + f"Wrong computation of usage difference. Got new_usage={new_usage}." + ) + + logger.info( + f"Yielding new usage for {pipeline.fullyQualifiedName.root}" + ) + yield Either( + right=PipelineUsage( + pipeline=pipeline, + usage=UsageRequest(date=self.today, count=new_usage), + ) + ) + + else: + logger.debug( + f"Latest usage {pipeline.usageSummary} vs. today {self.today}. Nothing to compute." + ) + logger.info( + f"Usage already informed for {pipeline.fullyQualifiedName.root}" + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name="Pipeline Usage", + error=f"Exception computing pipeline usage for {pipeline_details}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + def yield_pipeline_lineage( self, pipeline_details: Any ) -> Iterable[Either[OMetaLineageRequest]]: @@ -231,6 +347,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): def close(self): """Method to implement any required logic after the ingestion process is completed""" + self.metadata.compute_percentile(Pipeline, self.today) def get_services(self) -> Iterable[WorkflowSource]: yield self.config diff --git a/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py b/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py index e1cfead56a6..d8fcd518602 100644 --- a/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py +++ b/ingestion/tests/unit/topology/pipeline/test_dbtcloud.py @@ -12,6 +12,8 @@ Test dbt cloud using the topology """ import json +import uuid +from datetime import datetime, timedelta from unittest import TestCase from unittest.mock import patch @@ -32,12 +34,16 @@ from metadata.generated.schema.type.basic import ( SourceUrl, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats +from metadata.generated.schema.type.usageRequest import UsageRequest +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.dbtcloud.metadata import DbtcloudSource from metadata.ingestion.source.pipeline.dbtcloud.models import ( DBTJob, DBTJobList, DBTSchedule, ) +from metadata.ingestion.source.pipeline.pipeline_service import PipelineUsage MOCK_JOB_RESULT = json.loads( """ @@ -539,6 +545,9 @@ class DBTCloudUnitTest(TestCase): self.dbtcloud.context.get().__dict__[ "pipeline_service" ] = MOCK_PIPELINE_SERVICE.name.root + self.dbtcloud.metadata = OpenMetadata( + config.workflowConfig.openMetadataServerConfig + ) @patch("metadata.ingestion.source.pipeline.dbtcloud.client.DBTCloudClient.get_jobs") def test_get_pipelines_list(self, get_jobs): @@ -559,3 +568,217 @@ class DBTCloudUnitTest(TestCase): def test_pipelines(self): pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right assert pipeline == EXPECTED_CREATED_PIPELINES + + def test_yield_pipeline_usage(self): + """ + Validate the logic for existing or new usage + """ + + self.dbtcloud.context.get().__dict__["pipeline"] = "pipeline_name" + + # Start checking pipeline without usage + # and a view count + return_value = Pipeline( + id=uuid.uuid4(), + name="pipeline_name", + fullyQualifiedName="pipeline_service.pipeline_name", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + tasks=[ + Task( + name="task1", + startDate=self.dbtcloud.today, + endDate="2025-02-19 11:09:36.920915+00:00", + ), + Task( + name="task2", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task3", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate="2025-02-19 11:09:36.920915+00:00", + ), + ], + ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + got_usage = next( + self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS) + ).right + self.assertEqual( + got_usage, + PipelineUsage( + pipeline=return_value, + usage=UsageRequest(date=self.dbtcloud.today, count=2), + ), + ) + + # Now check what happens if we already have some summary data for today + return_value = Pipeline( + id=uuid.uuid4(), + name="pipeline_name", + fullyQualifiedName="pipeline_service.pipeline_name", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + tasks=[ + Task( + name="task1", + startDate=self.dbtcloud.today, + endDate="2025-02-19 11:09:36.920915+00:00", + ), + Task( + name="task2", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task3", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate="2025-02-19 11:09:36.920915+00:00", + ), + ], + usageSummary=UsageDetails( + dailyStats=UsageStats(count=10), date=self.dbtcloud.today + ), + ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + # Nothing is returned + self.assertEqual( + len(list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))), 0 + ) + + # But if we have usage for today but the count is 0, we'll return the details + return_value = Pipeline( + id=uuid.uuid4(), + name="pipeline_name", + fullyQualifiedName="pipeline_service.pipeline_name", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + tasks=[ + Task( + name="task1", + startDate=self.dbtcloud.today, + endDate="2025-02-19 11:09:36.920915+00:00", + ), + Task( + name="task2", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task3", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate="2025-02-19 11:09:36.920915+00:00", + ), + ], + usageSummary=UsageDetails( + dailyStats=UsageStats(count=0), date=self.dbtcloud.today + ), + ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + got_usage = next( + self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS) + ).right + self.assertEqual( + next(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)).right, + PipelineUsage( + pipeline=return_value, + usage=UsageRequest(date=self.dbtcloud.today, count=2), + ), + ) + + # But if we have usage for another day, then we do the difference + return_value = Pipeline( + id=uuid.uuid4(), + name="pipeline_name", + fullyQualifiedName="pipeline_service.pipeline_name", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + tasks=[ + Task( + name="task1", + startDate=self.dbtcloud.today, + endDate="2025-02-19 11:09:36.920915+00:00", + ), + Task( + name="task2", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task3", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task4", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task5", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task6", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task7", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate="2025-02-19 11:09:36.920915+00:00", + ), + ], + usageSummary=UsageDetails( + dailyStats=UsageStats(count=5), + date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), + ), + ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + got_usage = next( + self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS) + ).right + self.assertEqual( + next(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)).right, + PipelineUsage( + pipeline=return_value, + usage=UsageRequest(date=self.dbtcloud.today, count=1), + ), + ) + + # If the past usage is higher than what we have today, something weird is going on + # we don't return usage but don't explode + return_value = Pipeline( + id=uuid.uuid4(), + name="pipeline_name", + fullyQualifiedName="pipeline_service.pipeline_name", + service=EntityReference(id=uuid.uuid4(), type="pipelineService"), + tasks=[ + Task( + name="task1", + startDate=self.dbtcloud.today, + endDate="2025-02-19 11:09:36.920915+00:00", + ), + Task( + name="task2", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate=self.dbtcloud.today, + ), + Task( + name="task3", + startDate="2025-02-19 11:08:24.326771+00:00", + endDate="2025-02-19 11:09:36.920915+00:00", + ), + ], + usageSummary=UsageDetails( + dailyStats=UsageStats(count=1000), + date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"), + ), + ) + with patch.object(OpenMetadata, "get_by_name", return_value=return_value): + self.assertEqual( + len(list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))), 1 + ) + + self.assertIsNotNone( + list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))[0].left + ) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index 79191562656..c54b09c0aee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -149,6 +149,12 @@ public class PipelineRepository extends EntityRepository { fields.contains("pipelineStatus") ? getPipelineStatus(pipeline) : pipeline.getPipelineStatus()); + if (pipeline.getUsageSummary() == null) { + pipeline.withUsageSummary( + fields.contains("usageSummary") + ? EntityUtil.getLatestUsage(daoCollection.usageDAO(), pipeline.getId()) + : null); + } } @Override @@ -156,6 +162,7 @@ public class PipelineRepository extends EntityRepository { pipeline.withTasks(fields.contains(TASKS_FIELD) ? pipeline.getTasks() : null); pipeline.withPipelineStatus( fields.contains("pipelineStatus") ? pipeline.getPipelineStatus() : null); + pipeline.withUsageSummary(fields.contains("usageSummary") ? pipeline.getUsageSummary() : null); } @Override @@ -485,6 +492,7 @@ public class PipelineRepository extends EntityRepository { @Override public void entitySpecificUpdate(boolean consolidatingChanges) { updateTasks(original, updated); + recordChange("state", original.getState(), updated.getState()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("concurrency", original.getConcurrency(), updated.getConcurrency()); recordChange( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java index f0c3253c574..57934c66ee0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UsageRepository.java @@ -19,6 +19,7 @@ import static org.openmetadata.service.Entity.CHART; import static org.openmetadata.service.Entity.DASHBOARD; import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY; import static org.openmetadata.service.Entity.MLMODEL; +import static org.openmetadata.service.Entity.PIPELINE; import static org.openmetadata.service.Entity.TABLE; import static org.openmetadata.service.util.EntityUtil.fieldUpdated; @@ -35,6 +36,7 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.Chart; import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.data.MlModel; +import org.openmetadata.schema.entity.data.Pipeline; import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeEvent; @@ -116,6 +118,8 @@ public class UsageRepository { switch (type) { case TABLE: return tableEntityUsage(method, fields, entityId, entityType, usage); + case PIPELINE: + return pipelineEntityUsage(method, fields, entityId, entityType, usage); case DASHBOARD: return dashboardEntityUsage(method, fields, entityId, entityType, usage); case CHART: @@ -168,6 +172,20 @@ public class UsageRepository { return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, ENTITY_FIELDS_CHANGED); } + private RestUtil.PutResponse pipelineEntityUsage( + String method, String fields, UUID entityId, String entityType, DailyCount usage) { + Pipeline pipeline = Entity.getEntity(Entity.PIPELINE, entityId, fields, Include.ALL); + insertToUsageRepository(method, entityId, entityType, usage); + Pipeline updated = Entity.getEntity(Entity.PIPELINE, entityId, fields, Include.ALL); + + ChangeDescription change = + getChangeDescription( + pipeline.getVersion(), updated.getUsageSummary(), pipeline.getUsageSummary()); + ChangeEvent changeEvent = getChangeEvent(updated, change, entityType, pipeline.getVersion()); + + return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, ENTITY_FIELDS_CHANGED); + } + private RestUtil.PutResponse chartEntityUsage( String method, String fields, UUID entityId, String entityType, DailyCount usage) { Chart chart = Entity.getEntity(Entity.CHART, entityId, fields, Include.ALL); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineMapper.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineMapper.java index 78143bafdb0..224ee015735 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineMapper.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineMapper.java @@ -12,6 +12,7 @@ public class PipelineMapper implements EntityMapper { public Pipeline createToEntity(CreatePipeline create, String user) { return copy(new Pipeline(), create, user) .withService(getEntityReference(Entity.PIPELINE_SERVICE, create.getService())) + .withState(create.getState()) .withTasks(create.getTasks()) .withSourceUrl(create.getSourceUrl()) .withConcurrency(create.getConcurrency()) diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json index 7f6a653020e..2dd3bfc9e05 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json @@ -51,6 +51,11 @@ }, "default": null }, + "state": { + "description": "State of the pipeline.", + "$ref": "../../entity/data/pipeline.json#/definitions/pipelineState", + "default": null + }, "tags": { "description": "Tags for this Pipeline.", "type": "array", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json index 16577ceca6f..6d397ee8b6b 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json @@ -28,6 +28,19 @@ } ] }, + "pipelineState": { + "description": "Enum defining the possible Pipeline State.", + "type": "string", + "enum": ["Active", "Inactive"], + "javaEnums": [ + { + "name": "Active" + }, + { + "name": "Inactive" + } + ] + }, "taskStatus": { "type": "object", "javaType": "org.openmetadata.schema.type.Status", @@ -213,6 +226,11 @@ "$ref": "#/definitions/pipelineStatus", "default": null }, + "state": { + "description": "State of the Pipeline.", + "$ref": "#/definitions/pipelineState", + "default": null + }, "followers": { "description": "Followers of this Pipeline.", "$ref": "../../type/entityReferenceList.json" @@ -241,6 +259,11 @@ "description": "Service type where this pipeline is hosted in.", "$ref": "../services/pipelineService.json#/definitions/pipelineServiceType" }, + "usageSummary": { + "description": "Latest usage information for this pipeline.", + "$ref": "../../type/usageDetails.json", + "default": null + }, "changeDescription": { "description": "Change that lead to this version of the entity.", "$ref": "../../type/entityHistory.json#/definitions/changeDescription" diff --git a/openmetadata-ui/src/main/resources/ui/src/enums/entity.enum.ts b/openmetadata-ui/src/main/resources/ui/src/enums/entity.enum.ts index 465ed5deb04..6e59df486b2 100644 --- a/openmetadata-ui/src/main/resources/ui/src/enums/entity.enum.ts +++ b/openmetadata-ui/src/main/resources/ui/src/enums/entity.enum.ts @@ -99,6 +99,7 @@ export enum TabSpecificField { FOLLOWERS = 'followers', JOINS = 'joins', TAGS = 'tags', + STATE = 'state', OWNERS = 'owners', DATAMODEL = 'dataModel', CHARTS = 'charts', diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createPipeline.ts index 9236beff567..47948e7f5c1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createPipeline.ts @@ -75,6 +75,10 @@ export interface CreatePipeline { * Start date of the workflow */ startDate?: Date; + /** + * State of the pipeline. + */ + state?: PipelineState; /** * Tags for this Pipeline. */ @@ -187,6 +191,16 @@ export interface EntityReference { type: string; } +/** + * State of the pipeline. + * + * Enum defining the possible Pipeline State. + */ +export enum PipelineState { + Active = "Active", + Inactive = "Inactive", +} + /** * This schema defines the type for labeling an entity with a Tag. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/pipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/pipeline.ts index 1abba11642c..6f2c7439594 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/pipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/pipeline.ts @@ -115,6 +115,10 @@ export interface Pipeline { * Start date of the workflow. */ startDate?: Date; + /** + * State of the Pipeline. + */ + state?: PipelineState; /** * Tags for this Pipeline. */ @@ -132,6 +136,10 @@ export interface Pipeline { * User who made the update. */ updatedBy?: string; + /** + * Latest usage information for this pipeline. + */ + usageSummary?: UsageDetails; /** * Metadata version of the entity. */ @@ -476,6 +484,16 @@ export enum PipelineServiceType { Stitch = "Stitch", } +/** + * State of the Pipeline. + * + * Enum defining the possible Pipeline State. + */ +export enum PipelineState { + Active = "Active", + Inactive = "Inactive", +} + export interface Task { /** * Description of this Task. @@ -529,6 +547,51 @@ export interface Task { taskType?: string; } +/** + * Latest usage information for this pipeline. + * + * This schema defines the type for usage details. Daily, weekly, and monthly aggregation of + * usage is computed along with the percentile rank based on the usage for a given day. + */ +export interface UsageDetails { + /** + * Daily usage stats of a data asset on the start date. + */ + dailyStats: UsageStats; + /** + * Date in UTC. + */ + date: Date; + /** + * Monthly (last 30 days) rolling usage stats of a data asset on the start date. + */ + monthlyStats?: UsageStats; + /** + * Weekly (last 7 days) rolling usage stats of a data asset on the start date. + */ + weeklyStats?: UsageStats; +} + +/** + * Daily usage stats of a data asset on the start date. + * + * Type used to return usage statistics. + * + * Monthly (last 30 days) rolling usage stats of a data asset on the start date. + * + * Weekly (last 7 days) rolling usage stats of a data asset on the start date. + */ +export interface UsageStats { + /** + * Usage count of a data asset on the start date. + */ + count: number; + /** + * Optional daily percentile rank data asset use when relevant. + */ + percentileRank?: number; +} + /** * Votes on the entity. * diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/ServiceDetailsPage/ServiceDetailsPage.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/ServiceDetailsPage/ServiceDetailsPage.tsx index 87f938d86a4..deb6f7ae248 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/ServiceDetailsPage/ServiceDetailsPage.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/ServiceDetailsPage/ServiceDetailsPage.tsx @@ -437,7 +437,7 @@ const ServiceDetailsPage: FunctionComponent = () => { async (paging?: PagingWithoutTotal) => { const { data, paging: resPaging } = await getPipelines( decodedServiceFQN, - `${TabSpecificField.OWNERS},${TabSpecificField.TAGS}`, + `${TabSpecificField.OWNERS},${TabSpecificField.TAGS},${TabSpecificField.STATE},${TabSpecificField.USAGE_SUMMARY}`, paging, include );