diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 29e6d4ae75d..7b1da9ee542 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -18,7 +18,7 @@ import os import shutil import traceback from pathlib import Path -from typing import Iterable, List, Tuple +from typing import Iterable, List, Optional, Tuple from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createQuery import CreateQueryRequest @@ -78,12 +78,19 @@ class TableUsageStage(Stage): logger.info(f"Creating the directory to store staging data in {location}") location.mkdir(parents=True, exist_ok=True) - def _get_user_entity(self, username: str) -> Tuple[List[str], List[str]]: + def _get_user_entity( + self, username: str + ) -> Tuple[Optional[List[str]], Optional[List[str]]]: + """ + From the user received in the query history call - who executed the query in the db - + return if we find any users in OM that match, plus the user that we found in the db record. + """ if username: user = self.metadata.get_by_name(entity=User, fqn=username) if user: - return [user.fullyQualifiedName.__root__], [] - return [], [username] + return [user.fullyQualifiedName.__root__], [username] + return None, [username] + return None, None def _add_sql_query(self, record, table): users, used_by = self._get_user_entity(record.userName) @@ -139,6 +146,7 @@ class TableUsageStage(Stage): sqlQueries=[], databaseSchema=parsed_data.databaseSchema, ) + self.table_usage[(table, parsed_data.date)] = table_usage_count except Exception as exc: yield Either( @@ -148,7 +156,6 @@ class TableUsageStage(Stage): stack_trace=traceback.format_exc(), ) ) - self.table_usage[(table, parsed_data.date)] = table_usage_count yield Either(right=table) def _run(self, record: QueryParserData) -> Iterable[Either[str]]: