fix(ingest/bigquery): Adding way to change api's batch size on schema init (#10255)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Tamas Nemeth 2024-04-11 22:09:54 +02:00 committed by GitHub
parent 3d94388edf
commit e19b1fef62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 53 additions and 5 deletions

View File

@ -1092,7 +1092,11 @@ class DataHubGraph(DatahubRestEmitter):
)
def initialize_schema_resolver_from_datahub(
self, platform: str, platform_instance: Optional[str], env: str
self,
platform: str,
platform_instance: Optional[str],
env: str,
batch_size: int = 100,
) -> "SchemaResolver":
logger.info("Initializing schema resolver")
schema_resolver = self._make_schema_resolver(
@ -1106,6 +1110,7 @@ class DataHubGraph(DatahubRestEmitter):
platform=platform,
platform_instance=platform_instance,
env=env,
batch_size=batch_size,
):
try:
schema_resolver.add_graphql_schema_metadata(urn, schema_info)

View File

@ -489,6 +489,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
batch_size=self.config.schema_resolution_batch_size,
)
else:
logger.warning(
@ -1367,6 +1368,22 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
table=table.table_id,
)
if table.table_type == "VIEW":
if (
not self.config.include_views
or not self.config.view_pattern.allowed(
table_identifier.raw_table_name()
)
):
self.report.report_dropped(table_identifier.raw_table_name())
continue
else:
if not self.config.table_pattern.allowed(
table_identifier.raw_table_name()
):
self.report.report_dropped(table_identifier.raw_table_name())
continue
_, shard = BigqueryTableIdentifier.get_table_and_shard(
table_identifier.table
)
@ -1403,6 +1420,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
continue
table_items[table.table_id] = table
# Adding maximum shards to the list of tables
table_items.update({value.table_id: value for value in sharded_tables.values()})

View File

@ -280,6 +280,12 @@ class BigQueryV2Config(
description="Option to exclude empty projects from being ingested.",
)
schema_resolution_batch_size: int = Field(
default=100,
description="The number of tables to process in a batch when resolving schema from DataHub.",
hidden_from_schema=True,
)
@root_validator(skip_on_failure=True)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.

View File

@ -159,7 +159,7 @@ WHERE
def get_workunits(
self, project_id: str, tables: Dict[str, List[BigqueryTable]]
) -> Iterable[MetadataWorkUnit]:
profile_requests = []
profile_requests: List[TableProfilerRequest] = []
for dataset in tables:
for table in tables[dataset]:
@ -174,10 +174,17 @@ WHERE
)
# Emit the profile work unit
logger.debug(
f"Creating profile request for table {normalized_table_name}"
)
profile_request = self.get_profile_request(table, dataset, project_id)
if profile_request is not None:
self.report.report_entity_profiled(profile_request.pretty_name)
profile_requests.append(profile_request)
else:
logger.debug(
f"Table {normalized_table_name} was not eliagible for profiling."
)
if len(profile_requests) == 0:
return

View File

@ -158,6 +158,9 @@ class GenericProfiler:
size_in_bytes=table.size_in_bytes,
rows_count=table.rows_count,
):
logger.debug(
f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit"
)
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
if self.is_dataset_eligible_for_profiling(
@ -245,6 +248,9 @@ class GenericProfiler:
)
if not self.config.table_pattern.allowed(dataset_name):
logger.debug(
f"Table {dataset_name} is not allowed for profiling due to table pattern"
)
return False
last_profiled: Optional[int] = None
@ -267,14 +273,14 @@ class GenericProfiler:
self.config.profiling.profile_if_updated_since_days
)
if not self.config.profile_pattern.allowed(dataset_name):
return False
schema_name = dataset_name.rsplit(".", 1)[0]
if (threshold_time is not None) and (
last_altered is not None and last_altered < threshold_time
):
self.report.profiling_skipped_not_updated[schema_name] += 1
logger.debug(
f"Table {dataset_name} was skipped because it was not updated recently enough"
)
return False
if self.config.profiling.profile_table_size_limit is not None and (
@ -283,6 +289,9 @@ class GenericProfiler:
> self.config.profiling.profile_table_size_limit
):
self.report.profiling_skipped_size_limit[schema_name] += 1
logger.debug(
f"Table {dataset_name} is not allowed for profiling due to size limit"
)
return False
if self.config.profiling.profile_table_row_limit is not None and (
@ -290,6 +299,9 @@ class GenericProfiler:
and rows_count > self.config.profiling.profile_table_row_limit
):
self.report.profiling_skipped_row_limit[schema_name] += 1
logger.debug(
f"Table {dataset_name} is not allowed for profiling due to row limit"
)
return False
return True