feat(ingestion): Support for Server-less Redshift (#9998)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
skrydal 2024-03-12 09:32:20 +01:00 committed by GitHub
parent 5937472998
commit 2265ae9257
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 683 additions and 234 deletions

View File

@ -175,7 +175,7 @@ redshift_common = {
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector",
"redshift-connector>=2.1.0",
*sqllineage_lib,
*path_spec_common,
}

View File

@ -94,6 +94,11 @@ class RedshiftConfig(
description="The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
)
is_serverless: bool = Field(
default=False,
description="Whether target Redshift instance is serverless (alternative is provisioned cluster)",
)
use_lineage_v2: bool = Field(
default=False,
description="Whether to use the new SQL-based lineage collector.",

View File

@ -18,7 +18,11 @@ from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.aws.s3_util import strip_s3_prefix
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.redshift.redshift_schema import (
LineageRow,
RedshiftDataDictionary,
@ -158,6 +162,10 @@ class RedshiftLineageExtractor:
self.context = context
self._lineage_map: Dict[str, LineageItem] = defaultdict()
self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if self.config.is_serverless:
self.queries = RedshiftServerlessQuery()
self.redundant_run_skip_handler = redundant_run_skip_handler
self.start_time, self.end_time = (
self.report.lineage_start_time,
@ -659,7 +667,7 @@ class RedshiftLineageExtractor:
LineageMode.MIXED,
}:
# Populate table level lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
query = self.queries.stl_scan_based_lineage_query(
self.config.database,
self.start_time,
self.end_time,
@ -670,7 +678,7 @@ class RedshiftLineageExtractor:
LineageMode.MIXED,
}:
# Populate table level lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
query = self.queries.list_insert_create_queries_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
@ -679,15 +687,15 @@ class RedshiftLineageExtractor:
if self.config.include_views and self.config.include_view_lineage:
# Populate table level lineage for views
query = RedshiftQuery.view_lineage_query()
query = self.queries.view_lineage_query()
populate_calls.append((query, LineageCollectorType.VIEW))
# Populate table level lineage for late binding views
query = RedshiftQuery.list_late_view_ddls_query()
query = self.queries.list_late_view_ddls_query()
populate_calls.append((query, LineageCollectorType.VIEW_DDL_SQL_PARSING))
if self.config.include_copy_lineage:
query = RedshiftQuery.list_copy_commands_sql(
query = self.queries.list_copy_commands_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
@ -695,7 +703,7 @@ class RedshiftLineageExtractor:
populate_calls.append((query, LineageCollectorType.COPY))
if self.config.include_unload_lineage:
query = RedshiftQuery.list_unload_commands_sql(
query = self.queries.list_unload_commands_sql(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
@ -831,7 +839,7 @@ class RedshiftLineageExtractor:
# new urn -> prev urn
table_renames: Dict[str, str] = {}
query = RedshiftQuery.alter_table_rename_query(
query = self.queries.alter_table_rename_query(
db_name=database,
start_time=self.start_time,
end_time=self.end_time,
@ -869,7 +877,7 @@ class RedshiftLineageExtractor:
def get_temp_tables(
self, connection: redshift_connector.Connection
) -> List[TempTableRow]:
ddl_query: str = RedshiftQuery.temp_table_ddl_query(
ddl_query: str = self.queries.temp_table_ddl_query(
start_time=self.config.start_time,
end_time=self.config.end_time,
)
@ -892,9 +900,9 @@ class RedshiftLineageExtractor:
matched_temp_tables: List[TempTableRow] = []
for table_name in temp_table_names:
prefixes = RedshiftQuery.get_temp_table_clause(table_name)
prefixes = self.queries.get_temp_table_clause(table_name)
prefixes.extend(
RedshiftQuery.get_temp_table_clause(table_name.split(".")[-1])
self.queries.get_temp_table_clause(table_name.split(".")[-1])
)
for row in temp_table_rows:

View File

@ -13,7 +13,11 @@ from datahub.ingestion.source.redshift.lineage import (
LineageCollectorType,
RedshiftLineageExtractor,
)
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.redshift.redshift_schema import (
LineageRow,
RedshiftDataDictionary,
@ -64,6 +68,10 @@ class RedshiftSqlLineageV2:
)
self.report.sql_aggregator = self.aggregator.report
self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if self.config.is_serverless:
self.queries = RedshiftServerlessQuery()
self._lineage_v1 = RedshiftLineageExtractor(
config=config,
report=report,
@ -131,7 +139,7 @@ class RedshiftSqlLineageV2:
LineageMode.MIXED,
}:
# Populate lineage by parsing table creating sqls
query = RedshiftQuery.list_insert_create_queries_sql(
query = self.queries.list_insert_create_queries_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,
@ -148,7 +156,7 @@ class RedshiftSqlLineageV2:
LineageMode.MIXED,
}:
# Populate lineage by getting upstream tables from stl_scan redshift table
query = RedshiftQuery.stl_scan_based_lineage_query(
query = self.queries.stl_scan_based_lineage_query(
self.database,
self.start_time,
self.end_time,
@ -159,13 +167,13 @@ class RedshiftSqlLineageV2:
if self.config.include_views and self.config.include_view_lineage:
# Populate lineage for views
query = RedshiftQuery.view_lineage_query()
query = self.queries.view_lineage_query()
populate_calls.append(
(LineageCollectorType.VIEW, query, self._process_view_lineage)
)
# Populate lineage for late binding views
query = RedshiftQuery.list_late_view_ddls_query()
query = self.queries.list_late_view_ddls_query()
populate_calls.append(
(
LineageCollectorType.VIEW_DDL_SQL_PARSING,
@ -176,7 +184,7 @@ class RedshiftSqlLineageV2:
if self.config.include_copy_lineage:
# Populate lineage for copy commands.
query = RedshiftQuery.list_copy_commands_sql(
query = self.queries.list_copy_commands_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,
@ -187,7 +195,7 @@ class RedshiftSqlLineageV2:
if self.config.include_unload_lineage:
# Populate lineage for unload commands.
query = RedshiftQuery.list_unload_commands_sql(
query = self.queries.list_unload_commands_sql(
db_name=self.database,
start_time=self.start_time,
end_time=self.end_time,

View File

@ -4,11 +4,19 @@ from typing import List
redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
class RedshiftQuery:
class RedshiftCommonQuery:
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
CREATE_TEMPORARY_TABLE_CLAUSE = "create temporary table"
CREATE_TABLE_CLAUSE = "create table"
@staticmethod
def get_temp_table_clause(table_name: str) -> List[str]:
return [
f"{RedshiftCommonQuery.CREATE_TABLE_CLAUSE} {table_name}",
f"{RedshiftCommonQuery.CREATE_TEMP_TABLE_CLAUSE} {table_name}",
f"{RedshiftCommonQuery.CREATE_TEMPORARY_TABLE_CLAUSE} {table_name}",
]
list_databases: str = """SELECT datname FROM pg_database
WHERE (datname <> ('padb_harvest')::name)
AND (datname <> ('template0')::name)
@ -182,37 +190,163 @@ SELECT schemaname as schema_name,
ORDER BY "schema", "table_name", "attnum"
"""
additional_table_metadata: str = """
select
ti.database,
ti.schema,
"table",
size,
tbl_rows,
estimated_visible_rows,
skew_rows,
last_accessed,
case
when smi.name is not null then 1
else 0
end as is_materialized
from
pg_catalog.svv_table_info as ti
left join (
@staticmethod
def view_lineage_query() -> str:
return """
select
distinct
srcnsp.nspname as source_schema
,
srcobj.relname as source_table
,
tgtnsp.nspname as target_schema
,
tgtobj.relname as target_table
from
pg_catalog.pg_class as srcobj
inner join
pg_catalog.pg_depend as srcdep
on
srcobj.oid = srcdep.refobjid
inner join
pg_catalog.pg_depend as tgtdep
on
srcdep.objid = tgtdep.objid
join
pg_catalog.pg_class as tgtobj
on
tgtdep.refobjid = tgtobj.oid
and srcobj.oid <> tgtobj.oid
left outer join
pg_catalog.pg_namespace as srcnsp
on
srcobj.relnamespace = srcnsp.oid
left outer join
pg_catalog.pg_namespace tgtnsp
on
tgtobj.relnamespace = tgtnsp.oid
where
tgtdep.deptype = 'i'
--dependency_internal
and tgtobj.relkind = 'v'
--i=index, v=view, s=sequence
and tgtnsp.nspname not in ('pg_catalog', 'information_schema')
order by target_schema, target_table asc
"""
@staticmethod
def list_late_view_ddls_query() -> str:
return """
SELECT
n.nspname AS target_schema
,c.relname AS target_table
, COALESCE(pg_get_viewdef(c.oid, TRUE), '') AS ddl
FROM
pg_catalog.pg_class AS c
INNER JOIN
pg_catalog.pg_namespace AS n
ON c.relnamespace = n.oid
WHERE relkind = 'v'
and
n.nspname not in ('pg_catalog', 'information_schema')
"""
@staticmethod
def alter_table_rename_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
start_time_str: str = start_time.strftime(redshift_datetime_format)
end_time_str: str = end_time.strftime(redshift_datetime_format)
return f"""
SELECT transaction_id,
session_id,
start_time,
query_text
FROM sys_query_history SYS
WHERE SYS.status = 'success'
AND SYS.query_type = 'DDL'
AND SYS.database_name = '{db_name}'
AND SYS.start_time >= '{start_time_str}'
AND SYS.end_time < '{end_time_str}'
AND SYS.query_text ILIKE '%alter table % rename to %'
"""
@staticmethod
def additional_table_metadata_query() -> str:
raise NotImplementedError
@staticmethod
def usage_query(start_time: str, end_time: str, database: str) -> str:
raise NotImplementedError
@staticmethod
def operation_aspect_query(start_time: str, end_time: str) -> str:
raise NotImplementedError
@staticmethod
def stl_scan_based_lineage_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
raise NotImplementedError
@staticmethod
def list_unload_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
raise NotImplementedError
@staticmethod
def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
raise NotImplementedError
@staticmethod
def list_insert_create_queries_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
raise NotImplementedError
@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
raise NotImplementedError
class RedshiftProvisionedQuery(RedshiftCommonQuery):
@staticmethod
def additional_table_metadata_query() -> str:
return """
select
tbl,
max(endtime) as last_accessed
ti.database,
ti.schema,
"table",
size,
tbl_rows,
estimated_visible_rows,
skew_rows,
last_accessed,
case
when smi.name is not null then 1
else 0
end as is_materialized
from
pg_catalog.stl_insert
group by
tbl) as la on
(la.tbl = ti.table_id)
left join stv_mv_info smi on
smi.db_name = ti.database
and smi.schema = ti.schema
and smi.name = ti.table
;
"""
pg_catalog.svv_table_info as ti
left join (
select
tbl,
max(endtime) as last_accessed
from
pg_catalog.stl_insert
group by
tbl) as la on
(la.tbl = ti.table_id)
left join stv_mv_info smi on
smi.db_name = ti.database
and smi.schema = ti.schema
and smi.name = ti.table
;
"""
@staticmethod
def stl_scan_based_lineage_query(
@ -284,67 +418,6 @@ SELECT schemaname as schema_name,
end_time=end_time.strftime(redshift_datetime_format),
)
@staticmethod
def view_lineage_query() -> str:
return """
select
distinct
srcnsp.nspname as source_schema
,
srcobj.relname as source_table
,
tgtnsp.nspname as target_schema
,
tgtobj.relname as target_table
from
pg_catalog.pg_class as srcobj
inner join
pg_catalog.pg_depend as srcdep
on
srcobj.oid = srcdep.refobjid
inner join
pg_catalog.pg_depend as tgtdep
on
srcdep.objid = tgtdep.objid
join
pg_catalog.pg_class as tgtobj
on
tgtdep.refobjid = tgtobj.oid
and srcobj.oid <> tgtobj.oid
left outer join
pg_catalog.pg_namespace as srcnsp
on
srcobj.relnamespace = srcnsp.oid
left outer join
pg_catalog.pg_namespace tgtnsp
on
tgtobj.relnamespace = tgtnsp.oid
where
tgtdep.deptype = 'i'
--dependency_internal
and tgtobj.relkind = 'v'
--i=index, v=view, s=sequence
and tgtnsp.nspname not in ('pg_catalog', 'information_schema')
order by target_schema, target_table asc
"""
@staticmethod
def list_late_view_ddls_query() -> str:
return """
SELECT
n.nspname AS target_schema
,c.relname AS target_table
, COALESCE(pg_get_viewdef(c.oid, TRUE), '') AS ddl
FROM
pg_catalog.pg_class AS c
INNER JOIN
pg_catalog.pg_namespace AS n
ON c.relnamespace = n.oid
WHERE relkind = 'v'
and
n.nspname not in ('pg_catalog', 'information_schema')
"""
@staticmethod
def list_unload_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
@ -459,14 +532,6 @@ SELECT schemaname as schema_name,
end_time=end_time.strftime(redshift_datetime_format),
)
@staticmethod
def get_temp_table_clause(table_name: str) -> List[str]:
return [
f"{RedshiftQuery.CREATE_TABLE_CLAUSE} {table_name}",
f"{RedshiftQuery.CREATE_TEMP_TABLE_CLAUSE} {table_name}",
f"{RedshiftQuery.CREATE_TEMPORARY_TABLE_CLAUSE} {table_name}",
]
@staticmethod
def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
start_time_str: str = start_time.strftime(redshift_datetime_format)
@ -553,23 +618,450 @@ SELECT schemaname as schema_name,
rn = 1;
"""
# Add this join to the sql query for more metrics on completed queries
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html
# this sql query joins stl_scan over table info,
# querytext, and user info to get usage stats
# using non-LEFT joins here to limit the results to
# queries run by the user on user-defined tables.
@staticmethod
def alter_table_rename_query(
def usage_query(start_time: str, end_time: str, database: str) -> str:
return f"""
SELECT DISTINCT ss.userid as userid,
ss.query as query,
sui.usename as username,
ss.tbl as tbl,
sq.querytxt as querytxt,
sti.database as database,
sti.schema as schema,
sti.table as table,
sq.starttime as starttime,
sq.endtime as endtime
FROM stl_scan ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{start_time}'
AND ss.starttime < '{end_time}'
AND sti.database = '{database}'
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
""".strip()
@staticmethod
def operation_aspect_query(start_time: str, end_time: str) -> str:
return f"""
(SELECT
DISTINCT si.userid AS userid,
si.query AS query,
si.rows AS rows,
sui.usename AS username,
si.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'insert' AS operation_type
FROM
(select userid, query, sum(rows) as rows, tbl
from stl_insert si
where si.rows > 0
AND si.starttime >= '{start_time}'
AND si.starttime < '{end_time}'
group by userid, query, tbl
) as si
JOIN svv_table_info sti ON si.tbl = sti.table_id
JOIN stl_query sq ON si.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
sq.aborted = 0)
UNION
(SELECT
DISTINCT sd.userid AS userid,
sd.query AS query,
sd.rows AS ROWS,
sui.usename AS username,
sd.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'delete' AS operation_type
FROM
(select userid, query, sum(rows) as rows, tbl
from stl_delete sd
where sd.rows > 0
AND sd.starttime >= '{start_time}'
AND sd.starttime < '{end_time}'
group by userid, query, tbl
) as sd
JOIN svv_table_info sti ON sd.tbl = sti.table_id
JOIN stl_query sq ON sd.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
sq.aborted = 0)
ORDER BY
endtime DESC
""".strip()
class RedshiftServerlessQuery(RedshiftCommonQuery):
# stl_insert -> SYS_QUERY_DETAIL - showing less accesses
# stv_mv_info -> SVV_MV_INFO - not tested, seems to be fine
@staticmethod
def additional_table_metadata_query() -> str:
return """
select
ti.database,
ti.schema,
"table",
size,
tbl_rows,
estimated_visible_rows,
skew_rows,
last_accessed,
case
when smi.name is not null then 1
else 0
end as is_materialized
from
pg_catalog.svv_table_info as ti
left join (
SELECT
table_id as tbl,
max(end_time) as last_accessed
FROM
SYS_QUERY_DETAIL
GROUP BY
table_id) as la on
(la.tbl = ti.table_id)
left join SVV_MV_INFO smi on
smi.database_name = ti.database
and smi.schema_name = ti.schema
and smi.name = ti.table
;
"""
@staticmethod
def stl_scan_based_lineage_query(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
SELECT
distinct cluster,
target_schema,
target_table,
username,
source_schema,
source_table,
query_text AS ddl,
start_time AS timestamp
FROM
(
SELECT
sti.schema AS target_schema,
sti.table AS target_table,
sti.database AS cluster,
qi.table_id AS target_table_id,
qi.query_id AS query_id,
qi.start_time AS start_time
FROM
SYS_QUERY_DETAIL qi
JOIN
SVV_TABLE_INFO sti on sti.table_id = qi.table_id
WHERE
start_time >= '{start_time}' and
start_time < '{end_time}' and
cluster = '{db_name}' and
step_name = 'insert'
) AS target_tables
JOIN
(
SELECT
sti.schema AS source_schema,
sti.table AS source_table,
qs.table_id AS source_table_id,
qs.query_id AS query_id,
sui.user_name AS username,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
FROM
SYS_QUERY_DETAIL qs
JOIN
SVV_TABLE_INFO sti ON sti.table_id = qs.table_id
LEFT JOIN
SYS_QUERY_TEXT qt ON qt.query_id = qs.query_id
LEFT JOIN
SVV_USER_INFO sui ON qs.user_id = sui.user_id
WHERE
qs.step_name = 'scan' AND
qs.source = 'Redshift(local)' AND
qt.sequence < 320 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
sti.database = '{db_name}' AND -- this was required to not retrieve some internal redshift tables, try removing to see what happens
sui.user_name <> 'rdsdb' -- not entirely sure about this filter
GROUP BY sti.schema, sti.table, qs.table_id, qs.query_id, sui.user_name
) AS source_tables ON target_tables.query_id = source_tables.query_id
WHERE source_tables.source_table_id <> target_tables.target_table_id
ORDER BY cluster, target_schema, target_table, start_time ASC
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)
@staticmethod
def list_unload_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
SELECT
DISTINCT
sti.database as cluster,
sti.schema as source_schema,
sti."table" as source_table,
unl.file_name as filename
FROM SYS_UNLOAD_DETAIL unl
JOIN SYS_QUERY_DETAIL qd ON
unl.query_id = qd.query_id
JOIN SVV_TABLE_INFO sti ON
sti.table_id = qd.table_id
WHERE
qd.step_name = 'scan' AND
unl.start_time >= '{start_time}' AND
unl.start_time < '{end_time}' AND
sti.database = '{db_name}'
ORDER BY cluster, source_schema, source_table, filename, unl.start_time ASC
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)
# the differences vs old query are:
# * we additionally get queries like "insert into <table> values (...)" (to be confirmed it is not a problem)
# * querytxt do not contain newlines (to be confirmed it is not a problem)
@staticmethod
def list_insert_create_queries_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
SELECT
DISTINCT
cluster,
target_schema,
target_table,
username,
query_id,
LISTAGG(CASE WHEN LEN(RTRIM(querytxt)) = 0 THEN querytxt ELSE RTRIM(querytxt) END) WITHIN GROUP (ORDER BY sequence) AS ddl,
ANY_VALUE(session_id) AS session_id,
starttime AS timestamp
FROM
(
SELECT
DISTINCT
qd.table_id AS target_table_id,
sti.schema AS target_schema,
sti.table AS target_table,
sti.database AS cluster,
sui.user_name AS username,
qt."text" AS querytxt,
qd.query_id AS query_id,
qd.start_time AS starttime,
qt.sequence AS sequence,
qt.session_id AS session_id
FROM
SYS_QUERY_DETAIL qd
JOIN SVV_TABLE_INFO sti ON sti.table_id = qd.table_id
LEFT JOIN SVV_USER_INFO sui ON sui.user_id = qd.user_id
LEFT JOIN SYS_QUERY_TEXT qt ON qt.query_id = qd.query_id
LEFT JOIN SYS_LOAD_DETAIL ld ON ld.query_id = qd.query_id
WHERE
qd.step_name = 'insert' AND
sui.user_name <> 'rdsdb' AND
cluster = '{db_name}' AND
qd.start_time >= '{start_time}' AND
qd.start_time < '{end_time}' AND
qt.sequence < 320 AND -- See https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl-statementtext
ld.query_id IS NULL -- filter out queries which are also stored in SYS_LOAD_DETAIL
ORDER BY target_table ASC
)
GROUP BY cluster, query_id, target_schema, target_table, username, starttime
ORDER BY cluster, query_id, target_schema, target_table, starttime ASC
;
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)
# when loading from s3 using prefix with a single file it produces 2 lines (for file and just directory) - also
# behaves like this when run in the old way
@staticmethod
def list_copy_commands_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct
"schema" as target_schema,
"table" as target_table,
c.file_name
from
SYS_QUERY_DETAIL as si
join SYS_LOAD_DETAIL as c on
si.query_id = c.query_id
join SVV_TABLE_INFO sti on
sti.table_id = si.table_id
where
database = '{db_name}'
and si.start_time >= '{start_time}'
and si.start_time < '{end_time}'
order by target_schema, target_table, si.start_time asc
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
end_time=end_time.strftime(redshift_datetime_format),
)
# handles "create table IF ..." statements wrong probably - "create command" field contains only "create table if" in such cases
# also similar happens if for example table name contains special characters quoted with " i.e. "test-table1"
# it is also worth noting that "query_type" field from SYS_QUERY_HISTORY could be probably used to improve many
# of complicated queries in this file
@staticmethod
def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
start_time_str: str = start_time.strftime(redshift_datetime_format)
end_time_str: str = end_time.strftime(redshift_datetime_format)
return rf"""-- DataHub Redshift Source temp table DDL query
SELECT
*
FROM
(
SELECT
session_id,
transaction_id,
start_time,
userid,
query_text,
REGEXP_REPLACE(REGEXP_SUBSTR(REGEXP_REPLACE(query_text,'\\\\n','\\n'), '(CREATE(?:[\\n\\s\\t]+(?:temp|temporary))?(?:[\\n\\s\\t]+)table(?:[\\n\\s\\t]+)[^\\n\\s\\t()-]+)', 0, 1, 'ipe'),'[\\n\\s\\t]+',' ',1,'p') AS create_command,
ROW_NUMBER() OVER (
PARTITION BY query_text
ORDER BY start_time DESC
) rn
FROM
(
SELECT
qh.session_id AS session_id,
qh.transaction_id AS transaction_id,
qh.start_time AS start_time,
qh.user_id AS userid,
LISTAGG(qt."text") WITHIN GROUP (ORDER BY sequence) AS query_text
FROM
SYS_QUERY_HISTORY qh
LEFT JOIN SYS_QUERY_TEXT qt on qt.query_id = qh.query_id
WHERE
query_type IN ('DDL', 'CTAS', 'OTHER', 'COMMAND')
AND qh.start_time >= '{start_time_str}'
AND qh.start_time < '{end_time_str}'
AND qt.sequence < 320
GROUP BY qh.start_time, qh.session_id, qh.transaction_id, qh.user_id
ORDER BY qh.start_time, qh.session_id, qh.transaction_id, qh.user_id ASC
)
WHERE
(
create_command ILIKE '%create temp table %'
OR create_command ILIKE '%create temporary table %'
-- we want to get all the create table statements and not just temp tables if non temp table is created and dropped in the same transaction
OR create_command ILIKE '%create table %'
)
-- Redshift creates temp tables with the following names: volt_tt_%. We need to filter them out.
AND query_text NOT ILIKE '%CREATE TEMP TABLE volt_tt_%'
AND create_command NOT ILIKE '%CREATE TEMP TABLE volt_tt_%'
)
WHERE
rn = 1
;
"""
# new approach does not include "COPY" commands
@staticmethod
def usage_query(start_time: str, end_time: str, database: str) -> str:
return f"""
SELECT transaction_id,
session_id,
start_time,
query_text
FROM sys_query_history SYS
WHERE SYS.status = 'success'
AND SYS.query_type = 'DDL'
AND SYS.database_name = '{db_name}'
AND SYS.start_time >= '{start_time_str}'
AND SYS.end_time < '{end_time_str}'
AND SYS.query_text ILIKE 'alter table % rename to %'
"""
SELECT
DISTINCT
qh.user_id as userid,
qh.query_id as query,
sui.user_name as username,
qd.table_id as tbl,
qh.query_text as querytxt, -- truncated to 4k characters, join with SYS_QUERY_TEXT to build full query using "sequence" number field
sti.database as database,
sti.schema as schema,
sti.table as table,
qh.start_time as starttime,
qh.end_time as endtime
FROM
SYS_QUERY_DETAIL qd
JOIN SVV_TABLE_INFO sti ON qd.table_id = sti.table_id
JOIN SVV_USER_INFO sui ON sui.user_id = qd.user_id
JOIN SYS_QUERY_HISTORY qh ON qh.query_id = qd.query_id
WHERE
qd.step_name = 'scan'
AND qh.start_time >= '{start_time}'
AND qh.start_time < '{end_time}'
AND sti.database = '{database}'
AND qh.status = 'success'
ORDER BY qh.end_time DESC
;
""".strip()
@staticmethod
def operation_aspect_query(start_time: str, end_time: str) -> str:
return f"""
SELECT
DISTINCT
qd.user_id AS userid,
qd.query_id AS query,
qd.rows AS rows,
sui.user_name AS username,
qd.table_id AS tbl,
qh.query_text AS querytxt, -- truncated to 4k characters, join with SYS_QUERY_TEXT to build full query using "sequence" number field
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
qh.start_time AS starttime,
qh.end_time AS endtime,
qd.step_name as operation_type
FROM
(
SELECT
qd.user_id,
qd.query_id,
sum(qd.output_rows) as rows,
qd.table_id,
qd.step_name
FROM
SYS_QUERY_DETAIL qd
WHERE
qd.step_name in ('insert', 'delete')
AND qd.start_time >= '{start_time}'
AND qd.start_time < '{end_time}'
GROUP BY qd.user_id, qd.query_id, qd.table_id, qd.step_name
) qd
JOIN SVV_TABLE_INFO sti ON qd.table_id = sti.table_id
JOIN SVV_USER_INFO sui ON sui.user_id = qd.user_id
JOIN SYS_QUERY_HISTORY qh ON qh.query_id = qd.query_id
WHERE
qh.status = 'success'
ORDER BY
endtime DESC
""".strip()

View File

@ -340,6 +340,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
run_id=self.ctx.run_id,
)
self.data_dictionary = RedshiftDataDictionary(
is_serverless=self.config.is_serverless
)
self.db_tables: Dict[str, Dict[str, List[RedshiftTable]]] = {}
self.db_views: Dict[str, Dict[str, List[RedshiftView]]] = {}
self.db_schemas: Dict[str, Dict[str, RedshiftSchema]] = {}
@ -469,7 +473,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
yield from profiler.get_workunits(self.db_tables)
def process_schemas(self, connection, database):
for schema in RedshiftDataDictionary.get_schemas(
for schema in self.data_dictionary.get_schemas(
conn=connection, database=database
):
if not is_schema_allowed(
@ -520,7 +524,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
)
schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {}
schema_columns[schema.name] = RedshiftDataDictionary.get_columns_for_schema(
schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema(
conn=connection, schema=schema
)
@ -700,7 +704,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
data_type = resolve_postgres_modified_type(col.data_type.lower())
if any(type in col.data_type.lower() for type in ["struct", "array"]):
fields = RedshiftDataDictionary.get_schema_fields_for_column(col)
fields = self.data_dictionary.get_schema_fields_for_column(col)
schema_fields.extend(fields)
else:
field = SchemaField(
@ -829,7 +833,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
)
def cache_tables_and_views(self, connection, database):
tables, views = RedshiftDataDictionary.get_tables_and_views(conn=connection)
tables, views = self.data_dictionary.get_tables_and_views(conn=connection)
for schema in tables:
if not is_schema_allowed(
self.config.schema_pattern,

View File

@ -5,7 +5,11 @@ from typing import Dict, Iterable, List, Optional, Tuple
import redshift_connector
from datahub.ingestion.source.redshift.query import RedshiftQuery
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
@ -105,6 +109,11 @@ class AlterTableRow:
# this is a class to be a proxy to query Redshift
class RedshiftDataDictionary:
def __init__(self, is_serverless):
self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if is_serverless:
self.queries = RedshiftServerlessQuery()
@staticmethod
def get_query_result(
conn: redshift_connector.Connection, query: str
@ -119,7 +128,7 @@ class RedshiftDataDictionary:
def get_databases(conn: redshift_connector.Connection) -> List[str]:
cursor = RedshiftDataDictionary.get_query_result(
conn,
RedshiftQuery.list_databases,
RedshiftCommonQuery.list_databases,
)
dbs = cursor.fetchall()
@ -132,7 +141,7 @@ class RedshiftDataDictionary:
) -> List[RedshiftSchema]:
cursor = RedshiftDataDictionary.get_query_result(
conn,
RedshiftQuery.list_schemas.format(database_name=database),
RedshiftCommonQuery.list_schemas.format(database_name=database),
)
schemas = cursor.fetchall()
@ -150,12 +159,12 @@ class RedshiftDataDictionary:
for schema in schemas
]
@staticmethod
def enrich_tables(
self,
conn: redshift_connector.Connection,
) -> Dict[str, Dict[str, RedshiftExtraTableMeta]]:
cur = RedshiftDataDictionary.get_query_result(
conn, RedshiftQuery.additional_table_metadata
conn, self.queries.additional_table_metadata_query()
)
field_names = [i[0] for i in cur.description]
db_table_metadata = cur.fetchall()
@ -182,8 +191,8 @@ class RedshiftDataDictionary:
return table_enrich
@staticmethod
def get_tables_and_views(
self,
conn: redshift_connector.Connection,
) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]:
tables: Dict[str, List[RedshiftTable]] = {}
@ -191,9 +200,11 @@ class RedshiftDataDictionary:
# This query needs to run separately as we can't join with the main query because it works with
# driver only functions.
enriched_table = RedshiftDataDictionary.enrich_tables(conn)
enriched_table = self.enrich_tables(conn)
cur = RedshiftDataDictionary.get_query_result(conn, RedshiftQuery.list_tables)
cur = RedshiftDataDictionary.get_query_result(
conn, RedshiftCommonQuery.list_tables
)
field_names = [i[0] for i in cur.description]
db_tables = cur.fetchall()
logger.info(f"Fetched {len(db_tables)} tables/views from Redshift")
@ -328,7 +339,7 @@ class RedshiftDataDictionary:
) -> Dict[str, List[RedshiftColumn]]:
cursor = RedshiftDataDictionary.get_query_result(
conn,
RedshiftQuery.list_columns.format(schema_name=schema.name),
RedshiftCommonQuery.list_columns.format(schema_name=schema.name),
)
table_columns: Dict[str, List[RedshiftColumn]] = {}

View File

@ -19,6 +19,11 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.query import (
RedshiftCommonQuery,
RedshiftProvisionedQuery,
RedshiftServerlessQuery,
)
from datahub.ingestion.source.redshift.redshift_schema import (
RedshiftTable,
RedshiftView,
@ -40,94 +45,6 @@ logger = logging.getLogger(__name__)
REDSHIFT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
# Add this join to the sql query for more metrics on completed queries
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html
# this sql query joins stl_scan over table info,
# querytext, and user info to get usage stats
# using non-LEFT joins here to limit the results to
# queries run by the user on user-defined tables.
REDSHIFT_USAGE_QUERY_TEMPLATE: str = """
SELECT DISTINCT ss.userid as userid,
ss.query as query,
sui.usename as username,
ss.tbl as tbl,
sq.querytxt as querytxt,
sti.database as database,
sti.schema as schema,
sti.table as table,
sq.starttime as starttime,
sq.endtime as endtime
FROM stl_scan ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{start_time}'
AND ss.starttime < '{end_time}'
AND sti.database = '{database}'
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
""".strip()
REDSHIFT_OPERATION_ASPECT_QUERY_TEMPLATE: str = """
(SELECT
DISTINCT si.userid AS userid,
si.query AS query,
si.rows AS rows,
sui.usename AS username,
si.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'insert' AS operation_type
FROM
(select userid, query, sum(rows) as rows, tbl
from stl_insert si
where si.rows > 0
AND si.starttime >= '{start_time}'
AND si.starttime < '{end_time}'
group by userid, query, tbl
) as si
JOIN svv_table_info sti ON si.tbl = sti.table_id
JOIN stl_query sq ON si.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
sq.aborted = 0)
UNION
(SELECT
DISTINCT sd.userid AS userid,
sd.query AS query,
sd.rows AS ROWS,
sui.usename AS username,
sd.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'delete' AS operation_type
FROM
(select userid, query, sum(rows) as rows, tbl
from stl_delete sd
where sd.rows > 0
AND sd.starttime >= '{start_time}'
AND sd.starttime < '{end_time}'
group by userid, query, tbl
) as sd
JOIN svv_table_info sti ON sd.tbl = sti.table_id
JOIN stl_query sq ON sd.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
sq.aborted = 0)
ORDER BY
endtime DESC
""".strip()
RedshiftTableRef = str
AggregatedDataset = GenericAggregatedDataset[RedshiftTableRef]
AggregatedAccessEvents = Dict[datetime, Dict[RedshiftTableRef, AggregatedDataset]]
@ -200,6 +117,10 @@ class RedshiftUsageExtractor:
self.report.usage_end_time,
) = self.get_time_window()
self.queries: RedshiftCommonQuery = RedshiftProvisionedQuery()
if self.config.is_serverless:
self.queries = RedshiftServerlessQuery()
def get_time_window(self) -> Tuple[datetime, datetime]:
if self.redundant_run_skip_handler:
return self.redundant_run_skip_handler.suggest_run_time_window(
@ -273,7 +194,7 @@ class RedshiftUsageExtractor:
# Generate aggregate events
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION)
query: str = REDSHIFT_USAGE_QUERY_TEMPLATE.format(
query: str = self.queries.usage_query(
start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
database=self.config.database,
@ -300,7 +221,7 @@ class RedshiftUsageExtractor:
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]],
) -> Iterable[MetadataWorkUnit]:
# Generate access events
query: str = REDSHIFT_OPERATION_ASPECT_QUERY_TEMPLATE.format(
query: str = self.queries.operation_aspect_query(
start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
)