mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-27 08:54:32 +00:00
feat(fivetran/google_sheets): add Google Sheets support and API client integration (#15007)
This commit is contained in:
parent
b1b68b42fb
commit
114186b01b
@ -15,6 +15,7 @@ import elasticsearchLogo from '@images/elasticsearchlogo.png';
|
||||
import feastLogo from '@images/feastlogo.png';
|
||||
import fivetranLogo from '@images/fivetranlogo.png';
|
||||
import glueLogo from '@images/gluelogo.png';
|
||||
import googleSheetsLogo from '@images/google-sheets-logo.png';
|
||||
import grafanaLogo from '@images/grafana.png';
|
||||
import hiveLogo from '@images/hivelogo.png';
|
||||
import kafkaLogo from '@images/kafkalogo.png';
|
||||
@ -152,6 +153,8 @@ export const VERTEX_AI = 'vertexai';
|
||||
export const VERTEXAI_URN = `urn:li:dataPlatform:${VERTEX_AI}`;
|
||||
export const SNAPLOGIC = 'snaplogic';
|
||||
export const SNAPLOGIC_URN = `urn:li:dataPlatform:${SNAPLOGIC}`;
|
||||
export const GOOGLE_SHEETS = 'google_sheets';
|
||||
export const GOOGLE_SHEETS_URN = `urn:li:dataPlatform:${GOOGLE_SHEETS}`;
|
||||
|
||||
export const PLATFORM_URN_TO_LOGO = {
|
||||
[ATHENA_URN]: athenaLogo,
|
||||
@ -200,6 +203,7 @@ export const PLATFORM_URN_TO_LOGO = {
|
||||
[NEO4J_URN]: neo4j,
|
||||
[VERTEXAI_URN]: vertexAI,
|
||||
[SNAPLOGIC_URN]: snaplogic,
|
||||
[GOOGLE_SHEETS_URN]: googleSheetsLogo,
|
||||
};
|
||||
|
||||
export const SOURCE_TO_PLATFORM_URN = {
|
||||
|
||||
BIN
datahub-web-react/src/images/google-sheets-logo.png
Normal file
BIN
datahub-web-react/src/images/google-sheets-logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 392 B |
@ -34,6 +34,8 @@ class DatasetSubTypes(StrEnum):
|
||||
API_ENDPOINT = "API Endpoint"
|
||||
SLACK_CHANNEL = "Slack Channel"
|
||||
PROJECTIONS = "Projections"
|
||||
GOOGLE_SHEETS = "Google Sheets"
|
||||
GOOGLE_SHEETS_NAMED_RANGE = "Google Sheets Named Range"
|
||||
|
||||
# TODO: Create separate entity...
|
||||
NOTEBOOK = "Notebook"
|
||||
|
||||
@ -68,14 +68,22 @@ class Constant:
|
||||
SUCCESSFUL = "SUCCESSFUL"
|
||||
FAILURE_WITH_TASK = "FAILURE_WITH_TASK"
|
||||
CANCELED = "CANCELED"
|
||||
GOOGLE_SHEETS_CONNECTOR_TYPE = "google_sheets"
|
||||
|
||||
|
||||
# Key: Connector Type, Value: Platform ID/Name
|
||||
KNOWN_DATA_PLATFORM_MAPPING = {
|
||||
"google_cloud_postgresql": "postgres",
|
||||
"postgres": "postgres",
|
||||
"snowflake": "snowflake",
|
||||
Constant.GOOGLE_SHEETS_CONNECTOR_TYPE: Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
}
|
||||
|
||||
# Note: (As of Oct 2025) Fivetran Platform Connector has stale lineage metadata for Google Sheets column data (deleted/renamed).
|
||||
# Ref: https://fivetran.com/docs/connectors/files/google-sheets#deletingdata
|
||||
# TODO: Remove Google Sheets connector type from DISABLE_LINEAGE_FOR_CONNECTOR_TYPES
|
||||
DISABLE_COL_LINEAGE_FOR_CONNECTOR_TYPES = [Constant.GOOGLE_SHEETS_CONNECTOR_TYPE]
|
||||
|
||||
|
||||
class SnowflakeDestinationConfig(SnowflakeConnectionConfig):
|
||||
database: str = Field(description="The fivetran connector log database.")
|
||||
@ -97,6 +105,17 @@ class DatabricksDestinationConfig(UnityCatalogConnectionConfig):
|
||||
return warehouse_id
|
||||
|
||||
|
||||
class FivetranAPIConfig(ConfigModel):
|
||||
api_key: str = Field(description="Fivetran API key")
|
||||
api_secret: str = Field(description="Fivetran API secret")
|
||||
base_url: str = Field(
|
||||
default="https://api.fivetran.com", description="Fivetran API base URL"
|
||||
)
|
||||
request_timeout_sec: int = Field(
|
||||
default=30, description="Request timeout in seconds"
|
||||
)
|
||||
|
||||
|
||||
class FivetranLogConfig(ConfigModel):
|
||||
destination_platform: Literal["snowflake", "bigquery", "databricks"] = (
|
||||
pydantic.Field(
|
||||
@ -163,6 +182,7 @@ class MetadataExtractionPerfReport(Report):
|
||||
@dataclasses.dataclass
|
||||
class FivetranSourceReport(StaleEntityRemovalSourceReport):
|
||||
connectors_scanned: int = 0
|
||||
fivetran_rest_api_call_count: int = 0
|
||||
filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList)
|
||||
metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field(
|
||||
default_factory=MetadataExtractionPerfReport
|
||||
@ -174,6 +194,9 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport):
|
||||
def report_connectors_dropped(self, connector: str) -> None:
|
||||
self.filtered_connectors.append(connector)
|
||||
|
||||
def report_fivetran_rest_api_call_count(self) -> None:
|
||||
self.fivetran_rest_api_call_count += 1
|
||||
|
||||
|
||||
class PlatformDetail(ConfigModel):
|
||||
platform: Optional[str] = pydantic.Field(
|
||||
@ -234,6 +257,16 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
|
||||
description="A mapping of destination id to its platform/instance/env details.",
|
||||
)
|
||||
|
||||
"""
|
||||
Use Fivetran REST API to get :
|
||||
- Google Sheets Connector details and emit related entities
|
||||
Fivetran Platform Connector syncs limited information about the Google Sheets Connector.
|
||||
"""
|
||||
api_config: Optional[FivetranAPIConfig] = Field(
|
||||
default=None,
|
||||
description="Fivetran REST API configuration, used to provide wider support for connections.",
|
||||
)
|
||||
|
||||
@pydantic.root_validator(pre=True)
|
||||
def compat_sources_to_database(cls, values: Dict) -> Dict:
|
||||
if "sources_to_database" in values:
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import logging
|
||||
from typing import Dict, Iterable, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import datahub.emitter.mce_builder as builder
|
||||
from datahub.api.entities.datajob import DataJob as DataJobV1
|
||||
@ -22,6 +23,7 @@ from datahub.ingestion.api.source import (
|
||||
StructuredLogCategory,
|
||||
)
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
|
||||
from datahub.ingestion.source.fivetran.config import (
|
||||
KNOWN_DATA_PLATFORM_MAPPING,
|
||||
Constant,
|
||||
@ -35,24 +37,34 @@ from datahub.ingestion.source.fivetran.fivetran_query import (
|
||||
MAX_JOBS_PER_CONNECTOR,
|
||||
MAX_TABLE_LINEAGE_PER_CONNECTOR,
|
||||
)
|
||||
from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient
|
||||
from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import (
|
||||
StaleEntityRemovalHandler,
|
||||
)
|
||||
from datahub.ingestion.source.state.stateful_ingestion_base import (
|
||||
StatefulIngestionSourceBase,
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
|
||||
FineGrainedLineage,
|
||||
FineGrainedLineageDownstreamType,
|
||||
FineGrainedLineageUpstreamType,
|
||||
UpstreamLineage,
|
||||
)
|
||||
from datahub.metadata.schema_classes import (
|
||||
DatasetLineageTypeClass,
|
||||
UpstreamClass,
|
||||
)
|
||||
from datahub.metadata.urns import CorpUserUrn, DataFlowUrn, DatasetUrn
|
||||
from datahub.sdk.dataflow import DataFlow
|
||||
from datahub.sdk.datajob import DataJob
|
||||
from datahub.sdk.dataset import Dataset
|
||||
from datahub.sdk.entity import Entity
|
||||
|
||||
# Logger instance
|
||||
logger = logging.getLogger(__name__)
|
||||
CORPUSER_DATAHUB = "urn:li:corpuser:datahub"
|
||||
|
||||
|
||||
@platform_name("Fivetran")
|
||||
@ -76,8 +88,12 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
super().__init__(config, ctx)
|
||||
self.config = config
|
||||
self.report = FivetranSourceReport()
|
||||
|
||||
self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)
|
||||
self.api_client: Optional[FivetranAPIClient] = None
|
||||
self._connection_details_cache: Dict[str, FivetranConnectionDetails] = {}
|
||||
|
||||
if self.config.api_config:
|
||||
self.api_client = FivetranAPIClient(self.config.api_config)
|
||||
|
||||
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
|
||||
input_dataset_urn_list: List[Union[str, DatasetUrn]] = []
|
||||
@ -131,17 +147,43 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
if source_details.include_schema_in_urn
|
||||
else lineage.source_table.split(".", 1)[1]
|
||||
)
|
||||
input_dataset_urn = DatasetUrn.create_from_ids(
|
||||
platform_id=source_details.platform,
|
||||
table_name=(
|
||||
f"{source_details.database.lower()}.{source_table}"
|
||||
if source_details.database
|
||||
else source_table
|
||||
),
|
||||
env=source_details.env,
|
||||
platform_instance=source_details.platform_instance,
|
||||
)
|
||||
input_dataset_urn_list.append(input_dataset_urn)
|
||||
input_dataset_urn: Optional[DatasetUrn] = None
|
||||
# Special Handling for Google Sheets Connectors
|
||||
if connector.connector_type == Constant.GOOGLE_SHEETS_CONNECTOR_TYPE:
|
||||
# Get Google Sheet dataset details from Fivetran API
|
||||
# This is cached in the api_client
|
||||
gsheets_conn_details: Optional[FivetranConnectionDetails] = (
|
||||
self._get_connection_details_by_id(connector.connector_id)
|
||||
)
|
||||
|
||||
if gsheets_conn_details:
|
||||
input_dataset_urn = DatasetUrn.create_from_ids(
|
||||
platform_id=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
table_name=self._get_gsheet_named_range_dataset_id(
|
||||
gsheets_conn_details
|
||||
),
|
||||
env=source_details.env,
|
||||
)
|
||||
else:
|
||||
self.report.warning(
|
||||
title="Failed to extract lineage for Google Sheets Connector",
|
||||
message="Unable to extract lineage for Google Sheets Connector, as the connector details are not available from Fivetran API.",
|
||||
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
|
||||
)
|
||||
else:
|
||||
input_dataset_urn = DatasetUrn.create_from_ids(
|
||||
platform_id=source_details.platform,
|
||||
table_name=(
|
||||
f"{source_details.database.lower()}.{source_table}"
|
||||
if source_details.database
|
||||
else source_table
|
||||
),
|
||||
env=source_details.env,
|
||||
platform_instance=source_details.platform_instance,
|
||||
)
|
||||
|
||||
if input_dataset_urn:
|
||||
input_dataset_urn_list.append(input_dataset_urn)
|
||||
|
||||
destination_table = (
|
||||
lineage.destination_table
|
||||
@ -262,6 +304,67 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
clone_outlets=True,
|
||||
)
|
||||
|
||||
def _get_connection_details_by_id(
|
||||
self, connection_id: str
|
||||
) -> Optional[FivetranConnectionDetails]:
|
||||
if self.api_client is None:
|
||||
self.report.warning(
|
||||
title="Fivetran API client is not initialized",
|
||||
message="Google Sheets Connector details cannot be extracted, as Fivetran API client is not initialized.",
|
||||
context=f"connector_id: {connection_id}",
|
||||
)
|
||||
return None
|
||||
|
||||
if connection_id in self._connection_details_cache:
|
||||
return self._connection_details_cache[connection_id]
|
||||
|
||||
try:
|
||||
self.report.report_fivetran_rest_api_call_count()
|
||||
conn_details = self.api_client.get_connection_details_by_id(connection_id)
|
||||
# Update Cache
|
||||
if conn_details:
|
||||
self._connection_details_cache[connection_id] = conn_details
|
||||
|
||||
return conn_details
|
||||
except Exception as e:
|
||||
self.report.warning(
|
||||
title="Failed to get connection details for Google Sheets Connector",
|
||||
message=f"Exception occurred while getting connection details from Fivetran API. {e}",
|
||||
context=f"connector_id: {connection_id}",
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_gsheet_sheet_id_from_url(
|
||||
self, gsheets_conn_details: FivetranConnectionDetails
|
||||
) -> str:
|
||||
# Extracting the sheet_id (1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo) from the sheet_id url
|
||||
# "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0",
|
||||
try:
|
||||
parsed = urlparse(gsheets_conn_details.config.sheet_id)
|
||||
# Example: https://docs.google.com/spreadsheets/d/<spreadsheetId>/edit
|
||||
parts = parsed.path.split("/")
|
||||
return parts[3] if len(parts) > 2 else ""
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to extract sheet_id from the sheet_id url: {gsheets_conn_details.config.sheet_id}, {e}"
|
||||
)
|
||||
|
||||
return ""
|
||||
|
||||
def _get_gsheet_named_range_dataset_id(
|
||||
self, gsheets_conn_details: FivetranConnectionDetails
|
||||
) -> str:
|
||||
sheet_id = self._get_gsheet_sheet_id_from_url(gsheets_conn_details)
|
||||
named_range_id = (
|
||||
f"{sheet_id}.{gsheets_conn_details.config.named_range}"
|
||||
if sheet_id
|
||||
else gsheets_conn_details.config.named_range
|
||||
)
|
||||
logger.debug(
|
||||
f"Using gsheet_named_range_dataset_id: {named_range_id} for connector: {gsheets_conn_details.id}"
|
||||
)
|
||||
return named_range_id
|
||||
|
||||
def _get_dpi_workunits(
|
||||
self, job: Job, dpi: DataProcessInstance
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
@ -295,6 +398,74 @@ class FivetranSource(StatefulIngestionSourceBase):
|
||||
self, connector: Connector
|
||||
) -> Iterable[Union[MetadataWorkUnit, Entity]]:
|
||||
self.report.report_connectors_scanned()
|
||||
|
||||
"""
|
||||
-------------------------------------------------------
|
||||
Special Handling for Google Sheets Connectors
|
||||
-------------------------------------------------------
|
||||
Google Sheets source is not supported by Datahub yet.
|
||||
As a workaround, we are emitting a dataset entity for the Google Sheet
|
||||
and adding it to the lineage. This workaround needs to be removed once
|
||||
Datahub supports Google Sheets source natively.
|
||||
-------------------------------------------------------
|
||||
"""
|
||||
if connector.connector_type == Constant.GOOGLE_SHEETS_CONNECTOR_TYPE:
|
||||
# Get Google Sheet dataset details from Fivetran API
|
||||
gsheets_conn_details: Optional[FivetranConnectionDetails] = (
|
||||
self._get_connection_details_by_id(connector.connector_id)
|
||||
)
|
||||
|
||||
if gsheets_conn_details:
|
||||
gsheets_dataset = Dataset(
|
||||
name=self._get_gsheet_sheet_id_from_url(gsheets_conn_details),
|
||||
platform=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
env=self.config.env,
|
||||
display_name=self._get_gsheet_sheet_id_from_url(
|
||||
gsheets_conn_details
|
||||
),
|
||||
external_url=gsheets_conn_details.config.sheet_id,
|
||||
created=gsheets_conn_details.created_at,
|
||||
last_modified=gsheets_conn_details.source_sync_details.last_synced,
|
||||
subtype=DatasetSubTypes.GOOGLE_SHEETS,
|
||||
custom_properties={
|
||||
"ingested_by": "fivetran source",
|
||||
"connector_id": gsheets_conn_details.id,
|
||||
},
|
||||
)
|
||||
gsheets_named_range_dataset = Dataset(
|
||||
name=self._get_gsheet_named_range_dataset_id(gsheets_conn_details),
|
||||
platform=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
env=self.config.env,
|
||||
display_name=gsheets_conn_details.config.named_range,
|
||||
external_url=gsheets_conn_details.config.sheet_id,
|
||||
created=gsheets_conn_details.created_at,
|
||||
last_modified=gsheets_conn_details.source_sync_details.last_synced,
|
||||
subtype=DatasetSubTypes.GOOGLE_SHEETS_NAMED_RANGE,
|
||||
custom_properties={
|
||||
"ingested_by": "fivetran source",
|
||||
"connector_id": gsheets_conn_details.id,
|
||||
},
|
||||
upstreams=UpstreamLineage(
|
||||
upstreams=[
|
||||
UpstreamClass(
|
||||
dataset=str(gsheets_dataset.urn),
|
||||
type=DatasetLineageTypeClass.VIEW,
|
||||
auditStamp=AuditStamp(
|
||||
time=int(
|
||||
gsheets_conn_details.created_at.timestamp()
|
||||
* 1000
|
||||
),
|
||||
actor=CORPUSER_DATAHUB,
|
||||
),
|
||||
)
|
||||
],
|
||||
fineGrainedLineages=None,
|
||||
),
|
||||
)
|
||||
|
||||
yield gsheets_dataset
|
||||
yield gsheets_named_range_dataset
|
||||
|
||||
# Create dataflow entity with same name as connector name
|
||||
dataflow = self._generate_dataflow_from_connector(connector)
|
||||
yield dataflow
|
||||
|
||||
@ -9,6 +9,7 @@ from sqlalchemy import create_engine
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
|
||||
from datahub.ingestion.source.fivetran.config import (
|
||||
DISABLE_COL_LINEAGE_FOR_CONNECTOR_TYPES,
|
||||
Constant,
|
||||
FivetranLogConfig,
|
||||
FivetranSourceReport,
|
||||
@ -112,7 +113,11 @@ class FivetranLogAPI:
|
||||
"""
|
||||
Returns dict of column lineage metadata with key as (<SOURCE_TABLE_ID>, <DESTINATION_TABLE_ID>)
|
||||
"""
|
||||
all_column_lineage = defaultdict(list)
|
||||
all_column_lineage: Dict[Tuple[str, str], List] = defaultdict(list)
|
||||
|
||||
if not connector_ids:
|
||||
return dict(all_column_lineage)
|
||||
|
||||
column_lineage_result = self._query(
|
||||
self.fivetran_log_query.get_column_lineage_query(
|
||||
connector_ids=connector_ids
|
||||
@ -130,7 +135,11 @@ class FivetranLogAPI:
|
||||
"""
|
||||
Returns dict of table lineage metadata with key as 'CONNECTOR_ID'
|
||||
"""
|
||||
connectors_table_lineage_metadata = defaultdict(list)
|
||||
connectors_table_lineage_metadata: Dict[str, List] = defaultdict(list)
|
||||
|
||||
if not connector_ids:
|
||||
return dict(connectors_table_lineage_metadata)
|
||||
|
||||
table_lineage_result = self._query(
|
||||
self.fivetran_log_query.get_table_lineage_query(connector_ids=connector_ids)
|
||||
)
|
||||
@ -246,9 +255,15 @@ class FivetranLogAPI:
|
||||
return self._get_users().get(user_id)
|
||||
|
||||
def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
|
||||
connector_ids = [connector.connector_id for connector in connectors]
|
||||
table_lineage_metadata = self._get_table_lineage_metadata(connector_ids)
|
||||
column_lineage_metadata = self._get_column_lineage_metadata(connector_ids)
|
||||
# Create 2 filtered connector_ids lists - one for table lineage and one for column lineage
|
||||
tll_connector_ids: List[str] = []
|
||||
cll_connector_ids: List[str] = []
|
||||
for connector in connectors:
|
||||
tll_connector_ids.append(connector.connector_id)
|
||||
if connector.connector_type not in DISABLE_COL_LINEAGE_FOR_CONNECTOR_TYPES:
|
||||
cll_connector_ids.append(connector.connector_id)
|
||||
table_lineage_metadata = self._get_table_lineage_metadata(tll_connector_ids)
|
||||
column_lineage_metadata = self._get_column_lineage_metadata(cll_connector_ids)
|
||||
for connector in connectors:
|
||||
connector.lineage = self._extract_connector_lineage(
|
||||
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
|
||||
|
||||
@ -0,0 +1,65 @@
|
||||
import logging
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util import Retry
|
||||
|
||||
from datahub.ingestion.source.fivetran.config import (
|
||||
FivetranAPIConfig,
|
||||
)
|
||||
from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Retry configuration constants
|
||||
RETRY_MAX_TIMES = 3
|
||||
RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
|
||||
RETRY_BACKOFF_FACTOR = 1
|
||||
RETRY_ALLOWED_METHODS = ["GET"]
|
||||
|
||||
|
||||
class FivetranAPIClient:
|
||||
"""Client for interacting with the Fivetran REST API."""
|
||||
|
||||
def __init__(self, config: FivetranAPIConfig) -> None:
|
||||
self.config = config
|
||||
self._session = self._create_session()
|
||||
|
||||
def _create_session(self) -> requests.Session:
|
||||
"""
|
||||
Create a session with retry logic and basic authentication
|
||||
"""
|
||||
requests_session = requests.Session()
|
||||
|
||||
# Configure retry strategy for transient failures
|
||||
retry_strategy = Retry(
|
||||
total=RETRY_MAX_TIMES,
|
||||
backoff_factor=RETRY_BACKOFF_FACTOR,
|
||||
status_forcelist=RETRY_STATUS_CODES,
|
||||
allowed_methods=RETRY_ALLOWED_METHODS,
|
||||
raise_on_status=True,
|
||||
)
|
||||
|
||||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||||
requests_session.mount("http://", adapter)
|
||||
requests_session.mount("https://", adapter)
|
||||
|
||||
# Set up basic authentication
|
||||
requests_session.auth = (self.config.api_key, self.config.api_secret)
|
||||
requests_session.headers.update(
|
||||
{
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
)
|
||||
return requests_session
|
||||
|
||||
def get_connection_details_by_id(
|
||||
self, connection_id: str
|
||||
) -> FivetranConnectionDetails:
|
||||
"""Get details for a specific connection."""
|
||||
connection_details = self._session.get(
|
||||
f"{self.config.base_url}/v1/connections/{connection_id}",
|
||||
timeout=self.config.request_timeout_sec,
|
||||
)
|
||||
return FivetranConnectionDetails(**connection_details.json().get("data", {}))
|
||||
@ -0,0 +1,97 @@
|
||||
import datetime
|
||||
from typing import Dict, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class FivetranConnectionWarnings(BaseModel):
|
||||
code: str # Warning Code
|
||||
message: str # Warning Message
|
||||
details: Dict # Warning Details
|
||||
|
||||
|
||||
class FivetranConnectionStatus(BaseModel):
|
||||
setup_state: str # Setup State
|
||||
schema_status: str # Schema Status
|
||||
sync_state: str # Sync State
|
||||
update_state: str # Update State
|
||||
is_historical_sync: bool # Is Historical Sync
|
||||
warnings: List[FivetranConnectionWarnings] # Warnings
|
||||
|
||||
|
||||
class FivetranConnectionConfig(BaseModel):
|
||||
# Note: Connection Config is different for different connectors
|
||||
auth_type: str # Auth Type
|
||||
sheet_id: str # Sheet ID - URL to the Google Sheet
|
||||
named_range: str # Named Range
|
||||
|
||||
|
||||
class FivetranConnectionSourceSyncDetails(BaseModel):
|
||||
last_synced: datetime.datetime # Last Synced
|
||||
|
||||
|
||||
class FivetranConnectionDetails(BaseModel):
|
||||
"""
|
||||
Note: This reponse class only captures fields that are relevant to the Google Sheets Connector
|
||||
"""
|
||||
|
||||
id: str # Source ID
|
||||
group_id: str # Destination ID
|
||||
service: str # Connector Type
|
||||
created_at: datetime.datetime
|
||||
succeeded_at: datetime.datetime
|
||||
paused: bool # Paused Status
|
||||
sync_frequency: int # Sync Frequency (minutes)
|
||||
status: FivetranConnectionStatus # Status
|
||||
config: FivetranConnectionConfig # Connection Config
|
||||
source_sync_details: FivetranConnectionSourceSyncDetails # Source Sync Details
|
||||
|
||||
"""
|
||||
# Sample Response for Google Sheets Connector
|
||||
{
|
||||
"code": "Success",
|
||||
"data": {
|
||||
"id": "dialectical_remindful",
|
||||
"group_id": "empties_classification",
|
||||
"service": "google_sheets",
|
||||
"service_version": 1,
|
||||
"schema": "fivetran_google_sheets.fivetran_google_sheets",
|
||||
"connected_by": "sewn_restrained",
|
||||
"created_at": "2025-10-06T17:53:01.554289Z",
|
||||
"succeeded_at": "2025-10-06T22:55:45.275000Z",
|
||||
"failed_at": null,
|
||||
"paused": true,
|
||||
"pause_after_trial": false,
|
||||
"sync_frequency": 360,
|
||||
"data_delay_threshold": 0,
|
||||
"data_delay_sensitivity": "NORMAL",
|
||||
"private_link_id": null,
|
||||
"networking_method": "Directly",
|
||||
"proxy_agent_id": null,
|
||||
"schedule_type": "auto",
|
||||
"status": {
|
||||
"setup_state": "connected",
|
||||
"schema_status": "ready",
|
||||
"sync_state": "paused",
|
||||
"update_state": "on_schedule",
|
||||
"is_historical_sync": false,
|
||||
"tasks": [],
|
||||
"warnings": [
|
||||
{
|
||||
"code": "snowflake_discontinuing_password_auth",
|
||||
"message": "Snowflake is discontinuing username/password authentication",
|
||||
"details": {}
|
||||
}
|
||||
]
|
||||
},
|
||||
"config": {
|
||||
"auth_type": "ServiceAccount",
|
||||
"sheet_id": "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0",
|
||||
"named_range": "Fivetran_Test_Range"
|
||||
},
|
||||
"source_sync_details": {
|
||||
"last_synced": "2025-10-06T22:55:27.371Z"
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
1
metadata-ingestion/tests/unit/fivetran/__init__.py
Normal file
1
metadata-ingestion/tests/unit/fivetran/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# Fivetran unit tests
|
||||
@ -0,0 +1,203 @@
|
||||
import datetime
|
||||
from unittest import TestCase
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.source.fivetran.config import (
|
||||
Constant,
|
||||
FivetranAPIConfig,
|
||||
FivetranLogConfig,
|
||||
FivetranSourceConfig,
|
||||
)
|
||||
from datahub.ingestion.source.fivetran.data_classes import (
|
||||
ColumnLineage,
|
||||
Connector,
|
||||
TableLineage,
|
||||
)
|
||||
from datahub.ingestion.source.fivetran.fivetran import FivetranSource
|
||||
from datahub.ingestion.source.fivetran.response_models import (
|
||||
FivetranConnectionConfig,
|
||||
FivetranConnectionDetails,
|
||||
FivetranConnectionSourceSyncDetails,
|
||||
FivetranConnectionStatus,
|
||||
)
|
||||
|
||||
|
||||
class TestFivetranGoogleSheetsIntegration(TestCase):
|
||||
"""Test cases for Google Sheets integration in Fivetran source."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
self.config = FivetranSourceConfig(
|
||||
fivetran_log_config=FivetranLogConfig(
|
||||
destination_platform="snowflake",
|
||||
snowflake_destination_config={
|
||||
"host_port": "test.snowflakecomputing.com",
|
||||
"username": "test_user",
|
||||
"password": "test_password",
|
||||
"database": "test_db",
|
||||
"warehouse": "test_warehouse",
|
||||
"role": "test_role",
|
||||
"log_schema": "test_log_schema",
|
||||
},
|
||||
),
|
||||
api_config=FivetranAPIConfig(
|
||||
api_key="test_api_key", api_secret="test_api_secret"
|
||||
),
|
||||
env="PROD",
|
||||
platform_instance="test_instance",
|
||||
)
|
||||
self.ctx = PipelineContext(run_id="test_run")
|
||||
|
||||
# Mock the FivetranLogAPI to avoid real database connections
|
||||
with patch(
|
||||
"datahub.ingestion.source.fivetran.fivetran.FivetranLogAPI"
|
||||
) as mock_log_api:
|
||||
mock_audit_log = Mock()
|
||||
mock_audit_log.get_user_email.return_value = "test@example.com"
|
||||
mock_audit_log.fivetran_log_database = "test_db"
|
||||
mock_log_api.return_value = mock_audit_log
|
||||
self.source = FivetranSource(self.config, self.ctx)
|
||||
|
||||
# Mock the API client
|
||||
self.mock_api_client = Mock()
|
||||
self.source.api_client = self.mock_api_client
|
||||
|
||||
def test_google_sheets_connector_detection(self):
|
||||
"""Test detection of Google Sheets connectors."""
|
||||
connector = Connector(
|
||||
connector_id="test_gsheets_connector",
|
||||
connector_name="Google Sheets Test",
|
||||
connector_type=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
paused=False,
|
||||
sync_frequency=360,
|
||||
destination_id="test_destination",
|
||||
user_id="test_user",
|
||||
lineage=[],
|
||||
jobs=[],
|
||||
)
|
||||
|
||||
# Mock the API client
|
||||
mock_api_client = Mock()
|
||||
self.source.api_client = mock_api_client
|
||||
|
||||
# Mock the connection details response
|
||||
mock_connection_details = FivetranConnectionDetails(
|
||||
id="test_gsheets_connector",
|
||||
group_id="test_group",
|
||||
service="google_sheets",
|
||||
created_at=datetime.datetime(2025, 1, 1, 0, 0, 0),
|
||||
succeeded_at=datetime.datetime(2025, 1, 1, 1, 0, 0),
|
||||
paused=False,
|
||||
sync_frequency=360,
|
||||
status=FivetranConnectionStatus(
|
||||
setup_state="connected",
|
||||
schema_status="ready",
|
||||
sync_state="paused",
|
||||
update_state="on_schedule",
|
||||
is_historical_sync=False,
|
||||
warnings=[],
|
||||
),
|
||||
config=FivetranConnectionConfig(
|
||||
auth_type="ServiceAccount",
|
||||
sheet_id="https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0",
|
||||
named_range="Test_Range",
|
||||
),
|
||||
source_sync_details=FivetranConnectionSourceSyncDetails(
|
||||
last_synced=datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
),
|
||||
)
|
||||
|
||||
mock_api_client.get_connection_details_by_id.return_value = (
|
||||
mock_connection_details
|
||||
)
|
||||
|
||||
# Test the connector workunits generation
|
||||
workunits = list(self.source._get_connector_workunits(connector))
|
||||
|
||||
# Should generate Google Sheets datasets
|
||||
assert len(workunits) >= 2 # At least the two Google Sheets datasets
|
||||
|
||||
# Find the Google Sheets datasets
|
||||
gsheets_datasets = [
|
||||
wu
|
||||
for wu in workunits
|
||||
if hasattr(wu, "platform")
|
||||
and str(wu.platform)
|
||||
== f"urn:li:dataPlatform:{Constant.GOOGLE_SHEETS_CONNECTOR_TYPE}"
|
||||
]
|
||||
assert len(gsheets_datasets) == 2
|
||||
|
||||
def test_google_sheets_lineage_generation(self):
|
||||
"""Test lineage generation for Google Sheets connectors."""
|
||||
connector = Connector(
|
||||
connector_id="test_gsheets_connector",
|
||||
connector_name="Google Sheets Test",
|
||||
connector_type=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE,
|
||||
paused=False,
|
||||
sync_frequency=360,
|
||||
destination_id="test_destination",
|
||||
user_id="test_user",
|
||||
lineage=[
|
||||
TableLineage(
|
||||
source_table="source_schema.source_table",
|
||||
destination_table="dest_schema.dest_table",
|
||||
column_lineage=[
|
||||
ColumnLineage(
|
||||
source_column="source_col", destination_column="dest_col"
|
||||
)
|
||||
],
|
||||
)
|
||||
],
|
||||
jobs=[],
|
||||
)
|
||||
|
||||
# Mock the API client
|
||||
mock_api_client = Mock()
|
||||
self.source.api_client = mock_api_client
|
||||
|
||||
# Mock the connection details response
|
||||
mock_connection_details = FivetranConnectionDetails(
|
||||
id="test_gsheets_connector",
|
||||
group_id="test_group",
|
||||
service="google_sheets",
|
||||
created_at=datetime.datetime(2025, 1, 1, 0, 0, 0),
|
||||
succeeded_at=datetime.datetime(2025, 1, 1, 1, 0, 0),
|
||||
paused=False,
|
||||
sync_frequency=360,
|
||||
status=FivetranConnectionStatus(
|
||||
setup_state="connected",
|
||||
schema_status="ready",
|
||||
sync_state="paused",
|
||||
update_state="on_schedule",
|
||||
is_historical_sync=False,
|
||||
warnings=[],
|
||||
),
|
||||
config=FivetranConnectionConfig(
|
||||
auth_type="ServiceAccount",
|
||||
sheet_id="https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0",
|
||||
named_range="Test_Range",
|
||||
),
|
||||
source_sync_details=FivetranConnectionSourceSyncDetails(
|
||||
last_synced=datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
),
|
||||
)
|
||||
|
||||
mock_api_client.get_connection_details_by_id.return_value = (
|
||||
mock_connection_details
|
||||
)
|
||||
|
||||
# Test lineage extension
|
||||
datajob = Mock()
|
||||
lineage_properties = self.source._extend_lineage(connector, datajob)
|
||||
|
||||
# Check that the lineage properties include Google Sheets information
|
||||
assert "source.platform" in lineage_properties
|
||||
assert (
|
||||
lineage_properties["source.platform"]
|
||||
== Constant.GOOGLE_SHEETS_CONNECTOR_TYPE
|
||||
)
|
||||
|
||||
def test_google_sheets_connector_type_constant(self):
|
||||
"""Test that the Google Sheets connector type constant is correct."""
|
||||
assert Constant.GOOGLE_SHEETS_CONNECTOR_TYPE == "google_sheets"
|
||||
@ -0,0 +1,91 @@
|
||||
from unittest import TestCase
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from datahub.ingestion.source.fivetran.config import FivetranAPIConfig
|
||||
from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient
|
||||
from datahub.ingestion.source.fivetran.response_models import (
|
||||
FivetranConnectionDetails,
|
||||
)
|
||||
|
||||
|
||||
class TestFivetranAPIClient(TestCase):
|
||||
"""Test cases for FivetranAPIClient."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
self.config = FivetranAPIConfig(
|
||||
api_key="test_api_key",
|
||||
api_secret="test_api_secret",
|
||||
base_url="https://api.fivetran.com",
|
||||
request_timeout_sec=30,
|
||||
)
|
||||
self.client = FivetranAPIClient(self.config)
|
||||
|
||||
def test_init(self):
|
||||
"""Test FivetranAPIClient initialization."""
|
||||
assert self.client.config == self.config
|
||||
assert self.client._session is not None
|
||||
assert self.client._session.auth == ("test_api_key", "test_api_secret")
|
||||
assert self.client._session.headers["Content-Type"] == "application/json"
|
||||
assert self.client._session.headers["Accept"] == "application/json"
|
||||
|
||||
@patch("requests.Session.get")
|
||||
def test_get_connection_details_by_id_success(self, mock_get):
|
||||
"""Test successful retrieval of connection details."""
|
||||
# Mock response data
|
||||
mock_response_data = {
|
||||
"code": "Success",
|
||||
"data": {
|
||||
"id": "test_connection_id",
|
||||
"group_id": "test_group_id",
|
||||
"service": "google_sheets",
|
||||
"created_at": "2025-01-01T00:00:00Z",
|
||||
"succeeded_at": "2025-01-01T01:00:00Z",
|
||||
"paused": False,
|
||||
"sync_frequency": 360,
|
||||
"status": {
|
||||
"setup_state": "connected",
|
||||
"schema_status": "ready",
|
||||
"sync_state": "paused",
|
||||
"update_state": "on_schedule",
|
||||
"is_historical_sync": False,
|
||||
"warnings": [
|
||||
{
|
||||
"code": "test_warning",
|
||||
"message": "Test warning message",
|
||||
"details": {},
|
||||
}
|
||||
],
|
||||
},
|
||||
"config": {
|
||||
"auth_type": "ServiceAccount",
|
||||
"sheet_id": "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0",
|
||||
"named_range": "Test_Range",
|
||||
},
|
||||
"source_sync_details": {"last_synced": "2025-01-01T01:00:00Z"},
|
||||
},
|
||||
}
|
||||
|
||||
# Mock the response
|
||||
mock_response = Mock()
|
||||
mock_response.json.return_value = mock_response_data
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
# Call the method
|
||||
result = self.client.get_connection_details_by_id("test_connection_id")
|
||||
|
||||
# Assertions
|
||||
assert isinstance(result, FivetranConnectionDetails)
|
||||
assert result.id == "test_connection_id"
|
||||
assert result.group_id == "test_group_id"
|
||||
assert result.service == "google_sheets"
|
||||
assert result.paused is False
|
||||
assert result.sync_frequency == 360
|
||||
assert result.config.auth_type == "ServiceAccount"
|
||||
assert result.config.named_range == "Test_Range"
|
||||
# assert result.config.sheet_id_from_url == "1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo" # Removed - no longer an attribute
|
||||
|
||||
# Verify the API call
|
||||
mock_get.assert_called_once_with(
|
||||
"https://api.fivetran.com/v1/connections/test_connection_id", timeout=30
|
||||
)
|
||||
@ -817,3 +817,13 @@
|
||||
displayName: Informatica
|
||||
type: OTHERS
|
||||
logoUrl: "assets/platforms/informaticalogo.png"
|
||||
- entityUrn: urn:li:dataPlatform:google_sheets
|
||||
entityType: dataPlatform
|
||||
aspectName: dataPlatformInfo
|
||||
changeType: UPSERT
|
||||
aspect:
|
||||
datasetNameDelimiter: "."
|
||||
name: google_sheets
|
||||
displayName: Google Sheets
|
||||
type: OTHERS
|
||||
logoUrl: "assets/platforms/google-sheets-logo.png"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user