diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index fc046f2bc7..56774b7f33 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -54,7 +54,7 @@ class FivetranLogAPI: snowflake_destination_config.database, ) ) - fivetran_log_query.set_db( + fivetran_log_query.set_schema( snowflake_destination_config.log_schema, ) fivetran_log_database = snowflake_destination_config.database @@ -66,8 +66,12 @@ class FivetranLogAPI: engine = create_engine( bigquery_destination_config.get_sql_alchemy_url(), ) - fivetran_log_query.set_db(bigquery_destination_config.dataset) - fivetran_log_database = bigquery_destination_config.dataset + fivetran_log_query.set_schema(bigquery_destination_config.dataset) + + # The "database" should be the BigQuery project name. + fivetran_log_database = engine.execute( + "SELECT @@project_id" + ).fetchone()[0] else: raise ConfigurationError( f"Destination platform '{destination_platform}' is not yet supported." diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 65378928b2..48e7da9ddd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -12,14 +12,14 @@ class FivetranLogQuery: def __init__(self) -> None: # Select query db clause - self.db_clause: str = "" - - def set_db(self, db_name: str) -> None: - self.db_clause = f"{db_name}." + self.schema_clause: str = "" def use_database(self, db_name: str) -> str: return f"use database {db_name}" + def set_schema(self, schema_name: str) -> None: + self.schema_clause = f"{schema_name}." + def get_connectors_query(self) -> str: return f"""\ SELECT @@ -30,7 +30,7 @@ SELECT paused, sync_frequency, destination_id -FROM {self.db_clause}connector +FROM {self.schema_clause}connector WHERE _fivetran_deleted = FALSE QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1 @@ -42,7 +42,7 @@ SELECT id as user_id, given_name, family_name, email -FROM {self.db_clause}user +FROM {self.schema_clause}user """ def get_sync_logs_query( @@ -62,7 +62,7 @@ WITH ranked_syncs AS ( MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn - FROM {self.db_clause}log + FROM {self.schema_clause}log WHERE message_event in ('sync_start', 'sync_end') AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' AND connector_id IN ({formatted_connector_ids}) @@ -99,11 +99,11 @@ FROM ( dsm.name as destination_schema_name, tl.created_at as created_at, ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn - FROM {self.db_clause}table_lineage as tl - JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id - JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id - JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id - JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id + FROM {self.schema_clause}table_lineage as tl + JOIN {self.schema_clause}source_table_metadata as stm on tl.source_table_id = stm.id + JOIN {self.schema_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id + JOIN {self.schema_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id + JOIN {self.schema_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id WHERE stm.connector_id IN ({formatted_connector_ids}) ) -- Ensure that we only get back one entry per source and destination pair. @@ -131,13 +131,13 @@ FROM ( dcm.name as destination_column_name, cl.created_at as created_at, ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn - FROM {self.db_clause}column_lineage as cl - JOIN {self.db_clause}source_column_metadata as scm + FROM {self.schema_clause}column_lineage as cl + JOIN {self.schema_clause}source_column_metadata as scm ON cl.source_column_id = scm.id - JOIN {self.db_clause}destination_column_metadata as dcm + JOIN {self.schema_clause}destination_column_metadata as dcm ON cl.destination_column_id = dcm.id -- Only joining source_table_metadata to get the connector_id. - JOIN {self.db_clause}source_table_metadata as stm + JOIN {self.schema_clause}source_table_metadata as stm ON scm.table_id = stm.id WHERE stm.connector_id IN ({formatted_connector_ids}) ) diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index d4170f671f..9b0b6ffa83 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -48,7 +48,7 @@ def default_query_results( query, connector_query_results=default_connector_query_results ): fivetran_log_query = FivetranLogQuery() - fivetran_log_query.set_db("test") + fivetran_log_query.set_schema("test") if query == fivetran_log_query.use_database("test_database"): return [] elif query == fivetran_log_query.get_connectors_query():