Fix ADLS parquet reads (#12840)

* Fix ADLS parquet reads

* Generalize service methods

* Fix tests
This commit is contained in:
Pere Miquel Brull 2023-08-15 04:57:06 +02:00 committed by GitHub
parent b23b637dc1
commit a183fc67e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 66 deletions

View File

@ -30,4 +30,4 @@ def truncate_column_name(col_name: str):
To allow ingestion of tables we set name to truncate to 128 characters if its longer To allow ingestion of tables we set name to truncate to 128 characters if its longer
and use displayName to have the raw column name and use displayName to have the raw column name
""" """
return col_name[:128] return col_name[:256]

View File

@ -16,7 +16,6 @@ from datetime import datetime, timedelta
from enum import Enum from enum import Enum
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
from pandas import DataFrame
from pydantic import ValidationError from pydantic import ValidationError
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
@ -25,7 +24,6 @@ from metadata.generated.schema.entity.data.container import (
Container, Container,
ContainerDataModel, ContainerDataModel,
) )
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import ( from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
S3Config, S3Config,
) )
@ -44,22 +42,21 @@ from metadata.generated.schema.metadataIngestion.workflow import (
) )
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.storage.s3.models import ( from metadata.ingestion.source.storage.s3.models import (
S3BucketResponse, S3BucketResponse,
S3ContainerDetails, S3ContainerDetails,
) )
from metadata.ingestion.source.storage.storage_service import StorageServiceSource from metadata.ingestion.source.storage.storage_service import (
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper KEY_SEPARATOR,
from metadata.utils.datalake.datalake_utils import fetch_dataframe OPENMETADATA_TEMPLATE_FILE_NAME,
StorageServiceSource,
)
from metadata.utils.filters import filter_by_container from metadata.utils.filters import filter_by_container
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
S3_CLIENT_ROOT_RESPONSE = "Contents" S3_CLIENT_ROOT_RESPONSE = "Contents"
OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json"
S3_KEY_SEPARATOR = "/"
class S3Metric(Enum): class S3Metric(Enum):
@ -107,7 +104,7 @@ class S3Source(StorageServiceSource):
if metadata_config: if metadata_config:
for metadata_entry in metadata_config.entries: for metadata_entry in metadata_config.entries:
logger.info( logger.info(
f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating structured container" f"and generating structured container"
) )
structured_container: Optional[ structured_container: Optional[
@ -163,14 +160,18 @@ class S3Source(StorageServiceSource):
if sample_key: if sample_key:
columns = self._get_columns( columns = self._get_columns(
bucket_name=bucket_name, container_name=bucket_name,
sample_key=sample_key, sample_key=sample_key,
metadata_entry=metadata_entry, metadata_entry=metadata_entry,
config_source=S3Config(
securityConfig=self.service_connection.awsConfig
),
client=self.s3_client,
) )
if columns: if columns:
return S3ContainerDetails( return S3ContainerDetails(
name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR), name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", prefix=f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}",
creation_date=bucket_response.creation_date.isoformat(), creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric( number_of_objects=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS
@ -186,35 +187,6 @@ class S3Source(StorageServiceSource):
) )
return None return None
def _get_columns(
self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry
) -> Optional[List[Column]]:
"""
Get the columns from the file and partition information
"""
extracted_cols = self.extract_column_definitions(bucket_name, sample_key)
return (metadata_entry.partitionColumns or []) + (extracted_cols or [])
def extract_column_definitions(
self, bucket_name: str, sample_key: str
) -> List[Column]:
"""
Extract Column related metadata from s3
"""
data_structure_details = fetch_dataframe(
config_source=S3Config(securityConfig=self.service_connection.awsConfig),
client=self.s3_client,
file_fqn=DatalakeTableSchemaWrapper(
key=sample_key, bucket_name=bucket_name
),
)
columns = []
if isinstance(data_structure_details, DataFrame):
columns = DatalakeSource.get_columns(data_structure_details)
if isinstance(data_structure_details, list) and data_structure_details:
columns = DatalakeSource.get_columns(data_structure_details[0])
return columns
def fetch_buckets(self) -> List[S3BucketResponse]: def fetch_buckets(self) -> List[S3BucketResponse]:
results: List[S3BucketResponse] = [] results: List[S3BucketResponse] = []
try: try:
@ -310,7 +282,7 @@ class S3Source(StorageServiceSource):
) -> S3ContainerDetails: ) -> S3ContainerDetails:
return S3ContainerDetails( return S3ContainerDetails(
name=bucket_response.name, name=bucket_response.name,
prefix=S3_KEY_SEPARATOR, prefix=KEY_SEPARATOR,
creation_date=bucket_response.creation_date.isoformat(), creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric( number_of_objects=self._fetch_metric(
bucket_name=bucket_response.name, metric=S3Metric.NUMBER_OF_OBJECTS bucket_name=bucket_response.name, metric=S3Metric.NUMBER_OF_OBJECTS
@ -318,21 +290,10 @@ class S3Source(StorageServiceSource):
size=self._fetch_metric( size=self._fetch_metric(
bucket_name=bucket_response.name, metric=S3Metric.BUCKET_SIZE_BYTES bucket_name=bucket_response.name, metric=S3Metric.BUCKET_SIZE_BYTES
), ),
file_formats=[], # TODO should we fetch some random files by extension here? Would it be valuable info? file_formats=[],
data_model=None, data_model=None,
) )
@staticmethod
def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]:
"""
Return a prefix if we have structure data to read
"""
result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}"
if not metadata_entry.structureFormat:
logger.warning(f"Ignoring un-structured metadata entry {result}")
return None
return result
def _get_sample_file_path( def _get_sample_file_path(
self, bucket_name: str, metadata_entry: MetadataEntry self, bucket_name: str, metadata_entry: MetadataEntry
) -> Optional[str]: ) -> Optional[str]:

View File

@ -12,7 +12,9 @@
Base class for ingesting Object Storage services Base class for ingesting Object Storage services
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Iterable from typing import Any, Iterable, List, Optional
from pandas import DataFrame
from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.api.data.createContainer import CreateContainerRequest
from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.container import Container
@ -23,6 +25,9 @@ from metadata.generated.schema.entity.services.storageService import (
StorageConnection, StorageConnection,
StorageService, StorageService,
) )
from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import (
MetadataEntry,
)
from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import (
StorageServiceMetadataPipeline, StorageServiceMetadataPipeline,
) )
@ -39,10 +44,18 @@ from metadata.ingestion.models.topology import (
) )
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.ingestion.source.database.glue.models import Column
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.readers.models import ConfigSource
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json"
KEY_SEPARATOR = "/"
class StorageServiceTopology(ServiceTopology): class StorageServiceTopology(ServiceTopology):
@ -124,13 +137,17 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
return self.status return self.status
def close(self): def close(self):
pass """
By default, nothing needs to be closed
"""
def get_services(self) -> Iterable[WorkflowSource]: def get_services(self) -> Iterable[WorkflowSource]:
yield self.config yield self.config
def prepare(self): def prepare(self):
pass """
By default, nothing needs to be taken care of when loading the source
"""
def test_connection(self) -> None: def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection) test_connection_fn = get_test_connection_fn(self.service_connection)
@ -140,3 +157,54 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
yield self.metadata.get_create_service_from_source( yield self.metadata.get_create_service_from_source(
entity=StorageService, config=config entity=StorageService, config=config
) )
@staticmethod
def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]:
"""
Return a prefix if we have structure data to read
"""
result = f"{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
if not metadata_entry.structureFormat:
logger.warning(f"Ignoring un-structured metadata entry {result}")
return None
return result
@staticmethod
def extract_column_definitions(
bucket_name: str,
sample_key: str,
config_source: ConfigSource,
client: Any,
) -> List[Column]:
"""
Extract Column related metadata from s3
"""
data_structure_details = fetch_dataframe(
config_source=config_source,
client=client,
file_fqn=DatalakeTableSchemaWrapper(
key=sample_key, bucket_name=bucket_name
),
)
columns = []
if isinstance(data_structure_details, DataFrame):
columns = DatalakeSource.get_columns(data_structure_details)
if isinstance(data_structure_details, list) and data_structure_details:
columns = DatalakeSource.get_columns(data_structure_details[0])
return columns
def _get_columns(
self,
container_name: str,
sample_key: str,
metadata_entry: MetadataEntry,
config_source: ConfigSource,
client: Any,
) -> Optional[List[Column]]:
"""
Get the columns from the file and partition information
"""
extracted_cols = self.extract_column_definitions(
container_name, sample_key, config_source, client
)
return (metadata_entry.partitionColumns or []) + (extracted_cols or [])

View File

@ -101,7 +101,7 @@ class ParquetDataFrameReader(DataFrameReader):
storage_options = return_azure_storage_options(self.config_source) storage_options = return_azure_storage_options(self.config_source)
account_url = AZURE_PATH.format( account_url = AZURE_PATH.format(
bucket_name=bucket_name, bucket_name=bucket_name,
account_name=storage_options.get("account_name"), account_name=self.config_source.securityConfig.accountName,
key=key, key=key,
) )
dataframe = pd.read_parquet(account_url, storage_options=storage_options) dataframe = pd.read_parquet(account_url, storage_options=storage_options)

View File

@ -96,6 +96,26 @@ MOCK_S3_OBJECT_FILE_PATHS = {
} }
def _get_str_value(data):
if data:
if isinstance(data, str):
return data
return data.value
return None
def custom_column_compare(self, other):
return (
self.name == other.name
and self.displayName == other.displayName
and self.description == other.description
and self.dataTypeDisplay == other.dataTypeDisplay
and self.children == other.children
and _get_str_value(self.arrayDataType) == _get_str_value(other.arrayDataType)
)
class StorageUnitTest(TestCase): class StorageUnitTest(TestCase):
""" """
Validate how we work with object store metadata Validate how we work with object store metadata
@ -207,17 +227,17 @@ class StorageUnitTest(TestCase):
name=ColumnName(__root__="transaction_id"), name=ColumnName(__root__="transaction_id"),
dataType=DataType.INT, dataType=DataType.INT,
dataTypeDisplay="INT", dataTypeDisplay="INT",
dataLength=1, displayName="transaction_id",
), ),
Column( Column(
name=ColumnName(__root__="transaction_value"), name=ColumnName(__root__="transaction_value"),
dataType=DataType.INT, dataType=DataType.INT,
dataTypeDisplay="INT", dataTypeDisplay="INT",
dataLength=1, displayName="transaction_value",
), ),
] ]
self.object_store_source.extract_column_definitions = ( self.object_store_source.extract_column_definitions = (
lambda bucket_name, sample_key: columns lambda bucket_name, sample_key, config_source, client: columns
) )
entity_ref = EntityReference(id=uuid.uuid4(), type="container") entity_ref = EntityReference(id=uuid.uuid4(), type="container")
@ -249,7 +269,7 @@ class StorageUnitTest(TestCase):
# Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation # Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation
def test_extract_column_definitions(self): def test_extract_column_definitions(self):
with patch( with patch(
"metadata.ingestion.source.storage.s3.metadata.fetch_dataframe", "metadata.ingestion.source.storage.storage_service.fetch_dataframe",
return_value=[ return_value=[
pd.DataFrame.from_dict( pd.DataFrame.from_dict(
[ [
@ -260,23 +280,28 @@ class StorageUnitTest(TestCase):
) )
], ],
): ):
Column.__eq__ = custom_column_compare
self.assertListEqual( self.assertListEqual(
[ [
Column( Column(
name=ColumnName(__root__="transaction_id"), name=ColumnName(__root__="transaction_id"),
dataType=DataType.INT, dataType=DataType.INT,
dataTypeDisplay="INT", dataTypeDisplay="INT",
dataLength=1, displayName="transaction_id",
), ),
Column( Column(
name=ColumnName(__root__="transaction_value"), name=ColumnName(__root__="transaction_value"),
dataType=DataType.INT, dataType=DataType.INT,
dataTypeDisplay="INT", dataTypeDisplay="INT",
dataLength=1, displayName="transaction_value",
), ),
], ],
self.object_store_source.extract_column_definitions( self.object_store_source.extract_column_definitions(
bucket_name="test_bucket", sample_key="test.json" bucket_name="test_bucket",
sample_key="test.json",
config_source=None,
client=None,
), ),
) )