diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 63b2e9eb3e1..5005684b547 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -52,7 +52,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.models import Either from metadata.ingestion.connections.session import create_and_bind_thread_safe_session -from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection @@ -540,21 +539,27 @@ class CommonDbSourceService( Search the referred table for foreign constraints and get referred column fqn """ + supports_database = hasattr(self.service_connection, "supportsDatabase") foreign_constraints = [] for column in foreign_columns: referred_column_fqns = [] - referred_table = fqn.search_table_from_es( + if supports_database: + database_name = column.get("referred_database") + else: + database_name = self.context.get().database + referred_table_fqn = fqn.build( metadata=self.metadata, + entity_type=Table, table_name=column.get("referred_table"), schema_name=column.get("referred_schema"), - database_name=None, + database_name=database_name, service_name=self.context.get().database_service, ) - if referred_table: + if referred_table_fqn: for referred_column in column.get("referred_columns"): - col_fqn = get_column_fqn( - table_entity=referred_table, column=referred_column + col_fqn = fqn._build( + referred_table_fqn, referred_column, quote=False ) if col_fqn: referred_column_fqns.append(col_fqn) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index a06e0781a66..8554e00a356 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -56,6 +56,7 @@ from metadata.ingestion.source.database.postgres.utils import ( get_column_info, get_columns, get_etable_owner, + get_foreign_keys, get_table_comment, get_table_owner, get_view_definition, @@ -125,6 +126,8 @@ PGDialect.ischema_names = ischema_names Inspector.get_table_owner = get_etable_owner +PGDialect.get_foreign_keys = get_foreign_keys + class PostgresSource(CommonDbSourceService, MultiDBSource): """ diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index 79f8f9470ab..5eaa675454b 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -188,3 +188,19 @@ POSTGRES_SQL_COLUMNS = """ POSTGRES_GET_SERVER_VERSION = """ show server_version """ + +POSTGRES_FETCH_FK = """ + SELECT r.conname, + pg_catalog.pg_get_constraintdef(r.oid, true) as condef, + n.nspname as conschema, + d.datname AS con_db_name + FROM pg_catalog.pg_constraint r, + pg_namespace n, + pg_class c + JOIN pg_database d ON d.datname = current_database() + WHERE r.conrelid = :table AND + r.contype = 'f' AND + c.oid = confrelid AND + n.oid = c.relnamespace + ORDER BY 1 +""" diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py index 79b867f6fa0..13920e9b9ac 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/utils.py @@ -25,6 +25,7 @@ from sqlalchemy.sql import sqltypes from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_COL_IDENTITY, + POSTGRES_FETCH_FK, POSTGRES_GET_SERVER_VERSION, POSTGRES_SQL_COLUMNS, POSTGRES_TABLE_COMMENTS, @@ -62,6 +63,103 @@ def get_etable_owner( ) +@reflection.cache +def get_foreign_keys( + self, connection, table_name, schema=None, postgresql_ignore_search_path=False, **kw +): + preparer = self.identifier_preparer + table_oid = self.get_table_oid( + connection, table_name, schema, info_cache=kw.get("info_cache") + ) + + # https://www.postgresql.org/docs/9.0/static/sql-createtable.html + FK_REGEX = re.compile( + r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)" + r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" + r"[\s]?(ON UPDATE " + r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" + r"[\s]?(ON DELETE " + r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" + r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" + r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" + ) + + t = sql.text(POSTGRES_FETCH_FK).columns( + conname=sqltypes.Unicode, condef=sqltypes.Unicode, con_db_name=sqltypes.Unicode + ) + c = connection.execute(t, dict(table=table_oid)) + fkeys = [] + for conname, condef, conschema, con_db_name in c.fetchall(): + m = re.search(FK_REGEX, condef).groups() + + ( + constrained_columns, + referred_schema, + referred_table, + referred_columns, + _, + match, + _, + onupdate, + _, + ondelete, + deferrable, + _, + initially, + ) = m + + if deferrable is not None: + deferrable = True if deferrable == "DEFERRABLE" else False + constrained_columns = tuple(re.split(r"\s*,\s*", constrained_columns)) + constrained_columns = [ + preparer._unquote_identifier(x) for x in constrained_columns + ] + + if postgresql_ignore_search_path: + # when ignoring search path, we use the actual schema + # provided it isn't the "default" schema + if conschema != self.default_schema_name: + referred_schema = conschema + else: + referred_schema = schema + elif referred_schema: + # referred_schema is the schema that we regexp'ed from + # pg_get_constraintdef(). If the schema is in the search + # path, pg_get_constraintdef() will give us None. + referred_schema = preparer._unquote_identifier(referred_schema) + elif schema is not None and schema == conschema: + # If the actual schema matches the schema of the table + # we're reflecting, then we will use that. + referred_schema = schema + + referred_table = preparer._unquote_identifier(referred_table) + referred_columns = tuple(re.split(r"\s*,\s", referred_columns)) + referred_columns = [preparer._unquote_identifier(x) for x in referred_columns] + options = { + k: v + for k, v in [ + ("onupdate", onupdate), + ("ondelete", ondelete), + ("initially", initially), + ("deferrable", deferrable), + ("match", match), + ] + if v is not None and v != "NO ACTION" + } + referred_database = con_db_name if con_db_name else "" + fkey_d = { + "name": conname, + "constrained_columns": constrained_columns, + "referred_schema": referred_schema, + "referred_table": referred_table, + "referred_columns": referred_columns, + "options": options, + "referred_database": referred_database, + } + fkeys.append(fkey_d) + return fkeys + + @reflection.cache def get_table_owner( self, connection, table_name, schema=None, **kw diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index e001c06f712..79bd881f605 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -90,6 +90,7 @@ from metadata.ingestion.source.database.snowflake.utils import ( get_foreign_keys, get_pk_constraint, get_schema_columns, + get_schema_foreign_keys, get_table_comment, get_table_names, get_table_names_reflection, @@ -136,6 +137,7 @@ SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-acces SnowflakeDialect.get_pk_constraint = get_pk_constraint SnowflakeDialect.get_foreign_keys = get_foreign_keys SnowflakeDialect.get_columns = get_columns +SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys class SnowflakeSource( diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py index b2fb0351b87..ca525ad2b32 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/utils.py @@ -380,6 +380,65 @@ def get_foreign_keys(self, connection, table_name, schema=None, **kw): return foreign_key_map.get(table_name, []) +@reflection.cache +def get_schema_foreign_keys(self, connection, schema, **kw): + current_database, current_schema = self._current_database_schema(connection, **kw) + result = connection.execute( + text( + f"SHOW /* sqlalchemy:_get_schema_foreign_keys */ IMPORTED KEYS IN SCHEMA {schema}" + ) + ) + foreign_key_map = {} + for row in result: + name = self.normalize_name(row._mapping["fk_name"]) + if name not in foreign_key_map: + referred_schema = self.normalize_name(row._mapping["pk_schema_name"]) + foreign_key_map[name] = { + "constrained_columns": [ + self.normalize_name(row._mapping["fk_column_name"]) + ], + # referred schema should be None in context where it doesn't need to be specified + # https://docs.sqlalchemy.org/en/14/core/reflection.html#reflection-schema-qualified-interaction + "referred_schema": ( + referred_schema + if referred_schema not in (self.default_schema_name, current_schema) + else None + ), + "referred_table": self.normalize_name(row._mapping["pk_table_name"]), + "referred_columns": [ + self.normalize_name(row._mapping["pk_column_name"]) + ], + "referred_database": self.normalize_name( + row._mapping["pk_database_name"] + ), + "name": name, + "table_name": self.normalize_name(row._mapping["fk_table_name"]), + } + options = {} + if self.normalize_name(row._mapping["delete_rule"]) != "NO ACTION": + options["ondelete"] = self.normalize_name(row._mapping["delete_rule"]) + if self.normalize_name(row._mapping["update_rule"]) != "NO ACTION": + options["onupdate"] = self.normalize_name(row._mapping["update_rule"]) + foreign_key_map[name]["options"] = options + else: + foreign_key_map[name]["constrained_columns"].append( + self.normalize_name(row._mapping["fk_column_name"]) + ) + foreign_key_map[name]["referred_columns"].append( + self.normalize_name(row._mapping["pk_column_name"]) + ) + + ans = {} + + for _, v in foreign_key_map.items(): + if v["table_name"] not in ans: + ans[v["table_name"]] = [] + ans[v["table_name"]].append( + {k2: v2 for k2, v2 in v.items() if k2 != "table_name"} + ) + return ans + + @reflection.cache def get_unique_constraints(self, connection, table_name, schema, **kw): schema = schema or self.default_schema_name diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index eef99579f07..cc01b930203 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -51,7 +51,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser @@ -388,18 +387,17 @@ class UnitycatalogSource( ref_table_fqn = column.parent_table table_fqn_list = fqn.split(ref_table_fqn) - referred_table = fqn.search_table_from_es( - metadata=self.metadata, + referred_table_fqn = fqn.build( + self.metadata, + entity_type=Table, table_name=table_fqn_list[2], schema_name=table_fqn_list[1], database_name=table_fqn_list[0], service_name=self.context.get().database_service, ) - if referred_table: + if referred_table_fqn: for parent_column in column.parent_columns: - col_fqn = get_column_fqn( - table_entity=referred_table, column=parent_column - ) + col_fqn = fqn._build(referred_table_fqn, parent_column, quote=False) if col_fqn: referred_column_fqns.append(col_fqn) else: