diff --git a/ingestion/src/metadata/clients/connection_clients.py b/ingestion/src/metadata/clients/connection_clients.py index 85a464e9250..38e1b4d49a6 100644 --- a/ingestion/src/metadata/clients/connection_clients.py +++ b/ingestion/src/metadata/clients/connection_clients.py @@ -129,7 +129,7 @@ class DagsterClient: @dataclass -class DomoDashboardClient: +class DomoClient: def __init__(self, client) -> None: self.client = client diff --git a/ingestion/src/metadata/clients/domodashboard_client.py b/ingestion/src/metadata/clients/domo_client.py similarity index 59% rename from ingestion/src/metadata/clients/domodashboard_client.py rename to ingestion/src/metadata/clients/domo_client.py index 16bc0838b1c..f843ca258ac 100644 --- a/ingestion/src/metadata/clients/domodashboard_client.py +++ b/ingestion/src/metadata/clients/domo_client.py @@ -8,38 +8,42 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ -DomoDashboard Client source to extract DOMO metadata +DomoClient source to extract data from DOMO """ -from typing import List +from typing import List, Union from metadata.generated.schema.entity.services.connections.dashboard.domodashboardConnection import ( DomoDashboardConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import ( + DomoPipelineConnection, +) from metadata.ingestion.ometa.client import REST, ClientConfig CARDS_URL = ( - "cards?includeV4PageLayouts=true&parts=metadata,datasources,library,drillPathURNs" - ",owners,certification,dateInfo,subscriptions,slicers" + "cards?includeV4PageLayouts=true&parts=metadata" + ",datasources,library,drillPathURNs,owners,certification,dateInfo,subscriptions,slicers" ) - HEADERS = {"Content-Type": "application/json"} +WORKFLOW_URL = "dataprocessing/v1/dataflows" -class DomoDashboardClient: +class DomoClient: """ Implements the necessary methods to extract - Dashboard metadata from Domo's metadata db + DOMO metadata from Domo's metadata db """ - def __init__(self, config: DomoDashboardConnection): + def __init__(self, config: Union[DomoDashboardConnection, DomoPipelineConnection]): self.config = config HEADERS.update({"X-DOMO-Developer-Token": self.config.accessToken}) client_config: ClientConfig = ClientConfig( base_url=self.config.sandboxDomain, - api_version="api/content/v3/", + api_version="api/", auth_header="Authorization", auth_token=lambda: ("no_token", 0), ) @@ -47,7 +51,7 @@ class DomoDashboardClient: def get_chart_details(self, page_id) -> List[dict]: url = ( - f"stacks/{page_id}/" + f"content/v3/stacks/{page_id}/" f"{CARDS_URL}" f"&stackLoadContext=Page&stackLoadContextId={page_id}&stackLoadTrigger=page-view" ) @@ -55,3 +59,16 @@ class DomoDashboardClient: method="GET", path=url, headers=HEADERS ) return response + + def get_pipelines(self): + response = self.client._request( # pylint: disable=protected-access + method="GET", path=WORKFLOW_URL, headers=HEADERS + ) + return response + + def get_runs(self, workflow_id): + url = f"dataprocessing/v1/dataflows/{workflow_id}/executions?limit=100&offset=0" + response = self.client._request( # pylint: disable=protected-access + method="GET", path=url, headers=HEADERS + ) + return response diff --git a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py index 75a594c46ce..eaffbb9ce95 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/domodashboard.py @@ -15,7 +15,7 @@ DomoDashboard source to extract metadata import traceback from typing import Any, Iterable, List, Optional -from metadata.clients.domodashboard_client import DomoDashboardClient +from metadata.clients.domo_client import DomoClient from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -51,7 +51,7 @@ class DomodashboardSource(DashboardServiceSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.domo_client = self.connection.client - self.client = DomoDashboardClient(self.service_connection) + self.client = DomoClient(self.service_connection) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -102,7 +102,7 @@ class DomodashboardSource(DashboardServiceSource): try: chart_url = ( f"{self.service_connection.sandboxDomain}/page/" - f"{charts['id']}/{chart['type']}/details/{chart['id']}" + f"{charts['id']}/kpis/details/{chart['id']}" ) if filter_by_chart( diff --git a/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py new file mode 100644 index 00000000000..8342d802b16 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/domopipeline.py @@ -0,0 +1,122 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Domo Pipeline source to extract metadata +""" + +from typing import Dict, Iterable, List, Optional + +from metadata.clients.domo_client import DomoClient +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import ( + DomoPipelineConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.dagster import STATUS_MAP +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource + + +class DomopipelineSource(PipelineServiceSource): + """ + Implements the necessary methods to extract + Pipeline metadata from Domo's metadata db + """ + + config: WorkflowSource + + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.domo_client = self.connection.client + self.client = DomoClient(self.service_connection) + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config = WorkflowSource.parse_obj(config_dict) + connection: DomoPipelineConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DomoPipelineConnection): + raise InvalidSourceException( + f"Expected DomoPipelineConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def get_pipeline_name(self, pipeline_details) -> str: + return pipeline_details["name"] + + def get_pipelines_list(self) -> Dict: + results = self.client.get_pipelines() + for result in results: + yield result + + def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]: + task_list: List[Task] = [] + task = Task( + name=pipeline_details["name"], + displayName=pipeline_details["name"], + description=pipeline_details.get("description", ""), + ) + task_list.append(task) + + pipeline_yield = CreatePipelineRequest( + name=pipeline_details["name"], + description=pipeline_details.get("description", ""), + tasks=task_list, + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + startDate=pipeline_details["created"], + ) + yield pipeline_yield + + def yield_pipeline_lineage_details( + self, pipeline_details + ) -> Optional[Iterable[AddLineageRequest]]: + return + + def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus: + runs = self.client.get_runs(pipeline_details["id"]) + for run in runs: + task_status = TaskStatus( + name=pipeline_details["name"], + executionStatus=STATUS_MAP.get( + run["state"].lower(), StatusType.Pending.value + ), + startTime=run["beginTime"] // 1000, + endTime=run["endTime"] // 1000, + ) + + pipeline_status = PipelineStatus( + taskStatus=[task_status], + executionStatus=STATUS_MAP.get( + run["state"].lower(), StatusType.Pending.value + ), + timestamp=run["endTime"] // 1000, + ) + + yield OMetaPipelineStatus( + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, + ) diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index aee079e5846..34c10493a80 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -1,4 +1,4 @@ -# Copyright 2021 Collate +# Copyright 2021 Collate #pylint: disable=too-many-lines # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -34,7 +34,7 @@ from metadata.clients.connection_clients import ( DagsterClient, DatalakeClient, DeltaLakeClient, - DomoDashboardClient, + DomoClient, DynamoClient, FivetranClient, GlueDBClient, @@ -126,6 +126,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.backendConne from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import ( DagsterConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import ( + DomoPipelineConnection, +) from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import ( FivetranConnection, ) @@ -987,11 +990,36 @@ def _(connection: DomoDashboardConnection) -> None: except Exception as exc: msg = f"Unknown error connecting with {connection}: {exc}." raise SourceConnectionException(msg) - return DomoDashboardClient(domo) + return DomoClient(domo) @test_connection.register -def _(connection: DomoDashboardClient) -> None: +def _(connection: DomoClient) -> None: + try: + connection.client.page_list() + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) + + +@get_connection.register +def _(connection: DomoPipelineConnection) -> None: + from pydomo import Domo + + try: + domo = Domo( + connection.clientId, + connection.secretToken.get_secret_value(), + api_host=connection.apiHost, + ) + except Exception as exc: + msg = f"Unknown error connecting with {connection}: {exc}." + raise SourceConnectionException(msg) + return DomoClient(domo) + + +@test_connection.register +def _(connection: DomoClient) -> None: try: connection.client.page_list() except Exception as exc: diff --git a/ingestion/tests/unit/resources/datasets/domopipeline_dataset.json b/ingestion/tests/unit/resources/datasets/domopipeline_dataset.json new file mode 100644 index 00000000000..814d904fc62 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/domopipeline_dataset.json @@ -0,0 +1,119 @@ +[ + { + "id": 3, + "onboardFlowId": 1, + "previewRows": 0, + "dapDataFlowExecutionId": "9cc03a9d-124b-491e-8579-657fc51d497e", + "beginTime": 1665476783000, + "endTime": 1665476792000, + "lastUpdated": 1665476792000, + "failed": false, + "state": "SUCCESS", + "activationType": "MANUAL", + "dataProcessor": "MAGIC", + "inputDataSources": [ + { + "dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5", + "dataVersionId": 4, + "rowsRead": 100, + "bytesRead": 3200, + "downloadTime": 1053, + "onlyLoadNewVersions": false + } + ], + "outputDataSources": [ + { + "dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67", + "dataVersionId": 3, + "rowsWritten": 100, + "bytesWritten": 3200, + "partsProcessed": null, + "versionChainType": null + } + ], + "totalBytesWritten": 3200, + "dataFlowVersion": 2, + "totalRowsRead": 100, + "totalBytesRead": 3200, + "meanDownloadRateKbps": 2.9677113010446345, + "totalRowsWritten": 100 + }, + { + "id": 2, + "onboardFlowId": 1, + "previewRows": 0, + "dapDataFlowExecutionId": "6982708f-a5a6-4535-93b0-44bf812c672a", + "beginTime": 1665470244000, + "endTime": 1665470252000, + "lastUpdated": 1665470252000, + "failed": false, + "state": "SUCCESS", + "activationType": "MANUAL", + "dataProcessor": "MAGIC", + "inputDataSources": [ + { + "dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5", + "dataVersionId": 4, + "rowsRead": 100, + "bytesRead": 3200, + "downloadTime": 869, + "onlyLoadNewVersions": false + } + ], + "outputDataSources": [ + { + "dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67", + "dataVersionId": 2, + "rowsWritten": 100, + "bytesWritten": 3200, + "partsProcessed": null, + "versionChainType": null + } + ], + "totalBytesWritten": 3200, + "dataFlowVersion": 1, + "totalRowsRead": 100, + "totalBytesRead": 3200, + "meanDownloadRateKbps": 3.5960874568469507, + "totalRowsWritten": 100 + }, + { + "id": 1, + "onboardFlowId": 1, + "previewRows": 0, + "dapDataFlowExecutionId": "36bd19a5-2adb-47e8-8949-ad69fc389b30", + "beginTime": 1665148818000, + "endTime": 1665148827000, + "lastUpdated": 1665148827000, + "failed": false, + "state": "SUCCESS", + "activationType": "MANUAL", + "dataProcessor": "MAGIC", + "inputDataSources": [ + { + "dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5", + "dataVersionId": 1, + "rowsRead": 100, + "bytesRead": 3200, + "downloadTime": 1406, + "onlyLoadNewVersions": false + } + ], + "outputDataSources": [ + { + "dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67", + "dataVersionId": 1, + "rowsWritten": 100, + "bytesWritten": 3200, + "partsProcessed": null, + "versionChainType": null + } + ], + "totalBytesWritten": 3200, + "dataFlowVersion": 1, + "totalRowsRead": 100, + "totalBytesRead": 3200, + "meanDownloadRateKbps": 2.2226173541963012, + "totalRowsWritten": 100 + } + ] \ No newline at end of file diff --git a/ingestion/tests/unit/topology/dashboard/test_domodashboard.py b/ingestion/tests/unit/topology/dashboard/test_domodashboard.py index 915a8bef661..12bf7f53830 100644 --- a/ingestion/tests/unit/topology/dashboard/test_domodashboard.py +++ b/ingestion/tests/unit/topology/dashboard/test_domodashboard.py @@ -103,7 +103,7 @@ EXPECTED_DASHBOARD = CreateDashboardRequest( extension=None, ) -EXPECTED_PIPELINES = [ +EXPECTED_DASHBOARDS = [ CreateChartRequest( name="Top Salespeople", displayName="Top Salespeople", @@ -112,7 +112,7 @@ EXPECTED_PIPELINES = [ " Identify over-performers and understand the secrets to their success." ), chartType="Other", - chartUrl="https://domain.domo.com/page/552315335/kpi/details/1108771657", + chartUrl="https://domain.domo.com/page/552315335/kpis/details/1108771657", tables=None, tags=None, owner=None, @@ -132,7 +132,7 @@ EXPECTED_PIPELINES = [ displayName="Milan Datasets", description="", chartType="Other", - chartUrl="https://domain.domo.com/page/552315335/kpi/details/1985861713", + chartUrl="https://domain.domo.com/page/552315335/kpis/details/1985861713", tables=None, tags=None, owner=None, @@ -152,7 +152,7 @@ EXPECTED_PIPELINES = [ displayName="Page Fans", description="", chartType="Other", - chartUrl="https://domain.domo.com/page/552315335/kpi/details/2025899139", + chartUrl="https://domain.domo.com/page/552315335/kpis/details/2025899139", tables=None, tags=None, owner=None, @@ -205,9 +205,7 @@ class DomoDashboardUnitTest(TestCase): self.domodashboard.get_dashboard_name(MOCK_DASHBOARD) == mock_data["title"] ) - @patch( - "metadata.clients.domodashboard_client.DomoDashboardClient.get_chart_details" - ) + @patch("metadata.clients.domo_client.DomoClient.get_chart_details") def test_chart(self, get_chart_details): get_chart_details.return_value = mock_data results = self.domodashboard.yield_dashboard_chart(MOCK_DASHBOARD) @@ -215,5 +213,5 @@ class DomoDashboardUnitTest(TestCase): for result in results: if isinstance(result, CreateChartRequest): chart_list.append(result) - for _, (expected, original) in enumerate(zip(EXPECTED_PIPELINES, chart_list)): + for _, (expected, original) in enumerate(zip(EXPECTED_DASHBOARDS, chart_list)): self.assertEqual(expected, original) diff --git a/ingestion/tests/unit/topology/pipeline/test_domopipeline.py b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py new file mode 100644 index 00000000000..a8ea8be522f --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_domopipeline.py @@ -0,0 +1,291 @@ +""" +Test Domo Dashboard using the topology +""" + +import json +from datetime import datetime, timezone +from pathlib import Path +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.domopipeline import DomopipelineSource + +mock_file_path = ( + Path(__file__).parent.parent.parent / "resources/datasets/domopipeline_dataset.json" +) +with open(mock_file_path, encoding="UTF-8") as file: + mock_data: dict = json.load(file) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="86ff3c40-7c51-4ff5-9727-738cead28d9a", + name="domopipeline_source_test", + connection=PipelineConnection(), + serviceType=PipelineServiceType.DomoPipeline, +) + +MOCK_PIPELINE = Pipeline( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + name="do_it_all_with_default_config", + fullyQualifiedName="local_domo_pipeline.Nihar Dataflows", + displayName="do_it_all_with_default_config", + description="", + tasks=[ + Task( + name="a58b1856-729c-493b-bc87-6d2269b43ec0", + displayName="do_it_all_with_default_config", + description="", + taskUrl="", + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +mock_domopipeline_config = { + "source": { + "type": "domopipeline", + "serviceName": "test2", + "serviceConnection": { + "config": { + "type": "DomoPipeline", + "clientId": "00000", + "secretToken": "abcdefg", + "accessToken": "accessTpokem", + "apiHost": "api.domo.com", + "sandboxDomain": "https://domain.domo.com", + } + }, + "sourceConfig": { + "config": {"dashboardFilterPattern": {}, "chartFilterPattern": {}} + }, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + + +EXPECTED_PIPELINE_STATUS = [ + OMetaPipelineStatus( + pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_status=PipelineStatus( + timestamp=1665476792, + executionStatus="Successful", + taskStatus=[ + TaskStatus( + name="Nihar Dataflows", + executionStatus="Successful", + startTime=1665476783, + endTime=1665476792, + logLink=None, + ) + ], + ), + ), + OMetaPipelineStatus( + pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_status=PipelineStatus( + timestamp=1665470252, + executionStatus="Successful", + taskStatus=[ + TaskStatus( + name="Nihar Dataflows", + executionStatus="Successful", + startTime=1665470244, + endTime=1665470252, + logLink=None, + ) + ], + ), + ), + OMetaPipelineStatus( + pipeline_fqn="local_domo_pipeline.Nihar Dataflows", + pipeline_status=PipelineStatus( + timestamp=1665148827, + executionStatus="Successful", + taskStatus=[ + TaskStatus( + name="Nihar Dataflows", + executionStatus="Successful", + startTime=1665148818, + endTime=1665148827, + logLink=None, + ) + ], + ), + ), +] + +EXPECTED_PIPELINE = [ + CreatePipelineRequest( + name="Nihar Dataflows", + displayName=None, + description="THis is description for Nihar dataflow", + pipelineUrl=None, + concurrency=None, + pipelineLocation=None, + startDate=datetime(2022, 10, 7, 13, 20, 16, tzinfo=timezone.utc), + tasks=[ + Task( + name="Nihar Dataflows", + displayName="Nihar Dataflows", + fullyQualifiedName=None, + description="THis is description for Nihar dataflow", + taskUrl=None, + downstreamTasks=None, + taskType=None, + taskSQL=None, + startDate=None, + endDate=None, + tags=None, + ) + ], + tags=None, + owner=None, + service=EntityReference( + id="86ff3c40-7c51-4ff5-9727-738cead28d9a", + type="pipelineService", + name=None, + fullyQualifiedName=None, + description=None, + displayName=None, + deleted=None, + href=None, + ), + extension=None, + ) +] + +MOCK_PIPELINE_DETAILS = { + "id": 1, + "name": "Nihar Dataflows", + "description": "THis is description for Nihar dataflow", + "dapDataFlowId": "06996c5f-20ec-4814-8309-2aeb8028875f", + "responsibleUserId": 1027954122, + "runState": "ENABLED", + "lastExecution": { + "id": 3, + "onboardFlowId": 1, + "previewRows": 0, + "dapDataFlowExecutionId": "9cc03a9d-124b-491e-8579-657fc51d497e", + "beginTime": 1665476783000, + "endTime": 1665476792000, + "lastUpdated": 1665476792000, + "failed": False, + "state": "SUCCESS", + "dataFlowVersion": 0, + }, + "created": 1665148816000, + "modified": 1665470784000, + "engineProperties": {"kettle.mode": "STRICT"}, + "inputs": [ + { + "dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5", + "executeFlowWhenUpdated": False, + "dataSourceName": "Milan Datasets", + } + ], + "outputs": [ + { + "onboardFlowId": None, + "dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67", + "dataSourceName": "Milan Output", + "versionChainType": "REPLACE", + } + ], + "executionCount": 3, + "executionSuccessCount": 3, + "hydrationState": "DEHYDRATED", + "useLegacyTriggerBehavior": False, + "passwordProtected": False, + "deleted": False, + "abandoned": False, + "neverAbandon": False, + "paused": False, + "magic": True, + "restricted": False, + "editable": True, + "enabled": True, + "container": False, + "databaseType": "MAGIC", + "triggeredByInput": False, + "draft": False, + "numInputs": 1, + "numOutputs": 1, +} + + +class DomoPipelineUnitTest(TestCase): + """ + Implements the necessary methods to extract + Domo Pipeline Unit Test + """ + + @patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection") + @patch("pydomo.Domo") + def __init__(self, methodName, domo_client, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + domo_client.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_domopipeline_config) + self.domopipeline = DomopipelineSource.create( + mock_domopipeline_config["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + self.domopipeline.context.__dict__["pipeline"] = MOCK_PIPELINE + self.domopipeline.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE + + @patch("metadata.clients.domo_client.DomoClient.get_runs") + def test_pipeline(self, get_runs): + get_runs.return_value = mock_data + results = self.domopipeline.yield_pipeline(MOCK_PIPELINE_DETAILS) + pipeline_list = [] + for result in results: + if isinstance(result, CreatePipelineRequest): + pipeline_list.append(result) + for _, (expected, original) in enumerate(zip(EXPECTED_PIPELINE, pipeline_list)): + self.assertEqual(expected, original) + + @patch("metadata.clients.domo_client.DomoClient.get_runs") + def test_yield_pipeline_status(self, get_runs): + get_runs.return_value = mock_data + pipeline_status_list = [] + results = self.domopipeline.yield_pipeline_status(MOCK_PIPELINE_DETAILS) + for result in results: + if isinstance(result, OMetaPipelineStatus): + pipeline_status_list.append(result) + + for _, (expected, original) in enumerate( + zip(EXPECTED_PIPELINE_STATUS, pipeline_status_list) + ): + self.assertEqual(expected, original) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/domodashboardConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/domodashboardConnection.json index 3833f48c679..f32736b3485 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/domodashboardConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/dashboard/domodashboardConnection.json @@ -2,7 +2,7 @@ "$id": "https://open-metadata.org/schema/entity/services/connections/dashboard/domodashboardConnection.json", "$schema": "http://json-schema.org/draft-07/schema#", "title": "DomoDashboardConnection", - "description": "DomoDasboard Connection Config", + "description": "Domo Dasboard Connection Config", "type": "object", "javaType": "org.openmetadata.schema.services.connections.dashboard.DomoDashboardConnection", "definitions": { diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/domopipelineConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/domopipelineConnection.json new file mode 100644 index 00000000000..49314d0e3dc --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/domopipelineConnection.json @@ -0,0 +1,61 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/domopipelineConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DomoPipelineConnection", + "description": "Domo Pipeline Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.pipeline.DomoPipelineConnection", + "definitions": { + "DomoPipelineType": { + "description": "Service type.", + "type": "string", + "enum": ["DomoPipeline"], + "default": "DomoPipeline" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/DomoPipelineType", + "default": "DomoPipeline" + }, + "clientId": { + "title": "Client ID", + "description": "Client ID for DOMO", + "type": "string" + }, + "secretToken": { + "title": "Secret Token", + "description": "Secret Token to connect DOMO", + "type": "string", + "format": "password" + }, + "accessToken": { + "title": "Access Token", + "description": "Access to connecto to DOMO", + "type": "string" + }, + "apiHost": { + "expose": true, + "title": "API Host", + "description": "API Host to connect to DOMO instance", + "default": "api.domo.com", + "type": "string", + "format": "string" + }, + "sandboxDomain": { + "title": "Sandbox Domain", + "description": "Connect to Sandbox Domain", + "type": "string", + "format": "uri" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["clientId","secretToken","sandboxDomain"] + } + \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json index 81c0f1c0fe3..ec3b4782cfd 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json @@ -21,6 +21,7 @@ "Fivetran", "Dagster", "Nifi", + "DomoPipeline", "CustomPipeline" ], "javaEnums": [ @@ -42,6 +43,9 @@ { "name": "Nifi" }, + { + "name": "DomoPipeline" + }, { "name": "CustomPipeline" } @@ -76,6 +80,9 @@ { "$ref": "./connections/pipeline/nifiConnection.json" }, + { + "$ref": "./connections/pipeline/domopipelineConnection.json" + }, { "$ref": "./connections/pipeline/customPipelineConnection.json" } diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts index e5f886d9b45..1566910a588 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts @@ -18,6 +18,7 @@ import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/a import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json'; import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json'; import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json'; +import domopipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domopipelineConnection.json'; import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json'; import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json'; import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json'; @@ -57,6 +58,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => { break; } + case PipelineServiceType.DomoPipeline: { + schema = domopipelineConnection; + + break; + } case PipelineServiceType.CustomPipeline: { schema = customPipelineConnection;