diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 1c81b8f9be6..941a5f0a8fe 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -36,6 +36,7 @@ from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.logger import ingestion_logger +from metadata.utils.lru_cache import LRUCache from metadata.utils.sql_lineage import ( get_column_fqn, get_lineage_by_query, @@ -44,6 +45,8 @@ from metadata.utils.sql_lineage import ( logger = ingestion_logger() +LRU_CACHE_SIZE = 4096 + class MetadataUsageSinkConfig(ConfigModel): filename: str @@ -85,8 +88,11 @@ class MetadataUsageBulkSink(BulkSink): for query in queries if "create" in query.query.lower() or "insert" in query.query.lower() ] + seen_queries = LRUCache(LRU_CACHE_SIZE) for query in create_or_insert_queries: + if query in seen_queries: + continue lineages = get_lineage_by_query( self.metadata, query=query, @@ -97,6 +103,7 @@ class MetadataUsageBulkSink(BulkSink): for lineage in lineages or []: created_lineage = self.metadata.add_lineage(lineage) logger.info(f"Successfully added Lineage {created_lineage}") + seen_queries.put(query, None) # None because it really doesn't matter. def __populate_table_usage_map( self, table_entity: Table, table_usage: TableUsageCount @@ -259,12 +266,12 @@ class MetadataUsageBulkSink(BulkSink): def __get_table_joins( self, table_entity: Table, table_usage: TableUsageCount ) -> TableJoins: - table_joins: TableJoins = TableJoins( - columnJoins=[], directTableJoins=[], startDate=table_usage.date - ) """ Method to get Table Joins """ + table_joins: TableJoins = TableJoins( + columnJoins=[], directTableJoins=[], startDate=table_usage.date + ) column_joins_dict = {} for column_join in table_usage.joins: joined_with = {} diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index 4b025605685..a905f6e4763 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -31,9 +31,12 @@ from metadata.generated.schema.entity.data.table import ( from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.lru_cache import LRUCache logger = ometa_logger() +LRU_CACHE_SIZE = 4096 + class OMetaTableMixin: """ @@ -128,11 +131,14 @@ class OMetaTableMixin: :param table: Table Entity to update :param table_queries: SqlQuery to add """ + seen_queries = LRUCache(LRU_CACHE_SIZE) for query in table_queries: - self.client.put( - f"{self.get_suffix(Table)}/{table.id.__root__}/tableQuery", - data=query.json(), - ) + if query.query not in seen_queries: + self.client.put( + f"{self.get_suffix(Table)}/{table.id.__root__}/tableQuery", + data=query.json(), + ) + seen_queries.put(query.query, None) def publish_table_usage( self, table: Table, table_usage_request: UsageRequest diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index 602552f1fa6..f03ddc54173 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -92,7 +92,7 @@ class UsageSource(Source[TableQuery], ABC): datetime.utcnow() if not query_dict.get("start_time") else datetime.strptime( - query_dict.get("start_time"), "%Y-%m-%d %H:%M:%S" + query_dict.get("start_time"), "%Y-%m-%d %H:%M:%S.%f" ) ) query_list.append( @@ -101,7 +101,7 @@ class UsageSource(Source[TableQuery], ABC): userName=query_dict.get("user_name", ""), startTime=query_dict.get("start_time", ""), endTime=query_dict.get("end_time", ""), - analysisDate=analysis_date.date(), + analysisDate=analysis_date, aborted=self.get_aborted_status(query_dict), databaseName=self.get_database_name(query_dict), serviceName=self.config.serviceName, diff --git a/ingestion/src/metadata/utils/lru_cache.py b/ingestion/src/metadata/utils/lru_cache.py new file mode 100644 index 00000000000..0964e6d4ca3 --- /dev/null +++ b/ingestion/src/metadata/utils/lru_cache.py @@ -0,0 +1,43 @@ +""" +LRU cache +""" + +from collections import OrderedDict + + +class LRUCache: + """Least Recently Used cache""" + + def __init__(self, capacity: int) -> None: + self._cache = OrderedDict() + self.capacity = capacity + + def get(self, key): + """ + Returns the value associated to `key` if it exists, + updating the cache usage. + Raises `KeyError` if `key doesn't exist in the cache. + """ + self._cache.move_to_end(key) + return self._cache[key] + + def put(self, key, value) -> None: + """ + Assigns `value` to `key`, overwriting `key` if it already exists + in the cache and updating the cache usage. + If the size of the cache grows above capacity, pops the least used + element. + """ + self._cache[key] = value + self._cache.move_to_end(key) + if len(self._cache) > self.capacity: + self._cache.popitem(last=False) + + def __contains__(self, key) -> bool: + if key not in self._cache: + return False + self._cache.move_to_end(key) + return True + + def __len__(self) -> int: + return len(self._cache) diff --git a/ingestion/src/metadata/utils/sql_lineage.py b/ingestion/src/metadata/utils/sql_lineage.py index b89489b0780..9dd53d239c6 100644 --- a/ingestion/src/metadata/utils/sql_lineage.py +++ b/ingestion/src/metadata/utils/sql_lineage.py @@ -28,10 +28,13 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.helpers import get_formatted_entity_name from metadata.utils.logger import utils_logger +from metadata.utils.lru_cache import LRUCache logger = utils_logger() column_lineage_map = {} +LRU_CACHE_SIZE = 4096 + def split_raw_table_name(database: str, raw_name: str) -> dict: database_schema = None @@ -53,6 +56,9 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: return tbl_column.fullyQualifiedName.__root__ +search_cache = LRUCache(LRU_CACHE_SIZE) + + def search_table_entities( metadata: OpenMetadata, service_name: str, @@ -65,25 +71,30 @@ def search_table_entities( It uses ES to build the FQN if we miss some info and will run a request against the API to find the Entity. """ - try: - table_fqns = fqn.build( - metadata, - entity_type=Table, - service_name=service_name, - database_name=database, - schema_name=database_schema, - table_name=table, - fetch_multiple_entities=True, - ) - table_entities: Optional[List[Table]] = [] - for table_fqn in table_fqns or []: - table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) - if table_entity: - table_entities.append(table_entity) - return table_entities - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err) + search_tuple = (service_name, database, database_schema, table) + if search_tuple in search_cache: + return search_cache.get(search_tuple) + else: + try: + table_fqns = fqn.build( + metadata, + entity_type=Table, + service_name=service_name, + database_name=database, + schema_name=database_schema, + table_name=table, + fetch_multiple_entities=True, + ) + table_entities: Optional[List[Table]] = [] + for table_fqn in table_fqns or []: + table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn) + if table_entity: + table_entities.append(table_entity) + search_cache.put(search_tuple, table_entities) + return table_entities + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) def get_table_entities_from_query( @@ -176,7 +187,6 @@ def _create_lineage_by_table_name( """ try: - from_table_entities = get_table_entities_from_query( metadata=metadata, service_name=service_name, diff --git a/ingestion/tests/unit/metadata/utils/test_lru_cache.py b/ingestion/tests/unit/metadata/utils/test_lru_cache.py new file mode 100644 index 00000000000..acd6e4a580f --- /dev/null +++ b/ingestion/tests/unit/metadata/utils/test_lru_cache.py @@ -0,0 +1,50 @@ +"""Tests for the LRU cache class""" + +import pytest + +from metadata.utils.lru_cache import LRUCache + + +class TestLRUCache: + def test_create_cache(self) -> None: + cache = LRUCache(2) + cache.put(1, 1) + + def test_get_fails_if_key_doesnt_exist(self) -> None: + cache = LRUCache(2) + with pytest.raises(KeyError): + cache.get(1) + + def test_putting_an_element_increases_cache_size(self) -> None: + cache = LRUCache(2) + assert len(cache) == 0 + cache.put(1, None) + cache.put(2, None) + assert len(cache) == 2 + + def test_contains_determines_if_an_element_exists(self) -> None: + cache = LRUCache(2) + cache.put(1, 1) + assert 1 in cache + assert 2 not in cache + + def test_putting_over_capacity_rotates_cache(self) -> None: + cache = LRUCache(2) + cache.put(1, None) + cache.put(2, None) + cache.put(3, None) + assert 1 not in cache + + def test_interacting_with_a_key_makes_it_used(self) -> None: + cache = LRUCache(2) + cache.put(1, None) + cache.put(2, None) + 1 in cache + cache.put(3, None) + assert 1 in cache + assert 2 not in cache + + def test_getting_an_existing_key_returns_the_associated_element(self) -> None: + cache = LRUCache(2) + cache.put(1, 2) + assert cache.get(1) == 2