Azure datalake minor changes (#9407)

This commit is contained in:
NiharDoshi99 2022-12-21 10:28:41 +05:30 committed by GitHub
parent 4fdf14f2da
commit 9a3d599f30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 15 deletions

View File

@ -29,7 +29,7 @@ from metadata.generated.schema.entity.data.table import (
TableType, TableType,
) )
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
AzureDatalakeConfig, AzureConfig,
DatalakeConnection, DatalakeConnection,
GCSConfig, GCSConfig,
S3Config, S3Config,
@ -193,11 +193,19 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
else: else:
yield from self.fetch_s3_bucket_names() yield from self.fetch_s3_bucket_names()
if isinstance(self.service_connection.configSource, AzureDatalakeConfig): if isinstance(self.service_connection.configSource, AzureConfig):
yield from self.get_container_names() yield from self.get_container_names()
def get_container_names(self) -> Iterable[str]: def get_container_names(self) -> Iterable[str]:
schema_names = self.client.list_containers(name_starts_with="") """
To get schema names
"""
prefix = (
self.service_connection.bucketName
if self.service_connection.bucketName
else ""
)
schema_names = self.client.list_containers(name_starts_with=prefix)
for schema in schema_names: for schema in schema_names:
schema_fqn = fqn.build( schema_fqn = fqn.build(
self.metadata, self.metadata,
@ -319,9 +327,9 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
continue continue
yield table_name, TableType.Regular yield table_name, TableType.Regular
if isinstance(self.service_connection.configSource, AzureDatalakeConfig): if isinstance(self.service_connection.configSource, AzureConfig):
files_names = self.get_tables(container_name=bucket_name) files_names = self.get_tables(container_name=bucket_name)
for file in files_names.list_blobs(): for file in files_names.list_blobs(name_starts_with=prefix):
file_name = file.name file_name = file.name
if "/" in file.name: if "/" in file.name:
table_name = self.standardize_table_name(bucket_name, file_name) table_name = self.standardize_table_name(bucket_name, file_name)
@ -376,12 +384,14 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
data_frame = self.get_s3_files( data_frame = self.get_s3_files(
client=self.client, key=table_name, bucket_name=schema_name client=self.client, key=table_name, bucket_name=schema_name
) )
if isinstance(self.service_connection.configSource, AzureDatalakeConfig): if isinstance(self.service_connection.configSource, AzureConfig):
columns = None
connection_args = self.service_connection.configSource.securityConfig connection_args = self.service_connection.configSource.securityConfig
storage_options = { storage_options = {
"tenant_id": connection_args.tenantId, "tenant_id": connection_args.tenantId,
"client_id": connection_args.clientId, "client_id": connection_args.clientId,
"client_secret": connection_args.clientSecret.get_secret_value(), "client_secret": connection_args.clientSecret.get_secret_value(),
"account_name": connection_args.accountName,
} }
data_frame = self.get_azure_files( data_frame = self.get_azure_files(
client=self.client, client=self.client,
@ -565,7 +575,8 @@ class DatalakeSource(DatabaseServiceSource): # pylint: disable=too-many-public-
return False return False
def close(self): def close(self):
pass if isinstance(self.service_connection.configSource, AzureConfig):
self.client.close()
def get_status(self) -> SourceStatus: def get_status(self) -> SourceStatus:
return self.status return self.status

View File

@ -95,7 +95,7 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo
DatabricksConnection, DatabricksConnection,
) )
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
AzureDatalakeConfig, AzureConfig,
DatalakeConnection, DatalakeConnection,
GCSConfig, GCSConfig,
S3Config, S3Config,
@ -1005,7 +1005,7 @@ def _(connection: DatalakeClient) -> None:
else: else:
connection.client.list_buckets() connection.client.list_buckets()
if isinstance(config, AzureDatalakeConfig): if isinstance(config, AzureConfig):
connection.client.list_containers(name_starts_with="") connection.client.list_containers(name_starts_with="")
except ClientError as err: except ClientError as err:
@ -1050,7 +1050,7 @@ def _(config: GCSConfig):
@get_datalake_client.register @get_datalake_client.register
def _(config: AzureDatalakeConfig): def _(config: AzureConfig):
from azure.identity import ClientSecretCredential from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient from azure.storage.blob import BlobServiceClient

View File

@ -275,6 +275,8 @@ We support two ways of authenticating to GCS:
- `Storage Blob Data Contributor` - `Storage Blob Data Contributor`
- `Storage Queue Data Contributor` - `Storage Queue Data Contributor`
The current approach for authentication is based on `app registration`, reach out to us on [slack](https://slack.open-metadata.org/) if you find the need for another auth system
</Collapse> </Collapse>

View File

@ -16,7 +16,7 @@
}, },
"GCSConfig": { "GCSConfig": {
"title": "DataLake GCS Config Source", "title": "DataLake GCS Config Source",
"description": "DataLake Catalog and Manifest files in GCS storage. We will search for catalog.json and manifest.json.", "description": "DataLake GCS storage will ingest metadata of files",
"properties": { "properties": {
"securityConfig": { "securityConfig": {
"title": "DataLake GCS Security Config", "title": "DataLake GCS Security Config",
@ -26,7 +26,7 @@
}, },
"S3Config": { "S3Config": {
"title": "DataLake S3 Config Source", "title": "DataLake S3 Config Source",
"description": "DataLake Catalog and Manifest files in S3 bucket. We will search for catalog.json and manifest.json.", "description": "DataLake S3 bucket will ingest metadata of files in bucket",
"properties": { "properties": {
"securityConfig": { "securityConfig": {
"title": "DataLake S3 Security Config", "title": "DataLake S3 Security Config",
@ -34,9 +34,9 @@
} }
} }
}, },
"AzureDatalakeConfig": { "AzureConfig": {
"title": "Azure Config Source", "title": "Azure Config Source",
"description": "Azure Datalake Storage", "description": "Azure Datalake Storage will ingest files in container",
"properties": { "properties": {
"securityConfig": { "securityConfig": {
"title": "Azure Datalake Config Source", "title": "Azure Datalake Config Source",
@ -63,7 +63,7 @@
"$ref": "#/definitions/GCSConfig" "$ref": "#/definitions/GCSConfig"
}, },
{ {
"$ref": "#/definitions/AzureDatalakeConfig" "$ref": "#/definitions/AzureConfig"
} }
] ]
}, },