chore: reduce constraints db calls for redshift (#24336)

This commit is contained in:
Teddy 2025-11-14 08:25:47 +01:00 committed by GitHub
parent 587c4058ee
commit 678b054fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 183 additions and 45 deletions

View File

@ -16,7 +16,6 @@ import math
import time
import traceback
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import singledispatchmethod
from typing import Any, Generic, Iterable, List, Type, TypeVar
@ -41,6 +40,7 @@ from metadata.ingestion.models.topology import (
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
from metadata.utils.execution_time_tracker import ExecutionTimeTrackerContextMap
from metadata.utils.logger import ingestion_logger
from metadata.utils.source_hash import generate_source_hash
@ -93,38 +93,37 @@ class TopologyRunnerMixin(Generic[C]):
return
else:
chunksize = int(math.ceil(node_entities_length / threads))
chunks = [
chunks: list[list[Entity]] = [
node_entities[i : i + chunksize]
for i in range(0, node_entities_length, chunksize)
]
thread_pool = ThreadPoolExecutor(max_workers=threads)
with CustomThreadPoolExecutor(max_workers=threads) as pool:
futures = [
pool.submit(
self._multithread_process_entity,
node,
chunk,
child_nodes,
self.context.get_current_thread_id(),
)
for chunk in chunks
]
futures = [
thread_pool.submit(
self._multithread_process_entity,
node,
chunk,
child_nodes,
self.context.get_current_thread_id(),
)
for chunk in chunks
]
while True:
if self.queue.has_tasks():
yield from self.queue.process()
while True:
if self.queue.has_tasks():
yield from self.queue.process()
else:
if not futures:
break
else:
if not futures:
break
for i, future in enumerate(futures):
if future.done():
future.result()
futures.pop(i)
for i, future in enumerate(futures):
if future.done():
future.result()
futures.pop(i)
time.sleep(0.01)
time.sleep(0.01)
def _process_node(self, node: TopologyNode) -> Iterable[Entity]:
"""Processing of a Node in a single thread."""

View File

@ -18,7 +18,12 @@ from typing import Iterable, List, Optional, Tuple
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy_redshift.dialect import RedshiftDialect, RedshiftDialectMixin
from sqlalchemy_redshift.dialect import (
FOREIGN_KEY_RE,
SQL_IDENTIFIER_RE,
RedshiftDialect,
RedshiftDialectMixin,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
@ -72,6 +77,7 @@ from metadata.ingestion.source.database.redshift.incremental_table_processor imp
from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_EXTERNAL_TABLE_LOCATION,
REDSHIFT_GET_ALL_CONSTRAINTS,
REDSHIFT_GET_ALL_RELATION_INFO,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_GET_STORED_PROCEDURES,
@ -94,6 +100,7 @@ from metadata.utils.execution_time_tracker import (
calculate_execution_time_generator,
)
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import clean_up_starting_ending_double_quotes_in_string
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
@ -108,6 +115,7 @@ STANDARD_TABLE_TYPES = {
"r": TableType.Regular,
"e": TableType.External,
"v": TableType.View,
"m": TableType.MaterializedView,
}
# pylint: disable=protected-access
@ -142,6 +150,9 @@ class RedshiftSource(
):
super().__init__(config, metadata)
self.partition_details = {}
self.constraint_details: dict[
str, dict[str, set[str] | list[dict[str, str]]]
] = {}
self.life_cycle_query = REDSHIFT_LIFE_CYCLE_QUERY
self.context.get_global().deleted_tables = []
self.incremental = incremental_configuration
@ -185,7 +196,9 @@ class RedshiftSource(
"""
try:
self.partition_details.clear()
results = self.connection.execute(REDSHIFT_PARTITION_DETAILS).fetchall()
results = self.connection.execute(
statement=REDSHIFT_PARTITION_DETAILS
).fetchall()
for row in results:
self.partition_details[f"{row.schema}.{row.table}"] = row.diststyle
except Exception as exe:
@ -198,6 +211,7 @@ class RedshiftSource(
"""
Handle custom table types
"""
self._set_constraint_details(schema_name)
result = self.connection.execute(
sql.text(REDSHIFT_GET_ALL_RELATION_INFO),
@ -232,23 +246,7 @@ class RedshiftSource(
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""
result = self.inspector.get_view_names(schema_name) or []
if self.incremental.enabled:
result = [
name
for name in result
if name
in self.incremental_table_processor.get_not_deleted(
schema_name=schema_name
)
]
return [
TableNameAndType(name=table_name, type_=TableType.View)
for table_name in result
]
return []
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
@ -456,3 +454,115 @@ class RedshiftSource(
)
else:
yield from super().mark_tables_as_deleted()
def _get_columns_with_constraints(
self,
schema_name: str,
table_name: str,
*args,
**kwargs,
) -> tuple[list[str], list[str], list[str]]:
"""Fetch constraint for a specific schema and table
Args:
schema_name (str): schema name
table_name (str): table name
Returns:
tuple[list, list, list]: list of primary, unique and foreign columns
"""
constraints = self.constraint_details.get(f"{schema_name}.{table_name}", {})
if not constraints:
return [], [], []
pkeys = [
clean_up_starting_ending_double_quotes_in_string(p)
for p in constraints.get("pkey", set())
]
ukeys = [
clean_up_starting_ending_double_quotes_in_string(p)
for p in constraints.get("ukey", set())
]
fkeys = []
fkey_constraints: list[dict[str, str]] = constraints.get("fkey", [])
for fkey_constraint in fkey_constraints:
fkey_constraint.update(
{
"constrained_columns": [
clean_up_starting_ending_double_quotes_in_string(column)
for column in fkey_constraint.get("constrained_columns")
],
"referred_columns": [
clean_up_starting_ending_double_quotes_in_string(column)
for column in fkey_constraint.get("referred_columns")
],
}
)
fkeys.append(fkey_constraint)
return pkeys, [ukeys], fkeys
def _set_constraint_details(self, schema_name: str):
"""Get all the column constraints in a given schema
Args:
schema_name (str): schema name
"""
self.constraint_details = (
{}
) # reset constraint_details dict when fetching for a new schema
rows = self.connection.execute(
sql.text(REDSHIFT_GET_ALL_CONSTRAINTS),
{"schema": schema_name},
)
for row in rows or []:
schema_table_name = f"{row.schema}.{row.table_name}"
schema_table_constraints = self.constraint_details.setdefault(
schema_table_name, {}
)
if row.constraint_type == "p":
pkey = schema_table_constraints.setdefault("pkey", set())
pkey.add(row.column_name)
if row.constraint_type == "f":
fkey_constraint = {
"key": row.conkey,
"condef": row.condef,
"database": self.connection.engine.url.database,
}
extracted_fkey = self._extract_fkeys(fkey_constraint)
fkey: list[dict[str, str]] = schema_table_constraints.setdefault(
"fkey", []
)
fkey.extend(extracted_fkey)
if row.constraint_type == "u":
ukey = schema_table_constraints.setdefault("ukey", set())
ukey.add(row.column_name)
def _extract_fkeys(self, fkey_constraint: dict) -> list[dict[str, str]]:
"""extract foreign keys from rows
Args:
uniques (dict): _description_
"""
fkeys = []
m = FOREIGN_KEY_RE.match(fkey_constraint["condef"])
colstring = m.group("referred_columns")
referred_columns = SQL_IDENTIFIER_RE.findall(colstring)
referred_table = m.group("referred_table")
referred_schema = m.group("referred_schema")
colstring = m.group("columns")
constrained_columns = SQL_IDENTIFIER_RE.findall(colstring)
fkey_d = {
"name": fkey_constraint["key"],
"constrained_columns": constrained_columns,
"referred_schema": referred_schema,
"referred_table": referred_table,
"referred_columns": referred_columns,
"referred_database": fkey_constraint["database"],
}
fkeys.append(fkey_d)
return fkeys

View File

@ -97,7 +97,7 @@ REDSHIFT_GET_ALL_RELATION_INFO = textwrap.dedent(
c.relkind
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
WHERE c.relkind IN ('r', 'v', 'm', 'S', 'f')
AND n.nspname = :schema
UNION
SELECT
@ -240,6 +240,35 @@ SELECT
REDSHIFT_TEST_PARTITION_DETAILS = "select * from SVV_TABLE_INFO limit 1"
REDSHIFT_GET_ALL_CONSTRAINTS = """
select
n.nspname as "schema",
c.relname as "table_name",
t.contype as "constraint_type",
t.conkey,
pg_catalog.pg_get_constraintdef(t.oid, true)::varchar(512) as condef,
a.attname as "column_name"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n
ON n.oid = c.relnamespace
JOIN pg_catalog.pg_constraint t
ON t.conrelid = c.oid
JOIN pg_catalog.pg_attribute a
ON t.conrelid = a.attrelid AND a.attnum = ANY(t.conkey)
WHERE n.nspname not like '^pg_' and schema=:schema
UNION
SELECT
s.schemaname AS "schema",
c.tablename AS "table_name",
'p' as "constraint_type",
null as conkey,
null as condef,
c.columnname as "column_name"
FROM
svv_external_columns c
JOIN svv_external_schemas s ON s.schemaname = c.schemaname
where 1 and schema=:schema;
"""
# Redshift views definitions only contains the select query
# hence we are appending "create view <schema>.<table> as " to select query