diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index e576bf6423..cfded7b468 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -357,6 +357,7 @@ class BigQuerySource(SQLAlchemySource): logger.info( f"Built lineage map containing {len(self.lineage_metadata)} entries." ) + logger.debug(f"lineage metadata is {self.lineage_metadata}") def _compute_bigquery_lineage_via_gcp_logging( self, lineage_client_project_id: Optional[str] @@ -631,9 +632,6 @@ class BigQuerySource(SQLAlchemySource): lineage_map[destination_table_str] = new_lineage_str if not (has_table or has_view): self.report.num_skipped_lineage_entries_other += 1 - - if self.config.upstream_lineage_in_report: - self.report.upstream_lineage = lineage_map return lineage_map def get_latest_partition( @@ -823,7 +821,7 @@ WHERE assert self.lineage_metadata for ref_table in self.lineage_metadata[str(bq_table)]: upstream_table = BigQueryTableRef.from_string_name(ref_table) - if upstream_table.is_temporary_table(): + if upstream_table.is_temporary_table(self.config.temp_table_dataset_prefix): # making sure we don't process a table twice and not get into a recursive loop if ref_table in tables_seen: logger.debug( @@ -843,9 +841,11 @@ WHERE self, dataset_urn: str ) -> Optional[MetadataChangeProposalWrapper]: if self.lineage_metadata is None: + logger.debug("No lineage metadata so skipping getting mcp") return None dataset_key: Optional[DatasetKey] = mce_builder.dataset_urn_to_key(dataset_urn) if dataset_key is None: + logger.debug(f"No dataset_key for {dataset_urn} so skipping getting mcp") return None project_id, dataset_name, tablename = dataset_key.name.split(".") bq_table = BigQueryTableRef(project_id, dataset_name, tablename) @@ -869,6 +869,12 @@ WHERE ), DatasetLineageTypeClass.TRANSFORMED, ) + if self.config.upstream_lineage_in_report: + current_lineage_map: Set = self.report.upstream_lineage.get( + str(bq_table), set() + ) + current_lineage_map.add(str(upstream_table)) + self.report.upstream_lineage[str(bq_table)] = current_lineage_map upstream_list.append(upstream_table_class) if upstream_list: diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 41a4f03bb1..ae05deea34 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -260,9 +260,9 @@ class BigQueryTableRef: raise ValueError(f"invalid BigQuery table reference: {ref}") return cls(parts[1], parts[3], parts[5]) - def is_temporary_table(self) -> bool: + def is_temporary_table(self, prefix: str) -> bool: # Temporary tables will have a dataset that begins with an underscore. - return self.dataset.startswith("_") + return self.dataset.startswith(prefix) def remove_extras(self) -> "BigQueryTableRef": # Handle partitioned and sharded tables. @@ -971,7 +971,8 @@ class BigQueryUsageSource(Source): ) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]: self.report.num_read_events = 0 self.report.num_query_events = 0 - self.report.num_filtered_events = 0 + self.report.num_filtered_read_events = 0 + self.report.num_filtered_query_events = 0 for entry in entries: event: Optional[Union[ReadEvent, QueryEvent]] = None @@ -979,7 +980,7 @@ class BigQueryUsageSource(Source): if missing_read_entry is None: event = ReadEvent.from_entry(entry) if not self._is_table_allowed(event.resource): - self.report.num_filtered_events += 1 + self.report.num_filtered_read_events += 1 continue self.report.num_read_events += 1 @@ -987,7 +988,7 @@ class BigQueryUsageSource(Source): if event is None and missing_query_entry is None: event = QueryEvent.from_entry(entry) if not self._is_table_allowed(event.destinationTable): - self.report.num_filtered_events += 1 + self.report.num_filtered_query_events += 1 continue self.report.num_query_events += 1 wu = self._create_operation_aspect_work_unit(event) @@ -999,7 +1000,7 @@ class BigQueryUsageSource(Source): if event is None and missing_query_entry_v2 is None: event = QueryEvent.from_entry_v2(entry) if not self._is_table_allowed(event.destinationTable): - self.report.num_filtered_events += 1 + self.report.num_filtered_query_events += 1 continue self.report.num_query_events += 1 wu = self._create_operation_aspect_work_unit(event) @@ -1013,6 +1014,7 @@ class BigQueryUsageSource(Source): f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}", ) else: + logger.debug(f"Yielding {event} from log entries") yield event logger.info( @@ -1138,7 +1140,7 @@ class BigQueryUsageSource(Source): logger.warning(f"Failed to process event {str(event.resource)}", e) continue - if resource.is_temporary_table(): + if resource.is_temporary_table(self.config.temp_table_dataset_prefix): logger.debug(f"Dropping temporary table {resource}") self.report.report_dropped(str(resource)) continue diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 1929f99021..80e77fe946 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -56,6 +56,10 @@ class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig): description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.", ) _credentials_path: Optional[str] = pydantic.PrivateAttr(None) + temp_table_dataset_prefix: str = pydantic.Field( + default="_", + description="If you are creating temp tables in a dataset with a particular prefix you can use this config to set the prefix for the dataset. This is to support workflows from before bigquery's introduction of temp tables. By default we use `_` because of datasets that begin with an underscore are hidden by default https://cloud.google.com/bigquery/docs/datasets#dataset-naming.", + ) use_v2_audit_metadata: Optional[bool] = pydantic.Field( default=False, description="Whether to ingest logs using the v2 format." ) diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py index 8ff61674e2..960eed94e1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py @@ -116,6 +116,10 @@ class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig): description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set. See this example recipe for details", ) _credentials_path: Optional[str] = pydantic.PrivateAttr(None) + temp_table_dataset_prefix: str = pydantic.Field( + default="_", + description="If you are creating temp tables in a dataset with a particular prefix you can use this config to set the prefix for the dataset. This is to support workflows from before bigquery's introduction of temp tables. By default we use `_` because of datasets that begin with an underscore are hidden by default https://cloud.google.com/bigquery/docs/datasets#dataset-naming.", + ) def __init__(self, **data: Any): super().__init__(**data) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py index cde4594666..89cb27fb93 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/usage/bigquery_usage.py @@ -11,7 +11,8 @@ class BigQueryUsageSourceReport(SourceReport): dropped_table: Counter[str] = dataclasses.field(default_factory=collections.Counter) total_log_entries: Optional[int] = None num_read_events: Optional[int] = None - num_filtered_events: Optional[int] = None + num_filtered_read_events: Optional[int] = None + num_filtered_query_events: Optional[int] = None num_query_events: Optional[int] = None use_v2_audit_metadata: Optional[bool] = None log_page_size: Optional[int] = None