From ebfb872e16ea6288557c50424b306640dacd305f Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Sun, 13 Nov 2022 22:09:29 +0530 Subject: [PATCH] Fix: Filter Datalake empty files (#8677) --- .../ingestion/source/database/datalake.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py index 43a2415a98f..e48f073367c 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -301,20 +301,21 @@ class DatalakeSource(DatabaseServiceSource): data_frame = self.get_gcs_files(key=table_name, bucket_name=schema_name) if isinstance(self.service_connection.configSource, S3Config): data_frame = self.get_s3_files(key=table_name, bucket_name=schema_name) - columns = self.get_columns(data_frame) - table_request = CreateTableRequest( - name=table_name, - tableType=table_type, - description="", - columns=columns, - tableConstraints=table_constraints if table_constraints else None, - databaseSchema=EntityReference( - id=self.context.database_schema.id, - type="databaseSchema", - ), - ) - yield table_request - self.register_record(table_request=table_request) + if not data_frame.empty: + columns = self.get_columns(data_frame) + table_request = CreateTableRequest( + name=table_name, + tableType=table_type, + description="", + columns=columns, + tableConstraints=table_constraints if table_constraints else None, + databaseSchema=EntityReference( + id=self.context.database_schema.id, + type="databaseSchema", + ), + ) + yield table_request + self.register_record(table_request=table_request) except Exception as exc: logger.debug(traceback.format_exc())