feat(ingest/unity): Use ThreadPoolExecutor for CLL (#8952)

This commit is contained in:
Andrew Sikowitz 2023-10-06 10:06:36 -04:00 committed by GitHub
parent ea87febd2b
commit c80da8f949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 36 deletions

View File

@ -181,6 +181,17 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ", description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
) )
column_lineage_column_limit: int = pydantic.Field(
default=300,
description="Limit the number of columns to get column level lineage. ",
)
lineage_max_workers: int = pydantic.Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for column lineage thread pool executor. Set to 1 to disable.",
hidden_from_docs=True,
)
include_usage_statistics: bool = Field( include_usage_statistics: bool = Field(
default=True, default=True,
description="Generate usage statistics.", description="Generate usage statistics.",

View File

@ -233,9 +233,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
body={"table_name": table_name, "column_name": column_name}, body={"table_name": table_name, "column_name": column_name},
) )
def table_lineage( def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
self, table: Table, include_entity_lineage: bool
) -> Optional[dict]:
# Lineage endpoint doesn't exists on 2.1 version # Lineage endpoint doesn't exists on 2.1 version
try: try:
response: dict = self.list_lineages_by_table( response: dict = self.list_lineages_by_table(
@ -256,34 +254,30 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
for item in response.get("downstreams") or []: for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []: for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"]) table.downstream_notebooks.add(notebook["notebook_id"])
return response
except Exception as e: except Exception as e:
logger.error(f"Error getting lineage: {e}") logger.warning(
return None f"Error getting lineage on table {table.ref}: {e}", exc_info=True
def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None:
try:
table_lineage = self.table_lineage(
table, include_entity_lineage=include_entity_lineage
) )
if table_lineage:
for column in table.columns:
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
def get_column_lineage(self, table: Table, column_name: str) -> None:
try:
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column_name,
)
for item in response.get("upstream_cols") or []:
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column_name, []
).append(item["name"])
except Exception as e: except Exception as e:
logger.error(f"Error getting lineage: {e}") logger.warning(
f"Error getting column lineage on table {table.ref}, column {column_name}: {e}",
exc_info=True,
)
@staticmethod @staticmethod
def _escape_sequence(value: str) -> str: def _escape_sequence(value: str) -> str:

View File

@ -18,6 +18,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile") table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile")
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook") notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")
num_column_lineage_skipped_column_count: int = 0
num_queries: int = 0 num_queries: int = 0
num_queries_dropped_parse_failure: int = 0 num_queries_dropped_parse_failure: int = 0
num_queries_missing_table: int = 0 # Can be due to pattern filter num_queries_missing_table: int = 0 # Can be due to pattern filter

View File

@ -1,6 +1,7 @@
import logging import logging
import re import re
import time import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta from datetime import timedelta
from typing import Dict, Iterable, List, Optional, Set, Union from typing import Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin from urllib.parse import urljoin
@ -367,15 +368,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
ownership = self._create_table_ownership_aspect(table) ownership = self._create_table_ownership_aspect(table)
data_platform_instance = self._create_data_platform_instance_aspect() data_platform_instance = self._create_data_platform_instance_aspect()
if self.config.include_column_lineage: lineage = self.ingest_lineage(table)
self.unity_catalog_api_proxy.get_column_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
elif self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
lineage = self._generate_lineage_aspect(dataset_urn, table)
if self.config.include_notebooks: if self.config.include_notebooks:
for notebook_id in table.downstream_notebooks: for notebook_id in table.downstream_notebooks:
@ -401,6 +394,28 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
) )
] ]
def ingest_lineage(self, table: Table) -> Optional[UpstreamLineageClass]:
if self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
if self.config.include_column_lineage and table.upstreams:
if len(table.columns) > self.config.column_lineage_column_limit:
self.report.num_column_lineage_skipped_column_count += 1
with ThreadPoolExecutor(
max_workers=self.config.lineage_max_workers
) as executor:
for column in table.columns[: self.config.column_lineage_column_limit]:
executor.submit(
self.unity_catalog_api_proxy.get_column_lineage,
table,
column.name,
)
return self._generate_lineage_aspect(self.gen_dataset_urn(table.ref), table)
def _generate_lineage_aspect( def _generate_lineage_aspect(
self, dataset_urn: str, table: Table self, dataset_urn: str, table: Table
) -> Optional[UpstreamLineageClass]: ) -> Optional[UpstreamLineageClass]: