Day1 - Pipeline Usage (#19960)

This commit is contained in:
Suman Maharana 2025-03-06 18:19:42 +05:30 committed by GitHub
parent 0cd35015d7
commit 137f86c00d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 535 additions and 4 deletions

View File

@ -21,6 +21,7 @@ from metadata.generated.schema.entity.data.pipeline import (
PipelineStatus, PipelineStatus,
Task, Task,
) )
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger from metadata.utils.logger import ometa_logger
@ -83,6 +84,7 @@ class OMetaPipelineMixin:
sourceUrl=pipeline.sourceUrl, sourceUrl=pipeline.sourceUrl,
concurrency=pipeline.concurrency, concurrency=pipeline.concurrency,
pipelineLocation=pipeline.pipelineLocation, pipelineLocation=pipeline.pipelineLocation,
state=pipeline.state,
startDate=pipeline.startDate, startDate=pipeline.startDate,
service=pipeline.service.fullyQualifiedName, service=pipeline.service.fullyQualifiedName,
tasks=all_tasks, tasks=all_tasks,
@ -110,6 +112,7 @@ class OMetaPipelineMixin:
sourceUrl=pipeline.sourceUrl, sourceUrl=pipeline.sourceUrl,
concurrency=pipeline.concurrency, concurrency=pipeline.concurrency,
pipelineLocation=pipeline.pipelineLocation, pipelineLocation=pipeline.pipelineLocation,
state=pipeline.state,
startDate=pipeline.startDate, startDate=pipeline.startDate,
service=pipeline.service.fullyQualifiedName, service=pipeline.service.fullyQualifiedName,
tasks=[task for task in pipeline.tasks if task.name in task_ids], tasks=[task for task in pipeline.tasks if task.name in task_ids],
@ -118,3 +121,18 @@ class OMetaPipelineMixin:
) )
return self.create_or_update(updated_pipeline) return self.create_or_update(updated_pipeline)
def publish_pipeline_usage(
self, pipeline: Pipeline, pipeline_usage_request: UsageRequest
) -> None:
"""
POST usage details for a Pipeline
:param pipeline: Pipeline Entity to update
:param pipeline_usage_request: Usage data to add
"""
resp = self.client.put(
f"/usage/pipeline/{pipeline.id.root}",
data=pipeline_usage_request.model_dump_json(),
)
logger.debug("Published pipeline usage %s", resp)

View File

@ -34,7 +34,7 @@ from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteR
from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.pipeline import PipelineStatus from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
from metadata.generated.schema.entity.data.searchIndex import ( from metadata.generated.schema.entity.data.searchIndex import (
SearchIndex, SearchIndex,
SearchIndexSampleData, SearchIndexSampleData,
@ -83,6 +83,7 @@ from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.database.database_service import DataModelLink from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.pipeline.pipeline_service import PipelineUsage
from metadata.profiler.api.models import ProfilerResponse from metadata.profiler.api.models import ProfilerResponse
from metadata.sampler.models import SamplerResponse from metadata.sampler.models import SamplerResponse
from metadata.utils.execution_time_tracker import calculate_execution_time from metadata.utils.execution_time_tracker import calculate_execution_time
@ -625,6 +626,18 @@ class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
return Either(right=record) return Either(right=record)
@_run_dispatch.register
def write_pipeline_usage(self, pipeline_usage: PipelineUsage) -> Either[Pipeline]:
"""
Send a UsageRequest update to a pipeline entity
:param pipeline_usage: pipeline entity and usage request
"""
self.metadata.publish_pipeline_usage(
pipeline=pipeline_usage.pipeline,
pipeline_usage_request=pipeline_usage.usage,
)
return Either(right=pipeline_usage.pipeline)
def close(self): def close(self):
""" """
We don't have anything to close since we are using the given metadata client We don't have anything to close since we are using the given metadata client

View File

@ -29,6 +29,7 @@ from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequ
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import ( from metadata.generated.schema.entity.data.pipeline import (
Pipeline, Pipeline,
PipelineState,
PipelineStatus, PipelineStatus,
StatusType, StatusType,
Task, Task,
@ -122,6 +123,7 @@ class AirflowSource(PipelineServiceSource):
metadata: OpenMetadata, metadata: OpenMetadata,
): ):
super().__init__(config, metadata) super().__init__(config, metadata)
self.today = datetime.now().strftime("%Y-%m-%d")
self._session = None self._session = None
@classmethod @classmethod
@ -367,6 +369,16 @@ class AirflowSource(PipelineServiceSource):
break break
for serialized_dag in results: for serialized_dag in results:
try: try:
dag_model = (
self.session.query(DagModel)
.filter(DagModel.dag_id == serialized_dag[0])
.one_or_none()
)
pipeline_state = (
PipelineState.Active.value
if dag_model and not dag_model.is_paused
else PipelineState.Inactive.value
)
data = serialized_dag[1]["dag"] data = serialized_dag[1]["dag"]
dag = AirflowDagDetails( dag = AirflowDagDetails(
dag_id=serialized_dag[0], dag_id=serialized_dag[0],
@ -375,6 +387,7 @@ class AirflowSource(PipelineServiceSource):
max_active_runs=data.get("max_active_runs", None), max_active_runs=data.get("max_active_runs", None),
description=data.get("_description", None), description=data.get("_description", None),
start_date=data.get("start_date", None), start_date=data.get("start_date", None),
state=pipeline_state,
tasks=list( tasks=list(
map(self._extract_serialized_task, data.get("tasks", [])) map(self._extract_serialized_task, data.get("tasks", []))
), ),
@ -425,6 +438,14 @@ class AirflowSource(PipelineServiceSource):
""" """
return pipeline_details.dag_id return pipeline_details.dag_id
def get_pipeline_state(
self, pipeline_details: AirflowDagDetails
) -> Optional[PipelineState]:
"""
Return the state of the DAG
"""
return PipelineState[pipeline_details.state]
def get_tasks_from_dag(self, dag: AirflowDagDetails, host_port: str) -> List[Task]: def get_tasks_from_dag(self, dag: AirflowDagDetails, host_port: str) -> List[Task]:
""" """
Obtain the tasks from a SerializedDAG Obtain the tasks from a SerializedDAG
@ -480,12 +501,15 @@ class AirflowSource(PipelineServiceSource):
try: try:
# Airflow uses /dags/dag_id/grid to show pipeline / dag # Airflow uses /dags/dag_id/grid to show pipeline / dag
source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid" source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid"
pipeline_state = self.get_pipeline_state(pipeline_details)
pipeline_request = CreatePipelineRequest( pipeline_request = CreatePipelineRequest(
name=EntityName(pipeline_details.dag_id), name=EntityName(pipeline_details.dag_id),
description=Markdown(pipeline_details.description) description=Markdown(pipeline_details.description)
if pipeline_details.description if pipeline_details.description
else None, else None,
sourceUrl=SourceUrl(source_url), sourceUrl=SourceUrl(source_url),
state=pipeline_state,
concurrency=pipeline_details.max_active_runs, concurrency=pipeline_details.max_active_runs,
pipelineLocation=pipeline_details.fileloc, pipelineLocation=pipeline_details.fileloc,
startDate=pipeline_details.start_date.isoformat() startDate=pipeline_details.start_date.isoformat()
@ -614,4 +638,5 @@ class AirflowSource(PipelineServiceSource):
) )
def close(self): def close(self):
self.metadata.compute_percentile(Pipeline, self.today)
self.session.close() self.session.close()

View File

@ -68,4 +68,5 @@ class AirflowDagDetails(AirflowBaseModel):
start_date: Optional[datetime] = None start_date: Optional[datetime] = None
tasks: List[AirflowTask] tasks: List[AirflowTask]
owner: Optional[str] = None owner: Optional[str] = None
state: Optional[str] = None
schedule_interval: Optional[str] = None schedule_interval: Optional[str] = None

View File

@ -93,6 +93,7 @@ class OpenlineageSource(PipelineServiceSource):
"""Nothing to prepare""" """Nothing to prepare"""
def close(self) -> None: def close(self) -> None:
self.metadata.compute_percentile(Pipeline, self.today)
self.metadata.close() self.metadata.close()
@classmethod @classmethod

View File

@ -11,16 +11,21 @@
""" """
Base class for ingesting database services Base class for ingesting database services
""" """
import traceback
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterable, List, Optional, Set from typing import Any, Iterable, List, Optional, Set
from pydantic import Field from pydantic import BaseModel, Field
from typing_extensions import Annotated from typing_extensions import Annotated
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineState
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.entity.services.pipelineService import ( from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection, PipelineConnection,
PipelineService, PipelineService,
@ -31,6 +36,7 @@ from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.delete import delete_entity_from_source from metadata.ingestion.api.delete import delete_entity_from_source
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Source from metadata.ingestion.api.steps import Source
@ -60,6 +66,15 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
class PipelineUsage(BaseModel):
"""
Wrapper to handle type at the sink
"""
pipeline: Pipeline
usage: UsageRequest
class PipelineServiceTopology(ServiceTopology): class PipelineServiceTopology(ServiceTopology):
""" """
Defines the hierarchy in Pipeline Services. Defines the hierarchy in Pipeline Services.
@ -115,6 +130,12 @@ class PipelineServiceTopology(ServiceTopology):
consumer=["pipeline_service"], consumer=["pipeline_service"],
nullable=True, nullable=True,
), ),
NodeStage(
type_=UsageRequest,
processor="yield_pipeline_usage",
consumer=["pipeline_service"],
nullable=True,
),
], ],
) )
@ -143,6 +164,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
super().__init__() super().__init__()
self.config = config self.config = config
self.metadata = metadata self.metadata = metadata
self.today = datetime.now().strftime("%Y-%m-%d")
self.service_connection = self.config.serviceConnection.root.config self.service_connection = self.config.serviceConnection.root.config
self.source_config: PipelineServiceMetadataPipeline = ( self.source_config: PipelineServiceMetadataPipeline = (
self.config.sourceConfig.config self.config.sourceConfig.config
@ -184,6 +206,100 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
) -> Iterable[Either[OMetaPipelineStatus]]: ) -> Iterable[Either[OMetaPipelineStatus]]:
"""Get Pipeline Status""" """Get Pipeline Status"""
def get_pipeline_state(self, pipeline_details: Any) -> Optional[PipelineState]:
"""Get Pipeline State"""
def yield_pipeline_usage(
self, pipeline_details: Any
) -> Iterable[Either[PipelineUsage]]:
"""
Yield the usage of the pipeline
we will check the usage of the pipeline
by checking the tasks that have run today or are running today or ends today
we get the count of these tasks and compare it with the usageSummary
if the usageSummary is not present or the date is not today
we yield the fresh usage
"""
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline,
fqn=pipeline_fqn,
fields=["tasks", "usageSummary"],
)
if pipeline.tasks:
current_task_usage = sum(
1
for task in pipeline.tasks
if task.startDate
and task.startDate.startswith(self.today)
or task.endDate
and task.endDate.startswith(self.today)
)
if not current_task_usage:
logger.debug(
f"No usage to report for {pipeline.fullyQualifiedName.root}"
)
if not pipeline.usageSummary:
logger.info(
f"Yielding fresh usage for {pipeline.fullyQualifiedName.root}"
)
yield Either(
right=PipelineUsage(
pipeline=pipeline,
usage=UsageRequest(
date=self.today, count=current_task_usage
),
)
)
elif (
str(pipeline.usageSummary.date.root) != self.today
or not pipeline.usageSummary.dailyStats.count
):
latest_usage = pipeline.usageSummary.dailyStats.count
new_usage = current_task_usage - latest_usage
if new_usage < 0:
raise ValueError(
f"Wrong computation of usage difference. Got new_usage={new_usage}."
)
logger.info(
f"Yielding new usage for {pipeline.fullyQualifiedName.root}"
)
yield Either(
right=PipelineUsage(
pipeline=pipeline,
usage=UsageRequest(date=self.today, count=new_usage),
)
)
else:
logger.debug(
f"Latest usage {pipeline.usageSummary} vs. today {self.today}. Nothing to compute."
)
logger.info(
f"Usage already informed for {pipeline.fullyQualifiedName.root}"
)
except Exception as exc:
yield Either(
left=StackTraceError(
name="Pipeline Usage",
error=f"Exception computing pipeline usage for {pipeline_details}: {exc}",
stackTrace=traceback.format_exc(),
)
)
def yield_pipeline_lineage( def yield_pipeline_lineage(
self, pipeline_details: Any self, pipeline_details: Any
) -> Iterable[Either[OMetaLineageRequest]]: ) -> Iterable[Either[OMetaLineageRequest]]:
@ -231,6 +347,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
def close(self): def close(self):
"""Method to implement any required logic after the ingestion process is completed""" """Method to implement any required logic after the ingestion process is completed"""
self.metadata.compute_percentile(Pipeline, self.today)
def get_services(self) -> Iterable[WorkflowSource]: def get_services(self) -> Iterable[WorkflowSource]:
yield self.config yield self.config

View File

@ -12,6 +12,8 @@
Test dbt cloud using the topology Test dbt cloud using the topology
""" """
import json import json
import uuid
from datetime import datetime, timedelta
from unittest import TestCase from unittest import TestCase
from unittest.mock import patch from unittest.mock import patch
@ -32,12 +34,16 @@ from metadata.generated.schema.type.basic import (
SourceUrl, SourceUrl,
) )
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.dbtcloud.metadata import DbtcloudSource from metadata.ingestion.source.pipeline.dbtcloud.metadata import DbtcloudSource
from metadata.ingestion.source.pipeline.dbtcloud.models import ( from metadata.ingestion.source.pipeline.dbtcloud.models import (
DBTJob, DBTJob,
DBTJobList, DBTJobList,
DBTSchedule, DBTSchedule,
) )
from metadata.ingestion.source.pipeline.pipeline_service import PipelineUsage
MOCK_JOB_RESULT = json.loads( MOCK_JOB_RESULT = json.loads(
""" """
@ -539,6 +545,9 @@ class DBTCloudUnitTest(TestCase):
self.dbtcloud.context.get().__dict__[ self.dbtcloud.context.get().__dict__[
"pipeline_service" "pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root ] = MOCK_PIPELINE_SERVICE.name.root
self.dbtcloud.metadata = OpenMetadata(
config.workflowConfig.openMetadataServerConfig
)
@patch("metadata.ingestion.source.pipeline.dbtcloud.client.DBTCloudClient.get_jobs") @patch("metadata.ingestion.source.pipeline.dbtcloud.client.DBTCloudClient.get_jobs")
def test_get_pipelines_list(self, get_jobs): def test_get_pipelines_list(self, get_jobs):
@ -559,3 +568,217 @@ class DBTCloudUnitTest(TestCase):
def test_pipelines(self): def test_pipelines(self):
pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right pipeline = list(self.dbtcloud.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right
assert pipeline == EXPECTED_CREATED_PIPELINES assert pipeline == EXPECTED_CREATED_PIPELINES
def test_yield_pipeline_usage(self):
"""
Validate the logic for existing or new usage
"""
self.dbtcloud.context.get().__dict__["pipeline"] = "pipeline_name"
# Start checking pipeline without usage
# and a view count
return_value = Pipeline(
id=uuid.uuid4(),
name="pipeline_name",
fullyQualifiedName="pipeline_service.pipeline_name",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
tasks=[
Task(
name="task1",
startDate=self.dbtcloud.today,
endDate="2025-02-19 11:09:36.920915+00:00",
),
Task(
name="task2",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task3",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate="2025-02-19 11:09:36.920915+00:00",
),
],
)
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
got_usage = next(
self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)
).right
self.assertEqual(
got_usage,
PipelineUsage(
pipeline=return_value,
usage=UsageRequest(date=self.dbtcloud.today, count=2),
),
)
# Now check what happens if we already have some summary data for today
return_value = Pipeline(
id=uuid.uuid4(),
name="pipeline_name",
fullyQualifiedName="pipeline_service.pipeline_name",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
tasks=[
Task(
name="task1",
startDate=self.dbtcloud.today,
endDate="2025-02-19 11:09:36.920915+00:00",
),
Task(
name="task2",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task3",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate="2025-02-19 11:09:36.920915+00:00",
),
],
usageSummary=UsageDetails(
dailyStats=UsageStats(count=10), date=self.dbtcloud.today
),
)
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
# Nothing is returned
self.assertEqual(
len(list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))), 0
)
# But if we have usage for today but the count is 0, we'll return the details
return_value = Pipeline(
id=uuid.uuid4(),
name="pipeline_name",
fullyQualifiedName="pipeline_service.pipeline_name",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
tasks=[
Task(
name="task1",
startDate=self.dbtcloud.today,
endDate="2025-02-19 11:09:36.920915+00:00",
),
Task(
name="task2",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task3",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate="2025-02-19 11:09:36.920915+00:00",
),
],
usageSummary=UsageDetails(
dailyStats=UsageStats(count=0), date=self.dbtcloud.today
),
)
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
got_usage = next(
self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)
).right
self.assertEqual(
next(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)).right,
PipelineUsage(
pipeline=return_value,
usage=UsageRequest(date=self.dbtcloud.today, count=2),
),
)
# But if we have usage for another day, then we do the difference
return_value = Pipeline(
id=uuid.uuid4(),
name="pipeline_name",
fullyQualifiedName="pipeline_service.pipeline_name",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
tasks=[
Task(
name="task1",
startDate=self.dbtcloud.today,
endDate="2025-02-19 11:09:36.920915+00:00",
),
Task(
name="task2",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task3",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task4",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task5",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task6",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task7",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate="2025-02-19 11:09:36.920915+00:00",
),
],
usageSummary=UsageDetails(
dailyStats=UsageStats(count=5),
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
),
)
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
got_usage = next(
self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)
).right
self.assertEqual(
next(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS)).right,
PipelineUsage(
pipeline=return_value,
usage=UsageRequest(date=self.dbtcloud.today, count=1),
),
)
# If the past usage is higher than what we have today, something weird is going on
# we don't return usage but don't explode
return_value = Pipeline(
id=uuid.uuid4(),
name="pipeline_name",
fullyQualifiedName="pipeline_service.pipeline_name",
service=EntityReference(id=uuid.uuid4(), type="pipelineService"),
tasks=[
Task(
name="task1",
startDate=self.dbtcloud.today,
endDate="2025-02-19 11:09:36.920915+00:00",
),
Task(
name="task2",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate=self.dbtcloud.today,
),
Task(
name="task3",
startDate="2025-02-19 11:08:24.326771+00:00",
endDate="2025-02-19 11:09:36.920915+00:00",
),
],
usageSummary=UsageDetails(
dailyStats=UsageStats(count=1000),
date=datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d"),
),
)
with patch.object(OpenMetadata, "get_by_name", return_value=return_value):
self.assertEqual(
len(list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))), 1
)
self.assertIsNotNone(
list(self.dbtcloud.yield_pipeline_usage(EXPECTED_JOB_DETAILS))[0].left
)

View File

@ -149,6 +149,12 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
fields.contains("pipelineStatus") fields.contains("pipelineStatus")
? getPipelineStatus(pipeline) ? getPipelineStatus(pipeline)
: pipeline.getPipelineStatus()); : pipeline.getPipelineStatus());
if (pipeline.getUsageSummary() == null) {
pipeline.withUsageSummary(
fields.contains("usageSummary")
? EntityUtil.getLatestUsage(daoCollection.usageDAO(), pipeline.getId())
: null);
}
} }
@Override @Override
@ -156,6 +162,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
pipeline.withTasks(fields.contains(TASKS_FIELD) ? pipeline.getTasks() : null); pipeline.withTasks(fields.contains(TASKS_FIELD) ? pipeline.getTasks() : null);
pipeline.withPipelineStatus( pipeline.withPipelineStatus(
fields.contains("pipelineStatus") ? pipeline.getPipelineStatus() : null); fields.contains("pipelineStatus") ? pipeline.getPipelineStatus() : null);
pipeline.withUsageSummary(fields.contains("usageSummary") ? pipeline.getUsageSummary() : null);
} }
@Override @Override
@ -485,6 +492,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override @Override
public void entitySpecificUpdate(boolean consolidatingChanges) { public void entitySpecificUpdate(boolean consolidatingChanges) {
updateTasks(original, updated); updateTasks(original, updated);
recordChange("state", original.getState(), updated.getState());
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("concurrency", original.getConcurrency(), updated.getConcurrency()); recordChange("concurrency", original.getConcurrency(), updated.getConcurrency());
recordChange( recordChange(

View File

@ -19,6 +19,7 @@ import static org.openmetadata.service.Entity.CHART;
import static org.openmetadata.service.Entity.DASHBOARD; import static org.openmetadata.service.Entity.DASHBOARD;
import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY; import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY;
import static org.openmetadata.service.Entity.MLMODEL; import static org.openmetadata.service.Entity.MLMODEL;
import static org.openmetadata.service.Entity.PIPELINE;
import static org.openmetadata.service.Entity.TABLE; import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.util.EntityUtil.fieldUpdated; import static org.openmetadata.service.util.EntityUtil.fieldUpdated;
@ -35,6 +36,7 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.Chart; import org.openmetadata.schema.entity.data.Chart;
import org.openmetadata.schema.entity.data.Dashboard; import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.data.MlModel; import org.openmetadata.schema.entity.data.MlModel;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.type.ChangeDescription; import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
@ -116,6 +118,8 @@ public class UsageRepository {
switch (type) { switch (type) {
case TABLE: case TABLE:
return tableEntityUsage(method, fields, entityId, entityType, usage); return tableEntityUsage(method, fields, entityId, entityType, usage);
case PIPELINE:
return pipelineEntityUsage(method, fields, entityId, entityType, usage);
case DASHBOARD: case DASHBOARD:
return dashboardEntityUsage(method, fields, entityId, entityType, usage); return dashboardEntityUsage(method, fields, entityId, entityType, usage);
case CHART: case CHART:
@ -168,6 +172,20 @@ public class UsageRepository {
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, ENTITY_FIELDS_CHANGED); return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, ENTITY_FIELDS_CHANGED);
} }
private RestUtil.PutResponse<?> pipelineEntityUsage(
String method, String fields, UUID entityId, String entityType, DailyCount usage) {
Pipeline pipeline = Entity.getEntity(Entity.PIPELINE, entityId, fields, Include.ALL);
insertToUsageRepository(method, entityId, entityType, usage);
Pipeline updated = Entity.getEntity(Entity.PIPELINE, entityId, fields, Include.ALL);
ChangeDescription change =
getChangeDescription(
pipeline.getVersion(), updated.getUsageSummary(), pipeline.getUsageSummary());
ChangeEvent changeEvent = getChangeEvent(updated, change, entityType, pipeline.getVersion());
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, ENTITY_FIELDS_CHANGED);
}
private RestUtil.PutResponse<?> chartEntityUsage( private RestUtil.PutResponse<?> chartEntityUsage(
String method, String fields, UUID entityId, String entityType, DailyCount usage) { String method, String fields, UUID entityId, String entityType, DailyCount usage) {
Chart chart = Entity.getEntity(Entity.CHART, entityId, fields, Include.ALL); Chart chart = Entity.getEntity(Entity.CHART, entityId, fields, Include.ALL);

View File

@ -12,6 +12,7 @@ public class PipelineMapper implements EntityMapper<Pipeline, CreatePipeline> {
public Pipeline createToEntity(CreatePipeline create, String user) { public Pipeline createToEntity(CreatePipeline create, String user) {
return copy(new Pipeline(), create, user) return copy(new Pipeline(), create, user)
.withService(getEntityReference(Entity.PIPELINE_SERVICE, create.getService())) .withService(getEntityReference(Entity.PIPELINE_SERVICE, create.getService()))
.withState(create.getState())
.withTasks(create.getTasks()) .withTasks(create.getTasks())
.withSourceUrl(create.getSourceUrl()) .withSourceUrl(create.getSourceUrl())
.withConcurrency(create.getConcurrency()) .withConcurrency(create.getConcurrency())

View File

@ -51,6 +51,11 @@
}, },
"default": null "default": null
}, },
"state": {
"description": "State of the pipeline.",
"$ref": "../../entity/data/pipeline.json#/definitions/pipelineState",
"default": null
},
"tags": { "tags": {
"description": "Tags for this Pipeline.", "description": "Tags for this Pipeline.",
"type": "array", "type": "array",

View File

@ -28,6 +28,19 @@
} }
] ]
}, },
"pipelineState": {
"description": "Enum defining the possible Pipeline State.",
"type": "string",
"enum": ["Active", "Inactive"],
"javaEnums": [
{
"name": "Active"
},
{
"name": "Inactive"
}
]
},
"taskStatus": { "taskStatus": {
"type": "object", "type": "object",
"javaType": "org.openmetadata.schema.type.Status", "javaType": "org.openmetadata.schema.type.Status",
@ -213,6 +226,11 @@
"$ref": "#/definitions/pipelineStatus", "$ref": "#/definitions/pipelineStatus",
"default": null "default": null
}, },
"state": {
"description": "State of the Pipeline.",
"$ref": "#/definitions/pipelineState",
"default": null
},
"followers": { "followers": {
"description": "Followers of this Pipeline.", "description": "Followers of this Pipeline.",
"$ref": "../../type/entityReferenceList.json" "$ref": "../../type/entityReferenceList.json"
@ -241,6 +259,11 @@
"description": "Service type where this pipeline is hosted in.", "description": "Service type where this pipeline is hosted in.",
"$ref": "../services/pipelineService.json#/definitions/pipelineServiceType" "$ref": "../services/pipelineService.json#/definitions/pipelineServiceType"
}, },
"usageSummary": {
"description": "Latest usage information for this pipeline.",
"$ref": "../../type/usageDetails.json",
"default": null
},
"changeDescription": { "changeDescription": {
"description": "Change that lead to this version of the entity.", "description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription" "$ref": "../../type/entityHistory.json#/definitions/changeDescription"

View File

@ -99,6 +99,7 @@ export enum TabSpecificField {
FOLLOWERS = 'followers', FOLLOWERS = 'followers',
JOINS = 'joins', JOINS = 'joins',
TAGS = 'tags', TAGS = 'tags',
STATE = 'state',
OWNERS = 'owners', OWNERS = 'owners',
DATAMODEL = 'dataModel', DATAMODEL = 'dataModel',
CHARTS = 'charts', CHARTS = 'charts',

View File

@ -75,6 +75,10 @@ export interface CreatePipeline {
* Start date of the workflow * Start date of the workflow
*/ */
startDate?: Date; startDate?: Date;
/**
* State of the pipeline.
*/
state?: PipelineState;
/** /**
* Tags for this Pipeline. * Tags for this Pipeline.
*/ */
@ -187,6 +191,16 @@ export interface EntityReference {
type: string; type: string;
} }
/**
* State of the pipeline.
*
* Enum defining the possible Pipeline State.
*/
export enum PipelineState {
Active = "Active",
Inactive = "Inactive",
}
/** /**
* This schema defines the type for labeling an entity with a Tag. * This schema defines the type for labeling an entity with a Tag.
*/ */

View File

@ -115,6 +115,10 @@ export interface Pipeline {
* Start date of the workflow. * Start date of the workflow.
*/ */
startDate?: Date; startDate?: Date;
/**
* State of the Pipeline.
*/
state?: PipelineState;
/** /**
* Tags for this Pipeline. * Tags for this Pipeline.
*/ */
@ -132,6 +136,10 @@ export interface Pipeline {
* User who made the update. * User who made the update.
*/ */
updatedBy?: string; updatedBy?: string;
/**
* Latest usage information for this pipeline.
*/
usageSummary?: UsageDetails;
/** /**
* Metadata version of the entity. * Metadata version of the entity.
*/ */
@ -476,6 +484,16 @@ export enum PipelineServiceType {
Stitch = "Stitch", Stitch = "Stitch",
} }
/**
* State of the Pipeline.
*
* Enum defining the possible Pipeline State.
*/
export enum PipelineState {
Active = "Active",
Inactive = "Inactive",
}
export interface Task { export interface Task {
/** /**
* Description of this Task. * Description of this Task.
@ -529,6 +547,51 @@ export interface Task {
taskType?: string; taskType?: string;
} }
/**
* Latest usage information for this pipeline.
*
* This schema defines the type for usage details. Daily, weekly, and monthly aggregation of
* usage is computed along with the percentile rank based on the usage for a given day.
*/
export interface UsageDetails {
/**
* Daily usage stats of a data asset on the start date.
*/
dailyStats: UsageStats;
/**
* Date in UTC.
*/
date: Date;
/**
* Monthly (last 30 days) rolling usage stats of a data asset on the start date.
*/
monthlyStats?: UsageStats;
/**
* Weekly (last 7 days) rolling usage stats of a data asset on the start date.
*/
weeklyStats?: UsageStats;
}
/**
* Daily usage stats of a data asset on the start date.
*
* Type used to return usage statistics.
*
* Monthly (last 30 days) rolling usage stats of a data asset on the start date.
*
* Weekly (last 7 days) rolling usage stats of a data asset on the start date.
*/
export interface UsageStats {
/**
* Usage count of a data asset on the start date.
*/
count: number;
/**
* Optional daily percentile rank data asset use when relevant.
*/
percentileRank?: number;
}
/** /**
* Votes on the entity. * Votes on the entity.
* *

View File

@ -437,7 +437,7 @@ const ServiceDetailsPage: FunctionComponent = () => {
async (paging?: PagingWithoutTotal) => { async (paging?: PagingWithoutTotal) => {
const { data, paging: resPaging } = await getPipelines( const { data, paging: resPaging } = await getPipelines(
decodedServiceFQN, decodedServiceFQN,
`${TabSpecificField.OWNERS},${TabSpecificField.TAGS}`, `${TabSpecificField.OWNERS},${TabSpecificField.TAGS},${TabSpecificField.STATE},${TabSpecificField.USAGE_SUMMARY}`,
paging, paging,
include include
); );