diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py
index 28a853dfe9c..872c422e801 100644
--- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py
+++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py
@@ -371,41 +371,38 @@ class DatalakeSource(DatabaseServiceSource):
yield table_name, TableType.Regular
if isinstance(self.service_connection.configSource, AzureConfig):
- files_names = self.get_tables(container_name=bucket_name)
- for file in files_names.list_blobs(name_starts_with=prefix):
- file_name = file.name
- if "/" in file.name:
- table_name = self.standardize_table_name(bucket_name, file_name)
- table_fqn = fqn.build(
- self.metadata,
- entity_type=Table,
- service_name=self.context.database_service.name.__root__,
- database_name=self.context.database.name.__root__,
- schema_name=self.context.database_schema.name.__root__,
- table_name=table_name,
- skip_es_search=True,
- )
- if filter_by_table(
- self.config.sourceConfig.config.tableFilterPattern,
- table_fqn
- if self.config.sourceConfig.config.useFqnForFiltering
- else table_name,
- ):
- self.status.filter(
- table_fqn,
- "Object Filtered Out",
- )
- continue
- if not self.check_valid_file_type(file_name):
- logger.debug(
- f"Object filtered due to unsupported file type: {file_name}"
- )
- continue
- yield file_name, TableType.Regular
+ container_client = self.client.get_container_client(bucket_name)
- def get_tables(self, container_name) -> Iterable[any]:
- tables = self.client.get_container_client(container_name)
- return tables
+ for file in container_client.list_blobs(
+ name_starts_with=prefix or None
+ ):
+ table_name = self.standardize_table_name(bucket_name, file.name)
+ table_fqn = fqn.build(
+ self.metadata,
+ entity_type=Table,
+ service_name=self.context.database_service.name.__root__,
+ database_name=self.context.database.name.__root__,
+ schema_name=self.context.database_schema.name.__root__,
+ table_name=table_name,
+ skip_es_search=True,
+ )
+ if filter_by_table(
+ self.config.sourceConfig.config.tableFilterPattern,
+ table_fqn
+ if self.config.sourceConfig.config.useFqnForFiltering
+ else table_name,
+ ):
+ self.status.filter(
+ table_fqn,
+ "Object Filtered Out",
+ )
+ continue
+ if not self.check_valid_file_type(file.name):
+ logger.debug(
+ f"Object filtered due to unsupported file type: {file.name}"
+ )
+ continue
+ yield file.name, TableType.Regular
def yield_table(
self, table_name_and_type: Tuple[str, str]
@@ -439,7 +436,6 @@ class DatalakeSource(DatabaseServiceSource):
"tenant_id": connection_args.tenantId,
"client_id": connection_args.clientId,
"client_secret": connection_args.clientSecret.get_secret_value(),
- "account_name": connection_args.accountName,
}
data_frame = self.get_azure_files(
client=self.client,
@@ -694,7 +690,6 @@ class DatalakeSource(DatabaseServiceSource):
if hasattr(data_frame, "columns"):
df_columns = list(data_frame.columns)
for column in df_columns:
-
if COMPLEX_COLUMN_SEPARATOR in column:
DatalakeSource._parse_complex_column(
data_frame,
diff --git a/ingestion/src/metadata/utils/azure_utils.py b/ingestion/src/metadata/utils/azure_utils.py
index ff9874a50e3..73a03395094 100644
--- a/ingestion/src/metadata/utils/azure_utils.py
+++ b/ingestion/src/metadata/utils/azure_utils.py
@@ -59,7 +59,7 @@ def read_csv_from_azure(
return dataframe
except Exception as exc:
logger.debug(traceback.format_exc())
- logger.warning(f"Error reading CSV from s3 - {exc}")
+ logger.warning(f"Error reading CSV from ADLS - {exc}")
return None
@@ -68,7 +68,9 @@ def read_json_from_azure(client: Any, key: str, container_name: str, sample_size
Read the json file from the azure container and return a dataframe
"""
json_text = get_file_text(client=client, key=key, container_name=container_name)
- return read_from_json(key=key, json_text=json_text, sample_size=sample_size)
+ return read_from_json(
+ key=key, json_text=json_text, sample_size=sample_size, decode=True
+ )
def read_parquet_from_azure(
diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py
index d9d636e40ae..efb36241d7d 100644
--- a/ingestion/src/metadata/utils/gcs_utils.py
+++ b/ingestion/src/metadata/utils/gcs_utils.py
@@ -80,7 +80,7 @@ def read_json_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame:
Read the json file from the gcs bucket and return a dataframe
"""
json_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
- return read_from_json(key=key, json_text=json_text)
+ return read_from_json(key=key, json_text=json_text, decode=True)
def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md
index 379567b7d08..c79921476af 100644
--- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md
+++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md
@@ -46,7 +46,7 @@ custom Airflow plugins to handle the workflow deployment.
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
-**S3 Permissions**
+### S3 Permissions
To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions:
@@ -69,6 +69,13 @@ To execute metadata extraction AWS account should have enough access to fetch re
}
```
+### ADLS Permissions
+
+To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
+permissions on the Storage Account:
+- Storage Blob Data Contributor
+- Storage Queue Data Contributor
+
### Python Requirements
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md
index 279d8f66c4f..94a99378457 100644
--- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md
+++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md
@@ -47,7 +47,7 @@ custom Airflow plugins to handle the workflow deployment.
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
-**S3 Permissions**
+### S3 Permissions
To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions:
@@ -70,6 +70,13 @@ To execute metadata extraction AWS account should have enough access to fetch re
}
```
+### ADLS Permissions
+
+To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
+permissions on the Storage Account:
+- Storage Blob Data Contributor
+- Storage Queue Data Contributor
+
### Python Requirements
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md
index 2391e5bdc49..ad897720781 100644
--- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md
+++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md
@@ -66,7 +66,7 @@ custom Airflow plugins to handle the workflow deployment.
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
-**S3 Permissions**
+### S3 Permissions
To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions:
@@ -88,6 +88,14 @@ To execute metadata extraction AWS account should have enough access to fetch re
]
}
```
+
+### ADLS Permissions
+
+To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
+permissions on the Storage Account:
+- Storage Blob Data Contributor
+- Storage Queue Data Contributor
+
## Metadata Ingestion
{% stepsContainer %}