diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 1d9098ff72..9b9cb4cb53 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -76,7 +76,7 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig): ) include_table_lineage = values.get("include_table_lineage") - # TODO: Allow lineage extraction irrespective of basic schema extraction, + # TODO: Allow lineage extraction and profiling irrespective of basic schema extraction, # as it seems possible with some refractor if not include_technical_schema and any( [include_profiles, delete_detection_enabled, include_table_lineage] diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index d7cebdae94..195316db16 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -46,7 +46,6 @@ class SnowflakeProfiler(SnowflakeCommonMixin): "max_overflow", self.config.profiling.max_workers ) - # Otherwise, if column level profiling is enabled, use GE profiler. for db in databases: if not self.config.database_pattern.allowed(db.name): continue @@ -236,6 +235,7 @@ class SnowflakeProfiler(SnowflakeCommonMixin): if len(ge_profile_requests) == 0: return + # Otherwise, if column level profiling is enabled, use GE profiler. ge_profiler = self.get_profiler_instance(db_name) yield from ge_profiler.generate_profiles( ge_profile_requests, max_workers, platform, profiler_args diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 6759839628..60e0b3874d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -92,7 +92,9 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): self.report: SnowflakeV2Report = report self.logger = logger - def get_workunits(self) -> Iterable[MetadataWorkUnit]: + def get_workunits( + self, discovered_datasets: List[str] + ) -> Iterable[MetadataWorkUnit]: conn = self.config.get_connection() logger.info("Checking usage date ranges") @@ -107,18 +109,20 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): # Now, we report the usage as well as operation metadata even if user email is absent if self.config.include_usage_stats: - yield from self.get_usage_workunits(conn) + yield from self.get_usage_workunits(conn, discovered_datasets) if self.config.include_operational_stats: # Generate the operation workunits. access_events = self._get_snowflake_history(conn) for event in access_events: - yield from self._get_operation_aspect_work_unit(event) + yield from self._get_operation_aspect_work_unit( + event, discovered_datasets + ) conn.close() def get_usage_workunits( - self, conn: SnowflakeConnection + self, conn: SnowflakeConnection, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: with PerfTimer() as timer: @@ -144,6 +148,15 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): ): continue + dataset_identifier = self.get_dataset_identifier_from_qualified_name( + row["OBJECT_NAME"] + ) + if dataset_identifier not in discovered_datasets: + logger.debug( + f"Skipping usage for table {dataset_identifier}, as table schema is not accessible" + ) + continue + stats = DatasetUsageStatistics( timestampMillis=int(row["BUCKET_START_TIME"].timestamp() * 1000), eventGranularity=TimeWindowSize( @@ -161,7 +174,7 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): ) dataset_urn = make_dataset_urn_with_platform_instance( self.platform, - self.get_dataset_identifier_from_qualified_name(row["OBJECT_NAME"]), + dataset_identifier, self.config.platform_instance, self.config.env, ) @@ -276,7 +289,7 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): ) def _get_operation_aspect_work_unit( - self, event: SnowflakeJoinedAccessEvent + self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: if event.query_start_time and event.query_type in OPERATION_STATEMENT_TYPES: start_time = event.query_start_time @@ -292,9 +305,20 @@ class SnowflakeUsageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): for obj in event.objects_modified: resource = obj.objectName + + dataset_identifier = self.get_dataset_identifier_from_qualified_name( + resource + ) + + if dataset_identifier not in discovered_datasets: + logger.debug( + f"Skipping operations for table {dataset_identifier}, as table schema is not accessible" + ) + continue + dataset_urn = make_dataset_urn_with_platform_instance( self.platform, - self.get_dataset_identifier_from_qualified_name(resource), + dataset_identifier, self.config.platform_instance, self.config.env, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 54cee37b4e..0a90138562 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -205,9 +205,8 @@ class SnowflakeV2Source( cached_domains=[k for k in self.config.domain], graph=self.ctx.graph ) - if self.config.include_technical_schema: - # For database, schema, tables, views, etc - self.data_dictionary = SnowflakeDataDictionary() + # For database, schema, tables, views, etc + self.data_dictionary = SnowflakeDataDictionary() if config.include_table_lineage: # For lineage @@ -430,25 +429,24 @@ class SnowflakeV2Source( self.inspect_session_metadata(conn) self.report.include_technical_schema = self.config.include_technical_schema - if self.config.include_technical_schema: - databases: List[SnowflakeDatabase] = self.data_dictionary.get_databases( - conn - ) - for snowflake_db in databases: - self.report.report_entity_scanned(snowflake_db.name, "database") + databases: List[SnowflakeDatabase] = [] - if not self.config.database_pattern.allowed(snowflake_db.name): - self.report.report_dropped(f"{snowflake_db.name}.*") - continue + databases = self.data_dictionary.get_databases(conn) + for snowflake_db in databases: + self.report.report_entity_scanned(snowflake_db.name, "database") - yield from self._process_database(conn, snowflake_db) + if not self.config.database_pattern.allowed(snowflake_db.name): + self.report.report_dropped(f"{snowflake_db.name}.*") + continue - conn.close() - # Emit Stale entity workunits - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() + yield from self._process_database(conn, snowflake_db) - if self.config.profiling.enabled and len(databases) != 0: - yield from self.profiler.get_workunits(databases) + conn.close() + # Emit Stale entity workunits + yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() + + if self.config.profiling.enabled and len(databases) != 0: + yield from self.profiler.get_workunits(databases) if self.config.include_usage_stats or self.config.include_operational_stats: if self.redundant_run_skip_handler.should_skip_this_run( @@ -462,14 +460,27 @@ class SnowflakeV2Source( start_time_millis=datetime_to_ts_millis(self.config.start_time), end_time_millis=datetime_to_ts_millis(self.config.end_time), ) - yield from self.usage_extractor.get_workunits() + + discovered_datasets: List[str] = [ + self.get_dataset_identifier(table.name, schema.name, db.name) + for db in databases + for schema in db.schemas + for table in schema.tables + ] + [ + self.get_dataset_identifier(table.name, schema.name, db.name) + for db in databases + for schema in db.schemas + for table in schema.views + ] + yield from self.usage_extractor.get_workunits(discovered_datasets) def _process_database( self, conn: SnowflakeConnection, snowflake_db: SnowflakeDatabase ) -> Iterable[MetadataWorkUnit]: db_name = snowflake_db.name - yield from self.gen_database_containers(snowflake_db) + if self.config.include_technical_schema: + yield from self.gen_database_containers(snowflake_db) # Use database and extract metadata from its information_schema # If this query fails, it means, user does not have usage access on database @@ -501,23 +512,26 @@ class SnowflakeV2Source( self, conn: SnowflakeConnection, snowflake_schema: SnowflakeSchema, db_name: str ) -> Iterable[MetadataWorkUnit]: schema_name = snowflake_schema.name - yield from self.gen_schema_containers(snowflake_schema, db_name) + if self.config.include_technical_schema: + yield from self.gen_schema_containers(snowflake_schema, db_name) if self.config.include_tables: snowflake_schema.tables = self.get_tables_for_schema( conn, schema_name, db_name ) - for table in snowflake_schema.tables: - yield from self._process_table(conn, table, schema_name, db_name) + if self.config.include_technical_schema: + for table in snowflake_schema.tables: + yield from self._process_table(conn, table, schema_name, db_name) if self.config.include_views: snowflake_schema.views = self.get_views_for_schema( conn, schema_name, db_name ) - for view in snowflake_schema.views: - yield from self._process_view(conn, view, schema_name, db_name) + if self.config.include_technical_schema: + for view in snowflake_schema.views: + yield from self._process_view(conn, view, schema_name, db_name) def _process_table( self,