Fix domo pipeline test connection (#9863)

This commit is contained in:
Mayur Singal 2023-01-24 12:23:47 +05:30 committed by GitHub
parent d15d96ef1a
commit d0191da8ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 16 deletions

View File

@ -14,6 +14,7 @@ Source connection handler
"""
from pydomo import Domo
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.entity.services.connections.pipeline.domoPipelineConnection import (
DomoPipelineConnection,
)
@ -25,23 +26,18 @@ def get_connection(connection: DomoPipelineConnection) -> Domo:
Create connection
"""
try:
domo = Domo(
connection.clientId,
connection.secretToken.get_secret_value(),
api_host=connection.apiHost,
)
return domo
return DomoClient(connection)
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg)
def test_connection(domo: Domo) -> None:
def test_connection(connection: Domo) -> None:
"""
Test connection
"""
try:
domo.streams.list()
connection.get_pipelines()
except Exception as exc:
msg = f"Unknown error connecting with {domo}: {exc}."
msg = f"Unknown error while extracting pipeline from domo: {exc}."
raise SourceConnectionException(msg)

View File

@ -17,7 +17,6 @@ from typing import Dict, Iterable, Optional
from pydantic import ValidationError
from metadata.clients.domo_client import DomoClient
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
@ -58,10 +57,6 @@ class DomopipelineSource(PipelineServiceSource):
config: WorkflowSource
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.domo_client = DomoClient(self.service_connection)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)
@ -76,7 +71,7 @@ class DomopipelineSource(PipelineServiceSource):
return pipeline_details["name"]
def get_pipelines_list(self) -> Dict:
results = self.domo_client.get_pipelines()
results = self.connection.get_pipelines()
for result in results:
yield result
@ -131,7 +126,7 @@ class DomopipelineSource(PipelineServiceSource):
)
return None
runs = self.domo_client.get_runs(pipeline_id)
runs = self.connection.get_runs(pipeline_id)
try:
for run in runs or []: