fix(ingest): clarify s3/s3a requirements and platform defaults (#4263)

This commit is contained in:
Kevin Hu 2022-03-02 11:58:27 -05:00 committed by GitHub
parent 2a5cf3dd07
commit b2b8826118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 21 deletions

View File

@ -10,7 +10,7 @@ This source is in **Beta** and under active development. Not yet considered read
## Setup
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details).
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details). If profiling, make sure that permissions for **s3a://** access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access).
The data lake connector extracts schemas and profiles from a variety of file formats (see below for an exhaustive list).
Individual files are ingested as tables, and profiles are computed similar to the [SQL profiler](./sql_profiles.md).
@ -94,7 +94,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ---------------------------------------------------- | ------------------------ | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `env` | | `PROD` | Environment to use in namespace when constructing URNs. |
| `platform` | ✅ | | Platform to use in namespace when constructing URNs. |
| `platform` | | Autodetected | Platform to use in namespace when constructing URNs. If left blank, local paths will correspond to `file` and S3 paths will correspond to `s3`. |
| `base_path` | ✅ | | Path of the base folder to crawl. Unless `schema_patterns` and `profile_patterns` are set, the connector will ingest all files in this folder. |
| `path_spec` | | | Format string for constructing table identifiers from the relative path. See the above [setup section](#setup) for details. |
| `use_relative_path` | | `False` | Whether to use the relative path when constructing URNs. Has no effect when a `path_spec` is provided. |

View File

@ -1,6 +1,7 @@
import logging
import os
from datetime import datetime
from enum import Enum
from math import log10
from typing import Any, Dict, Iterable, List, Optional
@ -246,7 +247,10 @@ class DataLakeSource(Source):
return cls(config, ctx)
def read_file_spark(self, file: str) -> Optional[DataFrame]:
def read_file_spark(self, file: str, is_aws: bool) -> Optional[DataFrame]:
if is_aws:
file = f"s3a://{file}"
extension = os.path.splitext(file)[1]
@ -297,7 +301,7 @@ class DataLakeSource(Source):
return df.toDF(*(c.replace(".", "_") for c in df.columns))
def get_table_schema(
self, file_path: str, table_name: str
self, file_path: str, table_name: str, is_aws: bool
) -> Iterable[MetadataWorkUnit]:
data_platform_urn = make_data_platform_urn(self.source_config.platform)
@ -307,10 +311,6 @@ class DataLakeSource(Source):
dataset_name = os.path.basename(file_path)
# if no path spec is provided and the file is in S3, then use the S3 path to construct an URN
if is_s3_uri(file_path) and self.source_config.path_spec is None:
dataset_urn = make_s3_urn(file_path, self.source_config.env)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
@ -322,13 +322,15 @@ class DataLakeSource(Source):
)
dataset_snapshot.aspects.append(dataset_properties)
if file_path.startswith("s3a://"):
if is_aws:
if self.source_config.aws_config is None:
raise ValueError("AWS config is required for S3 file sources")
s3_client = self.source_config.aws_config.get_s3_client()
file = smart_open(file_path, "rb", transport_params={"client": s3_client})
file = smart_open(
f"s3://{file_path}", "rb", transport_params={"client": s3_client}
)
else:
@ -416,7 +418,7 @@ class DataLakeSource(Source):
return ".".join(name_components)
def ingest_table(
self, full_path: str, relative_path: str
self, full_path: str, relative_path: str, is_aws: bool
) -> Iterable[MetadataWorkUnit]:
table_name = self.get_table_name(relative_path, full_path)
@ -425,14 +427,14 @@ class DataLakeSource(Source):
logger.debug(
f"Ingesting {full_path}: making table schemas {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
yield from self.get_table_schema(full_path, table_name)
yield from self.get_table_schema(full_path, table_name, is_aws)
# If profiling is not enabled, skip the rest
if not self.source_config.profiling.enabled:
return
# read in the whole table with Spark for profiling
table = self.read_file_spark(full_path)
table = self.read_file_spark(full_path, is_aws)
# if table is not readable, skip
if table is None:
@ -513,7 +515,7 @@ class DataLakeSource(Source):
s3 = self.source_config.aws_config.get_s3_resource()
bucket = s3.Bucket(plain_base_path.split("/")[0])
unordered_files = []
base_obj_paths = []
for obj in bucket.objects.filter(
Prefix=plain_base_path.split("/", maxsplit=1)[1]
@ -534,16 +536,16 @@ class DataLakeSource(Source):
if self.source_config.ignore_dotfiles and file.startswith("."):
continue
obj_path = f"s3a://{obj.bucket_name}/{obj.key}"
base_obj_path = f"{obj.bucket_name}/{obj.key}"
unordered_files.append(obj_path)
base_obj_paths.append(base_obj_path)
for aws_file in sorted(unordered_files):
for aws_file in sorted(base_obj_paths):
relative_path = "./" + aws_file[len(f"s3a://{plain_base_path}") :]
relative_path = "./" + aws_file[len(plain_base_path) :]
# pass in the same relative_path as the full_path for S3 files
yield from self.ingest_table(aws_file, relative_path)
yield from self.ingest_table(aws_file, relative_path, is_aws=True)
def get_workunits_local(self) -> Iterable[MetadataWorkUnit]:
for root, dirs, files in os.walk(self.source_config.base_path):
@ -562,7 +564,7 @@ class DataLakeSource(Source):
if not self.source_config.schema_patterns.allowed(full_path):
continue
yield from self.ingest_table(full_path, relative_path)
yield from self.ingest_table(full_path, relative_path, is_aws=False)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:

View File

@ -6,14 +6,15 @@ import pydantic
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.data_lake.profiling import DataLakeProfilerConfig
class DataLakeSourceConfig(ConfigModel):
env: str = DEFAULT_ENV
platform: str
base_path: str
platform: str = "" # overwritten by validator below
use_relative_path: bool = False
ignore_dotfiles: bool = True
@ -40,6 +41,15 @@ class DataLakeSourceConfig(ConfigModel):
profiling.allow_deny_patterns = values["profile_patterns"]
return values
@pydantic.validator("platform", always=True)
def validate_platform(cls, value: str, values: Dict[str, Any]) -> Optional[str]:
if value != "":
return value
if is_s3_uri(values["base_path"]):
return "s3"
return "file"
@pydantic.validator("path_spec", always=True)
def validate_path_spec(
cls, value: Optional[str], values: Dict[str, Any]