fix(ingest/teradata): small teradata improvements (#9953)

This commit is contained in:
Tamas Nemeth 2024-03-12 14:22:17 +01:00 committed by GitHub
parent 2265ae9257
commit 28f16aabb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -81,6 +81,9 @@ register_custom_type(custom_types.MBR, BytesTypeClass)
register_custom_type(custom_types.GEOMETRY, BytesTypeClass) register_custom_type(custom_types.GEOMETRY, BytesTypeClass)
register_custom_type(custom_types.TDUDT, BytesTypeClass) register_custom_type(custom_types.TDUDT, BytesTypeClass)
register_custom_type(custom_types.XML, BytesTypeClass) register_custom_type(custom_types.XML, BytesTypeClass)
register_custom_type(custom_types.PERIOD_TIME, TimeTypeClass)
register_custom_type(custom_types.PERIOD_DATE, TimeTypeClass)
register_custom_type(custom_types.PERIOD_TIMESTAMP, TimeTypeClass)
@dataclass @dataclass
@ -458,7 +461,7 @@ class TeradataSource(TwoTierSQLAlchemySource):
LINEAGE_QUERY_DATABASE_FILTER: str = """and default_database IN ({databases})""" LINEAGE_QUERY_DATABASE_FILTER: str = """and default_database IN ({databases})"""
LINEAGE_TIMESTAMP_BOUND_QUERY: str = """ LINEAGE_TIMESTAMP_BOUND_QUERY: str = """
SELECT MIN(CollectTimeStamp) as "min_ts", MAX(CollectTimeStamp) as "max_ts" from DBC.DBQLogTbl SELECT MIN(CollectTimeStamp) as "min_ts", MAX(CollectTimeStamp) as "max_ts" from DBC.QryLogV
""".strip() """.strip()
QUERY_TEXT_QUERY: str = """ QUERY_TEXT_QUERY: str = """
@ -469,8 +472,8 @@ class TeradataSource(TwoTierSQLAlchemySource):
DefaultDatabase as default_database, DefaultDatabase as default_database,
s.SqlTextInfo as "query_text", s.SqlTextInfo as "query_text",
s.SqlRowNo as "row_no" s.SqlRowNo as "row_no"
FROM "DBC".DBQLogTbl as l FROM "DBC".QryLogV as l
JOIN "DBC".DBQLSqlTbl as s on s.QueryID = l.QueryID JOIN "DBC".QryLogSqlV as s on s.QueryID = l.QueryID
WHERE WHERE
l.ErrorCode = 0 l.ErrorCode = 0
AND l.statementtype not in ( AND l.statementtype not in (
@ -499,7 +502,7 @@ class TeradataSource(TwoTierSQLAlchemySource):
TABLES_AND_VIEWS_QUERY: str = """ TABLES_AND_VIEWS_QUERY: str = """
SELECT SELECT
t.DatabaseName, t.DataBaseName,
t.TableName as name, t.TableName as name,
t.CommentString as description, t.CommentString as description,
CASE t.TableKind CASE t.TableKind
@ -514,8 +517,8 @@ SELECT
t.LastAlterName, t.LastAlterName,
t.LastAlterTimeStamp, t.LastAlterTimeStamp,
t.RequestText t.RequestText
FROM dbc.Tables t FROM dbc.TablesV t
WHERE DatabaseName NOT IN ( WHERE DataBaseName NOT IN (
'All', 'All',
'Crashdumps', 'Crashdumps',
'Default', 'Default',
@ -561,7 +564,7 @@ WHERE DatabaseName NOT IN (
'dbc' 'dbc'
) )
AND t.TableKind in ('T', 'V', 'Q', 'O') AND t.TableKind in ('T', 'V', 'Q', 'O')
ORDER by DatabaseName, TableName; ORDER by DataBaseName, TableName;
""".strip() """.strip()
_tables_cache: MutableMapping[str, List[TeradataTable]] = defaultdict(list) _tables_cache: MutableMapping[str, List[TeradataTable]] = defaultdict(list)
@ -631,13 +634,14 @@ ORDER by DatabaseName, TableName;
), ),
) )
setattr( # noqa: B010 # Disabling the below because the cached view definition is not the view definition the column in tablesv actually holds the last statement executed against the object... not necessarily the view definition
TeradataDialect, # setattr( # noqa: B010
"get_view_definition", # TeradataDialect,
lambda self, connection, view_name, schema=None, **kw: optimized_get_view_definition( # "get_view_definition",
self, connection, view_name, schema, tables_cache=tables_cache, **kw # lambda self, connection, view_name, schema=None, **kw: optimized_get_view_definition(
), # self, connection, view_name, schema, tables_cache=tables_cache, **kw
) # ),
# )
setattr( # noqa: B010 setattr( # noqa: B010
TeradataDialect, TeradataDialect,
@ -775,7 +779,7 @@ ORDER by DatabaseName, TableName;
engine = self.get_metadata_engine() engine = self.get_metadata_engine()
for entry in engine.execute(self.TABLES_AND_VIEWS_QUERY): for entry in engine.execute(self.TABLES_AND_VIEWS_QUERY):
table = TeradataTable( table = TeradataTable(
database=entry.DatabaseName.strip(), database=entry.DataBaseName.strip(),
name=entry.name.strip(), name=entry.name.strip(),
description=entry.description.strip() if entry.description else None, description=entry.description.strip() if entry.description else None,
object_type=entry.object_type, object_type=entry.object_type,