feat(ingest): Add config option to set Bigquery credential in source config (#3786)

This commit is contained in:
Tamas Nemeth 2021-12-27 14:48:45 +01:00 committed by GitHub
parent 0f8458ad74
commit 5df5150e51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 250 additions and 25 deletions

View File

@ -19,4 +19,4 @@ source:
sink: sink:
type: "datahub-rest" type: "datahub-rest"
config: config:
server: "https://autotrader.acryl.io/gms" server: "http://localhost:8080"

View File

@ -0,0 +1,47 @@
---
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/bigquery for complete documentation
source:
type: "redshift"
config:
## Coordinates
project_id: project-id-1234567
## Credentials
## If GOOGLE_APPLICATION_CREDENTIALS environment variable is not set you can specify credentials here
#credential:
# project_id: project-id-1234567
# private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0"
# private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n"
# client_email: "test@suppproject-id-1234567.iam.gserviceaccount.com"
# client_id: "123456678890"
#include_tables: true
#include_views: true
#include_table_lineage: true
#start_time: 2021-12-15T20:08:23.091Z
#end_time: 2023-12-15T20:08:23.091Z
#profiling:
# enabled: true
# turn_off_expensive_profiling_metrics: false
# query_combiner_enabled: true
# max_number_of_fields_to_profile: 8
# profile_table_level_only: false
# include_field_null_count: true
# include_field_min_value: true
# include_field_max_value: true
# include_field_mean_value: true
# include_field_median_value: true
# include_field_stddev_value: false
# include_field_quantiles: false
# include_field_distinct_value_frequencies: false
# include_field_histogram: false
# include_field_sample_values: false
#profile_pattern:
# allow:
# - "schema.table.column"
# deny:
# - "*.*.*"
## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

View File

@ -6,6 +6,64 @@ For context on getting started with ingestion, check out our [metadata ingestion
To install this plugin, run `pip install 'acryl-datahub[bigquery]'`. To install this plugin, run `pip install 'acryl-datahub[bigquery]'`.
## Prerequisites
### Create a datahub profile in GCP:
1. Create a custom role for datahub (https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role)
2. Grant the following permissions to this role:
```
bigquery.datasets.get
bigquery.datasets.getIamPolicy
bigquery.jobs.create
bigquery.jobs.list
bigquery.jobs.listAll
bigquery.models.getMetadata
bigquery.models.list
bigquery.routines.get
bigquery.routines.list
bigquery.tables.create # Needs for profiling
bigquery.tables.get
bigquery.tables.getData # Needs for profiling
bigquery.tables.list
logging.logEntries.list # Needs for lineage generation
resourcemanager.projects.get
```
### Create a service account:
1. Setup a ServiceAccount (https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console)
and assign the previously created role to this service account.
2. Download a service account JSON keyfile.
Example credential file:
```json
{
"type": "service_account",
"project_id": "project-id-1234567",
"private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----",
"client_email": "test@suppproject-id-1234567.iam.gserviceaccount.com",
"client_id": "113545814931671546333",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com"
}
```
3. To provide credentials to the source, you can either:
Set an environment variable:
$ export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"
*or*
Set credential config in your source based on the credential json file. For example:
```yml
credential:
project_id: project-id-1234567
private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0"
private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n"
client_email: "test@suppproject-id-1234567.iam.gserviceaccount.com"
client_id: "123456678890"
```
## Capabilities ## Capabilities
This plugin extracts the following: This plugin extracts the following:
@ -44,30 +102,34 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration. As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration.
| Field | Required | Default | Description | | Field | Required | Default | Description |
| ----------------------------------------- | -------- | ------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |----------------------------------------|---------------------------------------------------------------------------|-------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `project_id` | | Autodetected | Project ID to ingest from. If not specified, will infer from environment. | | `project_id` | | Autodetected | Project ID to ingest from. If not specified, will infer from environment. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | | `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. | | `credential.project_id` | Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set | | |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. | | `credential.private_key_id` | Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. | | `credential.private_key` | Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `credential.client_email` | Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `schema_pattern.allow` | | | List of regex patterns for schemas to include in ingestion. | | `credential.client_id` | Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `schema_pattern.deny` | | | List of regex patterns for schemas to exclude from ingestion. | | `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `schema_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `view_pattern.allow` | | | List of regex patterns for views to include in ingestion. | | `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `view_pattern.deny` | | | List of regex patterns for views to exclude from ingestion. | | `schema_pattern.allow` | | | List of regex patterns for schemas to include in ingestion. |
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `schema_pattern.deny` | | | List of regex patterns for schemas to exclude from ingestion. |
| `include_tables` | | `True` | Whether tables should be ingested. | | `schema_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `include_views` | | `True` | Whether views should be ingested. | | `view_pattern.allow` | | | List of regex patterns for views to include in ingestion. |
| `include_table_lineage` | | `True` | Whether table level lineage should be ingested and processed. | | `view_pattern.deny` | | | List of regex patterns for views to exclude from ingestion. |
| `max_query_duration` | | `15` | A time buffer in minutes to adjust start_time and end_time while querying Bigquery audit logs. | | `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. | | `include_tables` | | `True` | Whether tables should be ingested. |
| `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. | | `include_views` | | `True` | Whether views should be ingested. |
| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. | | `include_table_lineage` | | `True` | Whether table level lineage should be ingested and processed. |
| `use_exported_bigquery_audit_metadata` | | `False` | When configured, use `BigQueryAuditMetadata` in `bigquery_audit_metadata_datasets` to compute lineage information. | | `max_query_duration` | | `15` | A time buffer in minutes to adjust start_time and end_time while querying Bigquery audit logs. |
| `use_date_sharded_audit_log_tables` | | `False` | Whether to read date sharded tables or time partitioned tables when extracting lineage from exported audit logs. | | `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. |
| `bigquery_audit_metadata_datasets` | | None | A list of datasets that contain a table named `cloudaudit_googleapis_com_data_access` which contain BigQuery audit logs, specifically, those containing `BigQueryAuditMetadata`. It is recommended that the project of the dataset is also specified, for example, `projectA.datasetB`. | | `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. |
| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
| `use_exported_bigquery_audit_metadata` | | `False` | When configured, use `BigQueryAuditMetadata` in `bigquery_audit_metadata_datasets` to compute lineage information. |
| `use_date_sharded_audit_log_tables` | | `False` | Whether to read date sharded tables or time partitioned tables when extracting lineage from exported audit logs. |
| `bigquery_audit_metadata_datasets` | | None | A list of datasets that contain a table named `cloudaudit_googleapis_com_data_access` which contain BigQuery audit logs, specifically, those containing `BigQueryAuditMetadata`. It is recommended that the project of the dataset is also specified, for example, `projectA.datasetB`. |

View File

@ -1,6 +1,9 @@
import collections import collections
import functools import functools
import json
import logging import logging
import os
import tempfile
import textwrap import textwrap
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
@ -14,6 +17,7 @@ from google.cloud.bigquery import Client as BigQueryClient
from google.cloud.logging_v2.client import Client as GCPLoggingClient from google.cloud.logging_v2.client import Client as GCPLoggingClient
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from datahub.configuration import ConfigModel
from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter import mce_builder from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -140,20 +144,59 @@ register_custom_type(GEOGRAPHY)
assert pybigquery.sqlalchemy_bigquery._type_map assert pybigquery.sqlalchemy_bigquery._type_map
class BigQueryCredential(ConfigModel):
project_id: str
private_key_id: str
private_key: str
client_email: str
client_id: str
auth_uri: str = "https://accounts.google.com/o/oauth2/auth"
token_uri: str = "https://oauth2.googleapis.com/token"
auth_provider_x509_cert_url: str = "https://www.googleapis.com/oauth2/v1/certs"
type: str = "service_account"
client_x509_cert_url: Optional[str]
def __init__(self, **data: Any):
super().__init__(**data) # type: ignore
if not self.client_x509_cert_url:
self.client_x509_cert_url = (
f"https://www.googleapis.com/robot/v1/metadata/x509/{self.client_email}"
)
def create_credential_temp_file(credential: BigQueryCredential) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credential.dict(), indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig): class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery" scheme: str = "bigquery"
project_id: Optional[str] = None project_id: Optional[str] = None
log_page_size: Optional[pydantic.PositiveInt] = 1000 log_page_size: Optional[pydantic.PositiveInt] = 1000
credential: Optional[BigQueryCredential]
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage. # extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = {} extra_client_options: Dict[str, Any] = {}
include_table_lineage: Optional[bool] = True include_table_lineage: Optional[bool] = True
max_query_duration: timedelta = timedelta(minutes=15) max_query_duration: timedelta = timedelta(minutes=15)
credentials_path: Optional[str] = None
bigquery_audit_metadata_datasets: Optional[List[str]] = None bigquery_audit_metadata_datasets: Optional[List[str]] = None
use_exported_bigquery_audit_metadata: bool = False use_exported_bigquery_audit_metadata: bool = False
use_date_sharded_audit_log_tables: bool = False use_date_sharded_audit_log_tables: bool = False
def __init__(self, **data: Any):
super().__init__(**data)
if self.credential:
self.credentials_path = create_credential_temp_file(self.credential)
logger.debug(
f"Creating temporary credential file at {self.credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.credentials_path
def get_sql_alchemy_url(self): def get_sql_alchemy_url(self):
if self.project_id: if self.project_id:
return f"{self.scheme}://{self.project_id}" return f"{self.scheme}://{self.project_id}"
@ -479,3 +522,11 @@ class BigQuerySource(SQLAlchemySource):
if segments[0] != schema: if segments[0] != schema:
raise ValueError(f"schema {schema} does not match table {entity}") raise ValueError(f"schema {schema} does not match table {entity}")
return segments[0], segments[1] return segments[0], segments[1]
# We can't use close as it is not called if the ingestion is not successful
def __del__(self):
if self.config.credentials_path:
logger.debug(
f"Deleting temporary credential file at {self.config.credentials_path}"
)
os.unlink(self.config.credentials_path)

View File

@ -0,0 +1,65 @@
import json
import os
import pytest
@pytest.mark.integration
def test_bigquery_uri():
from datahub.ingestion.source.sql.bigquery import BigQueryConfig
config = BigQueryConfig.parse_obj(
{
"project_id": "test-project",
}
)
assert config.get_sql_alchemy_url() == "bigquery://test-project"
@pytest.mark.integration
def test_bigquery_uri_with_credential():
from datahub.ingestion.source.sql.bigquery import BigQueryConfig
expected_credential_json = {
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"client_email": "test@acryl.io",
"client_id": "test_client-id",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test@acryl.io",
"private_key": "random_private_key",
"private_key_id": "test-private-key",
"project_id": "test-project",
"token_uri": "https://oauth2.googleapis.com/token",
"type": "service_account",
}
config = BigQueryConfig.parse_obj(
{
"project_id": "test-project",
"credential": {
"project_id": "test-project",
"private_key_id": "test-private-key",
"private_key": "random_private_key",
"client_email": "test@acryl.io",
"client_id": "test_client-id",
},
}
)
try:
assert config.get_sql_alchemy_url() == "bigquery://test-project"
assert config.credentials_path
with open(config.credentials_path) as jsonFile:
json_credential = json.load(jsonFile)
jsonFile.close()
credential = json.dumps(json_credential, sort_keys=True)
expected_credential = json.dumps(expected_credential_json, sort_keys=True)
assert expected_credential == credential
except AssertionError as e:
if config.credentials_path:
os.unlink(str(config.credentials_path))
raise e