mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-12 08:52:38 +00:00
postgres-selecting-multiple-databases-per-service (#4736)
This commit is contained in:
parent
2b1fca377d
commit
ee1dda5366
@ -135,6 +135,10 @@
|
|||||||
"description": "Regex exclude tables or databases that matches the pattern.",
|
"description": "Regex exclude tables or databases that matches the pattern.",
|
||||||
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
|
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
|
||||||
},
|
},
|
||||||
|
"databaseFilterPattern": {
|
||||||
|
"description": "Regex to only fetch databases that matches the pattern.",
|
||||||
|
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
|
||||||
|
},
|
||||||
"dbtConfigSource": {
|
"dbtConfigSource": {
|
||||||
"title": "DBT Configuration Source",
|
"title": "DBT Configuration Source",
|
||||||
"description": "Available sources to fetch DBT catalog and manifest files.",
|
"description": "Available sources to fetch DBT catalog and manifest files.",
|
||||||
|
|||||||
@ -32,6 +32,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
|
|||||||
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
|
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
|
||||||
from metadata.ingestion.source.sql_source import SQLSource
|
from metadata.ingestion.source.sql_source import SQLSource
|
||||||
from metadata.utils.connections import get_connection
|
from metadata.utils.connections import get_connection
|
||||||
|
from metadata.utils.filters import filter_by_database
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
TableKey = namedtuple("TableKey", ["schema", "table_name"])
|
TableKey = namedtuple("TableKey", ["schema", "table_name"])
|
||||||
@ -64,6 +65,11 @@ class PostgresSource(SQLSource):
|
|||||||
for res in results:
|
for res in results:
|
||||||
row = list(res)
|
row = list(res)
|
||||||
try:
|
try:
|
||||||
|
if filter_by_database(
|
||||||
|
self.source_config.databaseFilterPattern, database_name=row[0]
|
||||||
|
):
|
||||||
|
self.status.filter(row[0], "Database pattern not allowed")
|
||||||
|
continue
|
||||||
logger.info(f"Ingesting from database: {row[0]}")
|
logger.info(f"Ingesting from database: {row[0]}")
|
||||||
self.service_connection.database = row[0]
|
self.service_connection.database = row[0]
|
||||||
self.engine = get_connection(
|
self.engine = get_connection(
|
||||||
|
|||||||
@ -160,3 +160,18 @@ def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool
|
|||||||
:return: True for filtering, False otherwise
|
:return: True for filtering, False otherwise
|
||||||
"""
|
"""
|
||||||
return _filter(fqn_filter_pattern, fqn)
|
return _filter(fqn_filter_pattern, fqn)
|
||||||
|
|
||||||
|
|
||||||
|
def filter_by_database(
|
||||||
|
database_filter_pattern: Optional[FilterPattern], database_name: str
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Return True if the schema needs to be filtered, False otherwise
|
||||||
|
|
||||||
|
Include takes precedence over exclude
|
||||||
|
|
||||||
|
:param database_filter_pattern: Model defining database filtering logic
|
||||||
|
:param database_name: table database name
|
||||||
|
:return: True for filtering, False otherwise
|
||||||
|
"""
|
||||||
|
return _filter(database_filter_pattern, database_name)
|
||||||
|
|||||||
@ -29,6 +29,7 @@
|
|||||||
"sampleDataQuery": "select * from {}.{} limit 50",
|
"sampleDataQuery": "select * from {}.{} limit 50",
|
||||||
"enableDataProfiler": false,
|
"enableDataProfiler": false,
|
||||||
"schemaFilterPattern": null,
|
"schemaFilterPattern": null,
|
||||||
|
"databaseFilterPattern":null,
|
||||||
"tableFilterPattern": null,
|
"tableFilterPattern": null,
|
||||||
"dbtCatalogFilePath": null,
|
"dbtCatalogFilePath": null,
|
||||||
"dbtManifestFilePath": null
|
"dbtManifestFilePath": null
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user