diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py index 4cb00ebf158..164305e5f1c 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/metadata.py @@ -14,7 +14,7 @@ Domo Database source to extract metadata """ import traceback -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, Optional, Tuple, Union from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( @@ -57,6 +57,7 @@ from metadata.ingestion.source.database.database_service import DatabaseServiceS from metadata.ingestion.source.database.domodatabase.models import ( OutputDataset, Owner, + Schema, SchemaColumn, User, ) @@ -191,11 +192,7 @@ class DomodatabaseSource(DatabaseServiceSource): try: table_constraints = None table_object = OutputDataset(**self.domo_client.datasets.get(table_id)) - columns = ( - self.get_columns(table_object.schemas.columns) - if table_object.columns - else [] - ) + columns = self.get_columns(table_object) table_request = CreateTableRequest( name=EntityName(table_object.name), displayName=table_object.name, @@ -228,19 +225,57 @@ class DomodatabaseSource(DatabaseServiceSource): ) ) - def get_columns(self, table_object: List[SchemaColumn]): + def get_columns_from_federated_dataset(self, table_name: str, dataset_id: str): + """ + Method to retrieve the column metadata from federated datasets + """ + try: + # SQL query to get all columns without fetching any rows + sql_query = f'SELECT * FROM "{table_name}" LIMIT 1' + schema_columns = [] + response = self.domo_client.datasets.query(dataset_id, sql_query) + if response: + for i, column_name in enumerate(response["columns"] or []): + schema_column = SchemaColumn( + name=column_name, type=response["metadata"][i]["type"] + ) + schema_columns.append(schema_column) + if schema_columns: + return Schema(columns=schema_columns) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while fetching columns from federated dataset {table_name} - {exc}" + ) + return None + + def get_columns(self, table_object: OutputDataset): + """ + Method to get domo table columns + """ row_order = 1 columns = [] - for column in table_object: - columns.append( - Column( - name=ColumnName(column.name), - description=column.description, - dataType=column.type, - ordinalPosition=row_order, - ) + if not table_object.schemas or not table_object.schemas.columns: + table_object.schemas = self.get_columns_from_federated_dataset( + table_name=table_object.name, dataset_id=table_object.id ) - row_order += 1 + + for column in table_object.schemas.columns or []: + try: + columns.append( + Column( + name=ColumnName(column.name), + description=column.description, + dataType=column.type, + ordinalPosition=row_order, + ) + ) + row_order += 1 + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while fetching details of column {column} - {exc}" + ) return columns def yield_tag( diff --git a/ingestion/src/metadata/ingestion/source/database/domodatabase/models.py b/ingestion/src/metadata/ingestion/source/database/domodatabase/models.py index ef4ce4e968d..f350cceeb01 100644 --- a/ingestion/src/metadata/ingestion/source/database/domodatabase/models.py +++ b/ingestion/src/metadata/ingestion/source/database/domodatabase/models.py @@ -26,6 +26,7 @@ class DomoDatabaseBaseModel(BaseModel): class User(DomoDatabaseBaseModel): + id: int email: str role: str