From 0b2d7aecfd1e02ca9354879fb067e39016c802df Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 24 Oct 2023 11:38:58 +0200 Subject: [PATCH] Fix table_usage pydantic validation & unbound var (#13691) --- .../src/metadata/ingestion/stage/table_usage.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 29e6d4ae75d..7b1da9ee542 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -18,7 +18,7 @@ import os import shutil import traceback from pathlib import Path -from typing import Iterable, List, Tuple +from typing import Iterable, List, Optional, Tuple from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createQuery import CreateQueryRequest @@ -78,12 +78,19 @@ class TableUsageStage(Stage): logger.info(f"Creating the directory to store staging data in {location}") location.mkdir(parents=True, exist_ok=True) - def _get_user_entity(self, username: str) -> Tuple[List[str], List[str]]: + def _get_user_entity( + self, username: str + ) -> Tuple[Optional[List[str]], Optional[List[str]]]: + """ + From the user received in the query history call - who executed the query in the db - + return if we find any users in OM that match, plus the user that we found in the db record. + """ if username: user = self.metadata.get_by_name(entity=User, fqn=username) if user: - return [user.fullyQualifiedName.__root__], [] - return [], [username] + return [user.fullyQualifiedName.__root__], [username] + return None, [username] + return None, None def _add_sql_query(self, record, table): users, used_by = self._get_user_entity(record.userName) @@ -139,6 +146,7 @@ class TableUsageStage(Stage): sqlQueries=[], databaseSchema=parsed_data.databaseSchema, ) + self.table_usage[(table, parsed_data.date)] = table_usage_count except Exception as exc: yield Either( @@ -148,7 +156,6 @@ class TableUsageStage(Stage): stack_trace=traceback.format_exc(), ) ) - self.table_usage[(table, parsed_data.date)] = table_usage_count yield Either(right=table) def _run(self, record: QueryParserData) -> Iterable[Either[str]]: