mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-01 05:03:10 +00:00
MINOR: Apply multi threading in View Lineage Processing (#17922)
This commit is contained in:
parent
b8488a487f
commit
eda9ecc83f
@ -11,8 +11,11 @@
|
|||||||
"""
|
"""
|
||||||
Generic source to build SQL connectors.
|
Generic source to build SQL connectors.
|
||||||
"""
|
"""
|
||||||
|
import math
|
||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from typing import Any, Iterable, List, Optional, Tuple, Union, cast
|
from typing import Any, Iterable, List, Optional, Tuple, Union, cast
|
||||||
|
|
||||||
@ -60,6 +63,7 @@ from metadata.ingestion.api.models import Either
|
|||||||
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
|
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
|
||||||
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
|
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
|
||||||
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
|
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
|
||||||
|
from metadata.ingestion.models.topology import Queue
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
from metadata.ingestion.source.connections import (
|
from metadata.ingestion.source.connections import (
|
||||||
get_connection,
|
get_connection,
|
||||||
@ -73,6 +77,7 @@ from metadata.ingestion.source.models import TableView
|
|||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.db_utils import get_view_lineage
|
from metadata.utils.db_utils import get_view_lineage
|
||||||
from metadata.utils.execution_time_tracker import (
|
from metadata.utils.execution_time_tracker import (
|
||||||
|
ExecutionTimeTrackerContextMap,
|
||||||
calculate_execution_time,
|
calculate_execution_time,
|
||||||
calculate_execution_time_generator,
|
calculate_execution_time_generator,
|
||||||
)
|
)
|
||||||
@ -583,9 +588,78 @@ class CommonDbSourceService(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@calculate_execution_time_generator()
|
def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||||
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
|
"""Multithread Processing of a Node"""
|
||||||
logger.info("Processing Lineage for Views")
|
|
||||||
|
views_list = list(self.context.get().table_views or [])
|
||||||
|
views_length = len(views_list)
|
||||||
|
|
||||||
|
if views_length != 0:
|
||||||
|
chunksize = int(math.ceil(views_length / self.source_config.threads))
|
||||||
|
chunks = [
|
||||||
|
views_list[i : i + chunksize] for i in range(0, views_length, chunksize)
|
||||||
|
]
|
||||||
|
|
||||||
|
thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads)
|
||||||
|
queue = Queue()
|
||||||
|
|
||||||
|
futures = [
|
||||||
|
thread_pool.submit(
|
||||||
|
self._process_view_def_chunk,
|
||||||
|
chunk,
|
||||||
|
queue,
|
||||||
|
self.context.get_current_thread_id(),
|
||||||
|
)
|
||||||
|
for chunk in chunks
|
||||||
|
]
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if queue.has_tasks():
|
||||||
|
yield from queue.process()
|
||||||
|
|
||||||
|
else:
|
||||||
|
if not futures:
|
||||||
|
break
|
||||||
|
|
||||||
|
for i, future in enumerate(futures):
|
||||||
|
if future.done():
|
||||||
|
future.result()
|
||||||
|
futures.pop(i)
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
def _process_view_def_chunk(
|
||||||
|
self, chunk: List[TableView], queue: Queue, thread_id: int
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Process a chunk of view definitions
|
||||||
|
"""
|
||||||
|
self.context.copy_from(thread_id)
|
||||||
|
ExecutionTimeTrackerContextMap().copy_from_parent(thread_id)
|
||||||
|
for view in [v for v in chunk if v.view_definition is not None]:
|
||||||
|
for lineage in get_view_lineage(
|
||||||
|
view=view,
|
||||||
|
metadata=self.metadata,
|
||||||
|
service_name=self.context.get().database_service,
|
||||||
|
connection_type=self.service_connection.type.value,
|
||||||
|
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
|
||||||
|
):
|
||||||
|
if lineage.right is not None:
|
||||||
|
queue.put(
|
||||||
|
Either(
|
||||||
|
right=OMetaLineageRequest(
|
||||||
|
lineage_request=lineage.right,
|
||||||
|
override_lineage=self.source_config.overrideViewLineage,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
queue.put(lineage)
|
||||||
|
|
||||||
|
def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||||
|
"""
|
||||||
|
Process view definitions serially
|
||||||
|
"""
|
||||||
for view in [
|
for view in [
|
||||||
v for v in self.context.get().table_views if v.view_definition is not None
|
v for v in self.context.get().table_views if v.view_definition is not None
|
||||||
]:
|
]:
|
||||||
@ -606,6 +680,14 @@ class CommonDbSourceService(
|
|||||||
else:
|
else:
|
||||||
yield lineage
|
yield lineage
|
||||||
|
|
||||||
|
@calculate_execution_time_generator()
|
||||||
|
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||||
|
logger.info("Processing Lineage for Views")
|
||||||
|
if self.source_config.threads > 1:
|
||||||
|
yield from self.multithread_process_view_lineage()
|
||||||
|
else:
|
||||||
|
yield from self._process_view_def_serial()
|
||||||
|
|
||||||
def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
|
def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
|
||||||
"""
|
"""
|
||||||
Search the referred table for foreign constraints
|
Search the referred table for foreign constraints
|
||||||
|
@ -65,9 +65,6 @@ from metadata.ingestion.source.database.external_table_lineage_mixin import (
|
|||||||
from metadata.ingestion.source.database.incremental_metadata_extraction import (
|
from metadata.ingestion.source.database.incremental_metadata_extraction import (
|
||||||
IncrementalConfig,
|
IncrementalConfig,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.source.database.life_cycle_query_mixin import (
|
|
||||||
LifeCycleQueryMixin,
|
|
||||||
)
|
|
||||||
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
|
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
|
||||||
from metadata.ingestion.source.database.snowflake.models import (
|
from metadata.ingestion.source.database.snowflake.models import (
|
||||||
STORED_PROC_LANGUAGE_MAP,
|
STORED_PROC_LANGUAGE_MAP,
|
||||||
@ -148,7 +145,6 @@ SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys
|
|||||||
|
|
||||||
|
|
||||||
class SnowflakeSource(
|
class SnowflakeSource(
|
||||||
LifeCycleQueryMixin,
|
|
||||||
StoredProcedureMixin,
|
StoredProcedureMixin,
|
||||||
ExternalTableLineageMixin,
|
ExternalTableLineageMixin,
|
||||||
CommonDbSourceService,
|
CommonDbSourceService,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user