chore(ingest): cleanup unused fields in bigquery/snowflake (#7787)

This commit is contained in:
Harshal Sheth 2023-04-11 13:19:13 +05:30 committed by GitHub
parent 1312ca9368
commit 905d7fed66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 4 additions and 195 deletions

View File

@ -5,6 +5,7 @@ from typing import Dict, Optional, cast
from pydantic import Field, SecretStr, root_validator, validator
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.ingestion.glossary.classifier import ClassificationConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulProfilingConfigMixin,
@ -13,7 +14,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
from datahub.ingestion.source_config.sql.snowflake import (
BaseSnowflakeConfig,
SnowflakeConfig,
SnowflakeProvisionRoleConfig,
)
from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig
@ -51,14 +51,8 @@ class SnowflakeV2Config(
description="If enabled, populates the column lineage. Supported only for snowflake table-to-table and view-to-table lineage edge (not supported in table-to-view or view-to-view lineage edge yet). Requires appropriate grants given to the role.",
)
check_role_grants: bool = Field(
default=False,
description="Not supported",
)
provision_role: Optional[SnowflakeProvisionRoleConfig] = Field(
default=None, description="Not supported"
)
_check_role_grants_removed = pydantic_removed_field("check_role_grants")
_provision_role_removed = pydantic_removed_field("provision_role")
extract_tags: TagOption = Field(
default=TagOption.skip,
@ -100,18 +94,6 @@ class SnowflakeV2Config(
@root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict:
value = values.get("provision_role")
if value is not None and value.enabled:
raise ValueError(
"Provision role is currently not supported. Set `provision_role.enabled` to False."
)
value = values.get("check_role_grants")
if value is not None and value:
raise ValueError(
"Check role grants is not supported. Set `check_role_grants` to False.",
)
value = values.get("include_read_operational_stats")
if value is not None and value:
raise ValueError(

View File

@ -1378,7 +1378,6 @@ class SnowflakeV2Source(
if not self.report.ignore_start_time_lineage:
self.report.lineage_start_time = self.config.start_time
self.report.lineage_end_time = self.config.end_time
self.report.check_role_grants = self.config.check_role_grants
self.report.include_technical_schema = self.config.include_technical_schema
self.report.include_usage_stats = self.config.include_usage_stats
self.report.include_operational_stats = self.config.include_operational_stats

View File

@ -1,108 +0,0 @@
import logging
import os
from datetime import timedelta
from typing import Any, Dict, List, Optional
import pydantic
from datahub.configuration.common import ConfigurationError
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig
from datahub.ingestion.source_config.usage.bigquery_usage import BigQueryCredential
logger = logging.getLogger(__name__)
class BigQueryConfig(BigQueryBaseConfig, BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = pydantic.Field(
default=None,
description="Project ID where you have rights to run queries and create tables. If `storage_project_id` is not specified then it is assumed this is the same project where data is stored. If not specified, will infer from environment.",
)
storage_project_id: Optional[str] = pydantic.Field(
default=None,
description="If your data is stored in a different project where you don't have rights to run jobs and create tables then specify this field. The same service account must have read rights in this GCP project and write rights in `project_id`.",
)
log_page_size: pydantic.PositiveInt = pydantic.Field(
default=1000,
description="The number of log item will be queried per page for lineage collection",
)
credential: Optional[BigQueryCredential] = pydantic.Field(
default=None, description="BigQuery credential informations"
)
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = pydantic.Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)
include_table_lineage: Optional[bool] = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation. Is enabled by default.",
)
max_query_duration: timedelta = pydantic.Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
)
bigquery_audit_metadata_datasets: Optional[List[str]] = pydantic.Field(
default=None,
description="A list of datasets that contain a table named cloudaudit_googleapis_com_data_access which contain BigQuery audit logs, specifically, those containing BigQueryAuditMetadata. It is recommended that the project of the dataset is also specified, for example, projectA.datasetB.",
)
use_exported_bigquery_audit_metadata: bool = pydantic.Field(
default=False,
description="When configured, use BigQueryAuditMetadata in bigquery_audit_metadata_datasets to compute lineage information.",
)
use_date_sharded_audit_log_tables: bool = pydantic.Field(
default=False,
description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.",
)
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
use_v2_audit_metadata: Optional[bool] = pydantic.Field(
default=False, description="Whether to ingest logs using the v2 format."
)
upstream_lineage_in_report: bool = pydantic.Field(
default=False,
description="Useful for debugging lineage information. Set to True to see the raw lineage created internally.",
)
def __init__(self, **data: Any):
super().__init__(**data)
if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path
def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str:
if self.storage_project_id and not run_on_compute:
return f"{self.scheme}://{self.storage_project_id}"
if self.project_id:
return f"{self.scheme}://{self.project_id}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return f"{self.scheme}://"
@pydantic.validator("platform_instance")
def bigquery_doesnt_need_platform_instance(cls, v):
if v is not None:
raise ConfigurationError(
"BigQuery project ids are globally unique. You do not need to specify a platform instance."
)
@pydantic.root_validator()
def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
if (
values.get("use_exported_bigquery_audit_metadata")
and not values.get("use_v2_audit_metadata")
and not values.get("bigquery_audit_metadata_datasets")
):
raise ConfigurationError(
"bigquery_audit_metadata_datasets must be specified if using exported audit metadata. Otherwise set use_v2_audit_metadata to True."
)
return values

View File

@ -12,11 +12,7 @@ from snowflake.connector.network import (
OAUTH_AUTHENTICATOR,
)
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
OauthConfiguration,
)
from datahub.configuration.common import AllowDenyPattern, OauthConfiguration
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.snowflake.constants import (
@ -48,59 +44,6 @@ VALID_AUTH_TYPES: Dict[str, str] = {
SNOWFLAKE_HOST_SUFFIX = ".snowflakecomputing.com"
class SnowflakeProvisionRoleConfig(ConfigModel):
enabled: bool = pydantic.Field(
default=False,
description="Whether provisioning of Snowflake role (used for ingestion) is enabled or not.",
)
# Can be used by account admin to test what sql statements will be run
dry_run: bool = pydantic.Field(
default=False,
description="If provision_role is enabled, whether to dry run the sql commands for system admins to see what sql grant commands would be run without actually running the grant commands.",
)
# Setting this to True is helpful in case you want a clean role without any extra privileges
# Not set to True by default because multiple parallel
# snowflake ingestions can be dependent on single role
drop_role_if_exists: bool = pydantic.Field(
default=False,
description="Useful during testing to ensure you have a clean slate role. Not recommended for production use cases.",
)
# When Account admin is testing they might not want to actually do the ingestion
# Set this to False in case the account admin would want to
# create role
# grant role to user in main config
# run ingestion as the user in main config
run_ingestion: bool = pydantic.Field(
default=False,
description="If system admins wish to skip actual ingestion of metadata during testing of the provisioning of role.",
)
admin_role: Optional[str] = pydantic.Field(
default="accountadmin",
description="The Snowflake role of admin user used for provisioning of the role specified by role config. System admins can audit the open source code and decide to use a different role.",
)
admin_username: str = pydantic.Field(
description="The username to be used for provisioning of role."
)
admin_password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None,
exclude=True,
description="The password to be used for provisioning of role.",
)
@pydantic.validator("admin_username", always=True)
def username_not_empty(cls, v, values, **kwargs):
v_str: str = str(v)
if not v_str.strip():
raise ValueError("username is empty")
return v
class BaseSnowflakeConfig(BaseTimeWindowConfig):
# Note: this config model is also used by the snowflake-usage source.
@ -154,10 +97,6 @@ class BaseSnowflakeConfig(BaseTimeWindowConfig):
description="Connect args to pass to Snowflake SqlAlchemy driver",
exclude=True,
)
check_role_grants: bool = pydantic.Field(
default=False,
description="If set to True then checks role grants at the beginning of the ingestion run. To be used for debugging purposes. If you think everything is working fine then set it to False. In some cases this can take long depending on how many roles you might have.",
)
def get_account(self) -> str:
assert self.account_id
@ -338,7 +277,6 @@ class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
provision_role: Optional[SnowflakeProvisionRoleConfig] = None
ignore_start_time_lineage: bool = False
upstream_lineage_in_report: bool = False

View File

@ -32,8 +32,6 @@ class SnowflakeReport(BaseSnowflakeReport, ProfilingSqlReport):
default_db: Optional[str] = None
default_schema: Optional[str] = None
role: str = ""
check_role_grants: Optional[bool] = None
role_grants: List[str] = field(default_factory=list)
profile_if_updated_since: Optional[datetime] = None
profile_candidates: Dict[str, List[str]] = field(default_factory=dict)