chore(tableau): set ingestion stage report and perftimers (#12234)

This commit is contained in:
Sergio Gómez Villamor 2025-01-09 19:04:37 +01:00 committed by GitHub
parent 45450f19a0
commit 9d9a368dea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 617 additions and 491 deletions

View File

@ -334,6 +334,8 @@ class SourceReport(Report):
}
def compute_stats(self) -> None:
super().compute_stats()
duration = datetime.datetime.now() - self.start_time
workunits_produced = self.events_produced
if duration.total_seconds() > 0:

View File

@ -253,14 +253,14 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)
with self.report.new_stage("*: View and Snapshot Lineage"):
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)
if self.config.use_queries_v2:
# if both usage and lineage are disabled then skip queries extractor piece
@ -270,31 +270,29 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
):
return
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(

View File

@ -190,6 +190,3 @@ class BigQueryV2Report(
num_skipped_external_table_lineage: int = 0
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None
def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")

View File

@ -248,9 +248,9 @@ class BigQuerySchemaGenerator:
def get_project_workunits(
self, project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)
with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"):
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)
def get_dataplatform_instance_aspect(
self, dataset_urn: str, project_id: str
@ -405,11 +405,11 @@ class BigQuerySchemaGenerator:
if self.config.is_profiling_enabled():
logger.info(f"Starting profiling project {project_id}")
self.report.set_ingestion_stage(project_id, PROFILING)
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)
with self.report.new_stage(f"{project_id}: {PROFILING}"):
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)
def _process_project_datasets(
self,
@ -1203,9 +1203,9 @@ class BigQuerySchemaGenerator:
report=self.report,
)
self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
timer.elapsed_seconds(), 2
)
self.report.metadata_extraction_sec[
f"{project_id}.{dataset.name}"
] = timer.elapsed_seconds(digits=2)
def get_core_table_details(
self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str

View File

@ -330,11 +330,11 @@ class BigqueryLineageExtractor:
projects = ["*"] # project_id not used when using exported metadata
for project in projects:
self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION)
yield from self.generate_lineage(
project,
table_refs,
)
with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"):
yield from self.generate_lineage(
project,
table_refs,
)
if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
@ -368,8 +368,8 @@ class BigqueryLineageExtractor:
self.report.lineage_metadata_entries[project_id] = len(lineage)
logger.info(f"Built lineage map containing {len(lineage)} entries.")
logger.debug(f"lineage metadata is {lineage}")
self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)
self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
memory_footprint.total_size(lineage)

View File

@ -495,62 +495,62 @@ class BigQueryUsageExtractor:
def _generate_operational_workunits(
self, usage_state: BigQueryUsageState, table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS)
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)
with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"):
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)
def _generate_usage_workunits(
self, usage_state: BigQueryUsageState
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION)
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"):
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)
def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
if self.config.use_exported_bigquery_audit_metadata:
@ -559,10 +559,10 @@ class BigQueryUsageExtractor:
for project_id in projects:
with PerfTimer() as timer:
try:
self.report.set_ingestion_stage(
project_id, USAGE_EXTRACTION_INGESTION
)
yield from self._get_parsed_bigquery_log_events(project_id)
with self.report.new_stage(
f"{project_id}: {USAGE_EXTRACTION_INGESTION}"
):
yield from self._get_parsed_bigquery_log_events(project_id)
except Exception as e:
self.report.usage_failed_extraction.append(project_id)
self.report.warning(
@ -572,8 +572,8 @@ class BigQueryUsageExtractor:
)
self.report_status(f"usage-extraction-{project_id}", False)
self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)
def _store_usage_event(

View File

@ -70,30 +70,30 @@ class CassandraProfiler:
) -> Iterable[MetadataWorkUnit]:
for keyspace_name in cassandra_data.keyspaces:
tables = cassandra_data.tables.get(keyspace_name, [])
self.report.set_ingestion_stage(keyspace_name, PROFILING)
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)
with self.report.new_stage(f"{keyspace_name}: {PROFILING}"):
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)
def generate_profile(
self,

View File

@ -54,9 +54,6 @@ class CassandraSourceReport(StaleEntityRemovalSourceReport, IngestionStageReport
else:
raise KeyError(f"Unknown entity {ent_type}.")
def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{keyspace}: {stage}")
# TODO Need to create seperate common config for profiling report
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(

View File

@ -45,6 +45,3 @@ class DremioSourceReport(
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")
def set_ingestion_stage(self, dataset: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{dataset}: {stage}")

View File

@ -472,8 +472,8 @@ class DremioSource(StatefulIngestionSourceBase):
env=self.config.env,
platform_instance=self.config.platform_instance,
)
self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING)
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"):
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
def generate_view_lineage(
self, dataset_urn: str, parents: List[str]

View File

@ -141,40 +141,36 @@ class DataHubGcSource(Source):
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens()
with self.report.new_stage("Expired Token Cleanup"):
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices()
with self.report.new_stage("Truncate Indices"):
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
with self.report.new_stage("Soft Deleted Entities Cleanup"):
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal()
with self.report.new_stage("Data Process Cleanup"):
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
with self.report.new_stage("Execution request Cleanup"):
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from []
def truncate_indices(self) -> None:

View File

@ -423,10 +423,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
database = self.config.database
logger.info(f"Processing db {database}")
self.report.report_ingestion_stage_start(METADATA_EXTRACTION)
self.db_tables[database] = defaultdict()
self.db_views[database] = defaultdict()
self.db_schemas.setdefault(database, {})
with self.report.new_stage(METADATA_EXTRACTION):
self.db_tables[database] = defaultdict()
self.db_views[database] = defaultdict()
self.db_schemas.setdefault(database, {})
# TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping
# this fallback. For now, this gets us broad coverage quickly.
@ -462,12 +462,12 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
self.process_schemas(connection, database)
)
self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION)
yield from self.extract_lineage_v2(
connection=connection,
database=database,
lineage_extractor=lineage_extractor,
)
with self.report.new_stage(LINEAGE_EXTRACTION):
yield from self.extract_lineage_v2(
connection=connection,
database=database,
lineage_extractor=lineage_extractor,
)
all_tables = self.get_all_tables()
else:
@ -480,25 +480,25 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
or self.config.include_view_lineage
or self.config.include_copy_lineage
):
self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION)
yield from self.extract_lineage(
with self.report.new_stage(LINEAGE_EXTRACTION):
yield from self.extract_lineage(
connection=connection, all_tables=all_tables, database=database
)
if self.config.include_usage_statistics:
with self.report.new_stage(USAGE_EXTRACTION_INGESTION):
yield from self.extract_usage(
connection=connection, all_tables=all_tables, database=database
)
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION)
if self.config.include_usage_statistics:
yield from self.extract_usage(
connection=connection, all_tables=all_tables, database=database
)
if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start(PROFILING)
profiler = RedshiftProfiler(
config=self.config,
report=self.report,
state_handler=self.profiling_state_handler,
)
yield from profiler.get_workunits(self.db_tables)
with self.report.new_stage(PROFILING):
profiler = RedshiftProfiler(
config=self.config,
report=self.report,
state_handler=self.profiling_state_handler,
)
yield from profiler.get_workunits(self.db_tables)
def process_schemas(self, connection, database):
for schema in self.data_dictionary.get_schemas(
@ -633,8 +633,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
else:
logger.info("View processing disabled, skipping")
self.report.metadata_extraction_sec[report_key] = round(
timer.elapsed_seconds(), 2
self.report.metadata_extraction_sec[report_key] = timer.elapsed_seconds(
digits=2
)
def _process_table(
@ -986,9 +986,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
yield from usage_extractor.get_usage_workunits(all_tables=all_tables)
self.report.usage_extraction_sec[database] = round(
timer.elapsed_seconds(), 2
)
self.report.usage_extraction_sec[database] = timer.elapsed_seconds(digits=2)
def extract_lineage(
self,
@ -1011,8 +1009,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
database=database, connection=connection, all_tables=all_tables
)
self.report.lineage_extraction_sec[f"{database}"] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds(
digits=2
)
yield from self.generate_lineage(
database, lineage_extractor=lineage_extractor
@ -1042,8 +1040,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
yield from lineage_extractor.generate()
self.report.lineage_extraction_sec[f"{database}"] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds(
digits=2
)
if self.redundant_lineage_run_skip_handler:

View File

@ -182,38 +182,38 @@ class RedshiftUsageExtractor:
self.report.num_operational_stats_filtered = 0
if self.config.include_operational_stats:
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS)
with PerfTimer() as timer:
# Generate operation aspect workunits
yield from self._gen_operation_aspect_workunits(
self.connection, all_tables
)
self.report.operational_metadata_extraction_sec[
self.config.database
] = round(timer.elapsed_seconds(), 2)
with self.report.new_stage(USAGE_EXTRACTION_OPERATIONAL_STATS):
with PerfTimer() as timer:
# Generate operation aspect workunits
yield from self._gen_operation_aspect_workunits(
self.connection, all_tables
)
self.report.operational_metadata_extraction_sec[
self.config.database
] = timer.elapsed_seconds(digits=2)
# Generate aggregate events
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION)
query: str = self.queries.usage_query(
start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
database=self.config.database,
)
access_events_iterable: Iterable[
RedshiftAccessEvent
] = self._gen_access_events_from_history_query(
query, connection=self.connection, all_tables=all_tables
)
with self.report.new_stage(USAGE_EXTRACTION_USAGE_AGGREGATION):
query: str = self.queries.usage_query(
start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
database=self.config.database,
)
access_events_iterable: Iterable[
RedshiftAccessEvent
] = self._gen_access_events_from_history_query(
query, connection=self.connection, all_tables=all_tables
)
aggregated_events: AggregatedAccessEvents = self._aggregate_access_events(
access_events_iterable
)
# Generate usage workunits from aggregated events.
for time_bucket in aggregated_events.values():
for aggregate in time_bucket.values():
wu: MetadataWorkUnit = self._make_usage_stat(aggregate)
self.report.num_usage_workunits_emitted += 1
yield wu
aggregated_events: AggregatedAccessEvents = self._aggregate_access_events(
access_events_iterable
)
# Generate usage workunits from aggregated events.
for time_bucket in aggregated_events.values():
for aggregate in time_bucket.values():
wu: MetadataWorkUnit = self._make_usage_stat(aggregate)
self.report.num_usage_workunits_emitted += 1
yield wu
def _gen_operation_aspect_workunits(
self,

View File

@ -166,6 +166,3 @@ class SnowflakeV2Report(
def report_tag_processed(self, tag_name: str) -> None:
self._processed_tags.add(tag_name)
def set_ingestion_stage(self, database: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{database}: {stage}")

View File

@ -216,21 +216,23 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
try:
for snowflake_db in self.databases:
self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION)
yield from self._process_database(snowflake_db)
with self.report.new_stage(
f"{snowflake_db.name}: {METADATA_EXTRACTION}"
):
yield from self._process_database(snowflake_db)
self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE)
discovered_tables: List[str] = [
self.identifiers.get_dataset_identifier(
table_name, schema.name, db.name
)
for db in self.databases
for schema in db.schemas
for table_name in schema.tables
]
if self.aggregator:
for entry in self._external_tables_ddl_lineage(discovered_tables):
self.aggregator.add(entry)
with self.report.new_stage(f"*: {EXTERNAL_TABLE_DDL_LINEAGE}"):
discovered_tables: List[str] = [
self.identifiers.get_dataset_identifier(
table_name, schema.name, db.name
)
for db in self.databases
for schema in db.schemas
for table_name in schema.tables
]
if self.aggregator:
for entry in self._external_tables_ddl_lineage(discovered_tables):
self.aggregator.add(entry)
except SnowflakePermissionError as e:
self.structured_reporter.failure(
@ -332,8 +334,8 @@ class SnowflakeSchemaGenerator(SnowflakeStructuredReportMixin):
yield from self._process_db_schemas(snowflake_db, db_tables)
if self.profiler and db_tables:
self.report.set_ingestion_stage(snowflake_db.name, PROFILING)
yield from self.profiler.get_workunits(snowflake_db, db_tables)
with self.report.new_stage(f"{snowflake_db.name}: {PROFILING}"):
yield from self.profiler.get_workunits(snowflake_db, db_tables)
def _process_db_schemas(
self,

View File

@ -146,58 +146,57 @@ class SnowflakeUsageExtractor(SnowflakeCommonMixin, Closeable):
if not self._should_ingest_usage():
return
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION)
if self.report.edition == SnowflakeEdition.STANDARD.value:
logger.info(
"Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported."
)
return
with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"):
if self.report.edition == SnowflakeEdition.STANDARD.value:
logger.info(
"Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported."
)
return
logger.info("Checking usage date ranges")
logger.info("Checking usage date ranges")
self._check_usage_date_ranges()
self._check_usage_date_ranges()
# If permission error, execution returns from here
if (
self.report.min_access_history_time is None
or self.report.max_access_history_time is None
):
return
# If permission error, execution returns from here
if (
self.report.min_access_history_time is None
or self.report.max_access_history_time is None
):
return
# NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation
# Now, we report the usage as well as operation metadata even if user email is absent
# NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation
# Now, we report the usage as well as operation metadata even if user email is absent
if self.config.include_usage_stats:
yield from auto_empty_dataset_usage_statistics(
self._get_workunits_internal(discovered_datasets),
config=BaseTimeWindowConfig(
start_time=self.start_time,
end_time=self.end_time,
bucket_duration=self.config.bucket_duration,
),
dataset_urns={
self.identifiers.gen_dataset_urn(dataset_identifier)
for dataset_identifier in discovered_datasets
},
)
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS)
if self.config.include_operational_stats:
# Generate the operation workunits.
access_events = self._get_snowflake_history()
for event in access_events:
yield from self._get_operation_aspect_work_unit(
event, discovered_datasets
if self.config.include_usage_stats:
yield from auto_empty_dataset_usage_statistics(
self._get_workunits_internal(discovered_datasets),
config=BaseTimeWindowConfig(
start_time=self.start_time,
end_time=self.end_time,
bucket_duration=self.config.bucket_duration,
),
dataset_urns={
self.identifiers.gen_dataset_urn(dataset_identifier)
for dataset_identifier in discovered_datasets
},
)
if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
self.config.start_time,
self.config.end_time,
self.config.bucket_duration,
)
with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"):
if self.config.include_operational_stats:
# Generate the operation workunits.
access_events = self._get_snowflake_history()
for event in access_events:
yield from self._get_operation_aspect_work_unit(
event, discovered_datasets
)
if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
self.config.start_time,
self.config.end_time,
self.config.bucket_duration,
)
def _get_workunits_internal(
self, discovered_datasets: List[str]
@ -386,7 +385,7 @@ class SnowflakeUsageExtractor(SnowflakeCommonMixin, Closeable):
)
self.report_status(USAGE_EXTRACTION_OPERATIONAL_STATS, False)
return
self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2)
self.report.access_history_query_secs = timer.elapsed_seconds(digits=2)
for row in results:
yield from self._process_snowflake_history_row(row)
@ -434,8 +433,8 @@ class SnowflakeUsageExtractor(SnowflakeCommonMixin, Closeable):
self.report.max_access_history_time = db_row["MAX_TIME"].astimezone(
tz=timezone.utc
)
self.report.access_history_range_query_secs = round(
timer.elapsed_seconds(), 2
self.report.access_history_range_query_secs = timer.elapsed_seconds(
digits=2
)
def _get_operation_aspect_work_unit(

View File

@ -480,8 +480,8 @@ class SnowflakeV2Source(
identifiers=self.identifiers,
)
self.report.set_ingestion_stage("*", METADATA_EXTRACTION)
yield from schema_extractor.get_workunits_internal()
with self.report.new_stage(f"*: {METADATA_EXTRACTION}"):
yield from schema_extractor.get_workunits_internal()
databases = schema_extractor.databases
@ -513,47 +513,46 @@ class SnowflakeV2Source(
discovered_datasets = discovered_tables + discovered_views
if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", VIEW_PARSING)
yield from auto_workunit(self.aggregator.gen_metadata())
with self.report.new_stage(f"*: {VIEW_PARSING}"):
yield from auto_workunit(self.aggregator.gen_metadata())
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
schema_resolver = self.aggregator._schema_resolver
schema_resolver = self.aggregator._schema_resolver
queries_extractor = SnowflakeQueriesExtractor(
connection=self.connection,
config=SnowflakeQueriesExtractorConfig(
window=self.config,
temporary_tables_pattern=self.config.temporary_tables_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_stats,
include_operations=self.config.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
user_email_pattern=self.config.user_email_pattern,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
graph=self.ctx.graph,
)
queries_extractor = SnowflakeQueriesExtractor(
connection=self.connection,
config=SnowflakeQueriesExtractorConfig(
window=self.config,
temporary_tables_pattern=self.config.temporary_tables_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_stats,
include_operations=self.config.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
user_email_pattern=self.config.user_email_pattern,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
graph=self.ctx.graph,
)
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
# but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors,
# it should be pretty straightforward to refactor this and only initialize the aggregator once.
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
queries_extractor.close()
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
# but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors,
# it should be pretty straightforward to refactor this and only initialize the aggregator once.
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
queries_extractor.close()
else:
if self.lineage_extractor:
self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION)
self.lineage_extractor.add_time_based_lineage_to_aggregator(
discovered_tables=discovered_tables,
discovered_views=discovered_views,
)
with self.report.new_stage(f"*: {LINEAGE_EXTRACTION}"):
self.lineage_extractor.add_time_based_lineage_to_aggregator(
discovered_tables=discovered_tables,
discovered_views=discovered_views,
)
# This would emit view and external table ddl lineage
# as well as query lineage via lineage_extractor

View File

@ -878,7 +878,7 @@ ORDER by DataBaseName, TableName;
urns = self.schema_resolver.get_urns()
if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps(urns=urns)
with self.report.new_stage("Audit log extraction"):
yield from self.get_audit_log_mcps(urns=urns)
yield from self.builder.gen_workunits()

View File

@ -118,6 +118,7 @@ from datahub.ingestion.source.tableau.tableau_common import (
)
from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo
from datahub.ingestion.source.tableau.tableau_validation import check_user_role
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
@ -170,6 +171,8 @@ from datahub.sql_parsing.sqlglot_lineage import (
create_lineage_sql_parsed_result,
)
from datahub.utilities import config_clean
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict
from datahub.utilities.urns.dataset_urn import DatasetUrn
try:
@ -643,12 +646,41 @@ class SiteIdContentUrl:
@dataclass
class TableauSourceReport(StaleEntityRemovalSourceReport):
class TableauSourceReport(
StaleEntityRemovalSourceReport,
IngestionStageReport,
):
get_all_datasources_query_failed: bool = False
num_get_datasource_query_failures: int = 0
num_datasource_field_skipped_no_name: int = 0
num_csql_field_skipped_no_name: int = 0
num_table_field_skipped_no_name: int = 0
# timers
extract_usage_stats_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
fetch_groups_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
populate_database_server_hostname_map_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
populate_projects_registry_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_workbooks_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_sheets_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_dashboards_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict)
emit_embedded_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_published_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_custom_sql_datasources_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
emit_upstream_tables_timer: Dict[str, float] = dataclass_field(
default_factory=TopKDict
)
# lineage
num_tables_with_upstream_lineage: int = 0
num_upstream_table_lineage: int = 0
@ -660,6 +692,7 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
num_hidden_assets_skipped: int = 0
logged_in_user: List[UserInfo] = dataclass_field(default_factory=list)
last_authenticated_at: Optional[datetime] = None
num_expected_tableau_metadata_queries: int = 0
@ -834,6 +867,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
platform=self.platform,
)
yield from site_source.ingest_tableau_site()
except MetadataQueryException as md_exception:
self.report.failure(
title="Failed to Retrieve Tableau Metadata",
@ -3489,33 +3523,87 @@ class TableauSiteSource:
return {"permissions": json.dumps(groups)} if len(groups) > 0 else None
def ingest_tableau_site(self):
# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
self._populate_usage_stat_registry()
with self.report.new_stage(
f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}"
):
# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
with PerfTimer() as timer:
self._populate_usage_stat_registry()
self.report.extract_usage_stats_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.config.permission_ingestion:
self._fetch_groups()
if self.config.permission_ingestion:
with PerfTimer() as timer:
self._fetch_groups()
self.report.fetch_groups_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
self._populate_database_server_hostname_map()
# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
with PerfTimer() as timer:
self._populate_database_server_hostname_map()
self.report.populate_database_server_hostname_map_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
self._populate_projects_registry()
with PerfTimer() as timer:
self._populate_projects_registry()
self.report.populate_projects_registry_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()
yield from self.emit_workbooks()
if self.sheet_ids:
yield from self.emit_sheets()
if self.dashboard_ids:
yield from self.emit_dashboards()
if self.embedded_datasource_ids_being_used:
yield from self.emit_embedded_datasources()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
yield from self.emit_custom_sql_datasources()
if self.database_tables:
yield from self.emit_upstream_tables()
if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()
with PerfTimer() as timer:
yield from self.emit_workbooks()
self.report.emit_workbooks_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.sheet_ids:
with PerfTimer() as timer:
yield from self.emit_sheets()
self.report.emit_sheets_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.dashboard_ids:
with PerfTimer() as timer:
yield from self.emit_dashboards()
self.report.emit_dashboards_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.embedded_datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_embedded_datasources()
self.report.emit_embedded_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_published_datasources()
self.report.emit_published_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.custom_sql_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_custom_sql_datasources()
self.report.emit_custom_sql_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.database_tables:
with PerfTimer() as timer:
yield from self.emit_upstream_tables()
self.report.emit_upstream_tables_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

View File

@ -263,86 +263,86 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
]
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_ingestion_stage_start("Ingestion Setup")
wait_on_warehouse = None
if self.config.include_hive_metastore:
self.report.report_ingestion_stage_start("Start warehouse")
# Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
self.report.report_failure(
"initialization",
f"SQL warehouse {self.config.profiling.warehouse_id} not found",
)
return
else:
# wait until warehouse is started
wait_on_warehouse.result()
with self.report.new_stage("Ingestion Setup"):
wait_on_warehouse = None
if self.config.include_hive_metastore:
with self.report.new_stage("Start warehouse"):
# Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
self.report.report_failure(
"initialization",
f"SQL warehouse {self.config.profiling.warehouse_id} not found",
)
return
else:
# wait until warehouse is started
wait_on_warehouse.result()
if self.config.include_ownership:
self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
self.build_groups_map()
with self.report.new_stage("Ingest service principals"):
self.build_service_principal_map()
self.build_groups_map()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Ingest notebooks")
yield from self.process_notebooks()
with self.report.new_stage("Ingest notebooks"):
yield from self.process_notebooks()
yield from self.process_metastores()
yield from self.get_view_lineage()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Notebook lineage")
for notebook in self.notebooks.values():
wu = self._gen_notebook_lineage(notebook)
if wu:
yield wu
with self.report.new_stage("Notebook lineage"):
for notebook in self.notebooks.values():
wu = self._gen_notebook_lineage(notebook)
if wu:
yield wu
if self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("Ingest usage")
usage_extractor = UnityCatalogUsageExtractor(
config=self.config,
report=self.report,
proxy=self.unity_catalog_api_proxy,
table_urn_builder=self.gen_dataset_urn,
user_urn_builder=self.gen_user_urn,
)
yield from usage_extractor.get_usage_workunits(
self.table_refs | self.view_refs
)
with self.report.new_stage("Ingest usage"):
usage_extractor = UnityCatalogUsageExtractor(
config=self.config,
report=self.report,
proxy=self.unity_catalog_api_proxy,
table_urn_builder=self.gen_dataset_urn,
user_urn_builder=self.gen_user_urn,
)
yield from usage_extractor.get_usage_workunits(
self.table_refs | self.view_refs
)
if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Start warehouse")
# Need to start the warehouse again for profiling,
# as it may have been stopped after ingestion might take
# longer time to complete
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
self.report.report_failure(
"initialization",
f"SQL warehouse {self.config.profiling.warehouse_id} not found",
)
return
else:
# wait until warehouse is started
wait_on_warehouse.result()
with self.report.new_stage("Start warehouse"):
# Need to start the warehouse again for profiling,
# as it may have been stopped after ingestion might take
# longer time to complete
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
self.report.report_failure(
"initialization",
f"SQL warehouse {self.config.profiling.warehouse_id} not found",
)
return
else:
# wait until warehouse is started
wait_on_warehouse.result()
self.report.report_ingestion_stage_start("Profiling")
if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):
yield from UnityCatalogAnalyzeProfiler(
self.config.profiling,
self.report,
self.unity_catalog_api_proxy,
self.gen_dataset_urn,
).get_workunits(self.table_refs)
elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig):
yield from UnityCatalogGEProfiler(
sql_common_config=self.config,
profiling_config=self.config.profiling,
report=self.report,
).get_workunits(list(self.tables.values()))
else:
raise ValueError("Unknown profiling config method")
with self.report.new_stage("Profiling"):
if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig):
yield from UnityCatalogAnalyzeProfiler(
self.config.profiling,
self.report,
self.unity_catalog_api_proxy,
self.gen_dataset_urn,
).get_workunits(self.table_refs)
elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig):
yield from UnityCatalogGEProfiler(
sql_common_config=self.config,
profiling_config=self.config.profiling,
report=self.report,
).get_workunits(list(self.tables.values()))
else:
raise ValueError("Unknown profiling config method")
def build_service_principal_map(self) -> None:
try:
@ -462,11 +462,11 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.report.schemas.dropped(schema.id)
continue
self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}")
yield from self.gen_schema_containers(schema)
yield from self.process_tables(schema)
with self.report.new_stage(f"Ingest schema {schema.id}"):
yield from self.gen_schema_containers(schema)
yield from self.process_tables(schema)
self.report.schemas.processed(schema.id)
self.report.schemas.processed(schema.id)
def process_tables(self, schema: Schema) -> Iterable[MetadataWorkUnit]:
for table in self.unity_catalog_api_proxy.tables(schema=schema):

View File

@ -1,7 +1,7 @@
import logging
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict
@ -22,25 +22,29 @@ PROFILING = "Profiling"
@dataclass
class IngestionStageReport:
ingestion_stage: Optional[str] = None
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)
_timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
)
def new_stage(self, stage: str) -> "IngestionStageContext":
return IngestionStageContext(stage, self)
def report_ingestion_stage_start(self, stage: str) -> None:
if self._timer:
elapsed = round(self._timer.elapsed_seconds(), 2)
logger.info(
f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds",
stacklevel=2,
)
if self.ingestion_stage:
self.ingestion_stage_durations[self.ingestion_stage] = elapsed
else:
self._timer = PerfTimer()
self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
@dataclass
class IngestionStageContext(AbstractContextManager):
def __init__(self, stage: str, report: IngestionStageReport):
self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
self._timer: PerfTimer = PerfTimer()
self._report = report
def __enter__(self) -> "IngestionStageContext":
logger.info(f"Stage started: {self._ingestion_stage}")
self._timer.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = self._timer.elapsed_seconds(digits=2)
logger.info(
f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds",
stacklevel=2,
)
self._report.ingestion_stage_durations[self._ingestion_stage] = elapsed
return None

View File

@ -57,7 +57,7 @@ class PerfTimer(AbstractContextManager):
self.finish()
return None
def elapsed_seconds(self) -> float:
def elapsed_seconds(self, digits: int = 4) -> float:
"""
Returns the elapsed time in seconds.
"""
@ -65,11 +65,18 @@ class PerfTimer(AbstractContextManager):
return self._past_active_time
if self.end_time is None:
return (time.perf_counter() - self.start_time) + (self._past_active_time)
elapsed = (time.perf_counter() - self.start_time) + (self._past_active_time)
else:
return (self.end_time - self.start_time) + self._past_active_time
elapsed = (self.end_time - self.start_time) + self._past_active_time
return round(elapsed, digits)
def assert_timer_is_running(self) -> None:
if not self.is_running():
self._error_state = True
logger.warning("Did you forget to start the timer ?")
def is_running(self) -> bool:
"""
Returns true if timer is in running state.
Timer is in NOT in running state if
@ -77,9 +84,7 @@ class PerfTimer(AbstractContextManager):
2. it is in paused state.
3. it had been started and finished in the past but not started again.
"""
if self.start_time is None or self.paused or self.end_time:
self._error_state = True
logger.warning("Did you forget to start the timer ?")
return self.start_time is not None and not self.paused and self.end_time is None
def __repr__(self) -> str:
return repr(self.as_obj())

View File

@ -26,14 +26,14 @@ from tests.performance.helpers import workunit_sink
def run_test():
report = BigQueryV2Report()
report.set_ingestion_stage("All", "Seed Data Generation")
seed_metadata = generate_data(
num_containers=2000,
num_tables=20000,
num_views=2000,
time_range=timedelta(days=7),
)
all_tables = seed_metadata.all_tables
with report.new_stage("All: Seed Data Generation"):
seed_metadata = generate_data(
num_containers=2000,
num_tables=20000,
num_views=2000,
time_range=timedelta(days=7),
)
all_tables = seed_metadata.all_tables
config = BigQueryV2Config(
start_time=seed_metadata.start_time,
@ -51,42 +51,45 @@ def run_test():
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(config, report),
)
report.set_ingestion_stage("All", "Event Generation")
with report.new_stage("All: Event Generation"):
num_projects = 100
projects = [f"project-{i}" for i in range(num_projects)]
table_to_project = {table.name: random.choice(projects) for table in all_tables}
table_refs = {
str(ref_from_table(table, table_to_project)) for table in all_tables
}
num_projects = 100
projects = [f"project-{i}" for i in range(num_projects)]
table_to_project = {table.name: random.choice(projects) for table in all_tables}
table_refs = {str(ref_from_table(table, table_to_project)) for table in all_tables}
queries = list(
generate_queries(
seed_metadata,
num_selects=240_000,
num_operations=800_000,
num_unique_queries=50_000,
num_users=2000,
query_length=NormalDistribution(2000, 500),
queries = list(
generate_queries(
seed_metadata,
num_selects=240_000,
num_operations=800_000,
num_unique_queries=50_000,
num_users=2000,
query_length=NormalDistribution(2000, 500),
)
)
)
queries.sort(key=lambda q: q.timestamp)
events = list(generate_events(queries, projects, table_to_project, config=config))
print(f"Events generated: {len(events)}")
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
queries.sort(key=lambda q: q.timestamp)
events = list(
generate_events(queries, projects, table_to_project, config=config)
)
print(f"Events generated: {len(events)}")
pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss
print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}")
report.set_ingestion_stage("All", "Event Ingestion")
with PerfTimer() as timer:
workunits = usage_extractor._get_workunits_internal(events, table_refs)
num_workunits, peak_memory_usage = workunit_sink(workunits)
report.set_ingestion_stage("All", "Done")
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")
with report.new_stage("All: Event Ingestion"):
with PerfTimer() as timer:
workunits = usage_extractor._get_workunits_internal(events, table_refs)
num_workunits, peak_memory_usage = workunit_sink(workunits)
with report.new_stage("All: Done"):
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(f"Disk Used: {report.processing_perf.usage_state_size}")
print(f"Hash collisions: {report.num_usage_query_hash_collisions}")
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(f"Disk Used: {report.processing_perf.usage_state_size}")
print(f"Hash collisions: {report.num_usage_query_hash_collisions}")
if __name__ == "__main__":

View File

@ -59,7 +59,7 @@ def run_test():
workunits = source.get_workunits()
num_workunits, peak_memory_usage = workunit_sink(workunits)
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")
print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"

View File

@ -53,7 +53,7 @@ def run_test():
workunits = source.get_workunits()
num_workunits, peak_memory_usage = workunit_sink(workunits)
logging.info(f"Workunits Generated: {num_workunits}")
logging.info(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")
logging.info(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")
logging.info(source.get_report().as_string())
logging.info(

View File

@ -12,12 +12,14 @@ def run_test() -> None:
for i in range(N):
if i % 50 == 0:
print(
f"Running iteration {i}, elapsed time: {timer.elapsed_seconds():.2f} seconds"
f"Running iteration {i}, elapsed time: {timer.elapsed_seconds(digits=2)} seconds"
)
try_format_query.__wrapped__(large_sql_query, platform="snowflake")
print(f"Total time taken for {N} iterations: {timer.elapsed_seconds():.2f} seconds")
print(
f"Total time taken for {N} iterations: {timer.elapsed_seconds(digits=2)} seconds"
)
if __name__ == "__main__":

View File

@ -0,0 +1,42 @@
import time
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
def test_ingestion_stage_context_records_duration():
report = IngestionStageReport()
with report.new_stage(stage="Test Stage"):
pass
assert len(report.ingestion_stage_durations) == 1
assert "Test Stage" in next(iter(report.ingestion_stage_durations.keys()))
def test_ingestion_stage_context_handles_exceptions():
report = IngestionStageReport()
try:
with report.new_stage(stage="Test Stage"):
raise ValueError("Test Exception")
except ValueError:
pass
assert len(report.ingestion_stage_durations) == 1
assert "Test Stage" in next(iter(report.ingestion_stage_durations))
def test_ingestion_stage_context_report_handles_multiple_stages():
report = IngestionStageReport()
with report.new_stage(stage="Test Stage 1"):
time.sleep(0.1)
with report.new_stage(stage="Test Stage 2"):
time.sleep(0.1)
with report.new_stage(stage="Test Stage 3"):
time.sleep(0.1)
assert len(report.ingestion_stage_durations) == 3
assert all(
isinstance(duration, float) and duration > 0.0
for duration in report.ingestion_stage_durations.values()
)
sorted_stages = list(sorted(report.ingestion_stage_durations.keys()))
assert "Test Stage 1" in sorted_stages[0]
assert "Test Stage 2" in sorted_stages[1]
assert "Test Stage 3" in sorted_stages[2]