diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 5ab3fbafba..859b150757 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1092,7 +1092,11 @@ class DataHubGraph(DatahubRestEmitter): ) def initialize_schema_resolver_from_datahub( - self, platform: str, platform_instance: Optional[str], env: str + self, + platform: str, + platform_instance: Optional[str], + env: str, + batch_size: int = 100, ) -> "SchemaResolver": logger.info("Initializing schema resolver") schema_resolver = self._make_schema_resolver( @@ -1106,6 +1110,7 @@ class DataHubGraph(DatahubRestEmitter): platform=platform, platform_instance=platform_instance, env=env, + batch_size=batch_size, ): try: schema_resolver.add_graphql_schema_metadata(urn, schema_info) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 65a17dcb68..7b79c83db9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -489,6 +489,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): platform=self.platform, platform_instance=self.config.platform_instance, env=self.config.env, + batch_size=self.config.schema_resolution_batch_size, ) else: logger.warning( @@ -1367,6 +1368,22 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): table=table.table_id, ) + if table.table_type == "VIEW": + if ( + not self.config.include_views + or not self.config.view_pattern.allowed( + table_identifier.raw_table_name() + ) + ): + self.report.report_dropped(table_identifier.raw_table_name()) + continue + else: + if not self.config.table_pattern.allowed( + table_identifier.raw_table_name() + ): + self.report.report_dropped(table_identifier.raw_table_name()) + continue + _, shard = BigqueryTableIdentifier.get_table_and_shard( table_identifier.table ) @@ -1403,6 +1420,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): continue table_items[table.table_id] = table + # Adding maximum shards to the list of tables table_items.update({value.table_id: value for value in sharded_tables.values()}) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index b27fd4c798..21cca01a2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -280,6 +280,12 @@ class BigQueryV2Config( description="Option to exclude empty projects from being ingested.", ) + schema_resolution_batch_size: int = Field( + default=100, + description="The number of tables to process in a batch when resolving schema from DataHub.", + hidden_from_schema=True, + ) + @root_validator(skip_on_failure=True) def profile_default_settings(cls, values: Dict) -> Dict: # Extra default SQLAlchemy option for better connection pooling and threading. diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index dbaf28fabc..8c393d1e8a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -159,7 +159,7 @@ WHERE def get_workunits( self, project_id: str, tables: Dict[str, List[BigqueryTable]] ) -> Iterable[MetadataWorkUnit]: - profile_requests = [] + profile_requests: List[TableProfilerRequest] = [] for dataset in tables: for table in tables[dataset]: @@ -174,10 +174,17 @@ WHERE ) # Emit the profile work unit + logger.debug( + f"Creating profile request for table {normalized_table_name}" + ) profile_request = self.get_profile_request(table, dataset, project_id) if profile_request is not None: self.report.report_entity_profiled(profile_request.pretty_name) profile_requests.append(profile_request) + else: + logger.debug( + f"Table {normalized_table_name} was not eliagible for profiling." + ) if len(profile_requests) == 0: return diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index f051dc9e5b..4708836d3d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -158,6 +158,9 @@ class GenericProfiler: size_in_bytes=table.size_in_bytes, rows_count=table.rows_count, ): + logger.debug( + f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit" + ) # Profile only table level if dataset is filtered from profiling # due to size limits alone if self.is_dataset_eligible_for_profiling( @@ -245,6 +248,9 @@ class GenericProfiler: ) if not self.config.table_pattern.allowed(dataset_name): + logger.debug( + f"Table {dataset_name} is not allowed for profiling due to table pattern" + ) return False last_profiled: Optional[int] = None @@ -267,14 +273,14 @@ class GenericProfiler: self.config.profiling.profile_if_updated_since_days ) - if not self.config.profile_pattern.allowed(dataset_name): - return False - schema_name = dataset_name.rsplit(".", 1)[0] if (threshold_time is not None) and ( last_altered is not None and last_altered < threshold_time ): self.report.profiling_skipped_not_updated[schema_name] += 1 + logger.debug( + f"Table {dataset_name} was skipped because it was not updated recently enough" + ) return False if self.config.profiling.profile_table_size_limit is not None and ( @@ -283,6 +289,9 @@ class GenericProfiler: > self.config.profiling.profile_table_size_limit ): self.report.profiling_skipped_size_limit[schema_name] += 1 + logger.debug( + f"Table {dataset_name} is not allowed for profiling due to size limit" + ) return False if self.config.profiling.profile_table_row_limit is not None and ( @@ -290,6 +299,9 @@ class GenericProfiler: and rows_count > self.config.profiling.profile_table_row_limit ): self.report.profiling_skipped_row_limit[schema_name] += 1 + logger.debug( + f"Table {dataset_name} is not allowed for profiling due to row limit" + ) return False return True