mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-11 00:42:29 +00:00
fix(ingest/fivetran): use project id by default for bigquery (#13250)
This commit is contained in:
parent
61a71de360
commit
5ba8b7d173
@ -54,7 +54,7 @@ class FivetranLogAPI:
|
|||||||
snowflake_destination_config.database,
|
snowflake_destination_config.database,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
fivetran_log_query.set_db(
|
fivetran_log_query.set_schema(
|
||||||
snowflake_destination_config.log_schema,
|
snowflake_destination_config.log_schema,
|
||||||
)
|
)
|
||||||
fivetran_log_database = snowflake_destination_config.database
|
fivetran_log_database = snowflake_destination_config.database
|
||||||
@ -66,8 +66,12 @@ class FivetranLogAPI:
|
|||||||
engine = create_engine(
|
engine = create_engine(
|
||||||
bigquery_destination_config.get_sql_alchemy_url(),
|
bigquery_destination_config.get_sql_alchemy_url(),
|
||||||
)
|
)
|
||||||
fivetran_log_query.set_db(bigquery_destination_config.dataset)
|
fivetran_log_query.set_schema(bigquery_destination_config.dataset)
|
||||||
fivetran_log_database = bigquery_destination_config.dataset
|
|
||||||
|
# The "database" should be the BigQuery project name.
|
||||||
|
fivetran_log_database = engine.execute(
|
||||||
|
"SELECT @@project_id"
|
||||||
|
).fetchone()[0]
|
||||||
else:
|
else:
|
||||||
raise ConfigurationError(
|
raise ConfigurationError(
|
||||||
f"Destination platform '{destination_platform}' is not yet supported."
|
f"Destination platform '{destination_platform}' is not yet supported."
|
||||||
|
|||||||
@ -12,14 +12,14 @@ class FivetranLogQuery:
|
|||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
# Select query db clause
|
# Select query db clause
|
||||||
self.db_clause: str = ""
|
self.schema_clause: str = ""
|
||||||
|
|
||||||
def set_db(self, db_name: str) -> None:
|
|
||||||
self.db_clause = f"{db_name}."
|
|
||||||
|
|
||||||
def use_database(self, db_name: str) -> str:
|
def use_database(self, db_name: str) -> str:
|
||||||
return f"use database {db_name}"
|
return f"use database {db_name}"
|
||||||
|
|
||||||
|
def set_schema(self, schema_name: str) -> None:
|
||||||
|
self.schema_clause = f"{schema_name}."
|
||||||
|
|
||||||
def get_connectors_query(self) -> str:
|
def get_connectors_query(self) -> str:
|
||||||
return f"""\
|
return f"""\
|
||||||
SELECT
|
SELECT
|
||||||
@ -30,7 +30,7 @@ SELECT
|
|||||||
paused,
|
paused,
|
||||||
sync_frequency,
|
sync_frequency,
|
||||||
destination_id
|
destination_id
|
||||||
FROM {self.db_clause}connector
|
FROM {self.schema_clause}connector
|
||||||
WHERE
|
WHERE
|
||||||
_fivetran_deleted = FALSE
|
_fivetran_deleted = FALSE
|
||||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1
|
||||||
@ -42,7 +42,7 @@ SELECT id as user_id,
|
|||||||
given_name,
|
given_name,
|
||||||
family_name,
|
family_name,
|
||||||
email
|
email
|
||||||
FROM {self.db_clause}user
|
FROM {self.schema_clause}user
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_sync_logs_query(
|
def get_sync_logs_query(
|
||||||
@ -62,7 +62,7 @@ WITH ranked_syncs AS (
|
|||||||
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time,
|
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time,
|
||||||
MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data,
|
MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data,
|
||||||
ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn
|
ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn
|
||||||
FROM {self.db_clause}log
|
FROM {self.schema_clause}log
|
||||||
WHERE message_event in ('sync_start', 'sync_end')
|
WHERE message_event in ('sync_start', 'sync_end')
|
||||||
AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'
|
AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'
|
||||||
AND connector_id IN ({formatted_connector_ids})
|
AND connector_id IN ({formatted_connector_ids})
|
||||||
@ -99,11 +99,11 @@ FROM (
|
|||||||
dsm.name as destination_schema_name,
|
dsm.name as destination_schema_name,
|
||||||
tl.created_at as created_at,
|
tl.created_at as created_at,
|
||||||
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn
|
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn
|
||||||
FROM {self.db_clause}table_lineage as tl
|
FROM {self.schema_clause}table_lineage as tl
|
||||||
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
|
JOIN {self.schema_clause}source_table_metadata as stm on tl.source_table_id = stm.id
|
||||||
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
|
JOIN {self.schema_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
|
||||||
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
|
JOIN {self.schema_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
|
||||||
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
|
JOIN {self.schema_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
|
||||||
WHERE stm.connector_id IN ({formatted_connector_ids})
|
WHERE stm.connector_id IN ({formatted_connector_ids})
|
||||||
)
|
)
|
||||||
-- Ensure that we only get back one entry per source and destination pair.
|
-- Ensure that we only get back one entry per source and destination pair.
|
||||||
@ -131,13 +131,13 @@ FROM (
|
|||||||
dcm.name as destination_column_name,
|
dcm.name as destination_column_name,
|
||||||
cl.created_at as created_at,
|
cl.created_at as created_at,
|
||||||
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn
|
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn
|
||||||
FROM {self.db_clause}column_lineage as cl
|
FROM {self.schema_clause}column_lineage as cl
|
||||||
JOIN {self.db_clause}source_column_metadata as scm
|
JOIN {self.schema_clause}source_column_metadata as scm
|
||||||
ON cl.source_column_id = scm.id
|
ON cl.source_column_id = scm.id
|
||||||
JOIN {self.db_clause}destination_column_metadata as dcm
|
JOIN {self.schema_clause}destination_column_metadata as dcm
|
||||||
ON cl.destination_column_id = dcm.id
|
ON cl.destination_column_id = dcm.id
|
||||||
-- Only joining source_table_metadata to get the connector_id.
|
-- Only joining source_table_metadata to get the connector_id.
|
||||||
JOIN {self.db_clause}source_table_metadata as stm
|
JOIN {self.schema_clause}source_table_metadata as stm
|
||||||
ON scm.table_id = stm.id
|
ON scm.table_id = stm.id
|
||||||
WHERE stm.connector_id IN ({formatted_connector_ids})
|
WHERE stm.connector_id IN ({formatted_connector_ids})
|
||||||
)
|
)
|
||||||
|
|||||||
@ -48,7 +48,7 @@ def default_query_results(
|
|||||||
query, connector_query_results=default_connector_query_results
|
query, connector_query_results=default_connector_query_results
|
||||||
):
|
):
|
||||||
fivetran_log_query = FivetranLogQuery()
|
fivetran_log_query = FivetranLogQuery()
|
||||||
fivetran_log_query.set_db("test")
|
fivetran_log_query.set_schema("test")
|
||||||
if query == fivetran_log_query.use_database("test_database"):
|
if query == fivetran_log_query.use_database("test_database"):
|
||||||
return []
|
return []
|
||||||
elif query == fivetran_log_query.get_connectors_query():
|
elif query == fivetran_log_query.get_connectors_query():
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user