From a183fc67e290ac24b916af4247aec318f12cc92f Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Tue, 15 Aug 2023 04:57:06 +0200 Subject: [PATCH] Fix ADLS parquet reads (#12840) * Fix ADLS parquet reads * Generalize service methods * Fix tests --- .../source/database/column_helpers.py | 2 +- .../ingestion/source/storage/s3/metadata.py | 69 ++++------------- .../source/storage/storage_service.py | 74 ++++++++++++++++++- .../src/metadata/readers/dataframe/parquet.py | 2 +- .../unit/topology/storage/test_storage.py | 39 ++++++++-- 5 files changed, 120 insertions(+), 66 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/column_helpers.py b/ingestion/src/metadata/ingestion/source/database/column_helpers.py index 8dc260555fe..91eabe93400 100644 --- a/ingestion/src/metadata/ingestion/source/database/column_helpers.py +++ b/ingestion/src/metadata/ingestion/source/database/column_helpers.py @@ -30,4 +30,4 @@ def truncate_column_name(col_name: str): To allow ingestion of tables we set name to truncate to 128 characters if its longer and use displayName to have the raw column name """ - return col_name[:128] + return col_name[:256] diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index bcf4770fe3f..90ec0b06f3b 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -16,7 +16,6 @@ from datetime import datetime, timedelta from enum import Enum from typing import Dict, Iterable, List, Optional -from pandas import DataFrame from pydantic import ValidationError from metadata.generated.schema.api.data.createContainer import CreateContainerRequest @@ -25,7 +24,6 @@ from metadata.generated.schema.entity.data.container import ( Container, ContainerDataModel, ) -from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import ( S3Config, ) @@ -44,22 +42,21 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.ingestion.source.storage.s3.models import ( S3BucketResponse, S3ContainerDetails, ) -from metadata.ingestion.source.storage.storage_service import StorageServiceSource -from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper -from metadata.utils.datalake.datalake_utils import fetch_dataframe +from metadata.ingestion.source.storage.storage_service import ( + KEY_SEPARATOR, + OPENMETADATA_TEMPLATE_FILE_NAME, + StorageServiceSource, +) from metadata.utils.filters import filter_by_container from metadata.utils.logger import ingestion_logger logger = ingestion_logger() S3_CLIENT_ROOT_RESPONSE = "Contents" -OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json" -S3_KEY_SEPARATOR = "/" class S3Metric(Enum): @@ -107,7 +104,7 @@ class S3Source(StorageServiceSource): if metadata_config: for metadata_entry in metadata_config.entries: logger.info( - f"Extracting metadata from path {metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)} " + f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} " f"and generating structured container" ) structured_container: Optional[ @@ -163,14 +160,18 @@ class S3Source(StorageServiceSource): if sample_key: columns = self._get_columns( - bucket_name=bucket_name, + container_name=bucket_name, sample_key=sample_key, metadata_entry=metadata_entry, + config_source=S3Config( + securityConfig=self.service_connection.awsConfig + ), + client=self.s3_client, ) if columns: return S3ContainerDetails( - name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR), - prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", + name=metadata_entry.dataPath.strip(KEY_SEPARATOR), + prefix=f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}", creation_date=bucket_response.creation_date.isoformat(), number_of_objects=self._fetch_metric( bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS @@ -186,35 +187,6 @@ class S3Source(StorageServiceSource): ) return None - def _get_columns( - self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry - ) -> Optional[List[Column]]: - """ - Get the columns from the file and partition information - """ - extracted_cols = self.extract_column_definitions(bucket_name, sample_key) - return (metadata_entry.partitionColumns or []) + (extracted_cols or []) - - def extract_column_definitions( - self, bucket_name: str, sample_key: str - ) -> List[Column]: - """ - Extract Column related metadata from s3 - """ - data_structure_details = fetch_dataframe( - config_source=S3Config(securityConfig=self.service_connection.awsConfig), - client=self.s3_client, - file_fqn=DatalakeTableSchemaWrapper( - key=sample_key, bucket_name=bucket_name - ), - ) - columns = [] - if isinstance(data_structure_details, DataFrame): - columns = DatalakeSource.get_columns(data_structure_details) - if isinstance(data_structure_details, list) and data_structure_details: - columns = DatalakeSource.get_columns(data_structure_details[0]) - return columns - def fetch_buckets(self) -> List[S3BucketResponse]: results: List[S3BucketResponse] = [] try: @@ -310,7 +282,7 @@ class S3Source(StorageServiceSource): ) -> S3ContainerDetails: return S3ContainerDetails( name=bucket_response.name, - prefix=S3_KEY_SEPARATOR, + prefix=KEY_SEPARATOR, creation_date=bucket_response.creation_date.isoformat(), number_of_objects=self._fetch_metric( bucket_name=bucket_response.name, metric=S3Metric.NUMBER_OF_OBJECTS @@ -318,21 +290,10 @@ class S3Source(StorageServiceSource): size=self._fetch_metric( bucket_name=bucket_response.name, metric=S3Metric.BUCKET_SIZE_BYTES ), - file_formats=[], # TODO should we fetch some random files by extension here? Would it be valuable info? + file_formats=[], data_model=None, ) - @staticmethod - def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]: - """ - Return a prefix if we have structure data to read - """ - result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}" - if not metadata_entry.structureFormat: - logger.warning(f"Ignoring un-structured metadata entry {result}") - return None - return result - def _get_sample_file_path( self, bucket_name: str, metadata_entry: MetadataEntry ) -> Optional[str]: diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 1464502089d..d4f6de69cca 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -12,7 +12,9 @@ Base class for ingesting Object Storage services """ from abc import ABC, abstractmethod -from typing import Any, Iterable +from typing import Any, Iterable, List, Optional + +from pandas import DataFrame from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.entity.data.container import Container @@ -23,6 +25,9 @@ from metadata.generated.schema.entity.services.storageService import ( StorageConnection, StorageService, ) +from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig import ( + MetadataEntry, +) from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import ( StorageServiceMetadataPipeline, ) @@ -39,10 +44,18 @@ from metadata.ingestion.models.topology import ( ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.ingestion.source.database.datalake.metadata import DatalakeSource +from metadata.ingestion.source.database.glue.models import Column +from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper +from metadata.readers.models import ConfigSource +from metadata.utils.datalake.datalake_utils import fetch_dataframe from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +OPENMETADATA_TEMPLATE_FILE_NAME = "openmetadata.json" +KEY_SEPARATOR = "/" + class StorageServiceTopology(ServiceTopology): @@ -124,13 +137,17 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): return self.status def close(self): - pass + """ + By default, nothing needs to be closed + """ def get_services(self) -> Iterable[WorkflowSource]: yield self.config def prepare(self): - pass + """ + By default, nothing needs to be taken care of when loading the source + """ def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) @@ -140,3 +157,54 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): yield self.metadata.get_create_service_from_source( entity=StorageService, config=config ) + + @staticmethod + def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]: + """ + Return a prefix if we have structure data to read + """ + result = f"{metadata_entry.dataPath.strip(KEY_SEPARATOR)}" + if not metadata_entry.structureFormat: + logger.warning(f"Ignoring un-structured metadata entry {result}") + return None + return result + + @staticmethod + def extract_column_definitions( + bucket_name: str, + sample_key: str, + config_source: ConfigSource, + client: Any, + ) -> List[Column]: + """ + Extract Column related metadata from s3 + """ + data_structure_details = fetch_dataframe( + config_source=config_source, + client=client, + file_fqn=DatalakeTableSchemaWrapper( + key=sample_key, bucket_name=bucket_name + ), + ) + columns = [] + if isinstance(data_structure_details, DataFrame): + columns = DatalakeSource.get_columns(data_structure_details) + if isinstance(data_structure_details, list) and data_structure_details: + columns = DatalakeSource.get_columns(data_structure_details[0]) + return columns + + def _get_columns( + self, + container_name: str, + sample_key: str, + metadata_entry: MetadataEntry, + config_source: ConfigSource, + client: Any, + ) -> Optional[List[Column]]: + """ + Get the columns from the file and partition information + """ + extracted_cols = self.extract_column_definitions( + container_name, sample_key, config_source, client + ) + return (metadata_entry.partitionColumns or []) + (extracted_cols or []) diff --git a/ingestion/src/metadata/readers/dataframe/parquet.py b/ingestion/src/metadata/readers/dataframe/parquet.py index 3ac6368025a..c3f5eb4dd7f 100644 --- a/ingestion/src/metadata/readers/dataframe/parquet.py +++ b/ingestion/src/metadata/readers/dataframe/parquet.py @@ -101,7 +101,7 @@ class ParquetDataFrameReader(DataFrameReader): storage_options = return_azure_storage_options(self.config_source) account_url = AZURE_PATH.format( bucket_name=bucket_name, - account_name=storage_options.get("account_name"), + account_name=self.config_source.securityConfig.accountName, key=key, ) dataframe = pd.read_parquet(account_url, storage_options=storage_options) diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index 4b26ed0e7de..e8e1b2bfdeb 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -96,6 +96,26 @@ MOCK_S3_OBJECT_FILE_PATHS = { } +def _get_str_value(data): + if data: + if isinstance(data, str): + return data + return data.value + + return None + + +def custom_column_compare(self, other): + return ( + self.name == other.name + and self.displayName == other.displayName + and self.description == other.description + and self.dataTypeDisplay == other.dataTypeDisplay + and self.children == other.children + and _get_str_value(self.arrayDataType) == _get_str_value(other.arrayDataType) + ) + + class StorageUnitTest(TestCase): """ Validate how we work with object store metadata @@ -207,17 +227,17 @@ class StorageUnitTest(TestCase): name=ColumnName(__root__="transaction_id"), dataType=DataType.INT, dataTypeDisplay="INT", - dataLength=1, + displayName="transaction_id", ), Column( name=ColumnName(__root__="transaction_value"), dataType=DataType.INT, dataTypeDisplay="INT", - dataLength=1, + displayName="transaction_value", ), ] self.object_store_source.extract_column_definitions = ( - lambda bucket_name, sample_key: columns + lambda bucket_name, sample_key, config_source, client: columns ) entity_ref = EntityReference(id=uuid.uuid4(), type="container") @@ -249,7 +269,7 @@ class StorageUnitTest(TestCase): # Most of the parsing support are covered in test_datalake unit tests related to the Data lake implementation def test_extract_column_definitions(self): with patch( - "metadata.ingestion.source.storage.s3.metadata.fetch_dataframe", + "metadata.ingestion.source.storage.storage_service.fetch_dataframe", return_value=[ pd.DataFrame.from_dict( [ @@ -260,23 +280,28 @@ class StorageUnitTest(TestCase): ) ], ): + + Column.__eq__ = custom_column_compare self.assertListEqual( [ Column( name=ColumnName(__root__="transaction_id"), dataType=DataType.INT, dataTypeDisplay="INT", - dataLength=1, + displayName="transaction_id", ), Column( name=ColumnName(__root__="transaction_value"), dataType=DataType.INT, dataTypeDisplay="INT", - dataLength=1, + displayName="transaction_value", ), ], self.object_store_source.extract_column_definitions( - bucket_name="test_bucket", sample_key="test.json" + bucket_name="test_bucket", + sample_key="test.json", + config_source=None, + client=None, ), )