feat(ingest): bigquery - external url support and a small profiling filter fix (#6714)

This commit is contained in:
Tamas Nemeth 2022-12-13 01:25:32 +01:00 committed by GitHub
parent 54230a8d81
commit 5658fd5a54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 4 deletions

View File

@ -10,6 +10,7 @@ from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, cast
from google.cloud import bigquery
from google.cloud.bigquery.table import TableListItem
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
@ -54,7 +55,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client
from datahub.ingestion.source.bigquery_v2.common import (
BQ_EXTERNAL_DATASET_URL_TEMPLATE,
BQ_EXTERNAL_TABLE_URL_TEMPLATE,
get_bigquery_client,
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
@ -459,6 +464,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
dataset,
["Dataset"],
database_container_key,
external_url=BQ_EXTERNAL_DATASET_URL_TEMPLATE.format(
project=project_id, dataset=dataset
)
if self.config.include_external_url
else None,
)
self.stale_entity_removal_handler.add_entity_to_state(
@ -570,8 +580,12 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
bigquery_project.datasets
)
for bigquery_dataset in bigquery_project.datasets:
if not self.config.dataset_pattern.allowed(bigquery_dataset.name):
if not is_schema_allowed(
self.config.dataset_pattern,
bigquery_dataset.name,
project_id,
self.config.match_fully_qualified_names,
):
self.report.report_dropped(f"{bigquery_dataset.name}.*")
continue
try:
@ -854,6 +868,13 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created is not None
else None,
externalUrl=BQ_EXTERNAL_TABLE_URL_TEMPLATE.format(
project=project_id, dataset=dataset_name, table=table.name
)
if self.config.include_external_url
else None,
)
if custom_properties:

View File

@ -48,6 +48,16 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
)
match_fully_qualified_names: bool = Field(
default=False,
description="Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
)
include_external_url: bool = Field(
default=True,
description="Whether to populate BigQuery Console url to Datasets/Tables",
)
debug_include_full_payloads: bool = Field(
default=False,
description="Include full payload into events. It is only for debugging and internal use.",
@ -128,6 +138,20 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
logging.warning(
"schema_pattern will be ignored in favour of dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern only."
)
match_fully_qualified_names = values.get("match_fully_qualified_names")
if (
dataset_pattern is not None
and dataset_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `dataset_pattern` to match against fully qualified schema name `<project_id>.<dataset_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)
return values
def get_table_pattern(self, pattern: List[str]) -> str:

View File

@ -8,6 +8,9 @@ from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Confi
BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BQ_DATE_SHARD_FORMAT = "%Y%m%d"
BQ_EXTERNAL_TABLE_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m5!1m4!4m3!1s{project}!2s{dataset}!3s{table}"
BQ_EXTERNAL_DATASET_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m4!1m3!3m2!1s{project}!2s{dataset}"
def _make_gcp_logging_client(
project_id: Optional[str] = None, extra_client_options: Dict[str, Any] = {}

View File

@ -172,9 +172,14 @@ WHERE
word in column.data_type.lower()
for word in ["array", "struct", "geography", "json"]
):
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()
self.config.profile_pattern.deny.append(
f"^{project}.{dataset}.{table.name}.{column.field_path}$"
f"^{normalized_table_name}.{column.field_path}$"
)
# Emit the profile work unit
profile_request = self.get_bigquery_profile_request(
project=project, dataset=dataset, table=table