diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index 1f658bda434..1cb68ba1aad 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -135,6 +135,10 @@ "description": "Regex exclude tables or databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, + "databaseFilterPattern": { + "description": "Regex to only fetch databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, "dbtConfigSource": { "title": "DBT Configuration Source", "description": "Available sources to fetch DBT catalog and manifest files.", diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index e7b4144a612..c1acd1bca2e 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -32,6 +32,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException, SourceStatus from metadata.ingestion.source.sql_source import SQLSource from metadata.utils.connections import get_connection +from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger TableKey = namedtuple("TableKey", ["schema", "table_name"]) @@ -64,6 +65,11 @@ class PostgresSource(SQLSource): for res in results: row = list(res) try: + if filter_by_database( + self.source_config.databaseFilterPattern, database_name=row[0] + ): + self.status.filter(row[0], "Database pattern not allowed") + continue logger.info(f"Ingesting from database: {row[0]}") self.service_connection.database = row[0] self.engine = get_connection( diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index b10a30fe6bc..36d92dbd0d1 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -160,3 +160,18 @@ def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool :return: True for filtering, False otherwise """ return _filter(fqn_filter_pattern, fqn) + + +def filter_by_database( + database_filter_pattern: Optional[FilterPattern], database_name: str +) -> bool: + """ + Return True if the schema needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param database_filter_pattern: Model defining database filtering logic + :param database_name: table database name + :return: True for filtering, False otherwise + """ + return _filter(database_filter_pattern, database_name) diff --git a/openmetadata-airflow-apis/examples/deploy_dag.json b/openmetadata-airflow-apis/examples/deploy_dag.json index 0702b4ef0d1..376c1f2f05b 100644 --- a/openmetadata-airflow-apis/examples/deploy_dag.json +++ b/openmetadata-airflow-apis/examples/deploy_dag.json @@ -29,6 +29,7 @@ "sampleDataQuery": "select * from {}.{} limit 50", "enableDataProfiler": false, "schemaFilterPattern": null, + "databaseFilterPattern":null, "tableFilterPattern": null, "dbtCatalogFilePath": null, "dbtManifestFilePath": null