From a78a3b47340c7fe3faab0cf3b5cc26a9057c5db6 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 19 Apr 2023 07:28:41 +0200 Subject: [PATCH] Azure datalake metadata ingestion fixes (#11125) * Add ADLS permissions * Fix Azure DL ingestion * Format * enable decode for json * fix gcs decode error --------- Co-authored-by: ulixius9 --- .../source/database/datalake/metadata.py | 67 +++++++++---------- ingestion/src/metadata/utils/azure_utils.py | 6 +- ingestion/src/metadata/utils/gcs_utils.py | 2 +- .../connectors/database/datalake/airflow.md | 9 ++- .../connectors/database/datalake/cli.md | 9 ++- .../connectors/database/datalake/index.md | 10 ++- 6 files changed, 61 insertions(+), 42 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index 28a853dfe9c..872c422e801 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -371,41 +371,38 @@ class DatalakeSource(DatabaseServiceSource): yield table_name, TableType.Regular if isinstance(self.service_connection.configSource, AzureConfig): - files_names = self.get_tables(container_name=bucket_name) - for file in files_names.list_blobs(name_starts_with=prefix): - file_name = file.name - if "/" in file.name: - table_name = self.standardize_table_name(bucket_name, file_name) - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.context.database_service.name.__root__, - database_name=self.context.database.name.__root__, - schema_name=self.context.database_schema.name.__root__, - table_name=table_name, - skip_es_search=True, - ) - if filter_by_table( - self.config.sourceConfig.config.tableFilterPattern, - table_fqn - if self.config.sourceConfig.config.useFqnForFiltering - else table_name, - ): - self.status.filter( - table_fqn, - "Object Filtered Out", - ) - continue - if not self.check_valid_file_type(file_name): - logger.debug( - f"Object filtered due to unsupported file type: {file_name}" - ) - continue - yield file_name, TableType.Regular + container_client = self.client.get_container_client(bucket_name) - def get_tables(self, container_name) -> Iterable[any]: - tables = self.client.get_container_client(container_name) - return tables + for file in container_client.list_blobs( + name_starts_with=prefix or None + ): + table_name = self.standardize_table_name(bucket_name, file.name) + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.context.database_service.name.__root__, + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + table_name=table_name, + skip_es_search=True, + ) + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, + table_fqn + if self.config.sourceConfig.config.useFqnForFiltering + else table_name, + ): + self.status.filter( + table_fqn, + "Object Filtered Out", + ) + continue + if not self.check_valid_file_type(file.name): + logger.debug( + f"Object filtered due to unsupported file type: {file.name}" + ) + continue + yield file.name, TableType.Regular def yield_table( self, table_name_and_type: Tuple[str, str] @@ -439,7 +436,6 @@ class DatalakeSource(DatabaseServiceSource): "tenant_id": connection_args.tenantId, "client_id": connection_args.clientId, "client_secret": connection_args.clientSecret.get_secret_value(), - "account_name": connection_args.accountName, } data_frame = self.get_azure_files( client=self.client, @@ -694,7 +690,6 @@ class DatalakeSource(DatabaseServiceSource): if hasattr(data_frame, "columns"): df_columns = list(data_frame.columns) for column in df_columns: - if COMPLEX_COLUMN_SEPARATOR in column: DatalakeSource._parse_complex_column( data_frame, diff --git a/ingestion/src/metadata/utils/azure_utils.py b/ingestion/src/metadata/utils/azure_utils.py index ff9874a50e3..73a03395094 100644 --- a/ingestion/src/metadata/utils/azure_utils.py +++ b/ingestion/src/metadata/utils/azure_utils.py @@ -59,7 +59,7 @@ def read_csv_from_azure( return dataframe except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Error reading CSV from s3 - {exc}") + logger.warning(f"Error reading CSV from ADLS - {exc}") return None @@ -68,7 +68,9 @@ def read_json_from_azure(client: Any, key: str, container_name: str, sample_size Read the json file from the azure container and return a dataframe """ json_text = get_file_text(client=client, key=key, container_name=container_name) - return read_from_json(key=key, json_text=json_text, sample_size=sample_size) + return read_from_json( + key=key, json_text=json_text, sample_size=sample_size, decode=True + ) def read_parquet_from_azure( diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py index d9d636e40ae..efb36241d7d 100644 --- a/ingestion/src/metadata/utils/gcs_utils.py +++ b/ingestion/src/metadata/utils/gcs_utils.py @@ -80,7 +80,7 @@ def read_json_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame: Read the json file from the gcs bucket and return a dataframe """ json_text = get_file_text(client=client, key=key, bucket_name=bucket_name) - return read_from_json(key=key, json_text=json_text) + return read_from_json(key=key, json_text=json_text, decode=True) def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame: diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md index 379567b7d08..c79921476af 100644 --- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md +++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/airflow.md @@ -46,7 +46,7 @@ custom Airflow plugins to handle the workflow deployment. **Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`. -**S3 Permissions** +### S3 Permissions To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions: @@ -69,6 +69,13 @@ To execute metadata extraction AWS account should have enough access to fetch re } ``` +### ADLS Permissions + +To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following +permissions on the Storage Account: +- Storage Blob Data Contributor +- Storage Queue Data Contributor + ### Python Requirements If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3: diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md index 279d8f66c4f..94a99378457 100644 --- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md +++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/cli.md @@ -47,7 +47,7 @@ custom Airflow plugins to handle the workflow deployment. **Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`. -**S3 Permissions** +### S3 Permissions To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions: @@ -70,6 +70,13 @@ To execute metadata extraction AWS account should have enough access to fetch re } ``` +### ADLS Permissions + +To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following +permissions on the Storage Account: +- Storage Blob Data Contributor +- Storage Queue Data Contributor + ### Python Requirements If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3: diff --git a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md index 2391e5bdc49..ad897720781 100644 --- a/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md +++ b/openmetadata-docs-v1/content/v1.0.0/connectors/database/datalake/index.md @@ -66,7 +66,7 @@ custom Airflow plugins to handle the workflow deployment. **Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`. -**S3 Permissions** +### S3 Permissions To execute metadata extraction AWS account should have enough access to fetch required data. The Bucket Policy in AWS requires at least these permissions: @@ -88,6 +88,14 @@ To execute metadata extraction AWS account should have enough access to fetch re ] } ``` + +### ADLS Permissions + +To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following +permissions on the Storage Account: +- Storage Blob Data Contributor +- Storage Queue Data Contributor + ## Metadata Ingestion {% stepsContainer %}