feat(ingestion/fivetran): Add fivetran bigquery destination support (#9531)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Shubham Jagtap 2024-01-11 00:48:36 +05:30 committed by GitHub
parent af866eaf95
commit 0486319bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 937 additions and 123 deletions

View File

@ -221,7 +221,7 @@
"name": "fivetran",
"displayName": "Fivetran",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/fivetran/",
"recipe": "source:\n type: fivetran\n config:\n # Fivetran log connector destination server configurations\n fivetran_log_config:\n destination_platform: snowflake\n destination_config:\n # Coordinates\n account_id: snowflake_account_id\n warehouse: warehouse_name\n database: snowflake_db\n log_schema: fivetran_log_schema\n\n # Credentials\n username: ${SNOWFLAKE_USER}\n password: ${SNOWFLAKE_PASS}\n role: snowflake_role\n\n # Optional - filter for certain connector names instead of ingesting everything.\n # connector_patterns:\n # allow:\n # - connector_name\n\n # Optional -- This mapping is optional and only required to configure platform-instance for source\n # A mapping of Fivetran connector id to data platform instance\n # sources_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV\n\n # Optional -- This mapping is optional and only required to configure platform-instance for destination.\n # A mapping of Fivetran destination id to data platform instance\n # destination_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV"
"recipe": "source:\n type: fivetran\n config:\n # Fivetran log connector destination server configurations\n fivetran_log_config:\n destination_platform: snowflake\n snowflake_destination_config:\n # Coordinates\n account_id: snowflake_account_id\n warehouse: warehouse_name\n database: snowflake_db\n log_schema: fivetran_log_schema\n\n # Credentials\n username: ${SNOWFLAKE_USER}\n password: ${SNOWFLAKE_PASS}\n role: snowflake_role\n\n # Optional - filter for certain connector names instead of ingesting everything.\n # connector_patterns:\n # allow:\n # - connector_name\n\n # Optional -- This mapping is optional and only required to configure platform-instance for source\n # A mapping of Fivetran connector id to data platform instance\n # sources_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV\n\n # Optional -- This mapping is optional and only required to configure platform-instance for destination.\n # A mapping of Fivetran destination id to data platform instance\n # destination_to_platform_instance:\n # calendar_elected:\n # platform_instance: cloud_postgres_instance\n # env: DEV"
},
{
"urn": "urn:li:dataPlatform:csv-enricher",

View File

@ -26,7 +26,10 @@ Source and destination are mapped to Dataset as an Input and Output of Connector
## Current limitations
Works only for Snowflake destination for now.
Works only for
- Snowflake destination
- Bigquery destination
## Snowflake destination Configuration Guide
1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
@ -49,6 +52,10 @@ grant select on all tables in SCHEMA "<fivetran-log-database>"."<fivetran-log-sc
grant role fivetran_datahub to user snowflake_user;
```
## Bigquery destination Configuration Guide
1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles.
2. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config.
## Advanced Configurations
### Working with Platform Instances

View File

@ -4,7 +4,8 @@ source:
# Fivetran log connector destination server configurations
fivetran_log_config:
destination_platform: snowflake
destination_config:
# Optional - If destination platform is 'snowflake', provide snowflake configuration.
snowflake_destination_config:
# Coordinates
account_id: "abc48144"
warehouse: "COMPUTE_WH"
@ -15,6 +16,16 @@ source:
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "snowflake_role"
# Optional - If destination platform is 'bigquery', provide bigquery configuration.
bigquery_destination_config:
# Credentials
credential:
private_key_id: "project_key_id"
project_id: "project_id"
client_email: "client_email"
client_id: "client_id"
private_key: "private_key"
dataset: "fivetran_log_dataset"
# Optional - filter for certain connector names instead of ingesting everything.
# connector_patterns:

View File

@ -160,6 +160,7 @@ bigquery_common = {
"google-cloud-logging<=3.5.0",
"google-cloud-bigquery",
"more-itertools>=8.12.0",
"sqlalchemy-bigquery>=1.4.1",
}
clickhouse_common = {
@ -294,7 +295,6 @@ plugins: Dict[str, Set[str]] = {
| bigquery_common
| {
*sqlglot_lib,
"sqlalchemy-bigquery>=1.4.1",
"google-cloud-datacatalog-lineage==0.2.2",
},
"clickhouse": sql_common | clickhouse_common,
@ -396,7 +396,7 @@ plugins: Dict[str, Set[str]] = {
"unity-catalog": databricks | sql_common | sqllineage_lib,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,
"fivetran": snowflake_common,
"fivetran": snowflake_common | bigquery_common,
}
# This is mainly used to exclude plugins from the Docker image.

View File

@ -80,6 +80,14 @@ class BigQueryConnectionConfig(ConfigModel):
else:
return GCPLoggingClient(**client_options)
def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
class BigQueryV2Config(
BigQueryConnectionConfig,
@ -356,14 +364,6 @@ class BigQueryV2Config(
def get_table_pattern(self, pattern: List[str]) -> str:
return "|".join(pattern) if pattern else ""
def get_sql_alchemy_url(self) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
platform_instance_not_supported_for_bigquery = pydantic_removed_field(
"platform_instance"
)

View File

@ -4,9 +4,14 @@ from typing import Dict, List, Optional
import pydantic
from pydantic import Field, root_validator
from typing_extensions import Literal
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
@ -60,29 +65,45 @@ KNOWN_DATA_PLATFORM_MAPPING = {
}
class DestinationConfig(BaseSnowflakeConfig):
class SnowflakeDestinationConfig(BaseSnowflakeConfig):
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")
class BigQueryDestinationConfig(BigQueryConnectionConfig):
dataset: str = Field(description="The fivetran connector log dataset.")
class FivetranLogConfig(ConfigModel):
destination_platform: str = pydantic.Field(
destination_platform: Literal["snowflake", "bigquery"] = pydantic.Field(
default="snowflake",
description="The destination platform where fivetran connector log tables are dumped.",
)
destination_config: Optional[DestinationConfig] = pydantic.Field(
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field(
default=None,
description="If destination platform is 'snowflake', provide snowflake configuration.",
)
bigquery_destination_config: Optional[BigQueryDestinationConfig] = pydantic.Field(
default=None,
description="If destination platform is 'bigquery', provide bigquery configuration.",
)
_rename_destination_config = pydantic_renamed_field(
"destination_config", "snowflake_destination_config"
)
@root_validator(pre=True)
def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
destination_platform = values["destination_platform"]
if destination_platform == "snowflake":
if "destination_config" not in values:
if "snowflake_destination_config" not in values:
raise ValueError(
"If destination platform is 'snowflake', user must provide snowflake destination configuration in the recipe."
)
elif destination_platform == "bigquery":
if "bigquery_destination_config" not in values:
raise ValueError(
"If destination platform is 'bigquery', user must provide bigquery destination configuration in the recipe."
)
else:
raise ValueError(
f"Destination platform '{destination_platform}' is not yet supported."

View File

@ -119,15 +119,13 @@ class FivetranSource(StatefulIngestionSourceBase):
)
input_dataset_urn_list.append(input_dataset_urn)
output_dataset_urn: Optional[DatasetUrn] = None
if self.audit_log.fivetran_log_database:
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)
output_dataset_urn = DatasetUrn.create_from_ids(
platform_id=self.config.fivetran_log_config.destination_platform,
table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}",
env=destination_platform_detail.env,
platform_instance=destination_platform_detail.platform_instance,
)
output_dataset_urn_list.append(output_dataset_urn)
if self.config.include_column_lineage:
for column_lineage in table_lineage.column_lineage:
@ -282,11 +280,10 @@ class FivetranSource(StatefulIngestionSourceBase):
Datahub Ingestion framework invoke this method
"""
logger.info("Fivetran plugin execution is started")
connectors = self.audit_log.get_connectors_list()
connectors = self.audit_log.get_allowed_connectors_list(
self.config.connector_patterns, self.report
)
for connector in connectors:
if not self.config.connector_patterns.allowed(connector.connector_name):
self.report.report_connectors_dropped(connector.connector_name)
continue
logger.info(f"Processing connector id: {connector.connector_id}")
yield from self._get_connector_workunits(connector)

View File

@ -1,10 +1,15 @@
import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from sqlalchemy import create_engine
from datahub.ingestion.source.fivetran.config import Constant, FivetranLogConfig
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.ingestion.source.fivetran.config import (
Constant,
FivetranLogConfig,
FivetranSourceReport,
)
from datahub.ingestion.source.fivetran.data_classes import (
ColumnLineage,
Connector,
@ -18,30 +23,57 @@ logger: logging.Logger = logging.getLogger(__name__)
class FivetranLogAPI:
def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
self.fivetran_log_database: Optional[str] = None
self.fivetran_log_config = fivetran_log_config
self.engine = self._get_log_destination_engine()
(
self.engine,
self.fivetran_log_query,
self.fivetran_log_database,
) = self._initialize_fivetran_variables()
def _get_log_destination_engine(self) -> Any:
def _initialize_fivetran_variables(
self,
) -> Tuple[Any, FivetranLogQuery, str]:
fivetran_log_query = FivetranLogQuery()
destination_platform = self.fivetran_log_config.destination_platform
engine = None
# For every destination, create sqlalchemy engine,
# select the database and schema and set fivetran_log_database class variable
# set db_clause to generate select queries and set fivetran_log_database class variable
if destination_platform == "snowflake":
snowflake_destination_config = self.fivetran_log_config.destination_config
snowflake_destination_config = (
self.fivetran_log_config.snowflake_destination_config
)
if snowflake_destination_config is not None:
engine = create_engine(
snowflake_destination_config.get_sql_alchemy_url(),
**snowflake_destination_config.get_options(),
)
engine.execute(
FivetranLogQuery.use_schema(
fivetran_log_query.use_database(
snowflake_destination_config.database,
snowflake_destination_config.log_schema,
)
)
self.fivetran_log_database = snowflake_destination_config.database
return engine
fivetran_log_query.set_db(
snowflake_destination_config.log_schema,
)
fivetran_log_database = snowflake_destination_config.database
elif destination_platform == "bigquery":
bigquery_destination_config = (
self.fivetran_log_config.bigquery_destination_config
)
if bigquery_destination_config is not None:
engine = create_engine(
bigquery_destination_config.get_sql_alchemy_url(),
)
fivetran_log_query.set_db(bigquery_destination_config.dataset)
fivetran_log_database = bigquery_destination_config.dataset
else:
raise ConfigurationError(
f"Destination platform '{destination_platform}' is not yet supported."
)
return (
engine,
fivetran_log_query,
fivetran_log_database,
)
def _query(self, query: str) -> List[Dict]:
logger.debug("Query : {}".format(query))
@ -50,12 +82,12 @@ class FivetranLogAPI:
def _get_table_lineage(self, connector_id: str) -> List[TableLineage]:
table_lineage_result = self._query(
FivetranLogQuery.get_table_lineage_query(connector_id=connector_id)
self.fivetran_log_query.get_table_lineage_query(connector_id=connector_id)
)
table_lineage_list: List[TableLineage] = []
for table_lineage in table_lineage_result:
column_lineage_result = self._query(
FivetranLogQuery.get_column_lineage_query(
self.fivetran_log_query.get_column_lineage_query(
source_table_id=table_lineage[Constant.SOURCE_TABLE_ID],
destination_table_id=table_lineage[Constant.DESTINATION_TABLE_ID],
)
@ -82,13 +114,17 @@ class FivetranLogAPI:
sync_start_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_start_logs_query(connector_id=connector_id)
self.fivetran_log_query.get_sync_start_logs_query(
connector_id=connector_id
)
)
}
sync_end_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_end_logs_query(connector_id=connector_id)
self.fivetran_log_query.get_sync_end_logs_query(
connector_id=connector_id
)
)
}
for sync_id in sync_start_logs.keys():
@ -120,15 +156,22 @@ class FivetranLogAPI:
def _get_user_name(self, user_id: Optional[str]) -> Optional[str]:
if not user_id:
return None
user_details = self._query(FivetranLogQuery.get_user_query(user_id=user_id))[0]
user_details = self._query(
self.fivetran_log_query.get_user_query(user_id=user_id)
)[0]
return (
f"{user_details[Constant.GIVEN_NAME]} {user_details[Constant.FAMILY_NAME]}"
)
def get_connectors_list(self) -> List[Connector]:
def get_allowed_connectors_list(
self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport
) -> List[Connector]:
connectors: List[Connector] = []
connector_list = self._query(FivetranLogQuery.get_connectors_query())
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],

View File

@ -1,76 +1,74 @@
class FivetranLogQuery:
@staticmethod
def use_schema(db_name: str, schema_name: str) -> str:
return f'use schema "{db_name}"."{schema_name}"'
def __init__(self) -> None:
# Select query db clause
self.db_clause: str = ""
@staticmethod
def get_connectors_query() -> str:
return """
SELECT connector_id as "CONNECTOR_ID",
connecting_user_id as "CONNECTING_USER_ID",
connector_type_id as "CONNECTOR_TYPE_ID",
connector_name as "CONNECTOR_NAME",
paused as "PAUSED",
sync_frequency as "SYNC_FREQUENCY",
destination_id as "DESTINATION_ID"
FROM CONNECTOR
def set_db(self, db_name: str) -> None:
self.db_clause = f"{db_name}."
def use_database(self, db_name: str) -> str:
return f"use database {db_name}"
def get_connectors_query(self) -> str:
return f"""
SELECT connector_id,
connecting_user_id,
connector_type_id,
connector_name,
paused,
sync_frequency,
destination_id
FROM {self.db_clause}connector
WHERE _fivetran_deleted = FALSE"""
@staticmethod
def get_user_query(user_id: str) -> str:
def get_user_query(self, user_id: str) -> str:
return f"""
SELECT id as "USER_ID",
given_name as "GIVEN_NAME",
family_name as "FAMILY_NAME"
FROM USER
SELECT id as user_id,
given_name,
family_name
FROM {self.db_clause}user
WHERE id = '{user_id}'"""
@staticmethod
def get_sync_start_logs_query(
connector_id: str,
) -> str:
def get_sync_start_logs_query(self, connector_id: str) -> str:
return f"""
SELECT time_stamp as "TIME_STAMP",
sync_id as "SYNC_ID"
FROM LOG
SELECT time_stamp,
sync_id
FROM {self.db_clause}log
WHERE message_event = 'sync_start'
and connector_id = '{connector_id}' order by time_stamp"""
@staticmethod
def get_sync_end_logs_query(connector_id: str) -> str:
def get_sync_end_logs_query(self, connector_id: str) -> str:
return f"""
SELECT time_stamp as "TIME_STAMP",
sync_id as "SYNC_ID",
message_data as "MESSAGE_DATA"
FROM LOG
SELECT time_stamp,
sync_id,
message_data
FROM {self.db_clause}log
WHERE message_event = 'sync_end'
and connector_id = '{connector_id}' order by time_stamp"""
@staticmethod
def get_table_lineage_query(connector_id: str) -> str:
def get_table_lineage_query(self, connector_id: str) -> str:
return f"""
SELECT stm.id as "SOURCE_TABLE_ID",
stm.name as "SOURCE_TABLE_NAME",
ssm.name as "SOURCE_SCHEMA_NAME",
dtm.id as "DESTINATION_TABLE_ID",
dtm.name as "DESTINATION_TABLE_NAME",
dsm.name as "DESTINATION_SCHEMA_NAME"
FROM table_lineage as tl
JOIN source_table_metadata as stm on tl.source_table_id = stm.id
JOIN destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN destination_schema_metadata as dsm on dtm.schema_id = dsm.id
SELECT stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id = '{connector_id}'"""
@staticmethod
def get_column_lineage_query(
source_table_id: str, destination_table_id: str
self, source_table_id: str, destination_table_id: str
) -> str:
return f"""
SELECT scm.name as "SOURCE_COLUMN_NAME",
dcm.name as "DESTINATION_COLUMN_NAME"
FROM column_lineage as cl
JOIN source_column_metadata as scm on
SELECT scm.name as source_column_name,
dcm.name as destination_column_name
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm on
(cl.source_column_id = scm.id and scm.table_id = {source_table_id})
JOIN destination_column_metadata as dcm on
JOIN {self.db_clause}destination_column_metadata as dcm on
(cl.destination_column_id = dcm.id and dcm.table_id = {destination_table_id})"""

View File

@ -0,0 +1,626 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "postgres"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"paused": "False",
"sync_frequency": "1440",
"destination_id": "'interval_unconstitutional'"
},
"name": "postgres",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)"
],
"inputDatajobs": [],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD),id)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV),name)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD),name)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD),id)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),name)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD),name)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:Shubham Jagtap",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "4c9a03d6-eded-4422-a46a-163266e58243",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1695191853000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"upstreamInstances": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191853000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "STARTED"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1695191885000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "f773d1e9-c791-48f4-894f-8cf9b3dfc834",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1696343730000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"upstreamInstances": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696343730000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "STARTED"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696343732000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "COMPLETE",
"result": {
"type": "SKIPPED",
"nativeResultType": "fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "63c2fc85-600b-455f-9ba0-f576522465be",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1696343755000,
"actor": "urn:li:corpuser:datahub"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"upstreamInstances": []
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696343755000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "STARTED"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696343790000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "COMPLETE",
"result": {
"type": "FAILURE",
"nativeResultType": "fivetran"
}
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -5,18 +5,26 @@ from unittest.mock import MagicMock
import pytest
from freezegun import freeze_time
from datahub.configuration.common import ConfigurationWarning
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.fivetran.config import DestinationConfig
from datahub.ingestion.source.fivetran.config import (
BigQueryDestinationConfig,
FivetranSourceConfig,
SnowflakeDestinationConfig,
)
from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery
from datahub.ingestion.source_config.usage.bigquery_usage import BigQueryCredential
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2022-06-07 17:00:00"
def default_query_results(query):
if query == FivetranLogQuery.use_schema("TEST_DATABASE", "TEST_SCHEMA"):
fivetran_log_query = FivetranLogQuery()
fivetran_log_query.set_db("test")
if query == fivetran_log_query.use_database("test_database"):
return []
elif query == FivetranLogQuery.get_connectors_query():
elif query == fivetran_log_query.get_connectors_query():
return [
{
"connector_id": "calendar_elected",
@ -28,7 +36,7 @@ def default_query_results(query):
"destination_id": "interval_unconstitutional",
},
]
elif query == FivetranLogQuery.get_table_lineage_query("calendar_elected"):
elif query == fivetran_log_query.get_table_lineage_query("calendar_elected"):
return [
{
"source_table_id": "10040",
@ -47,9 +55,9 @@ def default_query_results(query):
"destination_schema_name": "postgres_public",
},
]
elif query == FivetranLogQuery.get_column_lineage_query(
elif query == fivetran_log_query.get_column_lineage_query(
"10040", "7779"
) or query == FivetranLogQuery.get_column_lineage_query("10041", "7780"):
) or query == fivetran_log_query.get_column_lineage_query("10041", "7780"):
return [
{
"source_column_name": "id",
@ -60,7 +68,7 @@ def default_query_results(query):
"destination_column_name": "name",
},
]
elif query == FivetranLogQuery.get_user_query("reapply_phone"):
elif query == fivetran_log_query.get_user_query("reapply_phone"):
return [
{
"user_id": "reapply_phone",
@ -68,7 +76,7 @@ def default_query_results(query):
"family_name": "Jagtap",
}
]
elif query == FivetranLogQuery.get_sync_start_logs_query("calendar_elected"):
elif query == fivetran_log_query.get_sync_start_logs_query("calendar_elected"):
return [
{
"time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
@ -83,7 +91,7 @@ def default_query_results(query):
"sync_id": "63c2fc85-600b-455f-9ba0-f576522465be",
},
]
elif query == FivetranLogQuery.get_sync_end_logs_query("calendar_elected"):
elif query == fivetran_log_query.get_sync_end_logs_query("calendar_elected"):
return [
{
"time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
@ -107,12 +115,12 @@ def default_query_results(query):
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_fivetran_basic(pytestconfig, tmp_path):
def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/fivetran"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "fivetran_test_events.json"
golden_file = test_resources_dir / "fivetran_golden.json"
golden_file = test_resources_dir / "fivetran_snowflake_golden.json"
with mock.patch(
"datahub.ingestion.source.fivetran.fivetran_log_api.create_engine"
@ -130,14 +138,14 @@ def test_fivetran_basic(pytestconfig, tmp_path):
"config": {
"fivetran_log_config": {
"destination_platform": "snowflake",
"destination_config": {
"account_id": "TESTID",
"warehouse": "TEST_WH",
"snowflake_destination_config": {
"account_id": "testid",
"warehouse": "test_wh",
"username": "test",
"password": "test@123",
"database": "TEST_DATABASE",
"role": "TESTROLE",
"log_schema": "TEST_SCHEMA",
"database": "test_database",
"role": "testrole",
"log_schema": "test",
},
},
"connector_patterns": {
@ -166,18 +174,87 @@ def test_fivetran_basic(pytestconfig, tmp_path):
pipeline.run()
pipeline.raise_from_status()
golden_file = "fivetran_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{output_file}",
golden_path=f"{test_resources_dir}/{golden_file}",
golden_path=f"{golden_file}",
)
@freeze_time(FROZEN_TIME)
def test_fivetran_snowflake_destination_config(pytestconfig, tmp_path):
snowflake_dest = DestinationConfig(
@pytest.mark.integration
def test_fivetran_with_bigquery_dest(pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/fivetran"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "fivetran_test_events.json"
golden_file = test_resources_dir / "fivetran_bigquery_golden.json"
with mock.patch(
"datahub.ingestion.source.fivetran.fivetran_log_api.create_engine"
) as mock_create_engine:
connection_magic_mock = MagicMock()
connection_magic_mock.execute.side_effect = default_query_results
mock_create_engine.return_value = connection_magic_mock
pipeline = Pipeline.create(
{
"run_id": "powerbi-test",
"source": {
"type": "fivetran",
"config": {
"fivetran_log_config": {
"destination_platform": "bigquery",
"bigquery_destination_config": {
"credential": {
"private_key_id": "testprivatekey",
"project_id": "test-project",
"client_email": "fivetran-connector@test-project.iam.gserviceaccount.com",
"client_id": "1234567",
"private_key": "private-key",
},
"dataset": "test",
},
},
"connector_patterns": {
"allow": [
"postgres",
]
},
"sources_to_database": {
"calendar_elected": "postgres_db",
},
"sources_to_platform_instance": {
"calendar_elected": {
"env": "DEV",
}
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{output_file}",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{output_file}",
golden_path=f"{golden_file}",
)
@freeze_time(FROZEN_TIME)
def test_fivetran_snowflake_destination_config():
snowflake_dest = SnowflakeDestinationConfig(
account_id="TESTID",
warehouse="TEST_WH",
username="test",
@ -190,3 +267,37 @@ def test_fivetran_snowflake_destination_config(pytestconfig, tmp_path):
snowflake_dest.get_sql_alchemy_url()
== "snowflake://test:test%40123@TESTID?application=acryl_datahub&authenticator=SNOWFLAKE&role=TESTROLE&warehouse=TEST_WH"
)
@freeze_time(FROZEN_TIME)
def test_fivetran_bigquery_destination_config():
bigquery_dest = BigQueryDestinationConfig(
credential=BigQueryCredential(
private_key_id="testprivatekey",
project_id="test-project",
client_email="fivetran-connector@test-project.iam.gserviceaccount.com",
client_id="1234567",
private_key="private-key",
),
dataset="test_dataset",
)
assert bigquery_dest.get_sql_alchemy_url() == "bigquery://"
@freeze_time(FROZEN_TIME)
def test_rename_destination_config():
config_dict = {
"fivetran_log_config": {
"destination_platform": "snowflake",
"destination_config": {
"account_id": "testid",
"database": "test_database",
"log_schema": "test",
},
},
}
with pytest.warns(
ConfigurationWarning,
match="destination_config is deprecated, please use snowflake_destination_config instead.",
):
FivetranSourceConfig.parse_obj(config_dict)