diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index f8b1c6dd93..02eb096b24 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -161,6 +161,10 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default=AllowDenyPattern.allow_all(), description="Regex patterns for connectors to filter in ingestion.", ) + destination_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for destinations to filter in ingestion.", + ) include_column_lineage: bool = Field( default=True, description="Populates table->table column lineage.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c899fe04d2..b459b47deb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -283,6 +283,7 @@ class FivetranSource(StatefulIngestionSourceBase): logger.info("Fivetran plugin execution is started") connectors = self.audit_log.get_allowed_connectors_list( self.config.connector_patterns, + self.config.destination_patterns, self.report, self.config.history_sync_lookback_period, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index d8ce68e834..31c1613906 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -251,6 +251,7 @@ class FivetranLogAPI: def get_allowed_connectors_list( self, connector_patterns: AllowDenyPattern, + destination_patterns: AllowDenyPattern, report: FivetranSourceReport, syncs_interval: int, ) -> List[Connector]: @@ -261,6 +262,9 @@ class FivetranLogAPI: if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) continue + if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]): + report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + continue connectors.append( Connector( connector_id=connector[Constant.CONNECTOR_ID], diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 5e0e20234c..0f5d098ee3 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -205,6 +205,11 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", }, @@ -291,6 +296,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", },