fix(ingest): bigquery-beta - eliminate the need for data.read permission for table schema (#6146)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Tamas Nemeth 2022-10-07 19:19:16 +02:00 committed by GitHub
parent 2e2ef536d8
commit 0f04cd6431
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 233 additions and 97 deletions

View File

@ -1,59 +1,53 @@
### Prerequisites
To understand how BigQuery ingestion needs to be set up, first familiarize yourself with the concepts in the diagram below:
<p align="center">
<img width="70%" src="https://github.com/datahub-project/static-assets/raw/main/imgs/integrations/bigquery/source-bigquery-setup.png"/>
</p>
There are two important concepts to understand and identify:
- *Extractor Project*: This is the project associated with a service-account, whose credentials you will be configuring in the connector. The connector uses this service-account to run jobs (including queries) within the project.
- *Bigquery Projects* are the projects from which table metadata, lineage, usage, and profiling data need to be collected. By default, the extractor project is included in the list of projects that DataHub collects metadata from, but you can control that by passing in a specific list of project ids that you want to collect metadata from. Read the configuration section below to understand how to limit the list of projects that DataHub extracts metadata from.
#### Create a datahub profile in GCP
1. Create a custom role for datahub as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role).
2. Grant the following permissions to this role:
2. Follow the sections below to grant permissions to this role on this project and other projects.
##### Basic Requirements (needed for metadata ingestion)
1. Identify your Extractor Project where the service account will run queries to extract metadata.
| permission                       | Description                                                                                                                         | Capability                                                               |
|----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|
| `bigquery.jobs.create`           | Run jobs (e.g. queries) within the project. *This only needs for the extractor project where the service account belongs*           |                                                                                                               |
| `bigquery.jobs.list`             | Manage the queries that the service account has sent. *This only needs for the extractor project where the service account belongs* |                                                                                                               |
| `bigquery.readsessions.create`   | Create a session for streaming large results. *This only needs for the extractor project where the service account belongs*         |                                                                                                               |
| `bigquery.readsessions.getData` | Get data from the read session. *This only needs for the extractor project where the service account belongs*                       |
| `bigquery.tables.create`         | Create temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                 | Profiling                           |                                                                                                                 |
| `bigquery.tables.delete`         | Delete temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                   | Profiling                           |                                                                                                                 |
2. Grant the following permissions to the Service Account on every project where you would like to extract metadata from
:::info
If you have multiple projects in your BigQuery setup, the role should be granted these permissions in each of the projects.
:::
| permission                       | Description                                                                                                 | Capability               | Default GCP role which contains this permission                                                                 |
|----------------------------------|--------------------------------------------------------------------------------------------------------------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------|
| `bigquery.datasets.get`         | Retrieve metadata about a dataset.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
| `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 |
| `bigquery.tables.create`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 |
| `bigquery.tables.delete`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 |
##### Basic Requirements (needs for metadata ingestion)
| permission | Description |
| -------------------------------- | ----------------------------------------------------- |
| `bigquery.datasets.get` | Retrieve metadata about a dataset. |
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions. |
| `bigquery.jobs.create` | Run jobs (e.g. queries) within the project. |
| `bigquery.jobs.list` | Manage the queries that the service account has sent. |
| `bigquery.tables.list` | List BigQuery tables. |
| `bigquery.tables.get` | Retrieve metadata for a table. |
| `bigquery.readsessions.create` | Create a session for streaming large results. |
| `bigquery.readsessions.getData` | Get data from the read session. |
| `resourcemanager.projects.get` | Retrieve project names and metadata. |
You can use the following predefined IAM role which has all the needed permissions as well:
- [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer)
##### Lineage/usage generation requirements
Additional requirements needed on the top of the basic requirements.
If you want to get lineage from multiple projects you have to grant this permission
for each of them.
| permission | Description |
| -------------------------------- | ------------------------------------------------------------------------------------------------------------ |
| `bigquery.jobs.listAll` | List all jobs (queries) submitted by any user. |
| `logging.logEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. |
| `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. |
##### Profiling requirements
Additional requirements needed on the top of the basic requirements.
| permission | Description |
| ------------------------- | ----------------------------------------------------------------------------------------- |
| `bigquery.tables.getData` | Access table data to do the profiling. |
| `bigquery.tables.create` | Create temporary tables when profiling partitioned/sharded tables. See below for details. |
| `bigquery.tables.delete` | Delete temporary tables when profiling partitioned/sharded tables. See below for details. |
Profiler creates temporary tables to profile partitioned/sharded tables and that is why it needs table create/delete privilege.
The profiler creates temporary tables to profile partitioned/sharded tables and that is why it needs table create/delete privilege.
Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset the create/delete permission
#### Create a service account
#### Create a service account in the Extractor Project
1. Setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console)
and assign the previously created role to this service account.
@ -62,16 +56,16 @@ Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset t
```json
{
"type": "service_account",
"project_id": "project-id-1234567",
"private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----",
"client_email": "test@suppproject-id-1234567.iam.gserviceaccount.com",
"client_id": "113545814931671546333",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com"
"type": "service_account",
"project_id": "project-id-1234567",
"private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----",
"client_email": "test@suppproject-id-1234567.iam.gserviceaccount.com",
"client_id": "113545814931671546333",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com"
}
```
@ -115,8 +109,8 @@ Temporary tables are removed after profiling.
```yaml
profiling:
enabled: true
bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created
enabled: true
bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created
```
:::note

View File

@ -233,6 +233,44 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
else:
return CapabilityReport(capable=True)
@staticmethod
def metada_read_capability_test(
project_ids: List[str], profiling_enabled: bool
) -> CapabilityReport:
for project_id in project_ids:
try:
logger.info((f"Metadata read capability test for project {project_id}"))
client: bigquery.Client = bigquery.Client(project_id)
assert client
result = BigQueryDataDictionary.get_datasets_for_project_id(
client, project_id, 10
)
if len(result) == 0:
return CapabilityReport(
capable=False,
failure_reason=f"Dataset query returned empty dataset. It is either empty or no dataset in project {project_id}",
)
tables = BigQueryDataDictionary.get_tables_for_dataset(
conn=client,
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=profiling_enabled,
)
if len(tables) == 0:
return CapabilityReport(
capable=False,
failure_reason=f"Tables query did not return any table. It is either empty or no tables in project {project_id}.{result[0].name}",
)
except Exception as e:
return CapabilityReport(
capable=False,
failure_reason=f"Dataset query failed with error: {e}",
)
return CapabilityReport(capable=True)
@staticmethod
def lineage_capability_test(
connection_conf: BigQueryV2Config,
@ -262,7 +300,13 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
for project_id in project_ids:
try:
logger.info((f"Usage capability test for project {project_id}"))
failures_before_test = len(report.failures)
usage_extractor.test_capability(project_id)
if failures_before_test != len(report.failures):
return CapabilityReport(
capable=False,
failure_reason="Usage capability test failed. Check the logs for further info",
)
except Exception as e:
return CapabilityReport(
capable=False,
@ -296,17 +340,25 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
if connection_conf.project_id_pattern.allowed(project.project_id):
project_ids.append(project.project_id)
lineage_capability = BigqueryV2Source.lineage_capability_test(
connection_conf, project_ids, report
metada_read_capability = BigqueryV2Source.metada_read_capability_test(
project_ids, connection_conf.profiling.enabled
)
if SourceCapability.LINEAGE_COARSE not in _report:
_report[SourceCapability.LINEAGE_COARSE] = lineage_capability
if SourceCapability.SCHEMA_METADATA not in _report:
_report[SourceCapability.SCHEMA_METADATA] = metada_read_capability
usage_capability = BigqueryV2Source.usage_capability_test(
connection_conf, project_ids, report
)
if SourceCapability.USAGE_STATS not in _report:
_report[SourceCapability.USAGE_STATS] = usage_capability
if connection_conf.include_table_lineage:
lineage_capability = BigqueryV2Source.lineage_capability_test(
connection_conf, project_ids, report
)
if SourceCapability.LINEAGE_COARSE not in _report:
_report[SourceCapability.LINEAGE_COARSE] = lineage_capability
if connection_conf.include_usage_statistics:
usage_capability = BigqueryV2Source.usage_capability_test(
connection_conf, project_ids, report
)
if SourceCapability.USAGE_STATS not in _report:
_report[SourceCapability.USAGE_STATS] = usage_capability
test_report.capability_report = _report
return test_report
@ -982,7 +1034,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
):
bigquery_tables.extend(
BigQueryDataDictionary.get_tables_for_dataset(
conn, project_id, dataset_name, table_items
conn,
project_id,
dataset_name,
table_items,
with_data_read_permission=self.config.profiling.enabled,
)
)
table_items.clear()
@ -1000,7 +1056,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
if table_items:
bigquery_tables.extend(
BigQueryDataDictionary.get_tables_for_dataset(
conn, project_id, dataset_name, table_items
conn,
project_id,
dataset_name,
table_items,
with_data_read_permission=self.config.profiling.enabled,
)
)
@ -1032,7 +1092,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
# falling back to get views for schema
if not views:
return BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name
conn, project_id, dataset_name, self.config.profiling.enabled
)
# Some schema may not have any table

View File

@ -28,8 +28,8 @@ class BigqueryTable:
name: str
created: datetime
last_altered: Optional[datetime]
size_in_bytes: int
rows_count: int
size_in_bytes: Optional[int]
rows_count: Optional[int]
expires: Optional[datetime]
clustering_fields: Optional[List[str]]
labels: Optional[Dict[str, str]]
@ -137,6 +137,33 @@ FROM
WHERE
table_type in ('BASE TABLE', 'EXTERNAL TABLE')
{table_filter}
order by
table_schema ASC,
table_base ASC,
table_suffix DESC
"""
tables_for_dataset_without_partition_data = """
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
t.table_name as table_name,
t.table_type as table_type,
t.creation_time as created,
tos.OPTION_VALUE as comment,
is_insertable_into,
ddl,
REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base
FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
WHERE
table_type in ('BASE TABLE', 'EXTERNAL TABLE')
{table_filter}
order by
table_schema ASC,
table_base ASC,
@ -169,6 +196,28 @@ order by
table_name ASC
"""
views_for_dataset_without_data_read: str = """
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
t.table_name as table_name,
t.table_type as table_type,
t.creation_time as created,
tos.OPTION_VALUE as comment,
is_insertable_into,
ddl as view_definition
FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
WHERE
table_type in ('VIEW MATERIALIZED', 'VIEW')
order by
table_schema ASC,
table_name ASC
"""
columns_for_dataset: str = """
select
c.table_catalog as table_catalog,
@ -229,9 +278,9 @@ class BigQueryDataDictionary:
@staticmethod
def get_datasets_for_project_id(
conn: bigquery.Client, project_id: str
conn: bigquery.Client, project_id: str, maxResults: Optional[int] = None
) -> List[BigqueryDataset]:
datasets = conn.list_datasets(project_id)
datasets = conn.list_datasets(project_id, max_results=maxResults)
return [BigqueryDataset(name=d.dataset_id) for d in datasets]
@ -261,20 +310,33 @@ class BigQueryDataDictionary:
project_id: str,
dataset_name: str,
tables: Dict[str, TableListItem],
with_data_read_permission: bool = False,
) -> List[BigqueryTable]:
filter: str = ", ".join(f"'{table}'" for table in tables.keys())
# Tables are ordered by name and table suffix to make sure we always process the latest sharded table
# and skip the others. Sharded tables are tables with suffix _20220102
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.tables_for_dataset.format(
project_id=project_id,
dataset_name=dataset_name,
table_filter=f" and t.table_name in ({filter})" if filter else "",
),
)
if with_data_read_permission:
# Tables are ordered by name and table suffix to make sure we always process the latest sharded table
# and skip the others. Sharded tables are tables with suffix _20220102
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.tables_for_dataset.format(
project_id=project_id,
dataset_name=dataset_name,
table_filter=f" and t.table_name in ({filter})" if filter else "",
),
)
else:
# Tables are ordered by name and table suffix to make sure we always process the latest sharded table
# and skip the others. Sharded tables are tables with suffix _20220102
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.tables_for_dataset_without_partition_data.format(
project_id=project_id,
dataset_name=dataset_name,
table_filter=f" and t.table_name in ({filter})" if filter else "",
),
)
# Some property we want to capture only available from the TableListItem we get from an earlier query of
# the list of tables.
@ -285,10 +347,10 @@ class BigQueryDataDictionary:
last_altered=datetime.fromtimestamp(
table.last_altered / 1000, tz=timezone.utc
)
if table.last_altered
if "last_altered" in table
else None,
size_in_bytes=table.bytes,
rows_count=table.row_count,
size_in_bytes=table.bytes if "bytes" in table else None,
rows_count=table.row_count if "row_count" in table else None,
comment=table.comment,
ddl=table.ddl,
expires=tables[table.table_name].expires if tables else None,
@ -299,36 +361,56 @@ class BigQueryDataDictionary:
clustering_fields=tables[table.table_name].clustering_fields
if tables
else None,
max_partition_id=table.max_partition_id,
max_partition_id=table.max_partition_id
if "max_partition_id" in table
else None,
max_shard_id=BigqueryTableIdentifier.get_table_and_shard(
table.table_name
)[1]
if len(BigqueryTableIdentifier.get_table_and_shard(table.table_name))
== 2
else None,
num_partitions=table.num_partitions,
active_billable_bytes=table.active_billable_bytes,
long_term_billable_bytes=table.long_term_billable_bytes,
num_partitions=table.num_partitions
if "num_partitions" in table
else None,
active_billable_bytes=table.active_billable_bytes
if "active_billable_bytes" in table
else None,
long_term_billable_bytes=table.long_term_billable_bytes
if "long_term_billable_bytes" in table
else None,
)
for table in cur
]
@staticmethod
def get_views_for_dataset(
conn: bigquery.Client, project_id: str, dataset_name: str
conn: bigquery.Client,
project_id: str,
dataset_name: str,
has_data_read: bool,
) -> List[BigqueryView]:
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.views_for_dataset.format(
project_id=project_id, dataset_name=dataset_name
),
)
if has_data_read:
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.views_for_dataset.format(
project_id=project_id, dataset_name=dataset_name
),
)
else:
cur = BigQueryDataDictionary.get_query_result(
conn,
BigqueryQuery.views_for_dataset_without_data_read.format(
project_id=project_id, dataset_name=dataset_name
),
)
return [
BigqueryView(
name=table.table_name,
created=table.created,
last_altered=table.last_altered,
last_altered=table.last_altered if "last_altered" in table else None,
comment=table.comment,
ddl=table.view_definition,
)

View File

@ -268,7 +268,7 @@ WHERE
self,
dataset_name: str,
last_altered: Optional[datetime.datetime],
size_in_bytes: int,
size_in_bytes: Optional[int],
rows_count: Optional[int],
) -> bool:
threshold_time: Optional[datetime.datetime] = None