diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 0adb9e61d28..171c6c57762 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -32,12 +32,9 @@ def get_table_column_join(table, table_aliases, joins): joined_with = [] for join in joins: try: - if "." in join: - if join.count(".") < 3: - jtable, column = join.split(".") - else: - jtable, column = join.split(".")[2:] - + if "." not in join: + continue + jtable, column = join.split(".")[-2:] if table == jtable or jtable in table_aliases: table_column = TableColumn( table=table_aliases[jtable] if jtable in table_aliases else jtable, @@ -86,13 +83,14 @@ class TableUsageStage(Stage[QueryParserData]): return cls(ctx, config, metadata_config) def stage_record(self, record: QueryParserData) -> None: - if record is not None: - for table in record.tables: - table_usage_count = None - if table in self.table_usage.keys(): - table_usage_count = self.table_usage.get(table) + if record is None: + return None + for table in record.tables: + try: + table_usage_count = self.table_usage.get(table) + if table_usage_count is not None: table_usage_count.count = table_usage_count.count + 1 - if "join" in record.columns: + if record.columns.get("join") is not None: table_usage_count.joins.append( get_table_column_join( table, record.tables_aliases, record.columns["join"] @@ -100,7 +98,7 @@ class TableUsageStage(Stage[QueryParserData]): ) else: joins = [] - if "join" in record.columns: + if record.columns.get("join") is not None: tbl_column_join = get_table_column_join( table, record.tables_aliases, record.columns["join"] ) @@ -114,7 +112,9 @@ class TableUsageStage(Stage[QueryParserData]): joins=joins, service_name=record.service_name, ) - self.table_usage[table] = table_usage_count + except Exception as exc: + logger.error("Error in staging record {}".format(exc)) + self.table_usage[table] = table_usage_count def get_status(self): return self.status