fix(ingest/snowflake): fix TypeError when columns are returned as None in snowflake query (#6404)

This commit is contained in:
Mayuri Nehate 2022-11-11 01:32:49 +05:30 committed by GitHub
parent ae2ea524a4
commit 1b739319f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -95,17 +95,24 @@ class SnowflakeUpstreamTable:
downstreamColumns: List[SnowflakeColumnWithLineage]
@classmethod
def from_dict(cls, dataset, upstreams_columns_dict, downstream_columns_dict):
def from_dict(cls, dataset, upstreams_columns_json, downstream_columns_json):
try:
upstreams_columns_list = []
downstream_columns_list = []
if upstreams_columns_json is not None:
upstreams_columns_list = json.loads(upstreams_columns_json)
if downstream_columns_json is not None:
downstream_columns_list = json.loads(downstream_columns_json)
table_with_upstreams = cls(
dataset,
[
SnowflakeColumnReference.parse_obj(col)
for col in upstreams_columns_dict
for col in upstreams_columns_list
],
[
SnowflakeColumnWithLineage.parse_obj(col)
for col in downstream_columns_dict
for col in downstream_columns_list
],
)
except ValidationError:
@ -390,8 +397,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
# (<upstream_table_name>, <json_list_of_upstream_columns>, <json_list_of_downstream_columns>)
SnowflakeUpstreamTable.from_dict(
upstream_table_name,
json.loads(db_row["UPSTREAM_TABLE_COLUMNS"]),
json.loads(db_row["DOWNSTREAM_TABLE_COLUMNS"]),
db_row["UPSTREAM_TABLE_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"],
),
)
num_edges += 1
@ -441,7 +448,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
# key is the downstream view name
self._lineage_map[view_name].update_lineage(
# (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>)
SnowflakeUpstreamTable.from_dict(view_upstream, [], [])
SnowflakeUpstreamTable.from_dict(view_upstream, None, None)
)
num_edges += 1
logger.debug(
@ -499,8 +506,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
# (<upstream_view_name>, <json_list_of_upstream_view_columns>, <json_list_of_downstream_columns>)
SnowflakeUpstreamTable.from_dict(
view_name,
json.loads(db_row["VIEW_COLUMNS"]),
json.loads(db_row["DOWNSTREAM_TABLE_COLUMNS"]),
db_row["VIEW_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"],
)
)
self.report.num_view_to_table_edges_scanned += 1