mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	[TASK-6241] Cache ES ingestion when ingesting usage (#6276)
* Cache ingestion * Uncomment secrets manager * Fix experiment stuff * Fix style * Add LRU cache * Add tests * Fix code smell
This commit is contained in:
		
							parent
							
								
									4d4a2fc2cf
								
							
						
					
					
						commit
						756dae5605
					
				@ -36,6 +36,7 @@ from metadata.ingestion.ometa.client import APIError
 | 
			
		||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | 
			
		||||
from metadata.utils import fqn
 | 
			
		||||
from metadata.utils.logger import ingestion_logger
 | 
			
		||||
from metadata.utils.lru_cache import LRUCache
 | 
			
		||||
from metadata.utils.sql_lineage import (
 | 
			
		||||
    get_column_fqn,
 | 
			
		||||
    get_lineage_by_query,
 | 
			
		||||
@ -44,6 +45,8 @@ from metadata.utils.sql_lineage import (
 | 
			
		||||
 | 
			
		||||
logger = ingestion_logger()
 | 
			
		||||
 | 
			
		||||
LRU_CACHE_SIZE = 4096
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MetadataUsageSinkConfig(ConfigModel):
 | 
			
		||||
    filename: str
 | 
			
		||||
@ -85,8 +88,11 @@ class MetadataUsageBulkSink(BulkSink):
 | 
			
		||||
            for query in queries
 | 
			
		||||
            if "create" in query.query.lower() or "insert" in query.query.lower()
 | 
			
		||||
        ]
 | 
			
		||||
        seen_queries = LRUCache(LRU_CACHE_SIZE)
 | 
			
		||||
 | 
			
		||||
        for query in create_or_insert_queries:
 | 
			
		||||
            if query in seen_queries:
 | 
			
		||||
                continue
 | 
			
		||||
            lineages = get_lineage_by_query(
 | 
			
		||||
                self.metadata,
 | 
			
		||||
                query=query,
 | 
			
		||||
@ -97,6 +103,7 @@ class MetadataUsageBulkSink(BulkSink):
 | 
			
		||||
            for lineage in lineages or []:
 | 
			
		||||
                created_lineage = self.metadata.add_lineage(lineage)
 | 
			
		||||
                logger.info(f"Successfully added Lineage {created_lineage}")
 | 
			
		||||
            seen_queries.put(query, None)  # None because it really doesn't matter.
 | 
			
		||||
 | 
			
		||||
    def __populate_table_usage_map(
 | 
			
		||||
        self, table_entity: Table, table_usage: TableUsageCount
 | 
			
		||||
@ -259,12 +266,12 @@ class MetadataUsageBulkSink(BulkSink):
 | 
			
		||||
    def __get_table_joins(
 | 
			
		||||
        self, table_entity: Table, table_usage: TableUsageCount
 | 
			
		||||
    ) -> TableJoins:
 | 
			
		||||
        table_joins: TableJoins = TableJoins(
 | 
			
		||||
            columnJoins=[], directTableJoins=[], startDate=table_usage.date
 | 
			
		||||
        )
 | 
			
		||||
        """
 | 
			
		||||
        Method to get Table Joins
 | 
			
		||||
        """
 | 
			
		||||
        table_joins: TableJoins = TableJoins(
 | 
			
		||||
            columnJoins=[], directTableJoins=[], startDate=table_usage.date
 | 
			
		||||
        )
 | 
			
		||||
        column_joins_dict = {}
 | 
			
		||||
        for column_join in table_usage.joins:
 | 
			
		||||
            joined_with = {}
 | 
			
		||||
 | 
			
		||||
@ -31,9 +31,12 @@ from metadata.generated.schema.entity.data.table import (
 | 
			
		||||
from metadata.generated.schema.type.usageRequest import UsageRequest
 | 
			
		||||
from metadata.ingestion.ometa.client import REST
 | 
			
		||||
from metadata.ingestion.ometa.utils import ometa_logger
 | 
			
		||||
from metadata.utils.lru_cache import LRUCache
 | 
			
		||||
 | 
			
		||||
logger = ometa_logger()
 | 
			
		||||
 | 
			
		||||
LRU_CACHE_SIZE = 4096
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class OMetaTableMixin:
 | 
			
		||||
    """
 | 
			
		||||
@ -128,11 +131,14 @@ class OMetaTableMixin:
 | 
			
		||||
        :param table: Table Entity to update
 | 
			
		||||
        :param table_queries: SqlQuery to add
 | 
			
		||||
        """
 | 
			
		||||
        seen_queries = LRUCache(LRU_CACHE_SIZE)
 | 
			
		||||
        for query in table_queries:
 | 
			
		||||
            self.client.put(
 | 
			
		||||
                f"{self.get_suffix(Table)}/{table.id.__root__}/tableQuery",
 | 
			
		||||
                data=query.json(),
 | 
			
		||||
            )
 | 
			
		||||
            if query.query not in seen_queries:
 | 
			
		||||
                self.client.put(
 | 
			
		||||
                    f"{self.get_suffix(Table)}/{table.id.__root__}/tableQuery",
 | 
			
		||||
                    data=query.json(),
 | 
			
		||||
                )
 | 
			
		||||
                seen_queries.put(query.query, None)
 | 
			
		||||
 | 
			
		||||
    def publish_table_usage(
 | 
			
		||||
        self, table: Table, table_usage_request: UsageRequest
 | 
			
		||||
 | 
			
		||||
@ -92,7 +92,7 @@ class UsageSource(Source[TableQuery], ABC):
 | 
			
		||||
                        datetime.utcnow()
 | 
			
		||||
                        if not query_dict.get("start_time")
 | 
			
		||||
                        else datetime.strptime(
 | 
			
		||||
                            query_dict.get("start_time"), "%Y-%m-%d %H:%M:%S"
 | 
			
		||||
                            query_dict.get("start_time"), "%Y-%m-%d %H:%M:%S.%f"
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
                    query_list.append(
 | 
			
		||||
@ -101,7 +101,7 @@ class UsageSource(Source[TableQuery], ABC):
 | 
			
		||||
                            userName=query_dict.get("user_name", ""),
 | 
			
		||||
                            startTime=query_dict.get("start_time", ""),
 | 
			
		||||
                            endTime=query_dict.get("end_time", ""),
 | 
			
		||||
                            analysisDate=analysis_date.date(),
 | 
			
		||||
                            analysisDate=analysis_date,
 | 
			
		||||
                            aborted=self.get_aborted_status(query_dict),
 | 
			
		||||
                            databaseName=self.get_database_name(query_dict),
 | 
			
		||||
                            serviceName=self.config.serviceName,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										43
									
								
								ingestion/src/metadata/utils/lru_cache.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								ingestion/src/metadata/utils/lru_cache.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,43 @@
 | 
			
		||||
"""
 | 
			
		||||
LRU cache
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
from collections import OrderedDict
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class LRUCache:
 | 
			
		||||
    """Least Recently Used cache"""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, capacity: int) -> None:
 | 
			
		||||
        self._cache = OrderedDict()
 | 
			
		||||
        self.capacity = capacity
 | 
			
		||||
 | 
			
		||||
    def get(self, key):
 | 
			
		||||
        """
 | 
			
		||||
        Returns the value associated to `key` if it exists,
 | 
			
		||||
        updating the cache usage.
 | 
			
		||||
        Raises `KeyError` if `key doesn't exist in the cache.
 | 
			
		||||
        """
 | 
			
		||||
        self._cache.move_to_end(key)
 | 
			
		||||
        return self._cache[key]
 | 
			
		||||
 | 
			
		||||
    def put(self, key, value) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Assigns `value` to `key`, overwriting `key` if it already exists
 | 
			
		||||
        in the cache and updating the cache usage.
 | 
			
		||||
        If the size of the cache grows above capacity, pops the least used
 | 
			
		||||
        element.
 | 
			
		||||
        """
 | 
			
		||||
        self._cache[key] = value
 | 
			
		||||
        self._cache.move_to_end(key)
 | 
			
		||||
        if len(self._cache) > self.capacity:
 | 
			
		||||
            self._cache.popitem(last=False)
 | 
			
		||||
 | 
			
		||||
    def __contains__(self, key) -> bool:
 | 
			
		||||
        if key not in self._cache:
 | 
			
		||||
            return False
 | 
			
		||||
        self._cache.move_to_end(key)
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    def __len__(self) -> int:
 | 
			
		||||
        return len(self._cache)
 | 
			
		||||
@ -28,10 +28,13 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | 
			
		||||
from metadata.utils import fqn
 | 
			
		||||
from metadata.utils.helpers import get_formatted_entity_name
 | 
			
		||||
from metadata.utils.logger import utils_logger
 | 
			
		||||
from metadata.utils.lru_cache import LRUCache
 | 
			
		||||
 | 
			
		||||
logger = utils_logger()
 | 
			
		||||
column_lineage_map = {}
 | 
			
		||||
 | 
			
		||||
LRU_CACHE_SIZE = 4096
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def split_raw_table_name(database: str, raw_name: str) -> dict:
 | 
			
		||||
    database_schema = None
 | 
			
		||||
@ -53,6 +56,9 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
 | 
			
		||||
            return tbl_column.fullyQualifiedName.__root__
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
search_cache = LRUCache(LRU_CACHE_SIZE)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def search_table_entities(
 | 
			
		||||
    metadata: OpenMetadata,
 | 
			
		||||
    service_name: str,
 | 
			
		||||
@ -65,25 +71,30 @@ def search_table_entities(
 | 
			
		||||
    It uses ES to build the FQN if we miss some info and will run
 | 
			
		||||
    a request against the API to find the Entity.
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        table_fqns = fqn.build(
 | 
			
		||||
            metadata,
 | 
			
		||||
            entity_type=Table,
 | 
			
		||||
            service_name=service_name,
 | 
			
		||||
            database_name=database,
 | 
			
		||||
            schema_name=database_schema,
 | 
			
		||||
            table_name=table,
 | 
			
		||||
            fetch_multiple_entities=True,
 | 
			
		||||
        )
 | 
			
		||||
        table_entities: Optional[List[Table]] = []
 | 
			
		||||
        for table_fqn in table_fqns or []:
 | 
			
		||||
            table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
 | 
			
		||||
            if table_entity:
 | 
			
		||||
                table_entities.append(table_entity)
 | 
			
		||||
        return table_entities
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
        logger.debug(traceback.format_exc())
 | 
			
		||||
        logger.error(err)
 | 
			
		||||
    search_tuple = (service_name, database, database_schema, table)
 | 
			
		||||
    if search_tuple in search_cache:
 | 
			
		||||
        return search_cache.get(search_tuple)
 | 
			
		||||
    else:
 | 
			
		||||
        try:
 | 
			
		||||
            table_fqns = fqn.build(
 | 
			
		||||
                metadata,
 | 
			
		||||
                entity_type=Table,
 | 
			
		||||
                service_name=service_name,
 | 
			
		||||
                database_name=database,
 | 
			
		||||
                schema_name=database_schema,
 | 
			
		||||
                table_name=table,
 | 
			
		||||
                fetch_multiple_entities=True,
 | 
			
		||||
            )
 | 
			
		||||
            table_entities: Optional[List[Table]] = []
 | 
			
		||||
            for table_fqn in table_fqns or []:
 | 
			
		||||
                table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
 | 
			
		||||
                if table_entity:
 | 
			
		||||
                    table_entities.append(table_entity)
 | 
			
		||||
            search_cache.put(search_tuple, table_entities)
 | 
			
		||||
            return table_entities
 | 
			
		||||
        except Exception as err:
 | 
			
		||||
            logger.debug(traceback.format_exc())
 | 
			
		||||
            logger.error(err)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_table_entities_from_query(
 | 
			
		||||
@ -176,7 +187,6 @@ def _create_lineage_by_table_name(
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
 | 
			
		||||
        from_table_entities = get_table_entities_from_query(
 | 
			
		||||
            metadata=metadata,
 | 
			
		||||
            service_name=service_name,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										50
									
								
								ingestion/tests/unit/metadata/utils/test_lru_cache.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								ingestion/tests/unit/metadata/utils/test_lru_cache.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,50 @@
 | 
			
		||||
"""Tests for the LRU cache class"""
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
from metadata.utils.lru_cache import LRUCache
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestLRUCache:
 | 
			
		||||
    def test_create_cache(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        cache.put(1, 1)
 | 
			
		||||
 | 
			
		||||
    def test_get_fails_if_key_doesnt_exist(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        with pytest.raises(KeyError):
 | 
			
		||||
            cache.get(1)
 | 
			
		||||
 | 
			
		||||
    def test_putting_an_element_increases_cache_size(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        assert len(cache) == 0
 | 
			
		||||
        cache.put(1, None)
 | 
			
		||||
        cache.put(2, None)
 | 
			
		||||
        assert len(cache) == 2
 | 
			
		||||
 | 
			
		||||
    def test_contains_determines_if_an_element_exists(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        cache.put(1, 1)
 | 
			
		||||
        assert 1 in cache
 | 
			
		||||
        assert 2 not in cache
 | 
			
		||||
 | 
			
		||||
    def test_putting_over_capacity_rotates_cache(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        cache.put(1, None)
 | 
			
		||||
        cache.put(2, None)
 | 
			
		||||
        cache.put(3, None)
 | 
			
		||||
        assert 1 not in cache
 | 
			
		||||
 | 
			
		||||
    def test_interacting_with_a_key_makes_it_used(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        cache.put(1, None)
 | 
			
		||||
        cache.put(2, None)
 | 
			
		||||
        1 in cache
 | 
			
		||||
        cache.put(3, None)
 | 
			
		||||
        assert 1 in cache
 | 
			
		||||
        assert 2 not in cache
 | 
			
		||||
 | 
			
		||||
    def test_getting_an_existing_key_returns_the_associated_element(self) -> None:
 | 
			
		||||
        cache = LRUCache(2)
 | 
			
		||||
        cache.put(1, 2)
 | 
			
		||||
        assert cache.get(1) == 2
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user