issue-15858: reduce es call in ingestion for foreign key processing (#15988)

This commit is contained in:
harshsoni2024 2024-04-29 11:07:32 +05:30 committed by GitHub
parent 4272c5ffee
commit d98a8c5cf1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 194 additions and 13 deletions

View File

@ -52,7 +52,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
@ -540,21 +539,27 @@ class CommonDbSourceService(
Search the referred table for foreign constraints
and get referred column fqn
"""
supports_database = hasattr(self.service_connection, "supportsDatabase")
foreign_constraints = []
for column in foreign_columns:
referred_column_fqns = []
referred_table = fqn.search_table_from_es(
if supports_database:
database_name = column.get("referred_database")
else:
database_name = self.context.get().database
referred_table_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=column.get("referred_table"),
schema_name=column.get("referred_schema"),
database_name=None,
database_name=database_name,
service_name=self.context.get().database_service,
)
if referred_table:
if referred_table_fqn:
for referred_column in column.get("referred_columns"):
col_fqn = get_column_fqn(
table_entity=referred_table, column=referred_column
col_fqn = fqn._build(
referred_table_fqn, referred_column, quote=False
)
if col_fqn:
referred_column_fqns.append(col_fqn)

View File

@ -56,6 +56,7 @@ from metadata.ingestion.source.database.postgres.utils import (
get_column_info,
get_columns,
get_etable_owner,
get_foreign_keys,
get_table_comment,
get_table_owner,
get_view_definition,
@ -125,6 +126,8 @@ PGDialect.ischema_names = ischema_names
Inspector.get_table_owner = get_etable_owner
PGDialect.get_foreign_keys = get_foreign_keys
class PostgresSource(CommonDbSourceService, MultiDBSource):
"""

View File

@ -188,3 +188,19 @@ POSTGRES_SQL_COLUMNS = """
POSTGRES_GET_SERVER_VERSION = """
show server_version
"""
POSTGRES_FETCH_FK = """
SELECT r.conname,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
n.nspname as conschema,
d.datname AS con_db_name
FROM pg_catalog.pg_constraint r,
pg_namespace n,
pg_class c
JOIN pg_database d ON d.datname = current_database()
WHERE r.conrelid = :table AND
r.contype = 'f' AND
c.oid = confrelid AND
n.oid = c.relnamespace
ORDER BY 1
"""

View File

@ -25,6 +25,7 @@ from sqlalchemy.sql import sqltypes
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_COL_IDENTITY,
POSTGRES_FETCH_FK,
POSTGRES_GET_SERVER_VERSION,
POSTGRES_SQL_COLUMNS,
POSTGRES_TABLE_COMMENTS,
@ -62,6 +63,103 @@ def get_etable_owner(
)
@reflection.cache
def get_foreign_keys(
self, connection, table_name, schema=None, postgresql_ignore_search_path=False, **kw
):
preparer = self.identifier_preparer
table_oid = self.get_table_oid(
connection, table_name, schema, info_cache=kw.get("info_cache")
)
# https://www.postgresql.org/docs/9.0/static/sql-createtable.html
FK_REGEX = re.compile(
r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
r"[\s]?(ON UPDATE "
r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
r"[\s]?(ON DELETE "
r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
)
t = sql.text(POSTGRES_FETCH_FK).columns(
conname=sqltypes.Unicode, condef=sqltypes.Unicode, con_db_name=sqltypes.Unicode
)
c = connection.execute(t, dict(table=table_oid))
fkeys = []
for conname, condef, conschema, con_db_name in c.fetchall():
m = re.search(FK_REGEX, condef).groups()
(
constrained_columns,
referred_schema,
referred_table,
referred_columns,
_,
match,
_,
onupdate,
_,
ondelete,
deferrable,
_,
initially,
) = m
if deferrable is not None:
deferrable = True if deferrable == "DEFERRABLE" else False
constrained_columns = tuple(re.split(r"\s*,\s*", constrained_columns))
constrained_columns = [
preparer._unquote_identifier(x) for x in constrained_columns
]
if postgresql_ignore_search_path:
# when ignoring search path, we use the actual schema
# provided it isn't the "default" schema
if conschema != self.default_schema_name:
referred_schema = conschema
else:
referred_schema = schema
elif referred_schema:
# referred_schema is the schema that we regexp'ed from
# pg_get_constraintdef(). If the schema is in the search
# path, pg_get_constraintdef() will give us None.
referred_schema = preparer._unquote_identifier(referred_schema)
elif schema is not None and schema == conschema:
# If the actual schema matches the schema of the table
# we're reflecting, then we will use that.
referred_schema = schema
referred_table = preparer._unquote_identifier(referred_table)
referred_columns = tuple(re.split(r"\s*,\s", referred_columns))
referred_columns = [preparer._unquote_identifier(x) for x in referred_columns]
options = {
k: v
for k, v in [
("onupdate", onupdate),
("ondelete", ondelete),
("initially", initially),
("deferrable", deferrable),
("match", match),
]
if v is not None and v != "NO ACTION"
}
referred_database = con_db_name if con_db_name else ""
fkey_d = {
"name": conname,
"constrained_columns": constrained_columns,
"referred_schema": referred_schema,
"referred_table": referred_table,
"referred_columns": referred_columns,
"options": options,
"referred_database": referred_database,
}
fkeys.append(fkey_d)
return fkeys
@reflection.cache
def get_table_owner(
self, connection, table_name, schema=None, **kw

View File

@ -90,6 +90,7 @@ from metadata.ingestion.source.database.snowflake.utils import (
get_foreign_keys,
get_pk_constraint,
get_schema_columns,
get_schema_foreign_keys,
get_table_comment,
get_table_names,
get_table_names_reflection,
@ -136,6 +137,7 @@ SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-acces
SnowflakeDialect.get_pk_constraint = get_pk_constraint
SnowflakeDialect.get_foreign_keys = get_foreign_keys
SnowflakeDialect.get_columns = get_columns
SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys
class SnowflakeSource(

View File

@ -380,6 +380,65 @@ def get_foreign_keys(self, connection, table_name, schema=None, **kw):
return foreign_key_map.get(table_name, [])
@reflection.cache
def get_schema_foreign_keys(self, connection, schema, **kw):
current_database, current_schema = self._current_database_schema(connection, **kw)
result = connection.execute(
text(
f"SHOW /* sqlalchemy:_get_schema_foreign_keys */ IMPORTED KEYS IN SCHEMA {schema}"
)
)
foreign_key_map = {}
for row in result:
name = self.normalize_name(row._mapping["fk_name"])
if name not in foreign_key_map:
referred_schema = self.normalize_name(row._mapping["pk_schema_name"])
foreign_key_map[name] = {
"constrained_columns": [
self.normalize_name(row._mapping["fk_column_name"])
],
# referred schema should be None in context where it doesn't need to be specified
# https://docs.sqlalchemy.org/en/14/core/reflection.html#reflection-schema-qualified-interaction
"referred_schema": (
referred_schema
if referred_schema not in (self.default_schema_name, current_schema)
else None
),
"referred_table": self.normalize_name(row._mapping["pk_table_name"]),
"referred_columns": [
self.normalize_name(row._mapping["pk_column_name"])
],
"referred_database": self.normalize_name(
row._mapping["pk_database_name"]
),
"name": name,
"table_name": self.normalize_name(row._mapping["fk_table_name"]),
}
options = {}
if self.normalize_name(row._mapping["delete_rule"]) != "NO ACTION":
options["ondelete"] = self.normalize_name(row._mapping["delete_rule"])
if self.normalize_name(row._mapping["update_rule"]) != "NO ACTION":
options["onupdate"] = self.normalize_name(row._mapping["update_rule"])
foreign_key_map[name]["options"] = options
else:
foreign_key_map[name]["constrained_columns"].append(
self.normalize_name(row._mapping["fk_column_name"])
)
foreign_key_map[name]["referred_columns"].append(
self.normalize_name(row._mapping["pk_column_name"])
)
ans = {}
for _, v in foreign_key_map.items():
if v["table_name"] not in ans:
ans[v["table_name"]] = []
ans[v["table_name"]].append(
{k2: v2 for k2, v2 in v.items() if k2 != "table_name"}
)
return ans
@reflection.cache
def get_unique_constraints(self, connection, table_name, schema, **kw):
schema = schema or self.default_schema_name

View File

@ -51,7 +51,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
@ -388,18 +387,17 @@ class UnitycatalogSource(
ref_table_fqn = column.parent_table
table_fqn_list = fqn.split(ref_table_fqn)
referred_table = fqn.search_table_from_es(
metadata=self.metadata,
referred_table_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=table_fqn_list[2],
schema_name=table_fqn_list[1],
database_name=table_fqn_list[0],
service_name=self.context.get().database_service,
)
if referred_table:
if referred_table_fqn:
for parent_column in column.parent_columns:
col_fqn = get_column_fqn(
table_entity=referred_table, column=parent_column
)
col_fqn = fqn._build(referred_table_fqn, parent_column, quote=False)
if col_fqn:
referred_column_fqns.append(col_fqn)
else: