diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index fc7f1bbfa2d..45be570e585 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -8,12 +8,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging +import traceback from typing import Optional from snowflake.sqlalchemy.custom_types import VARIANT from snowflake.sqlalchemy.snowdialect import ischema_names +from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) @@ -26,6 +28,8 @@ GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY") ischema_names["VARIANT"] = VARIANT ischema_names["GEOGRAPHY"] = GEOGRAPHY +logger: logging.Logger = logging.getLogger(__name__) + class SnowflakeConfig(SQLConnectionConfig): scheme = "snowflake" @@ -53,6 +57,25 @@ class SnowflakeSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) + def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: + resp_sample_data = super().fetch_sample_data(schema, table) + if not resp_sample_data: + try: + logger.info("Using Table Name with quotes to fetch the data") + query = self.config.query.format(schema, f'"{table}"') + logger.info(query) + results = self.connection.execute(query) + cols = [] + for col in results.keys(): + cols.append(col.replace(".", "_DOT_")) + rows = [] + for res in results: + row = list(res) + rows.append(row) + return TableData(columns=cols, rows=rows) + except Exception as err: + logger.error(err) + @classmethod def create(cls, config_dict, metadata_config_dict, ctx): config = SnowflakeConfig.parse_obj(config_dict) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index d51ad7e86de..44502d864c0 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import ( Constraint, ConstraintType, DataModel, + DataType, ModelType, Table, TableConstraint, @@ -397,6 +398,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) model_fqdn = f"{schema}.{model_name}" except Exception as err: + logger.debug(traceback.print_exc()) logger.error(err) self.data_models[model_fqdn] = model @@ -562,8 +564,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): col_type, column["type"] ) if col_type == "NULL" or col_type is None: - col_type = "VARCHAR" - data_type_display = "varchar" + col_type = DataType.VARCHAR.name + data_type_display = col_type.lower() logger.warning( "Unknown type {} mapped to VARCHAR: {}".format( repr(column["type"]), column["name"] @@ -579,10 +581,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]): col_data_length = ( 1 if col_data_length is None else col_data_length ) - if col_type == "ARRAY": - if arr_data_type is None: - arr_data_type = "VARCHAR" - dataTypeDisplay = col_type + "<" + arr_data_type + ">" + if col_type == "ARRAY" and arr_data_type is None: + arr_data_type = DataType.VARCHAR.name + dataTypeDisplay = f"array<{arr_data_type}>" + om_column = Column( name=column["name"], description=column.get("comment", None), @@ -640,7 +642,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ] except Exception as err: logger.debug(traceback.print_exc()) - logger.debug(err) + logger.error(err) + om_column = col_dict except Exception as err: logger.debug(traceback.print_exc())