From 114186b01b8656f36e63b70ff1baea9d934115d2 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Thu, 23 Oct 2025 14:32:49 -0700 Subject: [PATCH] feat(fivetran/google_sheets): add Google Sheets support and API client integration (#15007) --- .../app/ingest/source/builder/constants.ts | 4 + .../src/images/google-sheets-logo.png | Bin 0 -> 392 bytes .../ingestion/source/common/subtypes.py | 2 + .../ingestion/source/fivetran/config.py | 33 +++ .../ingestion/source/fivetran/fivetran.py | 195 +++++++++++++++-- .../source/fivetran/fivetran_log_api.py | 25 ++- .../source/fivetran/fivetran_rest_api.py | 65 ++++++ .../source/fivetran/response_models.py | 97 +++++++++ .../tests/unit/fivetran/__init__.py | 1 + ...test_fivetran_google_sheets_integration.py | 203 ++++++++++++++++++ .../unit/fivetran/test_fivetran_rest_api.py | 91 ++++++++ .../bootstrap_mcps/data-platforms.yaml | 10 + 12 files changed, 709 insertions(+), 17 deletions(-) create mode 100644 datahub-web-react/src/images/google-sheets-logo.png create mode 100644 metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_rest_api.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/fivetran/response_models.py create mode 100644 metadata-ingestion/tests/unit/fivetran/__init__.py create mode 100644 metadata-ingestion/tests/unit/fivetran/test_fivetran_google_sheets_integration.py create mode 100644 metadata-ingestion/tests/unit/fivetran/test_fivetran_rest_api.py diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index b1b2a1ea98..35e3514dc2 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -15,6 +15,7 @@ import elasticsearchLogo from '@images/elasticsearchlogo.png'; import feastLogo from '@images/feastlogo.png'; import fivetranLogo from '@images/fivetranlogo.png'; import glueLogo from '@images/gluelogo.png'; +import googleSheetsLogo from '@images/google-sheets-logo.png'; import grafanaLogo from '@images/grafana.png'; import hiveLogo from '@images/hivelogo.png'; import kafkaLogo from '@images/kafkalogo.png'; @@ -152,6 +153,8 @@ export const VERTEX_AI = 'vertexai'; export const VERTEXAI_URN = `urn:li:dataPlatform:${VERTEX_AI}`; export const SNAPLOGIC = 'snaplogic'; export const SNAPLOGIC_URN = `urn:li:dataPlatform:${SNAPLOGIC}`; +export const GOOGLE_SHEETS = 'google_sheets'; +export const GOOGLE_SHEETS_URN = `urn:li:dataPlatform:${GOOGLE_SHEETS}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -200,6 +203,7 @@ export const PLATFORM_URN_TO_LOGO = { [NEO4J_URN]: neo4j, [VERTEXAI_URN]: vertexAI, [SNAPLOGIC_URN]: snaplogic, + [GOOGLE_SHEETS_URN]: googleSheetsLogo, }; export const SOURCE_TO_PLATFORM_URN = { diff --git a/datahub-web-react/src/images/google-sheets-logo.png b/datahub-web-react/src/images/google-sheets-logo.png new file mode 100644 index 0000000000000000000000000000000000000000..561ad958b03608b7c936c493cded8f35630fb329 GIT binary patch literal 392 zcmeAS@N?(olHy`uVBq!ia0vp^2_VeD3?#3*wSy$00(?STfwaSZ2b1MNCM$xCR|Eq| z@kVpgRiQwJM1uv8G+7Q3{Qv*|%*!i5T(F4g@(|M%AqMLMOjZV)0yQoJi-V-V=Bx~I zn4hrA^^OS8bh(lszhDNI zic=tfKf{Fa*Mne2Myp5%3q~du4xp-r{gv7qHmsVq_Q0uCQQMhjuMORz{q}9U*gFG_ mKC9!GZz{>^UH{a$QhGyN`{!SIbtyovFnGH9xvX None: self.filtered_connectors.append(connector) + def report_fivetran_rest_api_call_count(self) -> None: + self.fivetran_rest_api_call_count += 1 + class PlatformDetail(ConfigModel): platform: Optional[str] = pydantic.Field( @@ -234,6 +257,16 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin description="A mapping of destination id to its platform/instance/env details.", ) + """ + Use Fivetran REST API to get : + - Google Sheets Connector details and emit related entities + Fivetran Platform Connector syncs limited information about the Google Sheets Connector. + """ + api_config: Optional[FivetranAPIConfig] = Field( + default=None, + description="Fivetran REST API configuration, used to provide wider support for connections.", + ) + @pydantic.root_validator(pre=True) def compat_sources_to_database(cls, values: Dict) -> Dict: if "sources_to_database" in values: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index fe634f3a0c..b585d2e560 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -1,5 +1,6 @@ import logging from typing import Dict, Iterable, List, Optional, Union +from urllib.parse import urlparse import datahub.emitter.mce_builder as builder from datahub.api.entities.datajob import DataJob as DataJobV1 @@ -22,6 +23,7 @@ from datahub.ingestion.api.source import ( StructuredLogCategory, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.ingestion.source.fivetran.config import ( KNOWN_DATA_PLATFORM_MAPPING, Constant, @@ -35,24 +37,34 @@ from datahub.ingestion.source.fivetran.fivetran_query import ( MAX_JOBS_PER_CONNECTOR, MAX_TABLE_LINEAGE_PER_CONNECTOR, ) +from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient +from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) +from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( FineGrainedLineage, FineGrainedLineageDownstreamType, FineGrainedLineageUpstreamType, + UpstreamLineage, +) +from datahub.metadata.schema_classes import ( + DatasetLineageTypeClass, + UpstreamClass, ) from datahub.metadata.urns import CorpUserUrn, DataFlowUrn, DatasetUrn from datahub.sdk.dataflow import DataFlow from datahub.sdk.datajob import DataJob +from datahub.sdk.dataset import Dataset from datahub.sdk.entity import Entity # Logger instance logger = logging.getLogger(__name__) +CORPUSER_DATAHUB = "urn:li:corpuser:datahub" @platform_name("Fivetran") @@ -76,8 +88,12 @@ class FivetranSource(StatefulIngestionSourceBase): super().__init__(config, ctx) self.config = config self.report = FivetranSourceReport() - self.audit_log = FivetranLogAPI(self.config.fivetran_log_config) + self.api_client: Optional[FivetranAPIClient] = None + self._connection_details_cache: Dict[str, FivetranConnectionDetails] = {} + + if self.config.api_config: + self.api_client = FivetranAPIClient(self.config.api_config) def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]: input_dataset_urn_list: List[Union[str, DatasetUrn]] = [] @@ -131,17 +147,43 @@ class FivetranSource(StatefulIngestionSourceBase): if source_details.include_schema_in_urn else lineage.source_table.split(".", 1)[1] ) - input_dataset_urn = DatasetUrn.create_from_ids( - platform_id=source_details.platform, - table_name=( - f"{source_details.database.lower()}.{source_table}" - if source_details.database - else source_table - ), - env=source_details.env, - platform_instance=source_details.platform_instance, - ) - input_dataset_urn_list.append(input_dataset_urn) + input_dataset_urn: Optional[DatasetUrn] = None + # Special Handling for Google Sheets Connectors + if connector.connector_type == Constant.GOOGLE_SHEETS_CONNECTOR_TYPE: + # Get Google Sheet dataset details from Fivetran API + # This is cached in the api_client + gsheets_conn_details: Optional[FivetranConnectionDetails] = ( + self._get_connection_details_by_id(connector.connector_id) + ) + + if gsheets_conn_details: + input_dataset_urn = DatasetUrn.create_from_ids( + platform_id=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE, + table_name=self._get_gsheet_named_range_dataset_id( + gsheets_conn_details + ), + env=source_details.env, + ) + else: + self.report.warning( + title="Failed to extract lineage for Google Sheets Connector", + message="Unable to extract lineage for Google Sheets Connector, as the connector details are not available from Fivetran API.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id})", + ) + else: + input_dataset_urn = DatasetUrn.create_from_ids( + platform_id=source_details.platform, + table_name=( + f"{source_details.database.lower()}.{source_table}" + if source_details.database + else source_table + ), + env=source_details.env, + platform_instance=source_details.platform_instance, + ) + + if input_dataset_urn: + input_dataset_urn_list.append(input_dataset_urn) destination_table = ( lineage.destination_table @@ -262,6 +304,67 @@ class FivetranSource(StatefulIngestionSourceBase): clone_outlets=True, ) + def _get_connection_details_by_id( + self, connection_id: str + ) -> Optional[FivetranConnectionDetails]: + if self.api_client is None: + self.report.warning( + title="Fivetran API client is not initialized", + message="Google Sheets Connector details cannot be extracted, as Fivetran API client is not initialized.", + context=f"connector_id: {connection_id}", + ) + return None + + if connection_id in self._connection_details_cache: + return self._connection_details_cache[connection_id] + + try: + self.report.report_fivetran_rest_api_call_count() + conn_details = self.api_client.get_connection_details_by_id(connection_id) + # Update Cache + if conn_details: + self._connection_details_cache[connection_id] = conn_details + + return conn_details + except Exception as e: + self.report.warning( + title="Failed to get connection details for Google Sheets Connector", + message=f"Exception occurred while getting connection details from Fivetran API. {e}", + context=f"connector_id: {connection_id}", + ) + return None + + def _get_gsheet_sheet_id_from_url( + self, gsheets_conn_details: FivetranConnectionDetails + ) -> str: + # Extracting the sheet_id (1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo) from the sheet_id url + # "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0", + try: + parsed = urlparse(gsheets_conn_details.config.sheet_id) + # Example: https://docs.google.com/spreadsheets/d//edit + parts = parsed.path.split("/") + return parts[3] if len(parts) > 2 else "" + except Exception as e: + logger.warning( + f"Failed to extract sheet_id from the sheet_id url: {gsheets_conn_details.config.sheet_id}, {e}" + ) + + return "" + + def _get_gsheet_named_range_dataset_id( + self, gsheets_conn_details: FivetranConnectionDetails + ) -> str: + sheet_id = self._get_gsheet_sheet_id_from_url(gsheets_conn_details) + named_range_id = ( + f"{sheet_id}.{gsheets_conn_details.config.named_range}" + if sheet_id + else gsheets_conn_details.config.named_range + ) + logger.debug( + f"Using gsheet_named_range_dataset_id: {named_range_id} for connector: {gsheets_conn_details.id}" + ) + return named_range_id + def _get_dpi_workunits( self, job: Job, dpi: DataProcessInstance ) -> Iterable[MetadataWorkUnit]: @@ -295,6 +398,74 @@ class FivetranSource(StatefulIngestionSourceBase): self, connector: Connector ) -> Iterable[Union[MetadataWorkUnit, Entity]]: self.report.report_connectors_scanned() + + """ + ------------------------------------------------------- + Special Handling for Google Sheets Connectors + ------------------------------------------------------- + Google Sheets source is not supported by Datahub yet. + As a workaround, we are emitting a dataset entity for the Google Sheet + and adding it to the lineage. This workaround needs to be removed once + Datahub supports Google Sheets source natively. + ------------------------------------------------------- + """ + if connector.connector_type == Constant.GOOGLE_SHEETS_CONNECTOR_TYPE: + # Get Google Sheet dataset details from Fivetran API + gsheets_conn_details: Optional[FivetranConnectionDetails] = ( + self._get_connection_details_by_id(connector.connector_id) + ) + + if gsheets_conn_details: + gsheets_dataset = Dataset( + name=self._get_gsheet_sheet_id_from_url(gsheets_conn_details), + platform=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE, + env=self.config.env, + display_name=self._get_gsheet_sheet_id_from_url( + gsheets_conn_details + ), + external_url=gsheets_conn_details.config.sheet_id, + created=gsheets_conn_details.created_at, + last_modified=gsheets_conn_details.source_sync_details.last_synced, + subtype=DatasetSubTypes.GOOGLE_SHEETS, + custom_properties={ + "ingested_by": "fivetran source", + "connector_id": gsheets_conn_details.id, + }, + ) + gsheets_named_range_dataset = Dataset( + name=self._get_gsheet_named_range_dataset_id(gsheets_conn_details), + platform=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE, + env=self.config.env, + display_name=gsheets_conn_details.config.named_range, + external_url=gsheets_conn_details.config.sheet_id, + created=gsheets_conn_details.created_at, + last_modified=gsheets_conn_details.source_sync_details.last_synced, + subtype=DatasetSubTypes.GOOGLE_SHEETS_NAMED_RANGE, + custom_properties={ + "ingested_by": "fivetran source", + "connector_id": gsheets_conn_details.id, + }, + upstreams=UpstreamLineage( + upstreams=[ + UpstreamClass( + dataset=str(gsheets_dataset.urn), + type=DatasetLineageTypeClass.VIEW, + auditStamp=AuditStamp( + time=int( + gsheets_conn_details.created_at.timestamp() + * 1000 + ), + actor=CORPUSER_DATAHUB, + ), + ) + ], + fineGrainedLineages=None, + ), + ) + + yield gsheets_dataset + yield gsheets_named_range_dataset + # Create dataflow entity with same name as connector name dataflow = self._generate_dataflow_from_connector(connector) yield dataflow diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index c6fdcb8501..6f5530cee2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -9,6 +9,7 @@ from sqlalchemy import create_engine from datahub.configuration.common import AllowDenyPattern, ConfigurationError from datahub.ingestion.source.fivetran.config import ( + DISABLE_COL_LINEAGE_FOR_CONNECTOR_TYPES, Constant, FivetranLogConfig, FivetranSourceReport, @@ -112,7 +113,11 @@ class FivetranLogAPI: """ Returns dict of column lineage metadata with key as (, ) """ - all_column_lineage = defaultdict(list) + all_column_lineage: Dict[Tuple[str, str], List] = defaultdict(list) + + if not connector_ids: + return dict(all_column_lineage) + column_lineage_result = self._query( self.fivetran_log_query.get_column_lineage_query( connector_ids=connector_ids @@ -130,7 +135,11 @@ class FivetranLogAPI: """ Returns dict of table lineage metadata with key as 'CONNECTOR_ID' """ - connectors_table_lineage_metadata = defaultdict(list) + connectors_table_lineage_metadata: Dict[str, List] = defaultdict(list) + + if not connector_ids: + return dict(connectors_table_lineage_metadata) + table_lineage_result = self._query( self.fivetran_log_query.get_table_lineage_query(connector_ids=connector_ids) ) @@ -246,9 +255,15 @@ class FivetranLogAPI: return self._get_users().get(user_id) def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: - connector_ids = [connector.connector_id for connector in connectors] - table_lineage_metadata = self._get_table_lineage_metadata(connector_ids) - column_lineage_metadata = self._get_column_lineage_metadata(connector_ids) + # Create 2 filtered connector_ids lists - one for table lineage and one for column lineage + tll_connector_ids: List[str] = [] + cll_connector_ids: List[str] = [] + for connector in connectors: + tll_connector_ids.append(connector.connector_id) + if connector.connector_type not in DISABLE_COL_LINEAGE_FOR_CONNECTOR_TYPES: + cll_connector_ids.append(connector.connector_id) + table_lineage_metadata = self._get_table_lineage_metadata(tll_connector_ids) + column_lineage_metadata = self._get_column_lineage_metadata(cll_connector_ids) for connector in connectors: connector.lineage = self._extract_connector_lineage( table_lineage_result=table_lineage_metadata.get(connector.connector_id), diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_rest_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_rest_api.py new file mode 100644 index 0000000000..de3df43f19 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_rest_api.py @@ -0,0 +1,65 @@ +import logging + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util import Retry + +from datahub.ingestion.source.fivetran.config import ( + FivetranAPIConfig, +) +from datahub.ingestion.source.fivetran.response_models import FivetranConnectionDetails + +logger = logging.getLogger(__name__) + +# Retry configuration constants +RETRY_MAX_TIMES = 3 +RETRY_STATUS_CODES = [429, 500, 502, 503, 504] +RETRY_BACKOFF_FACTOR = 1 +RETRY_ALLOWED_METHODS = ["GET"] + + +class FivetranAPIClient: + """Client for interacting with the Fivetran REST API.""" + + def __init__(self, config: FivetranAPIConfig) -> None: + self.config = config + self._session = self._create_session() + + def _create_session(self) -> requests.Session: + """ + Create a session with retry logic and basic authentication + """ + requests_session = requests.Session() + + # Configure retry strategy for transient failures + retry_strategy = Retry( + total=RETRY_MAX_TIMES, + backoff_factor=RETRY_BACKOFF_FACTOR, + status_forcelist=RETRY_STATUS_CODES, + allowed_methods=RETRY_ALLOWED_METHODS, + raise_on_status=True, + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + requests_session.mount("http://", adapter) + requests_session.mount("https://", adapter) + + # Set up basic authentication + requests_session.auth = (self.config.api_key, self.config.api_secret) + requests_session.headers.update( + { + "Content-Type": "application/json", + "Accept": "application/json", + } + ) + return requests_session + + def get_connection_details_by_id( + self, connection_id: str + ) -> FivetranConnectionDetails: + """Get details for a specific connection.""" + connection_details = self._session.get( + f"{self.config.base_url}/v1/connections/{connection_id}", + timeout=self.config.request_timeout_sec, + ) + return FivetranConnectionDetails(**connection_details.json().get("data", {})) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/response_models.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/response_models.py new file mode 100644 index 0000000000..92bbb96d1e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/response_models.py @@ -0,0 +1,97 @@ +import datetime +from typing import Dict, List + +from pydantic import BaseModel + + +class FivetranConnectionWarnings(BaseModel): + code: str # Warning Code + message: str # Warning Message + details: Dict # Warning Details + + +class FivetranConnectionStatus(BaseModel): + setup_state: str # Setup State + schema_status: str # Schema Status + sync_state: str # Sync State + update_state: str # Update State + is_historical_sync: bool # Is Historical Sync + warnings: List[FivetranConnectionWarnings] # Warnings + + +class FivetranConnectionConfig(BaseModel): + # Note: Connection Config is different for different connectors + auth_type: str # Auth Type + sheet_id: str # Sheet ID - URL to the Google Sheet + named_range: str # Named Range + + +class FivetranConnectionSourceSyncDetails(BaseModel): + last_synced: datetime.datetime # Last Synced + + +class FivetranConnectionDetails(BaseModel): + """ + Note: This reponse class only captures fields that are relevant to the Google Sheets Connector + """ + + id: str # Source ID + group_id: str # Destination ID + service: str # Connector Type + created_at: datetime.datetime + succeeded_at: datetime.datetime + paused: bool # Paused Status + sync_frequency: int # Sync Frequency (minutes) + status: FivetranConnectionStatus # Status + config: FivetranConnectionConfig # Connection Config + source_sync_details: FivetranConnectionSourceSyncDetails # Source Sync Details + + """ + # Sample Response for Google Sheets Connector + { + "code": "Success", + "data": { + "id": "dialectical_remindful", + "group_id": "empties_classification", + "service": "google_sheets", + "service_version": 1, + "schema": "fivetran_google_sheets.fivetran_google_sheets", + "connected_by": "sewn_restrained", + "created_at": "2025-10-06T17:53:01.554289Z", + "succeeded_at": "2025-10-06T22:55:45.275000Z", + "failed_at": null, + "paused": true, + "pause_after_trial": false, + "sync_frequency": 360, + "data_delay_threshold": 0, + "data_delay_sensitivity": "NORMAL", + "private_link_id": null, + "networking_method": "Directly", + "proxy_agent_id": null, + "schedule_type": "auto", + "status": { + "setup_state": "connected", + "schema_status": "ready", + "sync_state": "paused", + "update_state": "on_schedule", + "is_historical_sync": false, + "tasks": [], + "warnings": [ + { + "code": "snowflake_discontinuing_password_auth", + "message": "Snowflake is discontinuing username/password authentication", + "details": {} + } + ] + }, + "config": { + "auth_type": "ServiceAccount", + "sheet_id": "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0", + "named_range": "Fivetran_Test_Range" + }, + "source_sync_details": { + "last_synced": "2025-10-06T22:55:27.371Z" + } + } + } + """ diff --git a/metadata-ingestion/tests/unit/fivetran/__init__.py b/metadata-ingestion/tests/unit/fivetran/__init__.py new file mode 100644 index 0000000000..d11c3c619a --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/__init__.py @@ -0,0 +1 @@ +# Fivetran unit tests diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_google_sheets_integration.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_google_sheets_integration.py new file mode 100644 index 0000000000..49d8d07d8a --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_google_sheets_integration.py @@ -0,0 +1,203 @@ +import datetime +from unittest import TestCase +from unittest.mock import Mock, patch + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.fivetran.config import ( + Constant, + FivetranAPIConfig, + FivetranLogConfig, + FivetranSourceConfig, +) +from datahub.ingestion.source.fivetran.data_classes import ( + ColumnLineage, + Connector, + TableLineage, +) +from datahub.ingestion.source.fivetran.fivetran import FivetranSource +from datahub.ingestion.source.fivetran.response_models import ( + FivetranConnectionConfig, + FivetranConnectionDetails, + FivetranConnectionSourceSyncDetails, + FivetranConnectionStatus, +) + + +class TestFivetranGoogleSheetsIntegration(TestCase): + """Test cases for Google Sheets integration in Fivetran source.""" + + def setUp(self): + """Set up test fixtures.""" + self.config = FivetranSourceConfig( + fivetran_log_config=FivetranLogConfig( + destination_platform="snowflake", + snowflake_destination_config={ + "host_port": "test.snowflakecomputing.com", + "username": "test_user", + "password": "test_password", + "database": "test_db", + "warehouse": "test_warehouse", + "role": "test_role", + "log_schema": "test_log_schema", + }, + ), + api_config=FivetranAPIConfig( + api_key="test_api_key", api_secret="test_api_secret" + ), + env="PROD", + platform_instance="test_instance", + ) + self.ctx = PipelineContext(run_id="test_run") + + # Mock the FivetranLogAPI to avoid real database connections + with patch( + "datahub.ingestion.source.fivetran.fivetran.FivetranLogAPI" + ) as mock_log_api: + mock_audit_log = Mock() + mock_audit_log.get_user_email.return_value = "test@example.com" + mock_audit_log.fivetran_log_database = "test_db" + mock_log_api.return_value = mock_audit_log + self.source = FivetranSource(self.config, self.ctx) + + # Mock the API client + self.mock_api_client = Mock() + self.source.api_client = self.mock_api_client + + def test_google_sheets_connector_detection(self): + """Test detection of Google Sheets connectors.""" + connector = Connector( + connector_id="test_gsheets_connector", + connector_name="Google Sheets Test", + connector_type=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE, + paused=False, + sync_frequency=360, + destination_id="test_destination", + user_id="test_user", + lineage=[], + jobs=[], + ) + + # Mock the API client + mock_api_client = Mock() + self.source.api_client = mock_api_client + + # Mock the connection details response + mock_connection_details = FivetranConnectionDetails( + id="test_gsheets_connector", + group_id="test_group", + service="google_sheets", + created_at=datetime.datetime(2025, 1, 1, 0, 0, 0), + succeeded_at=datetime.datetime(2025, 1, 1, 1, 0, 0), + paused=False, + sync_frequency=360, + status=FivetranConnectionStatus( + setup_state="connected", + schema_status="ready", + sync_state="paused", + update_state="on_schedule", + is_historical_sync=False, + warnings=[], + ), + config=FivetranConnectionConfig( + auth_type="ServiceAccount", + sheet_id="https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0", + named_range="Test_Range", + ), + source_sync_details=FivetranConnectionSourceSyncDetails( + last_synced=datetime.datetime(2025, 1, 1, 1, 0, 0) + ), + ) + + mock_api_client.get_connection_details_by_id.return_value = ( + mock_connection_details + ) + + # Test the connector workunits generation + workunits = list(self.source._get_connector_workunits(connector)) + + # Should generate Google Sheets datasets + assert len(workunits) >= 2 # At least the two Google Sheets datasets + + # Find the Google Sheets datasets + gsheets_datasets = [ + wu + for wu in workunits + if hasattr(wu, "platform") + and str(wu.platform) + == f"urn:li:dataPlatform:{Constant.GOOGLE_SHEETS_CONNECTOR_TYPE}" + ] + assert len(gsheets_datasets) == 2 + + def test_google_sheets_lineage_generation(self): + """Test lineage generation for Google Sheets connectors.""" + connector = Connector( + connector_id="test_gsheets_connector", + connector_name="Google Sheets Test", + connector_type=Constant.GOOGLE_SHEETS_CONNECTOR_TYPE, + paused=False, + sync_frequency=360, + destination_id="test_destination", + user_id="test_user", + lineage=[ + TableLineage( + source_table="source_schema.source_table", + destination_table="dest_schema.dest_table", + column_lineage=[ + ColumnLineage( + source_column="source_col", destination_column="dest_col" + ) + ], + ) + ], + jobs=[], + ) + + # Mock the API client + mock_api_client = Mock() + self.source.api_client = mock_api_client + + # Mock the connection details response + mock_connection_details = FivetranConnectionDetails( + id="test_gsheets_connector", + group_id="test_group", + service="google_sheets", + created_at=datetime.datetime(2025, 1, 1, 0, 0, 0), + succeeded_at=datetime.datetime(2025, 1, 1, 1, 0, 0), + paused=False, + sync_frequency=360, + status=FivetranConnectionStatus( + setup_state="connected", + schema_status="ready", + sync_state="paused", + update_state="on_schedule", + is_historical_sync=False, + warnings=[], + ), + config=FivetranConnectionConfig( + auth_type="ServiceAccount", + sheet_id="https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0", + named_range="Test_Range", + ), + source_sync_details=FivetranConnectionSourceSyncDetails( + last_synced=datetime.datetime(2025, 1, 1, 1, 0, 0) + ), + ) + + mock_api_client.get_connection_details_by_id.return_value = ( + mock_connection_details + ) + + # Test lineage extension + datajob = Mock() + lineage_properties = self.source._extend_lineage(connector, datajob) + + # Check that the lineage properties include Google Sheets information + assert "source.platform" in lineage_properties + assert ( + lineage_properties["source.platform"] + == Constant.GOOGLE_SHEETS_CONNECTOR_TYPE + ) + + def test_google_sheets_connector_type_constant(self): + """Test that the Google Sheets connector type constant is correct.""" + assert Constant.GOOGLE_SHEETS_CONNECTOR_TYPE == "google_sheets" diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_rest_api.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_rest_api.py new file mode 100644 index 0000000000..6f42effb75 --- /dev/null +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_rest_api.py @@ -0,0 +1,91 @@ +from unittest import TestCase +from unittest.mock import Mock, patch + +from datahub.ingestion.source.fivetran.config import FivetranAPIConfig +from datahub.ingestion.source.fivetran.fivetran_rest_api import FivetranAPIClient +from datahub.ingestion.source.fivetran.response_models import ( + FivetranConnectionDetails, +) + + +class TestFivetranAPIClient(TestCase): + """Test cases for FivetranAPIClient.""" + + def setUp(self): + """Set up test fixtures.""" + self.config = FivetranAPIConfig( + api_key="test_api_key", + api_secret="test_api_secret", + base_url="https://api.fivetran.com", + request_timeout_sec=30, + ) + self.client = FivetranAPIClient(self.config) + + def test_init(self): + """Test FivetranAPIClient initialization.""" + assert self.client.config == self.config + assert self.client._session is not None + assert self.client._session.auth == ("test_api_key", "test_api_secret") + assert self.client._session.headers["Content-Type"] == "application/json" + assert self.client._session.headers["Accept"] == "application/json" + + @patch("requests.Session.get") + def test_get_connection_details_by_id_success(self, mock_get): + """Test successful retrieval of connection details.""" + # Mock response data + mock_response_data = { + "code": "Success", + "data": { + "id": "test_connection_id", + "group_id": "test_group_id", + "service": "google_sheets", + "created_at": "2025-01-01T00:00:00Z", + "succeeded_at": "2025-01-01T01:00:00Z", + "paused": False, + "sync_frequency": 360, + "status": { + "setup_state": "connected", + "schema_status": "ready", + "sync_state": "paused", + "update_state": "on_schedule", + "is_historical_sync": False, + "warnings": [ + { + "code": "test_warning", + "message": "Test warning message", + "details": {}, + } + ], + }, + "config": { + "auth_type": "ServiceAccount", + "sheet_id": "https://docs.google.com/spreadsheets/d/1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo/edit?gid=0#gid=0", + "named_range": "Test_Range", + }, + "source_sync_details": {"last_synced": "2025-01-01T01:00:00Z"}, + }, + } + + # Mock the response + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_get.return_value = mock_response + + # Call the method + result = self.client.get_connection_details_by_id("test_connection_id") + + # Assertions + assert isinstance(result, FivetranConnectionDetails) + assert result.id == "test_connection_id" + assert result.group_id == "test_group_id" + assert result.service == "google_sheets" + assert result.paused is False + assert result.sync_frequency == 360 + assert result.config.auth_type == "ServiceAccount" + assert result.config.named_range == "Test_Range" + # assert result.config.sheet_id_from_url == "1A82PdLAE7NXLLb5JcLPKeIpKUMytXQba5Z-Ei-mbXLo" # Removed - no longer an attribute + + # Verify the API call + mock_get.assert_called_once_with( + "https://api.fivetran.com/v1/connections/test_connection_id", timeout=30 + ) diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 293763ffd0..3cb6a7a2c0 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -817,3 +817,13 @@ displayName: Informatica type: OTHERS logoUrl: "assets/platforms/informaticalogo.png" +- entityUrn: urn:li:dataPlatform:google_sheets + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: google_sheets + displayName: Google Sheets + type: OTHERS + logoUrl: "assets/platforms/google-sheets-logo.png"