fix(ingest/delta-lake): skip file count if require_files is false (#11611)

This commit is contained in:
Mayuri Nehate 2024-10-14 18:21:05 +05:30 committed by GitHub
parent 3387110b41
commit b74ba11d93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 7 deletions

View File

@ -223,15 +223,14 @@ class DeltaLakeSource(Source):
) )
customProperties = { customProperties = {
"number_of_files": str(get_file_count(delta_table)),
"partition_columns": str(delta_table.metadata().partition_columns), "partition_columns": str(delta_table.metadata().partition_columns),
"table_creation_time": str(delta_table.metadata().created_time), "table_creation_time": str(delta_table.metadata().created_time),
"id": str(delta_table.metadata().id), "id": str(delta_table.metadata().id),
"version": str(delta_table.version()), "version": str(delta_table.version()),
"location": self.source_config.complete_path, "location": self.source_config.complete_path,
} }
if not self.source_config.require_files: if self.source_config.require_files:
del customProperties["number_of_files"] # always 0 customProperties["number_of_files"] = str(get_file_count(delta_table))
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
description=delta_table.metadata().description, description=delta_table.metadata().description,

View File

@ -1,6 +1,6 @@
import datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, TypeVar, Union from typing import Any, Union
import pytest import pytest
from mlflow import MlflowClient from mlflow import MlflowClient
@ -11,8 +11,6 @@ from mlflow.store.entities import PagedList
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource
T = TypeVar("T")
@pytest.fixture @pytest.fixture
def tracking_uri(tmp_path: Path) -> str: def tracking_uri(tmp_path: Path) -> str:
@ -46,7 +44,7 @@ def model_version(
) )
def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]: def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[str]:
dummy_pages = dict( dummy_pages = dict(
page_1=PagedList(items=["a", "b"], token="page_2"), page_1=PagedList(items=["a", "b"], token="page_2"),
page_2=PagedList(items=["c", "d"], token="page_3"), page_2=PagedList(items=["c", "d"], token="page_3"),