Snowflake usage fix (#3991)

* Snowflke Usage Fix

* Snowflke Usage Fix
This commit is contained in:
Mayur Singal 2022-04-09 21:43:02 +05:30 committed by GitHub
parent 0b44bbdde1
commit bb90df0e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 25 deletions

View File

@ -1,16 +1,22 @@
{ {
"source": { "source": {
"type": "snowflake-usage", "type": "snowflake-usage",
"config": { "serviceName": "snowflake",
"host_port": "account.region.service.snowflakecomputing.com", "serviceConnection": {
"username": "username", "config": {
"password": "strong_password", "type": "Snowflake",
"database": "SNOWFLAKE_SAMPLE_DATA", "hostPort": "account.region.service.snowflakecomputing.com",
"account": "account_name", "username": "username",
"warehouse": "COMPUTE_WH", "password": "strong_password",
"service_name": "snowflake", "database": "SNOWFLAKE_SAMPLE_DATA",
"result_limit": 1000, "warehouse": "COMPUTE_WH",
"duration": 2 "account": "account_name"
}
},
"sourceConfig": {
"config": {
"resultLimit":1000
}
} }
}, },
"processor": { "processor": {
@ -25,17 +31,16 @@
"filename": "/tmp/snowflake_usage" "filename": "/tmp/snowflake_usage"
} }
}, },
"bulk_sink": { "bulkSink": {
"type": "metadata-usage", "type": "metadata-usage",
"config": { "config": {
"filename": "/tmp/snowflake_usage" "filename": "/tmp/snowflake_usage"
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -16,17 +16,22 @@ import traceback
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, Iterable, Iterator, Union from typing import Any, Dict, Iterable, Iterator, Union
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import Source, SourceStatus from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.snowflake import SnowflakeConfig
from metadata.ingestion.source.sql_alchemy_helper import ( from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper, SQLAlchemyHelper,
SQLSourceStatus, SQLSourceStatus,
@ -75,18 +80,23 @@ class SnowflakeUsageSource(Source[TableQuery]):
SERVICE_TYPE = DatabaseServiceType.Snowflake.value SERVICE_TYPE = DatabaseServiceType.Snowflake.value
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config, metadata_config): def __init__(
self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
):
super().__init__() super().__init__()
self.config = config self.config = config
start, end = get_start_and_end(config.duration) self.service_connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
end = end + timedelta(days=1) end = end + timedelta(days=1)
self.analysis_date = start self.analysis_date = start
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start, end_date=end, result_limit=self.config.result_limit start_date=start,
end_date=end,
result_limit=self.config.sourceConfig.config.resultLimit,
) )
self.alchemy_helper = SQLAlchemyHelper( self.alchemy_helper = SQLAlchemyHelper(
config, metadata_config, "Snowflake", self.sql_stmt self.service_connection, metadata_config, "Snowflake", self.sql_stmt
) )
self._extract_iter: Union[None, Iterator] = None self._extract_iter: Union[None, Iterator] = None
self._database = "Snowflake" self._database = "Snowflake"
@ -94,7 +104,12 @@ class SnowflakeUsageSource(Source[TableQuery]):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = SnowflakeConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: SnowflakeConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SnowflakeConnection):
raise InvalidSourceException(
f"Expected SnowflakeConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -126,7 +141,7 @@ class SnowflakeUsageSource(Source[TableQuery]):
aborted="1969" in str(row["end_time"]), aborted="1969" in str(row["end_time"]),
database=row["database_name"], database=row["database_name"],
sql=row["query_text"], sql=row["query_text"],
service_name=self.config.service_name, service_name=self.config.serviceName,
) )
logger.debug(f"Parsed Query: {row['query_text']}") logger.debug(f"Parsed Query: {row['query_text']}")
if row["schema_name"] is not None: if row["schema_name"] is not None:
@ -138,7 +153,7 @@ class SnowflakeUsageSource(Source[TableQuery]):
"sql": table_query.sql, "sql": table_query.sql,
"from_type": "table", "from_type": "table",
"to_type": "table", "to_type": "table",
"service_name": self.config.service_name, "service_name": self.config.serviceName,
} }
ingest_lineage(query_info, self.metadata_config) ingest_lineage(query_info, self.metadata_config)
except Exception as err: except Exception as err: