fix(ingest): fix deps for fivetran (#13385)

This commit is contained in:
Harshal Sheth 2025-05-02 12:31:07 -07:00 committed by GitHub
parent 854ec614b9
commit b7ef234bc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 127 additions and 106 deletions

View File

@ -2,10 +2,8 @@ import logging
import os import os
import re import re
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator
from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.common import AllowDenyPattern, ConfigModel
@ -18,7 +16,9 @@ from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.ingestion.glossary.classification_mixin import ( from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin, ClassificationSourceConfigMixin,
) )
from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import ( from datahub.ingestion.source.state.stateful_ingestion_base import (
@ -105,64 +105,6 @@ class BigQueryUsageConfig(BaseUsageConfig):
) )
class BigQueryConnectionConfig(ConfigModel):
credential: Optional[GCPCredential] = Field(
default=None, description="BigQuery credential informations"
)
_credentials_path: Optional[str] = PrivateAttr(None)
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)
project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)
def __init__(self, **data: Any):
super().__init__(**data)
if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path
def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)
def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()
def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()
def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.extra_client_options.copy()
client_options["_use_grpc"] = False
if project_id is not None:
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)
def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
class GcsLineageProviderConfig(ConfigModel): class GcsLineageProviderConfig(ConfigModel):
""" """
Any source that produces gcs lineage from/to Datasets should inherit this class. Any source that produces gcs lineage from/to Datasets should inherit this class.

View File

@ -0,0 +1,70 @@
import logging
import os
from typing import Any, Dict, Optional
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PrivateAttr
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential
logger = logging.getLogger(__name__)
class BigQueryConnectionConfig(ConfigModel):
credential: Optional[GCPCredential] = Field(
default=None, description="BigQuery credential informations"
)
_credentials_path: Optional[str] = PrivateAttr(None)
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)
project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)
def __init__(self, **data: Any):
super().__init__(**data)
if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path
def get_bigquery_client(self) -> bigquery.Client:
client_options = self.extra_client_options
return bigquery.Client(self.project_on_behalf, **client_options)
def get_projects_client(self) -> resourcemanager_v3.ProjectsClient:
return resourcemanager_v3.ProjectsClient()
def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient:
return datacatalog_v1.PolicyTagManagerClient()
def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.extra_client_options.copy()
client_options["_use_grpc"] = False
if project_id is not None:
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)
def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"

View File

@ -10,10 +10,12 @@ from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_config import ( from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
BigQueryFilterConfig, BigQueryFilterConfig,
BigQueryIdentifierConfig, BigQueryIdentifierConfig,
) )
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import ( from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport, BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport, BigQuerySchemaApiPerfReport,

View File

@ -16,7 +16,7 @@ from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mce_builder import DEFAULT_ENV from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.report import Report from datahub.ingestion.api.report import Report
from datahub.ingestion.source.bigquery_v2.bigquery_config import ( from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig, BigQueryConnectionConfig,
) )
from datahub.ingestion.source.snowflake.snowflake_connection import ( from datahub.ingestion.source.snowflake.snowflake_connection import (

View File

@ -28,7 +28,7 @@ from datahub.ingestion.source.snowflake.oauth_config import (
OAuthIdentityProvider, OAuthIdentityProvider,
) )
from datahub.ingestion.source.snowflake.oauth_generator import OAuthTokenGenerator from datahub.ingestion.source.snowflake.oauth_generator import OAuthTokenGenerator
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.utilities.config_clean import ( from datahub.utilities.config_clean import (
remove_protocol, remove_protocol,
remove_suffix, remove_suffix,

View File

@ -35,13 +35,14 @@ from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource, SQLAlchemySource,
register_custom_type, register_custom_type,
) )
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.sql_utils import ( from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container, add_table_to_schema_container,
gen_database_container, gen_database_container,
gen_database_key, gen_database_key,
) )
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

View File

@ -36,7 +36,6 @@ from datahub.ingestion.source.sql.sql_common import (
from datahub.ingestion.source.sql.sql_config import ( from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig, BasicSQLAlchemyConfig,
SQLCommonConfig, SQLCommonConfig,
make_sqlalchemy_uri,
) )
from datahub.ingestion.source.sql.sql_utils import ( from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container, add_table_to_schema_container,
@ -46,6 +45,7 @@ from datahub.ingestion.source.sql.sql_utils import (
gen_schema_key, gen_schema_key,
get_domain_wu, get_domain_wu,
) )
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stateful_ingestion_base import JobId from datahub.ingestion.source.state.stateful_ingestion_base import JobId
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot

View File

@ -44,9 +44,9 @@ from datahub.ingestion.source.sql.sql_common import (
) )
from datahub.ingestion.source.sql.sql_config import ( from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig, BasicSQLAlchemyConfig,
make_sqlalchemy_uri,
) )
from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.sql.stored_procedures.base import ( from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_lineage, generate_procedure_lineage,
) )

View File

@ -4,7 +4,6 @@ from typing import Any, Dict, Optional
import pydantic import pydantic
from pydantic import Field from pydantic import Field
from sqlalchemy.engine import URL
from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import ( from datahub.configuration.source_common import (
@ -20,6 +19,7 @@ from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin, ClassificationSourceConfigMixin,
) )
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig, StatefulStaleMetadataRemovalConfig,
) )
@ -184,36 +184,3 @@ class SQLAlchemyConnectionConfig(ConfigModel):
class BasicSQLAlchemyConfig(SQLAlchemyConnectionConfig, SQLCommonConfig): class BasicSQLAlchemyConfig(SQLAlchemyConnectionConfig, SQLCommonConfig):
pass pass
def make_sqlalchemy_uri(
scheme: str,
username: Optional[str],
password: Optional[str],
at: Optional[str],
db: Optional[str],
uri_opts: Optional[Dict[str, Any]] = None,
) -> str:
host: Optional[str] = None
port: Optional[int] = None
if at:
try:
host, port_str = at.rsplit(":", 1)
port = int(port_str)
except ValueError:
host = at
port = None
if uri_opts:
uri_opts = {k: v for k, v in uri_opts.items() if v is not None}
return str(
URL.create(
drivername=scheme,
username=username,
password=password,
host=host,
port=port,
database=db,
query=uri_opts or {},
)
)

View File

@ -0,0 +1,36 @@
from typing import Any, Dict, Optional
from sqlalchemy.engine import URL
def make_sqlalchemy_uri(
scheme: str,
username: Optional[str],
password: Optional[str],
at: Optional[str],
db: Optional[str],
uri_opts: Optional[Dict[str, Any]] = None,
) -> str:
host: Optional[str] = None
port: Optional[int] = None
if at:
try:
host, port_str = at.rsplit(":", 1)
port = int(port_str)
except ValueError:
host = at
port = None
if uri_opts:
uri_opts = {k: v for k, v in uri_opts.items() if v is not None}
return str(
URL.create(
drivername=scheme,
username=username,
password=password,
host=host,
port=port,
database=db,
query=uri_opts or {},
)
)

View File

@ -14,12 +14,12 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, logger from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, logger
from datahub.ingestion.source.sql.sql_config import ( from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig, BasicSQLAlchemyConfig,
make_sqlalchemy_uri,
) )
from datahub.ingestion.source.sql.sql_utils import ( from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container, add_table_to_schema_container,
gen_database_key, gen_database_key,
) )
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig): class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig):

View File

@ -17,7 +17,8 @@ from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig, StatefulStaleMetadataRemovalConfig,
) )

View File

@ -14,7 +14,7 @@ from performance.data_model import ColumnType, Container, Table, View
from performance.databricks.unity_proxy_mock import _convert_column_type from performance.databricks.unity_proxy_mock import _convert_column_type
from sqlalchemy import create_engine from sqlalchemy import create_engine
from datahub.ingestion.source.sql.sql_config import make_sqlalchemy_uri from datahub.ingestion.source.sql.sqlalchemy_uri import make_sqlalchemy_uri
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
T = TypeVar("T") T = TypeVar("T")

View File

@ -20,9 +20,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigQueryTableRef, BigQueryTableRef,
) )
from datahub.ingestion.source.bigquery_v2.bigquery_config import ( from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
BigQueryV2Config, BigQueryV2Config,
) )
from datahub.ingestion.source.bigquery_v2.bigquery_connection import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryDataset, BigqueryDataset,