mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-13 01:38:35 +00:00
fix(ingestion): bigquery - extract temp table prefix as config, fix reporting, logging (#4766)
This commit is contained in:
parent
040e72af6b
commit
bddfd89002
@ -357,6 +357,7 @@ class BigQuerySource(SQLAlchemySource):
|
|||||||
logger.info(
|
logger.info(
|
||||||
f"Built lineage map containing {len(self.lineage_metadata)} entries."
|
f"Built lineage map containing {len(self.lineage_metadata)} entries."
|
||||||
)
|
)
|
||||||
|
logger.debug(f"lineage metadata is {self.lineage_metadata}")
|
||||||
|
|
||||||
def _compute_bigquery_lineage_via_gcp_logging(
|
def _compute_bigquery_lineage_via_gcp_logging(
|
||||||
self, lineage_client_project_id: Optional[str]
|
self, lineage_client_project_id: Optional[str]
|
||||||
@ -631,9 +632,6 @@ class BigQuerySource(SQLAlchemySource):
|
|||||||
lineage_map[destination_table_str] = new_lineage_str
|
lineage_map[destination_table_str] = new_lineage_str
|
||||||
if not (has_table or has_view):
|
if not (has_table or has_view):
|
||||||
self.report.num_skipped_lineage_entries_other += 1
|
self.report.num_skipped_lineage_entries_other += 1
|
||||||
|
|
||||||
if self.config.upstream_lineage_in_report:
|
|
||||||
self.report.upstream_lineage = lineage_map
|
|
||||||
return lineage_map
|
return lineage_map
|
||||||
|
|
||||||
def get_latest_partition(
|
def get_latest_partition(
|
||||||
@ -823,7 +821,7 @@ WHERE
|
|||||||
assert self.lineage_metadata
|
assert self.lineage_metadata
|
||||||
for ref_table in self.lineage_metadata[str(bq_table)]:
|
for ref_table in self.lineage_metadata[str(bq_table)]:
|
||||||
upstream_table = BigQueryTableRef.from_string_name(ref_table)
|
upstream_table = BigQueryTableRef.from_string_name(ref_table)
|
||||||
if upstream_table.is_temporary_table():
|
if upstream_table.is_temporary_table(self.config.temp_table_dataset_prefix):
|
||||||
# making sure we don't process a table twice and not get into a recursive loop
|
# making sure we don't process a table twice and not get into a recursive loop
|
||||||
if ref_table in tables_seen:
|
if ref_table in tables_seen:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -843,9 +841,11 @@ WHERE
|
|||||||
self, dataset_urn: str
|
self, dataset_urn: str
|
||||||
) -> Optional[MetadataChangeProposalWrapper]:
|
) -> Optional[MetadataChangeProposalWrapper]:
|
||||||
if self.lineage_metadata is None:
|
if self.lineage_metadata is None:
|
||||||
|
logger.debug("No lineage metadata so skipping getting mcp")
|
||||||
return None
|
return None
|
||||||
dataset_key: Optional[DatasetKey] = mce_builder.dataset_urn_to_key(dataset_urn)
|
dataset_key: Optional[DatasetKey] = mce_builder.dataset_urn_to_key(dataset_urn)
|
||||||
if dataset_key is None:
|
if dataset_key is None:
|
||||||
|
logger.debug(f"No dataset_key for {dataset_urn} so skipping getting mcp")
|
||||||
return None
|
return None
|
||||||
project_id, dataset_name, tablename = dataset_key.name.split(".")
|
project_id, dataset_name, tablename = dataset_key.name.split(".")
|
||||||
bq_table = BigQueryTableRef(project_id, dataset_name, tablename)
|
bq_table = BigQueryTableRef(project_id, dataset_name, tablename)
|
||||||
@ -869,6 +869,12 @@ WHERE
|
|||||||
),
|
),
|
||||||
DatasetLineageTypeClass.TRANSFORMED,
|
DatasetLineageTypeClass.TRANSFORMED,
|
||||||
)
|
)
|
||||||
|
if self.config.upstream_lineage_in_report:
|
||||||
|
current_lineage_map: Set = self.report.upstream_lineage.get(
|
||||||
|
str(bq_table), set()
|
||||||
|
)
|
||||||
|
current_lineage_map.add(str(upstream_table))
|
||||||
|
self.report.upstream_lineage[str(bq_table)] = current_lineage_map
|
||||||
upstream_list.append(upstream_table_class)
|
upstream_list.append(upstream_table_class)
|
||||||
|
|
||||||
if upstream_list:
|
if upstream_list:
|
||||||
|
|||||||
@ -260,9 +260,9 @@ class BigQueryTableRef:
|
|||||||
raise ValueError(f"invalid BigQuery table reference: {ref}")
|
raise ValueError(f"invalid BigQuery table reference: {ref}")
|
||||||
return cls(parts[1], parts[3], parts[5])
|
return cls(parts[1], parts[3], parts[5])
|
||||||
|
|
||||||
def is_temporary_table(self) -> bool:
|
def is_temporary_table(self, prefix: str) -> bool:
|
||||||
# Temporary tables will have a dataset that begins with an underscore.
|
# Temporary tables will have a dataset that begins with an underscore.
|
||||||
return self.dataset.startswith("_")
|
return self.dataset.startswith(prefix)
|
||||||
|
|
||||||
def remove_extras(self) -> "BigQueryTableRef":
|
def remove_extras(self) -> "BigQueryTableRef":
|
||||||
# Handle partitioned and sharded tables.
|
# Handle partitioned and sharded tables.
|
||||||
@ -971,7 +971,8 @@ class BigQueryUsageSource(Source):
|
|||||||
) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]:
|
) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]:
|
||||||
self.report.num_read_events = 0
|
self.report.num_read_events = 0
|
||||||
self.report.num_query_events = 0
|
self.report.num_query_events = 0
|
||||||
self.report.num_filtered_events = 0
|
self.report.num_filtered_read_events = 0
|
||||||
|
self.report.num_filtered_query_events = 0
|
||||||
for entry in entries:
|
for entry in entries:
|
||||||
event: Optional[Union[ReadEvent, QueryEvent]] = None
|
event: Optional[Union[ReadEvent, QueryEvent]] = None
|
||||||
|
|
||||||
@ -979,7 +980,7 @@ class BigQueryUsageSource(Source):
|
|||||||
if missing_read_entry is None:
|
if missing_read_entry is None:
|
||||||
event = ReadEvent.from_entry(entry)
|
event = ReadEvent.from_entry(entry)
|
||||||
if not self._is_table_allowed(event.resource):
|
if not self._is_table_allowed(event.resource):
|
||||||
self.report.num_filtered_events += 1
|
self.report.num_filtered_read_events += 1
|
||||||
continue
|
continue
|
||||||
self.report.num_read_events += 1
|
self.report.num_read_events += 1
|
||||||
|
|
||||||
@ -987,7 +988,7 @@ class BigQueryUsageSource(Source):
|
|||||||
if event is None and missing_query_entry is None:
|
if event is None and missing_query_entry is None:
|
||||||
event = QueryEvent.from_entry(entry)
|
event = QueryEvent.from_entry(entry)
|
||||||
if not self._is_table_allowed(event.destinationTable):
|
if not self._is_table_allowed(event.destinationTable):
|
||||||
self.report.num_filtered_events += 1
|
self.report.num_filtered_query_events += 1
|
||||||
continue
|
continue
|
||||||
self.report.num_query_events += 1
|
self.report.num_query_events += 1
|
||||||
wu = self._create_operation_aspect_work_unit(event)
|
wu = self._create_operation_aspect_work_unit(event)
|
||||||
@ -999,7 +1000,7 @@ class BigQueryUsageSource(Source):
|
|||||||
if event is None and missing_query_entry_v2 is None:
|
if event is None and missing_query_entry_v2 is None:
|
||||||
event = QueryEvent.from_entry_v2(entry)
|
event = QueryEvent.from_entry_v2(entry)
|
||||||
if not self._is_table_allowed(event.destinationTable):
|
if not self._is_table_allowed(event.destinationTable):
|
||||||
self.report.num_filtered_events += 1
|
self.report.num_filtered_query_events += 1
|
||||||
continue
|
continue
|
||||||
self.report.num_query_events += 1
|
self.report.num_query_events += 1
|
||||||
wu = self._create_operation_aspect_work_unit(event)
|
wu = self._create_operation_aspect_work_unit(event)
|
||||||
@ -1013,6 +1014,7 @@ class BigQueryUsageSource(Source):
|
|||||||
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}",
|
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
logger.debug(f"Yielding {event} from log entries")
|
||||||
yield event
|
yield event
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@ -1138,7 +1140,7 @@ class BigQueryUsageSource(Source):
|
|||||||
logger.warning(f"Failed to process event {str(event.resource)}", e)
|
logger.warning(f"Failed to process event {str(event.resource)}", e)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if resource.is_temporary_table():
|
if resource.is_temporary_table(self.config.temp_table_dataset_prefix):
|
||||||
logger.debug(f"Dropping temporary table {resource}")
|
logger.debug(f"Dropping temporary table {resource}")
|
||||||
self.report.report_dropped(str(resource))
|
self.report.report_dropped(str(resource))
|
||||||
continue
|
continue
|
||||||
|
|||||||
@ -56,6 +56,10 @@ class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
|
|||||||
description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.",
|
description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.",
|
||||||
)
|
)
|
||||||
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
|
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
|
||||||
|
temp_table_dataset_prefix: str = pydantic.Field(
|
||||||
|
default="_",
|
||||||
|
description="If you are creating temp tables in a dataset with a particular prefix you can use this config to set the prefix for the dataset. This is to support workflows from before bigquery's introduction of temp tables. By default we use `_` because of datasets that begin with an underscore are hidden by default https://cloud.google.com/bigquery/docs/datasets#dataset-naming.",
|
||||||
|
)
|
||||||
use_v2_audit_metadata: Optional[bool] = pydantic.Field(
|
use_v2_audit_metadata: Optional[bool] = pydantic.Field(
|
||||||
default=False, description="Whether to ingest logs using the v2 format."
|
default=False, description="Whether to ingest logs using the v2 format."
|
||||||
)
|
)
|
||||||
|
|||||||
@ -116,6 +116,10 @@ class BigQueryUsageConfig(DatasetSourceConfigBase, BaseUsageConfig):
|
|||||||
description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set. See this example recipe for details",
|
description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set. See this example recipe for details",
|
||||||
)
|
)
|
||||||
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
|
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
|
||||||
|
temp_table_dataset_prefix: str = pydantic.Field(
|
||||||
|
default="_",
|
||||||
|
description="If you are creating temp tables in a dataset with a particular prefix you can use this config to set the prefix for the dataset. This is to support workflows from before bigquery's introduction of temp tables. By default we use `_` because of datasets that begin with an underscore are hidden by default https://cloud.google.com/bigquery/docs/datasets#dataset-naming.",
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, **data: Any):
|
def __init__(self, **data: Any):
|
||||||
super().__init__(**data)
|
super().__init__(**data)
|
||||||
|
|||||||
@ -11,7 +11,8 @@ class BigQueryUsageSourceReport(SourceReport):
|
|||||||
dropped_table: Counter[str] = dataclasses.field(default_factory=collections.Counter)
|
dropped_table: Counter[str] = dataclasses.field(default_factory=collections.Counter)
|
||||||
total_log_entries: Optional[int] = None
|
total_log_entries: Optional[int] = None
|
||||||
num_read_events: Optional[int] = None
|
num_read_events: Optional[int] = None
|
||||||
num_filtered_events: Optional[int] = None
|
num_filtered_read_events: Optional[int] = None
|
||||||
|
num_filtered_query_events: Optional[int] = None
|
||||||
num_query_events: Optional[int] = None
|
num_query_events: Optional[int] = None
|
||||||
use_v2_audit_metadata: Optional[bool] = None
|
use_v2_audit_metadata: Optional[bool] = None
|
||||||
log_page_size: Optional[int] = None
|
log_page_size: Optional[int] = None
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user