Datalake parquet files fix (#9860)

This commit is contained in:
Mayur Singal 2023-01-23 17:44:34 +05:30 committed by GitHub
parent 8f958675f1
commit 5145dced25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 15 additions and 6 deletions

View File

@ -236,6 +236,7 @@ class CommonDbSourceService(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
skip_es_search=True,
)
if filter_by_table(
self.source_config.tableFilterPattern,

View File

@ -381,6 +381,7 @@ class DatabaseServiceSource(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
skip_es_search=True,
)
return self.get_tag_by_fqn(entity_fqn=table_fqn)
@ -413,6 +414,7 @@ class DatabaseServiceSource(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_request.name.__root__,
skip_es_search=True,
)
self.database_source_state.add(table_fqn)

View File

@ -287,6 +287,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
skip_es_search=True,
)
if filter_by_table(
@ -315,6 +316,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
skip_es_search=True,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
@ -348,6 +350,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
skip_es_search=True,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
@ -531,7 +534,7 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unexpected exception to get S3 files from [{bucket_name}]: {exc}"
f"Unexpected exception to get S3 file [{key}] from bucket [{bucket_name}]: {exc}"
)
return None

View File

@ -595,6 +595,7 @@ class SampleDataSource(
database_name=db.name.__root__,
schema_name=schema.name.__root__,
table_name=table_request.name.__root__,
skip_es_search=True,
)
location_fqn = fqn.build(

View File

@ -90,11 +90,13 @@ def read_parquet_from_s3(client: Any, key: str, bucket_name: str):
"""
Read the parquet file from the s3 bucket and return a dataframe
"""
s3_fs = s3fs.S3FileSystem(
key=client.awsAccessKeyId,
secret=client.awsSecretAccessKey.get_secret_value(),
token=client.awsSessionToken,
)
s3_fs = s3fs.S3FileSystem()
if client.awsAccessKeyId and client.awsSecretAccessKeyx:
s3_fs = s3fs.S3FileSystem(
key=client.awsAccessKeyId,
secret=client.awsSecretAccessKey.get_secret_value(),
token=client.awsSessionToken,
)
bucket_uri = f"s3://{bucket_name}/{key}"
dataset = pq.ParquetDataset(bucket_uri, filesystem=s3_fs)
return [dataset.read_pandas().to_pandas()]