Added DOMO Pipeline (#8133)

This commit is contained in:
NiharDoshi99 2022-10-18 17:52:28 +05:30 committed by GitHub
parent a0f44c5ac2
commit 84a89d7181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 676 additions and 27 deletions

View File

@ -129,7 +129,7 @@ class DagsterClient:
@dataclass
class DomoDashboardClient:
class DomoClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -8,38 +8,42 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DomoDashboard Client source to extract DOMO metadata
DomoClient source to extract data from DOMO
"""
from typing import List
from typing import List, Union
from metadata.generated.schema.entity.services.connections.dashboard.domodashboardConnection import (
DomoDashboardConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import (
DomoPipelineConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
CARDS_URL = (
"cards?includeV4PageLayouts=true&parts=metadata,datasources,library,drillPathURNs"
",owners,certification,dateInfo,subscriptions,slicers"
"cards?includeV4PageLayouts=true&parts=metadata"
",datasources,library,drillPathURNs,owners,certification,dateInfo,subscriptions,slicers"
)
HEADERS = {"Content-Type": "application/json"}
WORKFLOW_URL = "dataprocessing/v1/dataflows"
class DomoDashboardClient:
class DomoClient:
"""
Implements the necessary methods to extract
Dashboard metadata from Domo's metadata db
DOMO metadata from Domo's metadata db
"""
def __init__(self, config: DomoDashboardConnection):
def __init__(self, config: Union[DomoDashboardConnection, DomoPipelineConnection]):
self.config = config
HEADERS.update({"X-DOMO-Developer-Token": self.config.accessToken})
client_config: ClientConfig = ClientConfig(
base_url=self.config.sandboxDomain,
api_version="api/content/v3/",
api_version="api/",
auth_header="Authorization",
auth_token=lambda: ("no_token", 0),
)
@ -47,7 +51,7 @@ class DomoDashboardClient:
def get_chart_details(self, page_id) -> List[dict]:
url = (
f"stacks/{page_id}/"
f"content/v3/stacks/{page_id}/"
f"{CARDS_URL}"
f"&stackLoadContext=Page&stackLoadContextId={page_id}&stackLoadTrigger=page-view"
)
@ -55,3 +59,16 @@ class DomoDashboardClient:
method="GET", path=url, headers=HEADERS
)
return response
def get_pipelines(self):
response = self.client._request( # pylint: disable=protected-access
method="GET", path=WORKFLOW_URL, headers=HEADERS
)
return response
def get_runs(self, workflow_id):
url = f"dataprocessing/v1/dataflows/{workflow_id}/executions?limit=100&offset=0"
response = self.client._request( # pylint: disable=protected-access
method="GET", path=url, headers=HEADERS
)
return response

View File

@ -15,7 +15,7 @@ DomoDashboard source to extract metadata
import traceback
from typing import Any, Iterable, List, Optional
from metadata.clients.domodashboard_client import DomoDashboardClient
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -51,7 +51,7 @@ class DomodashboardSource(DashboardServiceSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.domo_client = self.connection.client
self.client = DomoDashboardClient(self.service_connection)
self.client = DomoClient(self.service_connection)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -102,7 +102,7 @@ class DomodashboardSource(DashboardServiceSource):
try:
chart_url = (
f"{self.service_connection.sandboxDomain}/page/"
f"{charts['id']}/{chart['type']}/details/{chart['id']}"
f"{charts['id']}/kpis/details/{chart['id']}"
)
if filter_by_chart(

View File

@ -0,0 +1,122 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Domo Pipeline source to extract metadata
"""
from typing import Dict, Iterable, List, Optional
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import (
DomoPipelineConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.dagster import STATUS_MAP
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
class DomopipelineSource(PipelineServiceSource):
"""
Implements the necessary methods to extract
Pipeline metadata from Domo's metadata db
"""
config: WorkflowSource
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.domo_client = self.connection.client
self.client = DomoClient(self.service_connection)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)
connection: DomoPipelineConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DomoPipelineConnection):
raise InvalidSourceException(
f"Expected DomoPipelineConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_pipeline_name(self, pipeline_details) -> str:
return pipeline_details["name"]
def get_pipelines_list(self) -> Dict:
results = self.client.get_pipelines()
for result in results:
yield result
def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]:
task_list: List[Task] = []
task = Task(
name=pipeline_details["name"],
displayName=pipeline_details["name"],
description=pipeline_details.get("description", ""),
)
task_list.append(task)
pipeline_yield = CreatePipelineRequest(
name=pipeline_details["name"],
description=pipeline_details.get("description", ""),
tasks=task_list,
service=EntityReference(
id=self.context.pipeline_service.id.__root__, type="pipelineService"
),
startDate=pipeline_details["created"],
)
yield pipeline_yield
def yield_pipeline_lineage_details(
self, pipeline_details
) -> Optional[Iterable[AddLineageRequest]]:
return
def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus:
runs = self.client.get_runs(pipeline_details["id"])
for run in runs:
task_status = TaskStatus(
name=pipeline_details["name"],
executionStatus=STATUS_MAP.get(
run["state"].lower(), StatusType.Pending.value
),
startTime=run["beginTime"] // 1000,
endTime=run["endTime"] // 1000,
)
pipeline_status = PipelineStatus(
taskStatus=[task_status],
executionStatus=STATUS_MAP.get(
run["state"].lower(), StatusType.Pending.value
),
timestamp=run["endTime"] // 1000,
)
yield OMetaPipelineStatus(
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
pipeline_status=pipeline_status,
)

View File

@ -1,4 +1,4 @@
# Copyright 2021 Collate
# Copyright 2021 Collate #pylint: disable=too-many-lines
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -34,7 +34,7 @@ from metadata.clients.connection_clients import (
DagsterClient,
DatalakeClient,
DeltaLakeClient,
DomoDashboardClient,
DomoClient,
DynamoClient,
FivetranClient,
GlueDBClient,
@ -126,6 +126,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.backendConne
from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import (
DagsterConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.domopipelineConnection import (
DomoPipelineConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import (
FivetranConnection,
)
@ -987,11 +990,36 @@ def _(connection: DomoDashboardConnection) -> None:
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
return DomoDashboardClient(domo)
return DomoClient(domo)
@test_connection.register
def _(connection: DomoDashboardClient) -> None:
def _(connection: DomoClient) -> None:
try:
connection.client.page_list()
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
@get_connection.register
def _(connection: DomoPipelineConnection) -> None:
from pydomo import Domo
try:
domo = Domo(
connection.clientId,
connection.secretToken.get_secret_value(),
api_host=connection.apiHost,
)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
return DomoClient(domo)
@test_connection.register
def _(connection: DomoClient) -> None:
try:
connection.client.page_list()
except Exception as exc:

View File

@ -0,0 +1,119 @@
[
{
"id": 3,
"onboardFlowId": 1,
"previewRows": 0,
"dapDataFlowExecutionId": "9cc03a9d-124b-491e-8579-657fc51d497e",
"beginTime": 1665476783000,
"endTime": 1665476792000,
"lastUpdated": 1665476792000,
"failed": false,
"state": "SUCCESS",
"activationType": "MANUAL",
"dataProcessor": "MAGIC",
"inputDataSources": [
{
"dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5",
"dataVersionId": 4,
"rowsRead": 100,
"bytesRead": 3200,
"downloadTime": 1053,
"onlyLoadNewVersions": false
}
],
"outputDataSources": [
{
"dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67",
"dataVersionId": 3,
"rowsWritten": 100,
"bytesWritten": 3200,
"partsProcessed": null,
"versionChainType": null
}
],
"totalBytesWritten": 3200,
"dataFlowVersion": 2,
"totalRowsRead": 100,
"totalBytesRead": 3200,
"meanDownloadRateKbps": 2.9677113010446345,
"totalRowsWritten": 100
},
{
"id": 2,
"onboardFlowId": 1,
"previewRows": 0,
"dapDataFlowExecutionId": "6982708f-a5a6-4535-93b0-44bf812c672a",
"beginTime": 1665470244000,
"endTime": 1665470252000,
"lastUpdated": 1665470252000,
"failed": false,
"state": "SUCCESS",
"activationType": "MANUAL",
"dataProcessor": "MAGIC",
"inputDataSources": [
{
"dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5",
"dataVersionId": 4,
"rowsRead": 100,
"bytesRead": 3200,
"downloadTime": 869,
"onlyLoadNewVersions": false
}
],
"outputDataSources": [
{
"dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67",
"dataVersionId": 2,
"rowsWritten": 100,
"bytesWritten": 3200,
"partsProcessed": null,
"versionChainType": null
}
],
"totalBytesWritten": 3200,
"dataFlowVersion": 1,
"totalRowsRead": 100,
"totalBytesRead": 3200,
"meanDownloadRateKbps": 3.5960874568469507,
"totalRowsWritten": 100
},
{
"id": 1,
"onboardFlowId": 1,
"previewRows": 0,
"dapDataFlowExecutionId": "36bd19a5-2adb-47e8-8949-ad69fc389b30",
"beginTime": 1665148818000,
"endTime": 1665148827000,
"lastUpdated": 1665148827000,
"failed": false,
"state": "SUCCESS",
"activationType": "MANUAL",
"dataProcessor": "MAGIC",
"inputDataSources": [
{
"dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5",
"dataVersionId": 1,
"rowsRead": 100,
"bytesRead": 3200,
"downloadTime": 1406,
"onlyLoadNewVersions": false
}
],
"outputDataSources": [
{
"dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67",
"dataVersionId": 1,
"rowsWritten": 100,
"bytesWritten": 3200,
"partsProcessed": null,
"versionChainType": null
}
],
"totalBytesWritten": 3200,
"dataFlowVersion": 1,
"totalRowsRead": 100,
"totalBytesRead": 3200,
"meanDownloadRateKbps": 2.2226173541963012,
"totalRowsWritten": 100
}
]

View File

@ -103,7 +103,7 @@ EXPECTED_DASHBOARD = CreateDashboardRequest(
extension=None,
)
EXPECTED_PIPELINES = [
EXPECTED_DASHBOARDS = [
CreateChartRequest(
name="Top Salespeople",
displayName="Top Salespeople",
@ -112,7 +112,7 @@ EXPECTED_PIPELINES = [
" Identify over-performers and understand the secrets to their success."
),
chartType="Other",
chartUrl="https://domain.domo.com/page/552315335/kpi/details/1108771657",
chartUrl="https://domain.domo.com/page/552315335/kpis/details/1108771657",
tables=None,
tags=None,
owner=None,
@ -132,7 +132,7 @@ EXPECTED_PIPELINES = [
displayName="Milan Datasets",
description="",
chartType="Other",
chartUrl="https://domain.domo.com/page/552315335/kpi/details/1985861713",
chartUrl="https://domain.domo.com/page/552315335/kpis/details/1985861713",
tables=None,
tags=None,
owner=None,
@ -152,7 +152,7 @@ EXPECTED_PIPELINES = [
displayName="Page Fans",
description="",
chartType="Other",
chartUrl="https://domain.domo.com/page/552315335/kpi/details/2025899139",
chartUrl="https://domain.domo.com/page/552315335/kpis/details/2025899139",
tables=None,
tags=None,
owner=None,
@ -205,9 +205,7 @@ class DomoDashboardUnitTest(TestCase):
self.domodashboard.get_dashboard_name(MOCK_DASHBOARD) == mock_data["title"]
)
@patch(
"metadata.clients.domodashboard_client.DomoDashboardClient.get_chart_details"
)
@patch("metadata.clients.domo_client.DomoClient.get_chart_details")
def test_chart(self, get_chart_details):
get_chart_details.return_value = mock_data
results = self.domodashboard.yield_dashboard_chart(MOCK_DASHBOARD)
@ -215,5 +213,5 @@ class DomoDashboardUnitTest(TestCase):
for result in results:
if isinstance(result, CreateChartRequest):
chart_list.append(result)
for _, (expected, original) in enumerate(zip(EXPECTED_PIPELINES, chart_list)):
for _, (expected, original) in enumerate(zip(EXPECTED_DASHBOARDS, chart_list)):
self.assertEqual(expected, original)

View File

@ -0,0 +1,291 @@
"""
Test Domo Dashboard using the topology
"""
import json
from datetime import datetime, timezone
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.domopipeline import DomopipelineSource
mock_file_path = (
Path(__file__).parent.parent.parent / "resources/datasets/domopipeline_dataset.json"
)
with open(mock_file_path, encoding="UTF-8") as file:
mock_data: dict = json.load(file)
MOCK_PIPELINE_SERVICE = PipelineService(
id="86ff3c40-7c51-4ff5-9727-738cead28d9a",
name="domopipeline_source_test",
connection=PipelineConnection(),
serviceType=PipelineServiceType.DomoPipeline,
)
MOCK_PIPELINE = Pipeline(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="do_it_all_with_default_config",
fullyQualifiedName="local_domo_pipeline.Nihar Dataflows",
displayName="do_it_all_with_default_config",
description="",
tasks=[
Task(
name="a58b1856-729c-493b-bc87-6d2269b43ec0",
displayName="do_it_all_with_default_config",
description="",
taskUrl="",
)
],
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
)
mock_domopipeline_config = {
"source": {
"type": "domopipeline",
"serviceName": "test2",
"serviceConnection": {
"config": {
"type": "DomoPipeline",
"clientId": "00000",
"secretToken": "abcdefg",
"accessToken": "accessTpokem",
"apiHost": "api.domo.com",
"sandboxDomain": "https://domain.domo.com",
}
},
"sourceConfig": {
"config": {"dashboardFilterPattern": {}, "chartFilterPattern": {}}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
EXPECTED_PIPELINE_STATUS = [
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_status=PipelineStatus(
timestamp=1665476792,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
executionStatus="Successful",
startTime=1665476783,
endTime=1665476792,
logLink=None,
)
],
),
),
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_status=PipelineStatus(
timestamp=1665470252,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
executionStatus="Successful",
startTime=1665470244,
endTime=1665470252,
logLink=None,
)
],
),
),
OMetaPipelineStatus(
pipeline_fqn="local_domo_pipeline.Nihar Dataflows",
pipeline_status=PipelineStatus(
timestamp=1665148827,
executionStatus="Successful",
taskStatus=[
TaskStatus(
name="Nihar Dataflows",
executionStatus="Successful",
startTime=1665148818,
endTime=1665148827,
logLink=None,
)
],
),
),
]
EXPECTED_PIPELINE = [
CreatePipelineRequest(
name="Nihar Dataflows",
displayName=None,
description="THis is description for Nihar dataflow",
pipelineUrl=None,
concurrency=None,
pipelineLocation=None,
startDate=datetime(2022, 10, 7, 13, 20, 16, tzinfo=timezone.utc),
tasks=[
Task(
name="Nihar Dataflows",
displayName="Nihar Dataflows",
fullyQualifiedName=None,
description="THis is description for Nihar dataflow",
taskUrl=None,
downstreamTasks=None,
taskType=None,
taskSQL=None,
startDate=None,
endDate=None,
tags=None,
)
],
tags=None,
owner=None,
service=EntityReference(
id="86ff3c40-7c51-4ff5-9727-738cead28d9a",
type="pipelineService",
name=None,
fullyQualifiedName=None,
description=None,
displayName=None,
deleted=None,
href=None,
),
extension=None,
)
]
MOCK_PIPELINE_DETAILS = {
"id": 1,
"name": "Nihar Dataflows",
"description": "THis is description for Nihar dataflow",
"dapDataFlowId": "06996c5f-20ec-4814-8309-2aeb8028875f",
"responsibleUserId": 1027954122,
"runState": "ENABLED",
"lastExecution": {
"id": 3,
"onboardFlowId": 1,
"previewRows": 0,
"dapDataFlowExecutionId": "9cc03a9d-124b-491e-8579-657fc51d497e",
"beginTime": 1665476783000,
"endTime": 1665476792000,
"lastUpdated": 1665476792000,
"failed": False,
"state": "SUCCESS",
"dataFlowVersion": 0,
},
"created": 1665148816000,
"modified": 1665470784000,
"engineProperties": {"kettle.mode": "STRICT"},
"inputs": [
{
"dataSourceId": "2e41e76b-fc02-480d-a932-91bdbea40fe5",
"executeFlowWhenUpdated": False,
"dataSourceName": "Milan Datasets",
}
],
"outputs": [
{
"onboardFlowId": None,
"dataSourceId": "dedf9fe6-2544-44b6-9129-d5c313b0ec67",
"dataSourceName": "Milan Output",
"versionChainType": "REPLACE",
}
],
"executionCount": 3,
"executionSuccessCount": 3,
"hydrationState": "DEHYDRATED",
"useLegacyTriggerBehavior": False,
"passwordProtected": False,
"deleted": False,
"abandoned": False,
"neverAbandon": False,
"paused": False,
"magic": True,
"restricted": False,
"editable": True,
"enabled": True,
"container": False,
"databaseType": "MAGIC",
"triggeredByInput": False,
"draft": False,
"numInputs": 1,
"numOutputs": 1,
}
class DomoPipelineUnitTest(TestCase):
"""
Implements the necessary methods to extract
Domo Pipeline Unit Test
"""
@patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection")
@patch("pydomo.Domo")
def __init__(self, methodName, domo_client, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
domo_client.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_domopipeline_config)
self.domopipeline = DomopipelineSource.create(
mock_domopipeline_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.domopipeline.context.__dict__["pipeline"] = MOCK_PIPELINE
self.domopipeline.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE
@patch("metadata.clients.domo_client.DomoClient.get_runs")
def test_pipeline(self, get_runs):
get_runs.return_value = mock_data
results = self.domopipeline.yield_pipeline(MOCK_PIPELINE_DETAILS)
pipeline_list = []
for result in results:
if isinstance(result, CreatePipelineRequest):
pipeline_list.append(result)
for _, (expected, original) in enumerate(zip(EXPECTED_PIPELINE, pipeline_list)):
self.assertEqual(expected, original)
@patch("metadata.clients.domo_client.DomoClient.get_runs")
def test_yield_pipeline_status(self, get_runs):
get_runs.return_value = mock_data
pipeline_status_list = []
results = self.domopipeline.yield_pipeline_status(MOCK_PIPELINE_DETAILS)
for result in results:
if isinstance(result, OMetaPipelineStatus):
pipeline_status_list.append(result)
for _, (expected, original) in enumerate(
zip(EXPECTED_PIPELINE_STATUS, pipeline_status_list)
):
self.assertEqual(expected, original)

View File

@ -2,7 +2,7 @@
"$id": "https://open-metadata.org/schema/entity/services/connections/dashboard/domodashboardConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DomoDashboardConnection",
"description": "DomoDasboard Connection Config",
"description": "Domo Dasboard Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.dashboard.DomoDashboardConnection",
"definitions": {

View File

@ -0,0 +1,61 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/domopipelineConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DomoPipelineConnection",
"description": "Domo Pipeline Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.pipeline.DomoPipelineConnection",
"definitions": {
"DomoPipelineType": {
"description": "Service type.",
"type": "string",
"enum": ["DomoPipeline"],
"default": "DomoPipeline"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/DomoPipelineType",
"default": "DomoPipeline"
},
"clientId": {
"title": "Client ID",
"description": "Client ID for DOMO",
"type": "string"
},
"secretToken": {
"title": "Secret Token",
"description": "Secret Token to connect DOMO",
"type": "string",
"format": "password"
},
"accessToken": {
"title": "Access Token",
"description": "Access to connecto to DOMO",
"type": "string"
},
"apiHost": {
"expose": true,
"title": "API Host",
"description": "API Host to connect to DOMO instance",
"default": "api.domo.com",
"type": "string",
"format": "string"
},
"sandboxDomain": {
"title": "Sandbox Domain",
"description": "Connect to Sandbox Domain",
"type": "string",
"format": "uri"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["clientId","secretToken","sandboxDomain"]
}

View File

@ -21,6 +21,7 @@
"Fivetran",
"Dagster",
"Nifi",
"DomoPipeline",
"CustomPipeline"
],
"javaEnums": [
@ -42,6 +43,9 @@
{
"name": "Nifi"
},
{
"name": "DomoPipeline"
},
{
"name": "CustomPipeline"
}
@ -76,6 +80,9 @@
{
"$ref": "./connections/pipeline/nifiConnection.json"
},
{
"$ref": "./connections/pipeline/domopipelineConnection.json"
},
{
"$ref": "./connections/pipeline/customPipelineConnection.json"
}

View File

@ -18,6 +18,7 @@ import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/a
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import domopipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domopipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
@ -57,6 +58,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => {
break;
}
case PipelineServiceType.DomoPipeline: {
schema = domopipelineConnection;
break;
}
case PipelineServiceType.CustomPipeline: {
schema = customPipelineConnection;