Added domo federated dataset support (#17061)

This commit is contained in:
Onkar Ravgan 2024-07-22 11:17:28 +05:30 committed by GitHub
parent 8d9ff41761
commit eafa6b8772
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 16 deletions

View File

@ -14,7 +14,7 @@ Domo Database source to extract metadata
""" """
import traceback import traceback
from typing import Any, Iterable, List, Optional, Tuple, Union from typing import Any, Iterable, Optional, Tuple, Union
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import ( from metadata.generated.schema.api.data.createDatabaseSchema import (
@ -57,6 +57,7 @@ from metadata.ingestion.source.database.database_service import DatabaseServiceS
from metadata.ingestion.source.database.domodatabase.models import ( from metadata.ingestion.source.database.domodatabase.models import (
OutputDataset, OutputDataset,
Owner, Owner,
Schema,
SchemaColumn, SchemaColumn,
User, User,
) )
@ -191,11 +192,7 @@ class DomodatabaseSource(DatabaseServiceSource):
try: try:
table_constraints = None table_constraints = None
table_object = OutputDataset(**self.domo_client.datasets.get(table_id)) table_object = OutputDataset(**self.domo_client.datasets.get(table_id))
columns = ( columns = self.get_columns(table_object)
self.get_columns(table_object.schemas.columns)
if table_object.columns
else []
)
table_request = CreateTableRequest( table_request = CreateTableRequest(
name=EntityName(table_object.name), name=EntityName(table_object.name),
displayName=table_object.name, displayName=table_object.name,
@ -228,19 +225,57 @@ class DomodatabaseSource(DatabaseServiceSource):
) )
) )
def get_columns(self, table_object: List[SchemaColumn]): def get_columns_from_federated_dataset(self, table_name: str, dataset_id: str):
"""
Method to retrieve the column metadata from federated datasets
"""
try:
# SQL query to get all columns without fetching any rows
sql_query = f'SELECT * FROM "{table_name}" LIMIT 1'
schema_columns = []
response = self.domo_client.datasets.query(dataset_id, sql_query)
if response:
for i, column_name in enumerate(response["columns"] or []):
schema_column = SchemaColumn(
name=column_name, type=response["metadata"][i]["type"]
)
schema_columns.append(schema_column)
if schema_columns:
return Schema(columns=schema_columns)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching columns from federated dataset {table_name} - {exc}"
)
return None
def get_columns(self, table_object: OutputDataset):
"""
Method to get domo table columns
"""
row_order = 1 row_order = 1
columns = [] columns = []
for column in table_object: if not table_object.schemas or not table_object.schemas.columns:
columns.append( table_object.schemas = self.get_columns_from_federated_dataset(
Column( table_name=table_object.name, dataset_id=table_object.id
name=ColumnName(column.name),
description=column.description,
dataType=column.type,
ordinalPosition=row_order,
)
) )
row_order += 1
for column in table_object.schemas.columns or []:
try:
columns.append(
Column(
name=ColumnName(column.name),
description=column.description,
dataType=column.type,
ordinalPosition=row_order,
)
)
row_order += 1
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching details of column {column} - {exc}"
)
return columns return columns
def yield_tag( def yield_tag(

View File

@ -26,6 +26,7 @@ class DomoDatabaseBaseModel(BaseModel):
class User(DomoDatabaseBaseModel): class User(DomoDatabaseBaseModel):
id: int
email: str email: str
role: str role: str