mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-27 18:36:08 +00:00
Azure datalake metadata ingestion fixes (#11125)
* Add ADLS permissions * Fix Azure DL ingestion * Format * enable decode for json * fix gcs decode error --------- Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
parent
df7e77aba9
commit
a78a3b4734
@ -371,41 +371,38 @@ class DatalakeSource(DatabaseServiceSource):
|
|||||||
|
|
||||||
yield table_name, TableType.Regular
|
yield table_name, TableType.Regular
|
||||||
if isinstance(self.service_connection.configSource, AzureConfig):
|
if isinstance(self.service_connection.configSource, AzureConfig):
|
||||||
files_names = self.get_tables(container_name=bucket_name)
|
container_client = self.client.get_container_client(bucket_name)
|
||||||
for file in files_names.list_blobs(name_starts_with=prefix):
|
|
||||||
file_name = file.name
|
|
||||||
if "/" in file.name:
|
|
||||||
table_name = self.standardize_table_name(bucket_name, file_name)
|
|
||||||
table_fqn = fqn.build(
|
|
||||||
self.metadata,
|
|
||||||
entity_type=Table,
|
|
||||||
service_name=self.context.database_service.name.__root__,
|
|
||||||
database_name=self.context.database.name.__root__,
|
|
||||||
schema_name=self.context.database_schema.name.__root__,
|
|
||||||
table_name=table_name,
|
|
||||||
skip_es_search=True,
|
|
||||||
)
|
|
||||||
if filter_by_table(
|
|
||||||
self.config.sourceConfig.config.tableFilterPattern,
|
|
||||||
table_fqn
|
|
||||||
if self.config.sourceConfig.config.useFqnForFiltering
|
|
||||||
else table_name,
|
|
||||||
):
|
|
||||||
self.status.filter(
|
|
||||||
table_fqn,
|
|
||||||
"Object Filtered Out",
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
if not self.check_valid_file_type(file_name):
|
|
||||||
logger.debug(
|
|
||||||
f"Object filtered due to unsupported file type: {file_name}"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
yield file_name, TableType.Regular
|
|
||||||
|
|
||||||
def get_tables(self, container_name) -> Iterable[any]:
|
for file in container_client.list_blobs(
|
||||||
tables = self.client.get_container_client(container_name)
|
name_starts_with=prefix or None
|
||||||
return tables
|
):
|
||||||
|
table_name = self.standardize_table_name(bucket_name, file.name)
|
||||||
|
table_fqn = fqn.build(
|
||||||
|
self.metadata,
|
||||||
|
entity_type=Table,
|
||||||
|
service_name=self.context.database_service.name.__root__,
|
||||||
|
database_name=self.context.database.name.__root__,
|
||||||
|
schema_name=self.context.database_schema.name.__root__,
|
||||||
|
table_name=table_name,
|
||||||
|
skip_es_search=True,
|
||||||
|
)
|
||||||
|
if filter_by_table(
|
||||||
|
self.config.sourceConfig.config.tableFilterPattern,
|
||||||
|
table_fqn
|
||||||
|
if self.config.sourceConfig.config.useFqnForFiltering
|
||||||
|
else table_name,
|
||||||
|
):
|
||||||
|
self.status.filter(
|
||||||
|
table_fqn,
|
||||||
|
"Object Filtered Out",
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if not self.check_valid_file_type(file.name):
|
||||||
|
logger.debug(
|
||||||
|
f"Object filtered due to unsupported file type: {file.name}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
yield file.name, TableType.Regular
|
||||||
|
|
||||||
def yield_table(
|
def yield_table(
|
||||||
self, table_name_and_type: Tuple[str, str]
|
self, table_name_and_type: Tuple[str, str]
|
||||||
@ -439,7 +436,6 @@ class DatalakeSource(DatabaseServiceSource):
|
|||||||
"tenant_id": connection_args.tenantId,
|
"tenant_id": connection_args.tenantId,
|
||||||
"client_id": connection_args.clientId,
|
"client_id": connection_args.clientId,
|
||||||
"client_secret": connection_args.clientSecret.get_secret_value(),
|
"client_secret": connection_args.clientSecret.get_secret_value(),
|
||||||
"account_name": connection_args.accountName,
|
|
||||||
}
|
}
|
||||||
data_frame = self.get_azure_files(
|
data_frame = self.get_azure_files(
|
||||||
client=self.client,
|
client=self.client,
|
||||||
@ -694,7 +690,6 @@ class DatalakeSource(DatabaseServiceSource):
|
|||||||
if hasattr(data_frame, "columns"):
|
if hasattr(data_frame, "columns"):
|
||||||
df_columns = list(data_frame.columns)
|
df_columns = list(data_frame.columns)
|
||||||
for column in df_columns:
|
for column in df_columns:
|
||||||
|
|
||||||
if COMPLEX_COLUMN_SEPARATOR in column:
|
if COMPLEX_COLUMN_SEPARATOR in column:
|
||||||
DatalakeSource._parse_complex_column(
|
DatalakeSource._parse_complex_column(
|
||||||
data_frame,
|
data_frame,
|
||||||
|
@ -59,7 +59,7 @@ def read_csv_from_azure(
|
|||||||
return dataframe
|
return dataframe
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.warning(f"Error reading CSV from s3 - {exc}")
|
logger.warning(f"Error reading CSV from ADLS - {exc}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@ -68,7 +68,9 @@ def read_json_from_azure(client: Any, key: str, container_name: str, sample_size
|
|||||||
Read the json file from the azure container and return a dataframe
|
Read the json file from the azure container and return a dataframe
|
||||||
"""
|
"""
|
||||||
json_text = get_file_text(client=client, key=key, container_name=container_name)
|
json_text = get_file_text(client=client, key=key, container_name=container_name)
|
||||||
return read_from_json(key=key, json_text=json_text, sample_size=sample_size)
|
return read_from_json(
|
||||||
|
key=key, json_text=json_text, sample_size=sample_size, decode=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def read_parquet_from_azure(
|
def read_parquet_from_azure(
|
||||||
|
@ -80,7 +80,7 @@ def read_json_from_gcs(client: Any, key: str, bucket_name: str) -> DataFrame:
|
|||||||
Read the json file from the gcs bucket and return a dataframe
|
Read the json file from the gcs bucket and return a dataframe
|
||||||
"""
|
"""
|
||||||
json_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
|
json_text = get_file_text(client=client, key=key, bucket_name=bucket_name)
|
||||||
return read_from_json(key=key, json_text=json_text)
|
return read_from_json(key=key, json_text=json_text, decode=True)
|
||||||
|
|
||||||
|
|
||||||
def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
|
def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
|
||||||
|
@ -46,7 +46,7 @@ custom Airflow plugins to handle the workflow deployment.
|
|||||||
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
||||||
|
|
||||||
|
|
||||||
**S3 Permissions**
|
### S3 Permissions
|
||||||
|
|
||||||
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
||||||
|
|
||||||
@ -69,6 +69,13 @@ To execute metadata extraction AWS account should have enough access to fetch re
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### ADLS Permissions
|
||||||
|
|
||||||
|
To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
|
||||||
|
permissions on the Storage Account:
|
||||||
|
- Storage Blob Data Contributor
|
||||||
|
- Storage Queue Data Contributor
|
||||||
|
|
||||||
### Python Requirements
|
### Python Requirements
|
||||||
|
|
||||||
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
|
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
|
||||||
|
@ -47,7 +47,7 @@ custom Airflow plugins to handle the workflow deployment.
|
|||||||
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
||||||
|
|
||||||
|
|
||||||
**S3 Permissions**
|
### S3 Permissions
|
||||||
|
|
||||||
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
||||||
|
|
||||||
@ -70,6 +70,13 @@ To execute metadata extraction AWS account should have enough access to fetch re
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### ADLS Permissions
|
||||||
|
|
||||||
|
To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
|
||||||
|
permissions on the Storage Account:
|
||||||
|
- Storage Blob Data Contributor
|
||||||
|
- Storage Queue Data Contributor
|
||||||
|
|
||||||
### Python Requirements
|
### Python Requirements
|
||||||
|
|
||||||
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
|
If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
|
||||||
|
@ -66,7 +66,7 @@ custom Airflow plugins to handle the workflow deployment.
|
|||||||
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
**Note:** Datalake connector supports extracting metadata from file types `JSON`, `CSV`, `TSV` & `Parquet`.
|
||||||
|
|
||||||
|
|
||||||
**S3 Permissions**
|
### S3 Permissions
|
||||||
|
|
||||||
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
To execute metadata extraction AWS account should have enough access to fetch required data. The <strong>Bucket Policy</strong> in AWS requires at least these permissions:
|
||||||
|
|
||||||
@ -88,6 +88,14 @@ To execute metadata extraction AWS account should have enough access to fetch re
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### ADLS Permissions
|
||||||
|
|
||||||
|
To extract metadata from Azure ADLS (Storage Account - StorageV2), you will need an **App Registration** with the following
|
||||||
|
permissions on the Storage Account:
|
||||||
|
- Storage Blob Data Contributor
|
||||||
|
- Storage Queue Data Contributor
|
||||||
|
|
||||||
## Metadata Ingestion
|
## Metadata Ingestion
|
||||||
|
|
||||||
{% stepsContainer %}
|
{% stepsContainer %}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user