Optimize and fix snowflake + usage (#4862)

Optimize and fix snowflake + usage (#4862)
This commit is contained in:
Ayush Shah 2022-05-11 17:35:35 +05:30 committed by GitHub
parent 9a5c3c1fcc
commit ed0d65efd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 15 deletions

View File

@ -38,7 +38,7 @@ T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name
class ESMixin(Generic[T]):
client: REST
es_url: str = "/search/query?q=service:{} {}&from={}&size={}&index={}"
es_url: str = "/search/query?q=service:{} AND {}&from={}&size={}&index={}"
def search_entities_using_es(
self, service_name, table_obj, search_index, from_count: int = 0, size: int = 10

View File

@ -42,7 +42,7 @@ from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_table
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.fqdn_generator import get_fqdn
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import (
@ -69,6 +69,11 @@ class SnowflakeSource(SQLSource):
results = self.connection.execute(query)
for res in results:
row = list(res)
if filter_by_database(
self.source_config.databaseFilterPattern, database_name=row[1]
):
self.status.filter(row[1], "Database pattern not allowed")
continue
use_db_query = f"USE DATABASE {row[1]}"
self.connection.execute(use_db_query)
logger.info(f"Ingesting from database: {row[1]}")
@ -77,7 +82,6 @@ class SnowflakeSource(SQLSource):
yield inspect(self.engine)
def fetch_tags(self, schema, table_name: str, column_name: str = ""):
self.connection.execute(f"USE {self.service_connection.database}.{schema}")
try:
result = self.connection.execute(
FETCH_SNOWFLAKE_ALL_TAGS.format(table_name)
@ -143,7 +147,19 @@ class SnowflakeSource(SQLSource):
def next_record(self) -> Iterable[Entity]:
for inspector in self.get_databases():
yield from self.fetch_tables(inspector=inspector, schema="")
for schema in inspector.get_schema_names():
if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name=schema
):
self.status.filter(
f"{self.config.serviceName}.{self.service_connection.database}.{schema}",
"{} pattern not allowed".format("Schema"),
)
continue
self.connection.execute(
f"USE {self.service_connection.database}.{schema}"
)
yield from self.fetch_tables(inspector=inspector, schema=schema)
def add_tags_to_table(self, schema: str, table_name: str, table_entity):
tag_category_list = self.fetch_tags(schema=schema, table_name=table_name)
@ -168,17 +184,19 @@ class SnowflakeSource(SQLSource):
inspector: Inspector,
schema: str,
) -> Iterable[Union[OMetaDatabaseAndTable, OMetaTagAndCategory]]:
entities = inspector.get_table_names()
for db, schema, table_name, entity_type, comment in entities:
entities = inspector.get_table_names(schema)
for table_name, entity_type, comment in entities:
try:
if filter_by_table(
self.source_config.tableFilterPattern, table_name=table_name
):
self.status.filter(
f"{self.config.serviceName}.{db}.{schema}.{table_name}",
f"{self.config.serviceName}.{self.service_connection.database}.{schema}.{table_name}",
"{} pattern not allowed".format(entity_type),
)
continue
if entity_type == "VIEW" and not self.source_config.includeViews:
continue
table_columns = self._get_columns(schema, table_name, inspector)
view_definition = inspector.get_view_definition(table_name, schema)
view_definition = (
@ -199,7 +217,7 @@ class SnowflakeSource(SQLSource):
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
if self.source_config.enableDataProfiler:
profile = self.run_profiler(table=table_name, schema=schema)
profile = self.run_profiler(table=table_entity, schema=schema)
table_entity.tableProfile = [profile] if profile else None
database = self._get_database(self.service_connection.database)
table_schema_and_db = OMetaDatabaseAndTable(
@ -214,8 +232,8 @@ class SnowflakeSource(SQLSource):
logger.error(err)
def get_table_names(self, connection, schema=None, **kw):
result = connection.execute(FETCH_SNOWFLAKE_METADATA)
def get_table_names(self, connection, schema, **kw):
result = connection.execute(FETCH_SNOWFLAKE_METADATA.format(schema))
return result.fetchall()

View File

@ -21,6 +21,9 @@ from sqlalchemy import inspect
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
@ -55,7 +58,7 @@ class SnowflakeUsageSource(UsageSource):
SERVICE_TYPE = DatabaseServiceType.Snowflake.value
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
end = end + timedelta(days=1)
@ -91,7 +94,9 @@ class SnowflakeUsageSource(UsageSource):
logger.info(f"Ingesting from database: {row[1]}")
self.config.serviceConnection.__root__.config.database = row[1]
self.engine = get_connection(self.connection)
yield inspect(self.engine)
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]:
"""
@ -112,8 +117,8 @@ class SnowflakeUsageSource(UsageSource):
sql=row["query_text"],
service_name=self.config.serviceName,
)
if not row["database_name"] and self.service_connection.database:
TableQuery.database = self.service_connection.database
if not row["database_name"] and self.connection.database:
TableQuery.database = self.connection.database
logger.debug(f"Parsed Query: {row['query_text']}")
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")

View File

@ -311,5 +311,5 @@ FETCH_SNOWFLAKE_ALL_TAGS = (
)
FETCH_SNOWFLAKE_METADATA = """
select TABLE_CATALOG,TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables
select TABLE_NAME,TABLE_TYPE,COMMENT from information_schema.tables where TABLE_SCHEMA = '{}'
"""