From 5145dced25f338af2eeb8d0683a903b8dad1b388 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 23 Jan 2023 17:44:34 +0530 Subject: [PATCH] Datalake parquet files fix (#9860) --- .../ingestion/source/database/common_db_source.py | 1 + .../ingestion/source/database/database_service.py | 2 ++ .../ingestion/source/database/datalake/metadata.py | 5 ++++- .../ingestion/source/database/sample_data.py | 1 + ingestion/src/metadata/utils/s3_utils.py | 12 +++++++----- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 7d8b9d300bc..c1a99385bc5 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -236,6 +236,7 @@ class CommonDbSourceService( database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_name, + skip_es_search=True, ) if filter_by_table( self.source_config.tableFilterPattern, diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 0ee18c98d1a..9b1a036b537 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -381,6 +381,7 @@ class DatabaseServiceSource( database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_name, + skip_es_search=True, ) return self.get_tag_by_fqn(entity_fqn=table_fqn) @@ -413,6 +414,7 @@ class DatabaseServiceSource( database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_request.name.__root__, + skip_es_search=True, ) self.database_source_state.add(table_fqn) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 6f938f6af86..291b1ed03a6 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -287,6 +287,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_name, + skip_es_search=True, ) if filter_by_table( @@ -315,6 +316,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_name, + skip_es_search=True, ) if filter_by_table( self.config.sourceConfig.config.tableFilterPattern, @@ -348,6 +350,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- database_name=self.context.database.name.__root__, schema_name=self.context.database_schema.name.__root__, table_name=table_name, + skip_es_search=True, ) if filter_by_table( self.config.sourceConfig.config.tableFilterPattern, @@ -531,7 +534,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public- except Exception as exc: logger.debug(traceback.format_exc()) logger.error( - f"Unexpected exception to get S3 files from [{bucket_name}]: {exc}" + f"Unexpected exception to get S3 file [{key}] from bucket [{bucket_name}]: {exc}" ) return None diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 300f45f6193..1d8616e6737 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -595,6 +595,7 @@ class SampleDataSource( database_name=db.name.__root__, schema_name=schema.name.__root__, table_name=table_request.name.__root__, + skip_es_search=True, ) location_fqn = fqn.build( diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index e8ec91c2607..9d961f23589 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -90,11 +90,13 @@ def read_parquet_from_s3(client: Any, key: str, bucket_name: str): """ Read the parquet file from the s3 bucket and return a dataframe """ - s3_fs = s3fs.S3FileSystem( - key=client.awsAccessKeyId, - secret=client.awsSecretAccessKey.get_secret_value(), - token=client.awsSessionToken, - ) + s3_fs = s3fs.S3FileSystem() + if client.awsAccessKeyId and client.awsSecretAccessKeyx: + s3_fs = s3fs.S3FileSystem( + key=client.awsAccessKeyId, + secret=client.awsSecretAccessKey.get_secret_value(), + token=client.awsSessionToken, + ) bucket_uri = f"s3://{bucket_name}/{key}" dataset = pq.ParquetDataset(bucket_uri, filesystem=s3_fs) return [dataset.read_pandas().to_pandas()]