Modified snowflake, utils (#4364)

This commit is contained in:
Ayush Shah 2022-04-22 16:43:11 +05:30 committed by GitHub
parent a94bb5fc8e
commit c863fd9d90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 9 deletions

View File

@ -14,6 +14,11 @@ MYSQL="${MYSQL_HOST:-mysql}":"${MYSQL_PORT:-3306}"
while ! wget -O /dev/null -o /dev/null "${MYSQL}";
do echo "Trying to connect to ${MYSQL}"; sleep 5;
done
ELASTICSEARCH="${ELASTICSEARCH_HOST:-elasticsearch}":"${ELASTICSEARCH_PORT:-9200}"
while ! wget -O /dev/null -o /dev/null "${ELASTICSEARCH}";
do echo "Trying to connect to ${ELASTICSEARCH}"; sleep 5;
done
sleep 5
cd /openmetadata-*/
./bootstrap/bootstrap_storage.sh migrate-all
./bin/openmetadata-server-start.sh conf/openmetadata.yaml

View File

@ -14,7 +14,6 @@ from typing import Iterable, Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, rsa
from snowflake.sqlalchemy.custom_types import VARIANT
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names
from sqlalchemy.engine import reflection
@ -43,6 +42,13 @@ ischema_names["GEOGRAPHY"] = GEOGRAPHY
logger: logging.Logger = logging.getLogger(__name__)
def normalize_names(self, name):
return name
SnowflakeDialect.normalize_name = normalize_names
class SnowflakeSource(SQLSource):
def __init__(self, config, metadata_config):
connection_arguments = (
@ -68,14 +74,12 @@ class SnowflakeSource(SQLSource):
super().__init__(config, metadata_config)
def get_databases(self) -> Iterable[Inspector]:
if self.config.serviceConnection.__root__.config.database != None:
if self.config.serviceConnection.__root__.config.database:
yield from super().get_databases()
else:
query = "SHOW DATABASES"
results = self.connection.execute(query)
for res in results:
row = list(res)
use_db_query = f"USE DATABASE {row[1]}"
self.connection.execute(use_db_query)
@ -88,7 +92,7 @@ class SnowflakeSource(SQLSource):
if not resp_sample_data:
try:
logger.info("Using Table Name with quotes to fetch the data")
query = self.config.query.format(schema, f'"{table}"')
query = self.source_config.sampleDataQuery.format(schema, f'"{table}"')
logger.info(query)
results = self.connection.execute(query)
cols = []
@ -101,6 +105,7 @@ class SnowflakeSource(SQLSource):
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(err)
return resp_sample_data
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -390,7 +390,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if self.source_config.includeViews:
yield from self.fetch_views(inspector, schema)
if self.source_config.markDeletedTables:
schema_fqdn = f"{self.config.serviceName}.{schema}"
schema_fqdn = f"{self.config.serviceName}.{self.service_connection.database}.{schema}"
yield from self.delete_tables(schema_fqdn)
except Exception as err:
logger.debug(traceback.format_exc())
@ -852,19 +852,22 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
and column["policy_tags"]
):
self.metadata.create_primary_tag(
category_name=self.config.tag_category_name,
category_name=self.service_connection.tagCategoryName,
primary_tag_body=CreateTagRequest(
name=column["policy_tags"],
description="Bigquery Policy Tag",
),
)
except APIError:
if column["policy_tags"] and self.config.enable_policy_tags:
if (
column["policy_tags"]
and self.service_connection.enablePolicyTagImport
):
col_dict.tags = [
TagLabel(
tagFQN=get_fqdn(
Tag,
self.config.tag_category_name,
self.service_connection.tagCategoryName,
column["policy_tags"],
),
labelType="Automated",