diff --git a/ingestion/examples/workflows/snowflake_usage.json b/ingestion/examples/workflows/snowflake_usage.json index da3f9b2115e..5855b98fa53 100644 --- a/ingestion/examples/workflows/snowflake_usage.json +++ b/ingestion/examples/workflows/snowflake_usage.json @@ -1,16 +1,22 @@ { "source": { "type": "snowflake-usage", - "config": { - "host_port": "account.region.service.snowflakecomputing.com", - "username": "username", - "password": "strong_password", - "database": "SNOWFLAKE_SAMPLE_DATA", - "account": "account_name", - "warehouse": "COMPUTE_WH", - "service_name": "snowflake", - "result_limit": 1000, - "duration": 2 + "serviceName": "snowflake", + "serviceConnection": { + "config": { + "type": "Snowflake", + "hostPort": "account.region.service.snowflakecomputing.com", + "username": "username", + "password": "strong_password", + "database": "SNOWFLAKE_SAMPLE_DATA", + "warehouse": "COMPUTE_WH", + "account": "account_name" + } + }, + "sourceConfig": { + "config": { + "resultLimit":1000 + } } }, "processor": { @@ -25,17 +31,16 @@ "filename": "/tmp/snowflake_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/snowflake_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index db632d0b74c..992f2c67401 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -16,17 +16,22 @@ import traceback from datetime import timedelta from typing import Any, Dict, Iterable, Iterator, Union +from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( + SnowflakeConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus # This import verifies that the dependencies are available. from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.snowflake import SnowflakeConfig from metadata.ingestion.source.sql_alchemy_helper import ( SQLAlchemyHelper, SQLSourceStatus, @@ -75,18 +80,23 @@ class SnowflakeUsageSource(Source[TableQuery]): SERVICE_TYPE = DatabaseServiceType.Snowflake.value DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" - def __init__(self, config, metadata_config): + def __init__( + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig + ): super().__init__() self.config = config - start, end = get_start_and_end(config.duration) + self.service_connection = config.serviceConnection.__root__.config + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) end = end + timedelta(days=1) self.analysis_date = start self.metadata_config = metadata_config self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( - start_date=start, end_date=end, result_limit=self.config.result_limit + start_date=start, + end_date=end, + result_limit=self.config.sourceConfig.config.resultLimit, ) self.alchemy_helper = SQLAlchemyHelper( - config, metadata_config, "Snowflake", self.sql_stmt + self.service_connection, metadata_config, "Snowflake", self.sql_stmt ) self._extract_iter: Union[None, Iterator] = None self._database = "Snowflake" @@ -94,7 +104,12 @@ class SnowflakeUsageSource(Source[TableQuery]): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = SnowflakeConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: SnowflakeConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SnowflakeConnection): + raise InvalidSourceException( + f"Expected SnowflakeConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -126,7 +141,7 @@ class SnowflakeUsageSource(Source[TableQuery]): aborted="1969" in str(row["end_time"]), database=row["database_name"], sql=row["query_text"], - service_name=self.config.service_name, + service_name=self.config.serviceName, ) logger.debug(f"Parsed Query: {row['query_text']}") if row["schema_name"] is not None: @@ -138,7 +153,7 @@ class SnowflakeUsageSource(Source[TableQuery]): "sql": table_query.sql, "from_type": "table", "to_type": "table", - "service_name": self.config.service_name, + "service_name": self.config.serviceName, } ingest_lineage(query_info, self.metadata_config) except Exception as err: