Fix Redshift Usage (#3624)

This commit is contained in:
Ayush Shah 2022-03-24 10:37:25 +05:30 committed by GitHub
parent f104ab632b
commit a87ed206c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 840 additions and 840 deletions

File diff suppressed because it is too large Load Diff

View File

@ -90,7 +90,7 @@ class RedshiftUsageSource(Source[TableQuery]):
starttime=str(row["starttime"]),
endtime=str(row["endtime"]),
analysis_date=str(self.analysis_date),
database=row["database"],
database=row["schema"],
aborted=row["aborted"],
sql=row["querytxt"],
service_name=self.config.service_name,

View File

@ -27,7 +27,7 @@ from metadata.ingestion.stage.file import FileStageConfig
logger = logging.getLogger(__name__)
def get_table_column_join(table, table_aliases, joins):
def get_table_column_join(table, table_aliases, joins, database):
table_column = None
joined_with = []
for join in joins:
@ -35,7 +35,11 @@ def get_table_column_join(table, table_aliases, joins):
if "." not in join:
continue
jtable, column = join.split(".")[-2:]
if table == jtable or jtable in table_aliases:
if (
table == jtable
or table == f"{database}.{jtable}"
or jtable in table_aliases
):
table_column = TableColumn(
table=table_aliases[jtable] if jtable in table_aliases else jtable,
column=column,
@ -93,14 +97,20 @@ class TableUsageStage(Stage[QueryParserData]):
if record.columns.get("join") is not None:
table_usage_count.joins.append(
get_table_column_join(
table, record.tables_aliases, record.columns["join"]
table,
record.tables_aliases,
record.columns["join"],
record.database,
)
)
else:
joins = []
if record.columns.get("join") is not None:
tbl_column_join = get_table_column_join(
table, record.tables_aliases, record.columns["join"]
table,
record.tables_aliases,
record.columns["join"],
record.database,
)
if tbl_column_join is not None:
joins.append(tbl_column_join)
@ -114,12 +124,8 @@ class TableUsageStage(Stage[QueryParserData]):
)
except Exception as exc:
logger.error("Error in staging record {}".format(exc))
self.status.failures(
f"Table: {table}", "Error in staging record {}".format(exc)
)
self.table_usage[table] = table_usage_count
logger.info(f"Successfully record staged for {table}")
self.status.records_status(f"Table: {table}")
def get_status(self):
return self.status