From c863fd9d90f1bd540ef7d991cd67aedffca5d886 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 22 Apr 2022 16:43:11 +0530 Subject: [PATCH] Modified snowflake, utils (#4364) --- docker/metadata/openmetadata-start.sh | 5 +++++ .../src/metadata/ingestion/source/snowflake.py | 15 ++++++++++----- .../src/metadata/ingestion/source/sql_source.py | 11 +++++++---- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/docker/metadata/openmetadata-start.sh b/docker/metadata/openmetadata-start.sh index 1bd0e857e81..9bb57cabdab 100644 --- a/docker/metadata/openmetadata-start.sh +++ b/docker/metadata/openmetadata-start.sh @@ -14,6 +14,11 @@ MYSQL="${MYSQL_HOST:-mysql}":"${MYSQL_PORT:-3306}" while ! wget -O /dev/null -o /dev/null "${MYSQL}"; do echo "Trying to connect to ${MYSQL}"; sleep 5; done +ELASTICSEARCH="${ELASTICSEARCH_HOST:-elasticsearch}":"${ELASTICSEARCH_PORT:-9200}" +while ! wget -O /dev/null -o /dev/null "${ELASTICSEARCH}"; + do echo "Trying to connect to ${ELASTICSEARCH}"; sleep 5; +done +sleep 5 cd /openmetadata-*/ ./bootstrap/bootstrap_storage.sh migrate-all ./bin/openmetadata-server-start.sh conf/openmetadata.yaml diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index 136ac543df8..71971a555de 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -14,7 +14,6 @@ from typing import Iterable, Optional from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import dsa, rsa from snowflake.sqlalchemy.custom_types import VARIANT from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names from sqlalchemy.engine import reflection @@ -43,6 +42,13 @@ ischema_names["GEOGRAPHY"] = GEOGRAPHY logger: logging.Logger = logging.getLogger(__name__) +def normalize_names(self, name): + return name + + +SnowflakeDialect.normalize_name = normalize_names + + class SnowflakeSource(SQLSource): def __init__(self, config, metadata_config): connection_arguments = ( @@ -68,14 +74,12 @@ class SnowflakeSource(SQLSource): super().__init__(config, metadata_config) def get_databases(self) -> Iterable[Inspector]: - - if self.config.serviceConnection.__root__.config.database != None: + if self.config.serviceConnection.__root__.config.database: yield from super().get_databases() else: query = "SHOW DATABASES" results = self.connection.execute(query) for res in results: - row = list(res) use_db_query = f"USE DATABASE {row[1]}" self.connection.execute(use_db_query) @@ -88,7 +92,7 @@ class SnowflakeSource(SQLSource): if not resp_sample_data: try: logger.info("Using Table Name with quotes to fetch the data") - query = self.config.query.format(schema, f'"{table}"') + query = self.source_config.sampleDataQuery.format(schema, f'"{table}"') logger.info(query) results = self.connection.execute(query) cols = [] @@ -101,6 +105,7 @@ class SnowflakeSource(SQLSource): return TableData(columns=cols, rows=rows) except Exception as err: logger.error(err) + return resp_sample_data @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index d6fe257e889..9b3011ac7d0 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -390,7 +390,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): if self.source_config.includeViews: yield from self.fetch_views(inspector, schema) if self.source_config.markDeletedTables: - schema_fqdn = f"{self.config.serviceName}.{schema}" + schema_fqdn = f"{self.config.serviceName}.{self.service_connection.database}.{schema}" yield from self.delete_tables(schema_fqdn) except Exception as err: logger.debug(traceback.format_exc()) @@ -852,19 +852,22 @@ class SQLSource(Source[OMetaDatabaseAndTable]): and column["policy_tags"] ): self.metadata.create_primary_tag( - category_name=self.config.tag_category_name, + category_name=self.service_connection.tagCategoryName, primary_tag_body=CreateTagRequest( name=column["policy_tags"], description="Bigquery Policy Tag", ), ) except APIError: - if column["policy_tags"] and self.config.enable_policy_tags: + if ( + column["policy_tags"] + and self.service_connection.enablePolicyTagImport + ): col_dict.tags = [ TagLabel( tagFQN=get_fqdn( Tag, - self.config.tag_category_name, + self.service_connection.tagCategoryName, column["policy_tags"], ), labelType="Automated",