Prepare Storage Connector for ADLS & Docs (#13376)

* Prepare Storage Connector for ADLS & Docs

* Format

* Fix test
This commit is contained in:
Pere Miquel Brull 2023-10-02 12:15:09 +02:00 committed by GitHub
parent 6ca71ae323
commit d915254fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 198 additions and 65 deletions

View File

@ -255,7 +255,7 @@ plugins: Dict[str, Set[str]] = {
dev = {
"black==22.3.0",
"datamodel-code-generator==0.15.0",
"datamodel-code-generator==0.22.0",
"docker",
"isort",
"pre-commit",

View File

@ -27,4 +27,6 @@ sink:
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -37,9 +37,6 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig
MetadataEntry,
StorageContainerConfig,
)
from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import (
ManifestMetadataConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -94,7 +91,6 @@ class S3Source(StorageServiceSource):
return cls(config, metadata_config)
def get_containers(self) -> Iterable[S3ContainerDetails]:
global_manifest: Optional[ManifestMetadataConfig] = self.get_manifest_file()
bucket_results = self.fetch_buckets()
for bucket_response in bucket_results:
@ -108,10 +104,10 @@ class S3Source(StorageServiceSource):
parent_entity: EntityReference = EntityReference(
id=self._bucket_cache[bucket_name].id.__root__, type="container"
)
if global_manifest:
if self.global_manifest:
manifest_entries_for_current_bucket = (
self._manifest_entries_to_metadata_entries_by_bucket(
bucket=bucket_name, manifest=global_manifest
self._manifest_entries_to_metadata_entries_by_container(
container_name=bucket_name, manifest=self.global_manifest
)
)
# Check if we have entries in the manifest file belonging to this bucket
@ -119,8 +115,9 @@ class S3Source(StorageServiceSource):
# ingest all the relevant valid paths from it
yield from self._generate_structured_containers(
bucket_response=bucket_response,
entries=self._manifest_entries_to_metadata_entries_by_bucket(
bucket=bucket_name, manifest=global_manifest
entries=self._manifest_entries_to_metadata_entries_by_container(
container_name=bucket_name,
manifest=self.global_manifest,
),
parent=parent_entity,
)
@ -334,25 +331,6 @@ class S3Source(StorageServiceSource):
sourceUrl=self._get_bucket_source_url(bucket_name=bucket_response.name),
)
@staticmethod
def _manifest_entries_to_metadata_entries_by_bucket(
bucket: str, manifest: ManifestMetadataConfig
) -> List[MetadataEntry]:
"""
Convert manifest entries(which have an extra bucket property) to bucket-level metadata entries, filtered by
a given bucket
"""
return [
MetadataEntry(
dataPath=entry.dataPath,
structureFormat=entry.structureFormat,
isPartitioned=entry.isPartitioned,
partitionColumns=entry.partitionColumns,
)
for entry in manifest.entries
if entry.bucketName == bucket
]
def _get_sample_file_path(
self, bucket_name: str, metadata_entry: MetadataEntry
) -> Optional[str]:

View File

@ -55,7 +55,10 @@ from metadata.readers.dataframe.reader_factory import SupportedTypes
from metadata.readers.models import ConfigSource
from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_columns
from metadata.utils.logger import ingestion_logger
from metadata.utils.storage_metadata_config import get_manifest
from metadata.utils.storage_metadata_config import (
StorageMetadataConfigException,
get_manifest,
)
logger = ingestion_logger()
@ -108,6 +111,8 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
topology = StorageServiceTopology()
context = create_source_context(topology)
global_manifest: Optional[ManifestMetadataConfig]
def __init__(
self,
config: WorkflowSource,
@ -127,12 +132,20 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
self.connection_obj = self.connection
self.test_connection()
# Try to get the global manifest
self.global_manifest: Optional[
ManifestMetadataConfig
] = self.get_manifest_file()
def get_manifest_file(self) -> Optional[ManifestMetadataConfig]:
if self.source_config.storageMetadataConfigSource and not isinstance(
self.source_config.storageMetadataConfigSource,
NoMetadataConfigurationSource,
):
return get_manifest(self.source_config.storageMetadataConfigSource)
try:
return get_manifest(self.source_config.storageMetadataConfigSource)
except StorageMetadataConfigException as exc:
logger.warning(f"Could no get global manifest due to [{exc}]")
return None
@abstractmethod
@ -167,6 +180,25 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
)
)
@staticmethod
def _manifest_entries_to_metadata_entries_by_container(
container_name: str, manifest: ManifestMetadataConfig
) -> List[MetadataEntry]:
"""
Convert manifest entries (which have an extra bucket property) to bucket-level metadata entries, filtered by
a given bucket
"""
return [
MetadataEntry(
dataPath=entry.dataPath,
structureFormat=entry.structureFormat,
isPartitioned=entry.isPartitioned,
partitionColumns=entry.partitionColumns,
)
for entry in manifest.entries
if entry.containerName == container_name
]
@staticmethod
def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]:
"""

View File

@ -59,7 +59,7 @@ class ADLSReader(Reader):
except Exception as err:
if verbose:
logger.debug(traceback.format_exc())
raise ReadException(f"Error fetching file [{path}] from repo: {err}")
raise ReadException(f"Error fetching file [{path}] from ADLS: {err}")
def _get_tree(self) -> List[str]:
"""

View File

@ -39,7 +39,7 @@ class GCSReader(Reader):
except Exception as err:
if verbose:
logger.debug(traceback.format_exc())
raise ReadException(f"Error fetching file [{path}] from repo: {err}")
raise ReadException(f"Error fetching file [{path}] from GCS: {err}")
def _get_tree(self) -> List[str]:
"""

View File

@ -37,7 +37,7 @@ class S3Reader(Reader):
except Exception as err:
if verbose:
logger.debug(traceback.format_exc())
raise ReadException(f"Error fetching file [{path}] from repo: {err}")
raise ReadException(f"Error fetching file [{path}] from S3: {err}")
def _get_tree(self) -> List[str]:
"""

View File

@ -17,9 +17,18 @@ from functools import singledispatch
import requests
from metadata.generated.schema.entity.services.connections.database.datalake.azureConfig import (
AzureConfig,
)
from metadata.generated.schema.entity.services.connections.database.datalake.s3Config import (
S3Config,
)
from metadata.generated.schema.metadataIngestion.storage.manifestMetadataConfig import (
ManifestMetadataConfig,
)
from metadata.generated.schema.metadataIngestion.storage.storageMetadataADLSConfig import (
StorageMetadataAdlsConfig,
)
from metadata.generated.schema.metadataIngestion.storage.storageMetadataHttpConfig import (
StorageMetadataHttpConfig,
)
@ -29,11 +38,12 @@ from metadata.generated.schema.metadataIngestion.storage.storageMetadataLocalCon
from metadata.generated.schema.metadataIngestion.storage.storageMetadataS3Config import (
StorageMetadataS3Config,
)
from metadata.readers.file.config_source_factory import get_reader
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
STORAGE_METADATA_MANIFEST_FILE_NAME = "storage_metadata_manifest.json"
STORAGE_METADATA_MANIFEST_FILE_NAME = "openmetadata_storage_manifest.json"
class StorageMetadataConfigException(Exception):
@ -81,8 +91,6 @@ def _(config: StorageMetadataHttpConfig) -> ManifestMetadataConfig:
"Manifest file not found in file server"
)
return ManifestMetadataConfig.parse_obj(http_manifest.json())
except StorageMetadataConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
raise StorageMetadataConfigException(
@ -92,29 +100,75 @@ def _(config: StorageMetadataHttpConfig) -> ManifestMetadataConfig:
@get_manifest.register
def _(config: StorageMetadataS3Config) -> ManifestMetadataConfig:
manifest = None
try:
bucket_name, prefix = (
config.prefixConfig.bucketName,
config.prefixConfig.containerName,
config.prefixConfig.objectPrefix,
)
path = (
f"{prefix}/{STORAGE_METADATA_MANIFEST_FILE_NAME}"
if prefix
else STORAGE_METADATA_MANIFEST_FILE_NAME
)
from metadata.clients.aws_client import ( # pylint: disable=import-outside-toplevel
AWSClient,
)
aws_client = AWSClient(config.securityConfig).get_resource("s3")
bucket = aws_client.Bucket(bucket_name)
obj_list = bucket.objects.filter(Prefix=prefix)
for bucket_object in obj_list:
if STORAGE_METADATA_MANIFEST_FILE_NAME in bucket_object.key:
logger.debug(f"{STORAGE_METADATA_MANIFEST_FILE_NAME} found")
manifest = bucket_object.get()["Body"].read().decode()
break
if not manifest:
raise StorageMetadataConfigException("Manifest file not found in s3")
aws_client = AWSClient(config.securityConfig).get_client(service_name="s3")
reader = get_reader(
config_source=S3Config(securityConfig=config.securityConfig),
client=aws_client,
)
manifest = reader.read(path=path, bucket_name=bucket_name)
return ManifestMetadataConfig.parse_obj(json.loads(manifest))
except Exception as exc:
logger.debug(traceback.format_exc())
raise StorageMetadataConfigException(
f"Error fetching manifest file from s3: {exc}"
)
@get_manifest.register
def _(config: StorageMetadataAdlsConfig) -> ManifestMetadataConfig:
"""Read the manifest from ADLS"""
try:
bucket_name, prefix = (
config.prefixConfig.containerName,
config.prefixConfig.objectPrefix,
)
path = (
f"{prefix}/{STORAGE_METADATA_MANIFEST_FILE_NAME}"
if prefix
else STORAGE_METADATA_MANIFEST_FILE_NAME
)
from azure.identity import ( # pylint: disable=import-outside-toplevel
ClientSecretCredential,
)
from azure.storage.blob import ( # pylint: disable=import-outside-toplevel
BlobServiceClient,
)
blob_client = BlobServiceClient(
account_url=f"https://{config.securityConfig.accountName}.blob.core.windows.net/",
credential=ClientSecretCredential(
config.securityConfig.tenantId,
config.securityConfig.clientId,
config.securityConfig.clientSecret.get_secret_value(),
),
)
reader = get_reader(
config_source=AzureConfig(securityConfig=config.securityConfig),
client=blob_client,
)
manifest = reader.read(path=path, bucket_name=bucket_name)
return ManifestMetadataConfig.parse_obj(json.loads(manifest))
except StorageMetadataConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
raise StorageMetadataConfigException(

View File

@ -65,7 +65,7 @@ MOCK_OBJECT_STORE_CONFIG = {
"storageMetadataConfigSource": {
"securityConfig": {"awsRegion": "us-east-1"},
"prefixConfig": {
"bucketName": "test_bucket",
"containerName": "test_bucket",
"objectPrefix": "manifest",
},
},
@ -176,7 +176,7 @@ class StorageUnitTest(TestCase):
"storageMetadataConfigSource": {
"securityConfig": {"awsRegion": "us-east-1"},
"prefixConfig": {
"bucketName": "test_bucket",
"containerName": "test_bucket",
"objectPrefix": "manifest",
},
},

View File

@ -27,5 +27,6 @@ caption="Configure Metadata Ingestion Page" /%}
- **Include**: Explicitly include containers by adding a list of comma-separated regular expressions to the Include field. OpenMetadata will include all containers with names matching one or more of the supplied regular expressions. All other containers will be excluded.
- **Exclude**: Explicitly exclude containers by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all containers with names matching one or more of the supplied regular expressions. All other containers will be included.
- **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug.
- **Storage Metadata Config Source**: Here you can specify the location of your global manifest `openmetadata_storage_manifest.json` file. It can be located in S3, a local path or HTTP.
{% /extraContent %}

View File

@ -110,3 +110,26 @@ Again, this information will be added on top of the inferred schema from the dat
]
}
```
### Global Manifest
You can also manage a **single** manifest file to centralize the ingestion process for any container. In that case,
you will need to add a `containerName` entry to the structure above. For example:
```yaml
{
"entries": [
{
"dataPath": "transactions",
"structureFormat": "csv",
"isPartitioned": false,
"containerName": "collate-demo-storage"
}
]
}
```
You can also keep local manifests in each container, but if possible, we will always try to pick up the global manifest
during the ingestion.
We will look for a file named `openmetadata_storage_manifest.json`.

View File

@ -224,6 +224,12 @@ The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetada
{% /codeInfo %}
{% codeInfo srNumber=16 %}
**storageMetadataConfigSource**: Path to the `openmetadata_storage_manifest.json` global manifest file. It can be located in S3, a local path or as a URL to the file.
{% /codeInfo %}
#### Sink Configuration
{% codeInfo srNumber=14 %}
@ -307,6 +313,21 @@ source:
# - container3
# - container4
```
```yaml {% srNumber=16 %}
# storageMetadataConfigSource:
## For S3
# securityConfig:
# awsAccessKeyId: ...
# awsSecretAccessKey: ...
# awsRegion: ...
# prefixConfig:
# containerName: om-glue-test
# objectPrefix: <optional prefix>
## For HTTP
# manifestHttpPath: http://...
## For Local
# manifestFilePath: /path/to/openmetadata_storage_manifest.json
```
```yaml {% srNumber=14 %}
sink:

View File

@ -10,9 +10,9 @@
"javaType": "org.openmetadata.schema.metadataIngestion.storage.ManifestMetadataEntry",
"type": "object",
"properties": {
"bucketName": {
"title": "Bucket Name",
"description": "The bucket name containing the data path to be ingested",
"containerName": {
"title": "Container Name",
"description": "The top-level container name containing the data path to be ingested",
"type": "string"
},
"dataPath": {
@ -43,7 +43,7 @@
}
},
"required": [
"bucketName" ,"dataPath"
"containerName" ,"dataPath"
]
}
},

View File

@ -6,17 +6,17 @@
"javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataBucketDetails",
"type": "object",
"properties": {
"bucketName": {
"title": "Storage Metadata Bucket Name",
"description": "Name of the bucket where the storage metadata file is stored",
"containerName": {
"title": "Storage Metadata Container Name",
"description": "Name of the top level container where the storage metadata file is stored",
"type": "string"
},
"objectPrefix": {
"title": "Storage Metadata Object Prefix",
"description": "Path of the folder where the storage metadata file is stored, '/' for root",
"description": "Path of the folder where the storage metadata file is stored. If the file is at the root, you can keep it empty.",
"type": "string"
}
},
"additionalProperties": false,
"required": ["bucketName", "objectPrefix"]
"required": ["containerName"]
}

View File

@ -0,0 +1,20 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/storage/storageMetadataS3Config.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Storage Metadata ADLS Config",
"description": "Storage Metadata Manifest file ADLS path config.",
"javaType": "org.openmetadata.schema.metadataIngestion.storage.StorageMetadataADLSConfig",
"type": "object",
"properties": {
"securityConfig": {
"title": "S3 Security Config",
"$ref": "../../security/credentials/azureCredentials.json"
},
"prefixConfig": {
"title": "Storage Metadata Prefix Config",
"$ref": "./storageBucketDetails.json"
}
},
"additionalProperties": false,
"required": ["prefixConfig"]
}

View File

@ -43,10 +43,12 @@
},
{
"$ref": "./storage/storageMetadataS3Config.json"
},
{
"$ref": "./storage/storageMetadataADLSConfig.json"
}
]
}
},
"additionalProperties": false,
"required": ["storageMetadataConfigSource"]
"additionalProperties": false
}