feat(ingest/abs): Adding azure blob storage ingestion source (#10813)

This commit is contained in:
Joel Pinto Mata (KPN-DSH-DEX team) 2024-07-17 11:06:05 +02:00 committed by GitHub
parent 298c299cf1
commit 13b6febce9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 2138 additions and 11 deletions

View File

@ -0,0 +1,40 @@
This connector ingests Azure Blob Storage (abbreviated to abs) datasets into DataHub. It allows mapping an individual
file or a folder of files to a dataset in DataHub.
To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer
section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.
### Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
| Source Concept | DataHub Concept | Notes |
|----------------------------------------|--------------------------------------------------------------------------------------------|------------------|
| `"abs"` | [Data Platform](https://datahubproject.io/docs/generated/metamodel/entities/dataplatform/) | |
| abs blob / Folder containing abs blobs | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) | |
| abs container | [Container](https://datahubproject.io/docs/generated/metamodel/entities/container/) | Subtype `Folder` |
This connector supports both local files and those stored on Azure Blob Storage (which must be identified using the
prefix `http(s)://<account>.blob.core.windows.net/` or `azure://`).
### Supported file types
Supported file types are as follows:
- CSV (*.csv)
- TSV (*.tsv)
- JSONL (*.jsonl)
- JSON (*.json)
- Parquet (*.parquet)
- Apache Avro (*.avro)
Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first
100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few
objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.
### Profiling
Profiling is not available in the current release.

View File

@ -0,0 +1,204 @@
### Path Specs
Path Specs (`path_specs`) is a list of Path Spec (`path_spec`) objects, where each individual `path_spec` represents one or more datasets. The include path (`path_spec.include`) represents a formatted path to the dataset. This path must end with `*.*` or `*.[ext]` to represent the leaf level. If `*.[ext]` is provided, then only files with the specified extension type will be scanned. "`.[ext]`" can be any of the [supported file types](#supported-file-types). Refer to [example 1](#example-1---individual-file-as-dataset) below for more details.
All folder levels need to be specified in the include path. You can use `/*/` to represent a folder level and avoid specifying the exact folder name. To map a folder as a dataset, use the `{table}` placeholder to represent the folder level for which the dataset is to be created. For a partitioned dataset, you can use the placeholder `{partition_key[i]}` to represent the name of the `i`th partition and `{partition[i]}` to represent the value of the `i`th partition. During ingestion, `i` will be used to match the partition_key to the partition. Refer to [examples 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.
Exclude paths (`path_spec.exclude`) can be used to ignore paths that are not relevant to the current `path_spec`. This path cannot have named variables (`{}`). The exclude path can have `**` to represent multiple folder levels. Refer to [example 4](#example-4---folder-of-files-as-dataset-with-partitions-and-exclude-filter) below for more details.
Refer to [example 5](#example-5---advanced---either-individual-file-or-folder-of-files-as-dataset) if your container has a more complex dataset representation.
**Additional points to note**
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.
### Path Specs - Examples
#### Example 1 - Individual file as Dataset
Container structure:
```
test-container
├── employees.csv
├── departments.json
└── food_items.csv
```
Path specs config to ingest `employees.csv` and `food_items.csv` as datasets:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/*.csv
```
This will automatically ignore `departments.json` file. To include it, use `*.*` instead of `*.csv`.
#### Example 2 - Folder of files as Dataset (without Partitions)
Container structure:
```
test-container
└── offers
├── 1.avro
└── 2.avro
```
Path specs config to ingest folder `offers` as dataset:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.avro
```
`{table}` represents folder for which dataset will be created.
#### Example 3 - Folder of files as Dataset (with Partitions)
Container structure:
```
test-container
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
```
Path specs config to ingest folders `orders` and `returns` as datasets:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```
One can also use `include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*/*/*.parquet` here however above format is preferred as it allows declaring partitions explicitly.
#### Example 4 - Folder of files as Dataset (with Partitions), and Exclude Filter
Container structure:
```
test-container
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
```
Path specs config to ingest folder `orders` as dataset but not folder `tmp_orders`:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
exclude:
- **/tmp_orders/**
```
#### Example 5 - Advanced - Either Individual file OR Folder of files as Dataset
Container structure:
```
test-container
├── customers
│ ├── part1.json
│ ├── part2.json
│ ├── part3.json
│ └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
└── year=2022
└── month=2
├── 1.parquet
├── 2.parquet
└── 3.parquet
```
Path specs config:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/*.csv
exclude:
- **/tmp_10101000.csv
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.json
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```
Above config has 3 path_specs and will ingest following datasets
- `employees.csv` - Single File as Dataset
- `food_items.csv` - Single File as Dataset
- `customers` - Folder as Dataset
- `orders` - Folder as Dataset
and will ignore file `tmp_10101000.csv`
**Valid path_specs.include**
```python
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/bar.avro # single file table
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/*.* # mulitple file level tables
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.avro #table without partition
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*/*.avro #table where partitions are not specified
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.* # table where no partitions as well as data type specified
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
https://storageaccountname.blob.core.windows.net/my-container/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in container
https://storageaccountname.blob.core.windows.net/my-container/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in container
```
**Valid path_specs.exclude**
- \**/tests/**
- https://storageaccountname.blob.core.windows.net/my-container/hr/**
- **/tests/*.csv
- https://storageaccountname.blob.core.windows.net/my-container/foo/*/my_table/**
If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.
:::caution
Specify as long fixed prefix ( with out /*/ ) as possible in `path_specs.include`. This will reduce the scanning time and cost, specifically on AWS S3
:::
:::caution
Running profiling against many tables or over many rows can run up significant costs.
While we've done our best to limit the expensiveness of the queries the profiler runs, you
should be prudent about the set of tables profiling is enabled on or the frequency
of the profiling runs.
:::
:::caution
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
:::
### Compatibility
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).
For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/).
:::caution
From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [https://github.com/apache/spark/blob/72c62b6596d21e975c5597f8fff84b1a9d070a02/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala#L158]
Avro files that contain such columns won't be profiled.
:::

View File

@ -0,0 +1,13 @@
source:
type: abs
config:
path_specs:
- include: "https://storageaccountname.blob.core.windows.net/covid19-lake/covid_knowledge_graph/csv/nodes/*.*"
azure_config:
account_name: "*****"
sas_token: "*****"
container_name: "covid_knowledge_graph"
env: "PROD"
# sink configs

View File

@ -1,5 +1,5 @@
This connector ingests AWS S3 datasets into DataHub. It allows mapping an individual file or a folder of files to a dataset in DataHub.
To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.
Refer to the section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.
:::tip
This connector can also be used to ingest local files.

View File

@ -258,6 +258,13 @@ s3_base = {
*path_spec_common,
}
abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.14.0",
"azure-storage-blob>=12.19.0",
"azure-storage-file-datalake>=12.14.0",
}
data_lake_profiling = {
"pydeequ~=1.1.0",
"pyspark~=3.3.0",
@ -265,6 +272,7 @@ data_lake_profiling = {
delta_lake = {
*s3_base,
*abs_base,
# Version 0.18.0 broken on ARM Macs: https://github.com/delta-io/delta-rs/issues/2577
"deltalake>=0.6.3, != 0.6.4, < 0.18.0; platform_system == 'Darwin' and platform_machine == 'arm64'",
"deltalake>=0.6.3, != 0.6.4; platform_system != 'Darwin' or platform_machine != 'arm64'",
@ -407,6 +415,7 @@ plugins: Dict[str, Set[str]] = {
| {"cachetools"},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"abs": {*abs_base},
"sagemaker": aws_common,
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
@ -686,6 +695,7 @@ entry_points = {
"demo-data = datahub.ingestion.source.demo_data.DemoDataSource",
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"abs = datahub.ingestion.source.abs.source:ABSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource",
"qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource",

View File

@ -0,0 +1,163 @@
import logging
from typing import Any, Dict, List, Optional, Union
import pydantic
from pydantic.fields import Field
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.abs.datalake_profiler_config import DataLakeProfilerConfig
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)
class DataLakeSourceConfig(
StatefulIngestionConfigBase, DatasetSourceConfigMixin, PathSpecsConfigMixin
):
platform: str = Field(
default="",
description="The platform that this source connects to (either 'abs' or 'file'). "
"If not specified, the platform will be inferred from the path_specs.",
)
azure_config: Optional[AzureConnectionConfig] = Field(
default=None, description="Azure configuration"
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
# Whether to create Datahub Azure Container properties
use_abs_container_properties: Optional[bool] = Field(
None,
description="Whether to create tags in datahub from the abs container properties",
)
# Whether to create Datahub Azure blob tags
use_abs_blob_tags: Optional[bool] = Field(
None,
description="Whether to create tags in datahub from the abs blob tags",
)
# Whether to create Datahub Azure blob properties
use_abs_blob_properties: Optional[bool] = Field(
None,
description="Whether to create tags in datahub from the abs blob properties",
)
# Whether to update the table schema when schema in files within the partitions are updated
_update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated(
"update_schema_on_partition_file_updates",
message="update_schema_on_partition_file_updates is deprecated. This behaviour is the default now.",
)
profile_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for tables to profile ",
)
profiling: DataLakeProfilerConfig = Field(
default=DataLakeProfilerConfig(), description="Data profiling configuration"
)
spark_driver_memory: str = Field(
default="4g", description="Max amount of memory to grant Spark."
)
spark_config: Dict[str, Any] = Field(
description='Spark configuration properties to set on the SparkSession. Put config property names into quotes. For example: \'"spark.executor.memory": "2g"\'',
default={},
)
max_rows: int = Field(
default=100,
description="Maximum number of rows to use when inferring schemas for TSV and CSV files.",
)
add_partition_columns_to_schema: bool = Field(
default=False,
description="Whether to add partition fields to the schema.",
)
verify_ssl: Union[bool, str] = Field(
default=True,
description="Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.",
)
number_of_files_to_sample: int = Field(
default=100,
description="Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec.",
)
_rename_path_spec_to_plural = pydantic_renamed_field(
"path_spec", "path_specs", lambda path_spec: [path_spec]
)
def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)
@pydantic.validator("path_specs", always=True)
def check_path_specs_and_infer_platform(
cls, path_specs: List[PathSpec], values: Dict
) -> List[PathSpec]:
if len(path_specs) == 0:
raise ValueError("path_specs must not be empty")
# Check that all path specs have the same platform.
guessed_platforms = set(
"abs" if path_spec.is_abs else "file" for path_spec in path_specs
)
if len(guessed_platforms) > 1:
raise ValueError(
f"Cannot have multiple platforms in path_specs: {guessed_platforms}"
)
guessed_platform = guessed_platforms.pop()
# Ensure abs configs aren't used for file sources.
if guessed_platform != "abs" and (
values.get("use_abs_container_properties")
or values.get("use_abs_blob_tags")
or values.get("use_abs_blob_properties")
):
raise ValueError(
"Cannot grab abs blob/container tags when platform is not abs. Remove the flag or use abs."
)
# Infer platform if not specified.
if values.get("platform") and values["platform"] != guessed_platform:
raise ValueError(
f"All path_specs belong to {guessed_platform} platform, but platform is set to {values['platform']}"
)
else:
logger.debug(f'Setting config "platform": {guessed_platform}')
values["platform"] = guessed_platform
return path_specs
@pydantic.validator("platform", always=True)
def platform_not_empty(cls, platform: str, values: dict) -> str:
inferred_platform = values.get(
"platform", None
) # we may have inferred it above
platform = platform or inferred_platform
if not platform:
raise ValueError("platform must not be empty")
return platform
@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
profiling: Optional[DataLakeProfilerConfig] = values.get("profiling")
if profiling is not None and profiling.enabled:
profiling._allow_deny_patterns = values["profile_patterns"]
return values

View File

@ -0,0 +1,92 @@
from typing import Any, Dict, Optional
import pydantic
from pydantic.fields import Field
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source_config.operation_config import OperationConfig
class DataLakeProfilerConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
# These settings will override the ones below.
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)
_allow_deny_patterns: AllowDenyPattern = pydantic.PrivateAttr(
default=AllowDenyPattern.allow_all(),
)
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field(
default=None,
description="A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
)
include_field_null_count: bool = Field(
default=True,
description="Whether to profile for the number of nulls for each column.",
)
include_field_min_value: bool = Field(
default=True,
description="Whether to profile for the min value of numeric columns.",
)
include_field_max_value: bool = Field(
default=True,
description="Whether to profile for the max value of numeric columns.",
)
include_field_mean_value: bool = Field(
default=True,
description="Whether to profile for the mean value of numeric columns.",
)
include_field_median_value: bool = Field(
default=True,
description="Whether to profile for the median value of numeric columns.",
)
include_field_stddev_value: bool = Field(
default=True,
description="Whether to profile for the standard deviation of numeric columns.",
)
include_field_quantiles: bool = Field(
default=True,
description="Whether to profile for the quantiles of numeric columns.",
)
include_field_distinct_value_frequencies: bool = Field(
default=True, description="Whether to profile for distinct value frequencies."
)
include_field_histogram: bool = Field(
default=True,
description="Whether to profile for the histogram for numeric fields.",
)
include_field_sample_values: bool = Field(
default=True,
description="Whether to profile for the sample values for all columns.",
)
@pydantic.root_validator()
def ensure_field_level_settings_are_normalized(
cls: "DataLakeProfilerConfig", values: Dict[str, Any]
) -> Dict[str, Any]:
max_num_fields_to_profile_key = "max_number_of_fields_to_profile"
max_num_fields_to_profile = values.get(max_num_fields_to_profile_key)
# Disable all field-level metrics.
if values.get("profile_table_level_only"):
for field_level_metric in cls.__fields__:
if field_level_metric.startswith("include_field_"):
values.setdefault(field_level_metric, False)
assert (
max_num_fields_to_profile is None
), f"{max_num_fields_to_profile_key} should be set to None"
return values

View File

@ -0,0 +1,472 @@
import dataclasses
from typing import Any, List, Optional
from pandas import DataFrame
from pydeequ.analyzers import (
AnalysisRunBuilder,
AnalysisRunner,
AnalyzerContext,
ApproxCountDistinct,
ApproxQuantile,
ApproxQuantiles,
Histogram,
Maximum,
Mean,
Minimum,
StandardDeviation,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
from pyspark.sql.types import (
DataType as SparkDataType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
NullType,
ShortType,
StringType,
TimestampType,
)
from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.source.profiling.common import (
Cardinality,
convert_to_cardinality,
)
from datahub.ingestion.source.s3.datalake_profiler_config import DataLakeProfilerConfig
from datahub.ingestion.source.s3.report import DataLakeSourceReport
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
HistogramClass,
QuantileClass,
ValueFrequencyClass,
)
from datahub.telemetry import stats, telemetry
NUM_SAMPLE_ROWS = 20
QUANTILES = [0.05, 0.25, 0.5, 0.75, 0.95]
MAX_HIST_BINS = 25
def null_str(value: Any) -> Optional[str]:
# str() with a passthrough for None.
return str(value) if value is not None else None
@dataclasses.dataclass
class _SingleColumnSpec:
column: str
column_profile: DatasetFieldProfileClass
# if the histogram is a list of value frequencies (discrete data) or bins (continuous data)
histogram_distinct: Optional[bool] = None
type_: SparkDataType = NullType # type:ignore
unique_count: Optional[int] = None
non_null_count: Optional[int] = None
cardinality: Optional[Cardinality] = None
class _SingleTableProfiler:
spark: SparkSession
dataframe: DataFrame
analyzer: AnalysisRunBuilder
column_specs: List[_SingleColumnSpec]
row_count: int
profiling_config: DataLakeProfilerConfig
file_path: str
columns_to_profile: List[str]
ignored_columns: List[str]
profile: DatasetProfileClass
report: DataLakeSourceReport
def __init__(
self,
dataframe: DataFrame,
spark: SparkSession,
profiling_config: DataLakeProfilerConfig,
report: DataLakeSourceReport,
file_path: str,
):
self.spark = spark
self.dataframe = dataframe
self.analyzer = AnalysisRunner(spark).onData(dataframe)
self.column_specs = []
self.row_count = dataframe.count()
self.profiling_config = profiling_config
self.file_path = file_path
self.columns_to_profile = []
self.ignored_columns = []
self.profile = DatasetProfileClass(timestampMillis=get_sys_time())
self.report = report
self.profile.rowCount = self.row_count
self.profile.columnCount = len(dataframe.columns)
column_types = {x.name: x.dataType for x in dataframe.schema.fields}
if self.profiling_config.profile_table_level_only:
return
# get column distinct counts
for column in dataframe.columns:
if not self.profiling_config._allow_deny_patterns.allowed(column):
self.ignored_columns.append(column)
continue
self.columns_to_profile.append(column)
# Normal CountDistinct is ridiculously slow
self.analyzer.addAnalyzer(ApproxCountDistinct(column))
if self.profiling_config.max_number_of_fields_to_profile is not None:
if (
len(self.columns_to_profile)
> self.profiling_config.max_number_of_fields_to_profile
):
columns_being_dropped = self.columns_to_profile[
self.profiling_config.max_number_of_fields_to_profile :
]
self.columns_to_profile = self.columns_to_profile[
: self.profiling_config.max_number_of_fields_to_profile
]
self.report.report_file_dropped(
f"The max_number_of_fields_to_profile={self.profiling_config.max_number_of_fields_to_profile} reached. Profile of columns {self.file_path}({', '.join(sorted(columns_being_dropped))})"
)
analysis_result = self.analyzer.run()
analysis_metrics = AnalyzerContext.successMetricsAsJson(
self.spark, analysis_result
)
# reshape distinct counts into dictionary
column_distinct_counts = {
x["instance"]: int(x["value"])
for x in analysis_metrics
if x["name"] == "ApproxCountDistinct"
}
select_numeric_null_counts = [
count(
when(
isnan(c) | col(c).isNull(),
c,
)
).alias(c)
for c in self.columns_to_profile
if column_types[column] in [DoubleType, FloatType]
]
# PySpark doesn't support isnan() on non-float/double columns
select_nonnumeric_null_counts = [
count(
when(
col(c).isNull(),
c,
)
).alias(c)
for c in self.columns_to_profile
if column_types[column] not in [DoubleType, FloatType]
]
null_counts = dataframe.select(
select_numeric_null_counts + select_nonnumeric_null_counts
)
column_null_counts = null_counts.toPandas().T[0].to_dict()
column_null_fractions = {
c: column_null_counts[c] / self.row_count if self.row_count != 0 else 0
for c in self.columns_to_profile
}
column_nonnull_counts = {
c: self.row_count - column_null_counts[c] for c in self.columns_to_profile
}
column_unique_proportions = {
c: (
column_distinct_counts[c] / column_nonnull_counts[c]
if column_nonnull_counts[c] > 0
else 0
)
for c in self.columns_to_profile
}
if self.profiling_config.include_field_sample_values:
# take sample and convert to Pandas DataFrame
if self.row_count < NUM_SAMPLE_ROWS:
# if row count is less than number to sample, just take all rows
rdd_sample = dataframe.rdd.take(self.row_count)
else:
rdd_sample = dataframe.rdd.takeSample(False, NUM_SAMPLE_ROWS, seed=0)
# init column specs with profiles
for column in self.columns_to_profile:
column_profile = DatasetFieldProfileClass(fieldPath=column)
column_spec = _SingleColumnSpec(column, column_profile)
column_profile.uniqueCount = column_distinct_counts.get(column)
column_profile.uniqueProportion = column_unique_proportions.get(column)
column_profile.nullCount = column_null_counts.get(column)
column_profile.nullProportion = column_null_fractions.get(column)
if self.profiling_config.include_field_sample_values:
column_profile.sampleValues = sorted(
[str(x[column]) for x in rdd_sample]
)
column_spec.type_ = column_types[column]
column_spec.cardinality = convert_to_cardinality(
column_distinct_counts[column],
column_null_fractions[column],
)
self.column_specs.append(column_spec)
def prep_min_value(self, column: str) -> None:
if self.profiling_config.include_field_min_value:
self.analyzer.addAnalyzer(Minimum(column))
def prep_max_value(self, column: str) -> None:
if self.profiling_config.include_field_max_value:
self.analyzer.addAnalyzer(Maximum(column))
def prep_mean_value(self, column: str) -> None:
if self.profiling_config.include_field_mean_value:
self.analyzer.addAnalyzer(Mean(column))
def prep_median_value(self, column: str) -> None:
if self.profiling_config.include_field_median_value:
self.analyzer.addAnalyzer(ApproxQuantile(column, 0.5))
def prep_stdev_value(self, column: str) -> None:
if self.profiling_config.include_field_stddev_value:
self.analyzer.addAnalyzer(StandardDeviation(column))
def prep_quantiles(self, column: str) -> None:
if self.profiling_config.include_field_quantiles:
self.analyzer.addAnalyzer(ApproxQuantiles(column, QUANTILES))
def prep_distinct_value_frequencies(self, column: str) -> None:
if self.profiling_config.include_field_distinct_value_frequencies:
self.analyzer.addAnalyzer(Histogram(column))
def prep_field_histogram(self, column: str) -> None:
if self.profiling_config.include_field_histogram:
self.analyzer.addAnalyzer(Histogram(column, maxDetailBins=MAX_HIST_BINS))
def prepare_table_profiles(self) -> None:
row_count = self.row_count
telemetry.telemetry_instance.ping(
"profile_data_lake_table",
{"rows_profiled": stats.discretize(row_count)},
)
# loop through the columns and add the analyzers
for column_spec in self.column_specs:
column = column_spec.column
column_profile = column_spec.column_profile
type_ = column_spec.type_
cardinality = column_spec.cardinality
non_null_count = column_spec.non_null_count
unique_count = column_spec.unique_count
if (
self.profiling_config.include_field_null_count
and non_null_count is not None
):
null_count = row_count - non_null_count
assert null_count >= 0
column_profile.nullCount = null_count
if row_count > 0:
column_profile.nullProportion = null_count / row_count
if unique_count is not None:
column_profile.uniqueCount = unique_count
if non_null_count is not None and non_null_count > 0:
column_profile.uniqueProportion = unique_count / non_null_count
if isinstance(
type_,
(
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
ShortType,
),
):
if cardinality == Cardinality.UNIQUE:
pass
elif cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
column_spec.histogram_distinct = True
self.prep_distinct_value_frequencies(column)
elif cardinality in [
Cardinality.MANY,
Cardinality.VERY_MANY,
Cardinality.UNIQUE,
]:
column_spec.histogram_distinct = False
self.prep_min_value(column)
self.prep_max_value(column)
self.prep_mean_value(column)
self.prep_median_value(column)
self.prep_stdev_value(column)
self.prep_quantiles(column)
self.prep_field_histogram(column)
else: # unknown cardinality - skip
pass
elif isinstance(type_, StringType):
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
column_spec.histogram_distinct = True
self.prep_distinct_value_frequencies(
column,
)
elif isinstance(type_, (DateType, TimestampType)):
self.prep_min_value(column)
self.prep_max_value(column)
# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self.prep_distinct_value_frequencies(
column,
)
def extract_table_profiles(
self,
analysis_metrics: DataFrame,
) -> None:
self.profile.fieldProfiles = []
analysis_metrics = analysis_metrics.toPandas()
# DataFrame with following columns:
# entity: "Column" for column profile, "Table" for table profile
# instance: name of column being profiled. "*" for table profiles
# name: name of metric. Histogram metrics are formatted as "Histogram.<metric>.<value>"
# value: value of metric
column_metrics = analysis_metrics[analysis_metrics["entity"] == "Column"]
# resolve histogram types for grouping
column_metrics["kind"] = column_metrics["name"].apply(
lambda x: "Histogram" if x.startswith("Histogram.") else x
)
column_histogram_metrics = column_metrics[column_metrics["kind"] == "Histogram"]
column_nonhistogram_metrics = column_metrics[
column_metrics["kind"] != "Histogram"
]
histogram_columns = set()
if len(column_histogram_metrics) > 0:
# we only want the absolute counts for each histogram for now
column_histogram_metrics = column_histogram_metrics[
column_histogram_metrics["name"].apply(
lambda x: x.startswith("Histogram.abs.")
)
]
# get the histogram bins by chopping off the "Histogram.abs." prefix
column_histogram_metrics["bin"] = column_histogram_metrics["name"].apply(
lambda x: x[14:]
)
# reshape histogram counts for easier access
histogram_counts = column_histogram_metrics.set_index(["instance", "bin"])[
"value"
]
histogram_columns = set(histogram_counts.index.get_level_values(0))
profiled_columns = set()
if len(column_nonhistogram_metrics) > 0:
# reshape other metrics for easier access
nonhistogram_metrics = column_nonhistogram_metrics.set_index(
["instance", "name"]
)["value"]
profiled_columns = set(nonhistogram_metrics.index.get_level_values(0))
# histogram_columns = set(histogram_counts.index.get_level_values(0))
for column_spec in self.column_specs:
column = column_spec.column
column_profile = column_spec.column_profile
if column not in profiled_columns:
continue
# convert to Dict so we can use .get
deequ_column_profile = nonhistogram_metrics.loc[column].to_dict()
# uniqueCount, uniqueProportion, nullCount, nullProportion, sampleValues already set in TableWrapper
column_profile.min = null_str(deequ_column_profile.get("Minimum"))
column_profile.max = null_str(deequ_column_profile.get("Maximum"))
column_profile.mean = null_str(deequ_column_profile.get("Mean"))
column_profile.median = null_str(
deequ_column_profile.get("ApproxQuantiles-0.5")
)
column_profile.stdev = null_str(
deequ_column_profile.get("StandardDeviation")
)
if all(
deequ_column_profile.get(f"ApproxQuantiles-{quantile}") is not None
for quantile in QUANTILES
):
column_profile.quantiles = [
QuantileClass(
quantile=str(quantile),
value=str(deequ_column_profile[f"ApproxQuantiles-{quantile}"]),
)
for quantile in QUANTILES
]
if column in histogram_columns:
column_histogram = histogram_counts.loc[column]
# sort so output is deterministic
column_histogram = column_histogram.sort_index()
if column_spec.histogram_distinct:
column_profile.distinctValueFrequencies = [
ValueFrequencyClass(
value=value, frequency=int(column_histogram.loc[value])
)
for value in column_histogram.index
]
# sort so output is deterministic
column_profile.distinctValueFrequencies = sorted(
column_profile.distinctValueFrequencies, key=lambda x: x.value
)
else:
column_profile.histogram = HistogramClass(
[str(x) for x in column_histogram.index],
[float(x) for x in column_histogram],
)
# append the column profile to the dataset profile
self.profile.fieldProfiles.append(column_profile)

View File

@ -0,0 +1,19 @@
import dataclasses
from dataclasses import field as dataclass_field
from typing import List
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
@dataclasses.dataclass
class DataLakeSourceReport(StaleEntityRemovalSourceReport):
files_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
def report_file_scanned(self) -> None:
self.files_scanned += 1
def report_file_dropped(self, file: str) -> None:
self.filtered.append(file)

View File

@ -0,0 +1,700 @@
import dataclasses
import functools
import logging
import os
import pathlib
import re
import time
from collections import OrderedDict
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from smart_open import open as smart_open
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
from datahub.ingestion.source.azure.abs_util import (
get_abs_properties,
get_abs_tags,
get_container_name,
get_container_relative_path,
get_key_prefix,
list_folders,
strip_abs_prefix,
)
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
MapTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
_Aspect,
)
from datahub.telemetry import telemetry
from datahub.utilities.perf_timer import PerfTimer
# hide annoying debug errors from py4j
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)
# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000
# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])
def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None
for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break
# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass
return SchemaFieldDataType(type=TypeClass())
# config flags to emit telemetry for
config_options_to_report = [
"platform",
"use_relative_path",
"ignore_dotfiles",
]
def partitioned_folder_comparator(folder1: str, folder2: str) -> int:
# Try to convert to number and compare if the folder name is a number
try:
# Stripping = from the folder names as it most probably partition name part like year=2021
if "=" in folder1 and "=" in folder2:
if folder1.rsplit("=", 1)[0] == folder2.rsplit("=", 1)[0]:
folder1 = folder1.rsplit("=", 1)[-1]
folder2 = folder2.rsplit("=", 1)[-1]
num_folder1 = int(folder1)
num_folder2 = int(folder2)
if num_folder1 == num_folder2:
return 0
else:
return 1 if num_folder1 > num_folder2 else -1
except Exception:
# If folder name is not a number then do string comparison
if folder1 == folder2:
return 0
else:
return 1 if folder1 > folder2 else -1
@dataclasses.dataclass
class TableData:
display_name: str
is_abs: bool
full_path: str
rel_path: str
partitions: Optional[OrderedDict]
timestamp: datetime
table_path: str
size_in_bytes: int
number_of_files: int
@platform_name("ABS Data Lake", id="abs")
@config_class(DataLakeSourceConfig)
@support_status(SupportStatus.INCUBATING)
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.TAGS, "Can extract ABS object/container tags if enabled")
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
supported=True,
)
class ABSSource(StatefulIngestionSourceBase):
source_config: DataLakeSourceConfig
report: DataLakeSourceReport
profiling_times_taken: List[float]
container_WU_creator: ContainerWUCreator
def __init__(self, config: DataLakeSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.source_config = config
self.report = DataLakeSourceReport()
self.profiling_times_taken = []
config_report = {
config_option: config.dict().get(config_option)
for config_option in config_options_to_report
}
config_report = {
**config_report,
"profiling_enabled": config.is_profiling_enabled(),
}
telemetry.telemetry_instance.ping(
"data_lake_config",
config_report,
)
@classmethod
def create(cls, config_dict, ctx):
config = DataLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
if self.is_abs_platform():
if self.source_config.azure_config is None:
raise ValueError("Azure config is required for ABS file sources")
abs_client = self.source_config.azure_config.get_blob_service_client()
file = smart_open(
f"azure://{self.source_config.azure_config.container_name}/{table_data.rel_path}",
"rb",
transport_params={"client": abs_client},
)
else:
# We still use smart_open here to take advantage of the compression
# capabilities of smart_open.
file = smart_open(table_data.full_path, "rb")
fields = []
extension = pathlib.Path(table_data.full_path).suffix
from datahub.ingestion.source.data_lake_common.path_spec import (
SUPPORTED_COMPRESSIONS,
)
if path_spec.enable_compression and (extension[1:] in SUPPORTED_COMPRESSIONS):
# Removing the compression extension and using the one before that like .json.gz -> .json
extension = pathlib.Path(table_data.full_path).with_suffix("").suffix
if extension == "" and path_spec.default_extension:
extension = f".{path_spec.default_extension}"
try:
if extension == ".parquet":
fields = parquet.ParquetInferrer().infer_schema(file)
elif extension == ".csv":
fields = csv_tsv.CsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif extension == ".tsv":
fields = csv_tsv.TsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif extension == ".json":
fields = json.JsonInferrer().infer_schema(file)
elif extension == ".avro":
fields = avro.AvroInferrer().infer_schema(file)
else:
self.report.report_warning(
table_data.full_path,
f"file {table_data.full_path} has unsupported extension",
)
file.close()
except Exception as e:
self.report.report_warning(
table_data.full_path,
f"could not infer schema for file {table_data.full_path}: {e}",
)
file.close()
logger.debug(f"Extracted fields in schema: {fields}")
fields = sorted(fields, key=lambda f: f.fieldPath)
if self.source_config.add_partition_columns_to_schema:
self.add_partition_columns_to_schema(
fields=fields, path_spec=path_spec, full_path=table_data.full_path
)
return fields
def add_partition_columns_to_schema(
self, path_spec: PathSpec, full_path: str, fields: List[SchemaField]
) -> None:
vars = path_spec.get_named_vars(full_path)
if vars is not None and "partition" in vars:
for partition in vars["partition"].values():
partition_arr = partition.split("=")
if len(partition_arr) != 2:
logger.debug(
f"Could not derive partition key from partition field {partition}"
)
continue
partition_key = partition_arr[0]
fields.append(
SchemaField(
fieldPath=f"{partition_key}",
nativeDataType="string",
type=SchemaFieldDataType(StringTypeClass()),
isPartitioningKey=True,
nullable=True,
recursive=False,
)
)
def _create_table_operation_aspect(self, table_data: TableData) -> OperationClass:
reported_time = int(time.time() * 1000)
operation = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=int(table_data.timestamp.timestamp() * 1000),
operationType=OperationTypeClass.UPDATE,
)
return operation
def ingest_table(
self, table_data: TableData, path_spec: PathSpec
) -> Iterable[MetadataWorkUnit]:
aspects: List[Optional[_Aspect]] = []
logger.info(f"Extracting table schema from file: {table_data.full_path}")
browse_path: str = (
strip_abs_prefix(table_data.table_path)
if self.is_abs_platform()
else table_data.table_path.strip("/")
)
data_platform_urn = make_data_platform_urn(self.source_config.platform)
logger.info(f"Creating dataset urn with name: {browse_path}")
dataset_urn = make_dataset_urn_with_platform_instance(
self.source_config.platform,
browse_path,
self.source_config.platform_instance,
self.source_config.env,
)
if self.source_config.platform_instance:
data_platform_instance = DataPlatformInstanceClass(
platform=data_platform_urn,
instance=make_dataplatform_instance_urn(
self.source_config.platform, self.source_config.platform_instance
),
)
aspects.append(data_platform_instance)
container = get_container_name(table_data.table_path)
key_prefix = (
get_key_prefix(table_data.table_path)
if table_data.full_path == table_data.table_path
else None
)
custom_properties = get_abs_properties(
container,
key_prefix,
full_path=str(table_data.full_path),
number_of_files=table_data.number_of_files,
size_in_bytes=table_data.size_in_bytes,
sample_files=path_spec.sample_files,
azure_config=self.source_config.azure_config,
use_abs_container_properties=self.source_config.use_abs_container_properties,
use_abs_blob_properties=self.source_config.use_abs_blob_properties,
)
dataset_properties = DatasetPropertiesClass(
description="",
name=table_data.display_name,
customProperties=custom_properties,
)
aspects.append(dataset_properties)
if table_data.size_in_bytes > 0:
try:
fields = self.get_fields(table_data, path_spec)
schema_metadata = SchemaMetadata(
schemaName=table_data.display_name,
platform=data_platform_urn,
version=0,
hash="",
fields=fields,
platformSchema=OtherSchemaClass(rawSchema=""),
)
aspects.append(schema_metadata)
except Exception as e:
logger.error(
f"Failed to extract schema from file {table_data.full_path}. The error was:{e}"
)
else:
logger.info(
f"Skipping schema extraction for empty file {table_data.full_path}"
)
if (
self.source_config.use_abs_container_properties
or self.source_config.use_abs_blob_tags
):
abs_tags = get_abs_tags(
container,
key_prefix,
dataset_urn,
self.source_config.azure_config,
self.ctx,
self.source_config.use_abs_blob_tags,
)
if abs_tags:
aspects.append(abs_tags)
operation = self._create_table_operation_aspect(table_data)
aspects.append(operation)
for mcp in MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=aspects,
):
yield mcp.as_workunit()
yield from self.container_WU_creator.create_container_hierarchy(
table_data.table_path, dataset_urn
)
def get_prefix(self, relative_path: str) -> str:
index = re.search(r"[\*|\{]", relative_path)
if index:
return relative_path[: index.start()]
else:
return relative_path
def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str:
if path_spec.table_name is None:
raise ValueError("path_spec.table_name is not set")
return path_spec.table_name.format_map(named_vars)
def extract_table_data(
self,
path_spec: PathSpec,
path: str,
rel_path: str,
timestamp: datetime,
size: int,
) -> TableData:
logger.debug(f"Getting table data for path: {path}")
table_name, table_path = path_spec.extract_table_name_and_path(path)
table_data = TableData(
display_name=table_name,
is_abs=self.is_abs_platform(),
full_path=path,
rel_path=rel_path,
partitions=None,
timestamp=timestamp,
table_path=table_path,
number_of_files=1,
size_in_bytes=size,
)
return table_data
def resolve_templated_folders(
self, container_name: str, prefix: str
) -> Iterable[str]:
folder_split: List[str] = prefix.split("*", 1)
# If the len of split is 1 it means we don't have * in the prefix
if len(folder_split) == 1:
yield prefix
return
folders: Iterable[str] = list_folders(
container_name, folder_split[0], self.source_config.azure_config
)
for folder in folders:
yield from self.resolve_templated_folders(
container_name, f"{folder}{folder_split[1]}"
)
def get_dir_to_process(
self,
container_name: str,
folder: str,
path_spec: PathSpec,
protocol: str,
) -> str:
iterator = list_folders(
container_name=container_name,
prefix=folder,
azure_config=self.source_config.azure_config,
)
iterator = peekable(iterator)
if iterator:
sorted_dirs = sorted(
iterator,
key=functools.cmp_to_key(partitioned_folder_comparator),
reverse=True,
)
for dir in sorted_dirs:
if path_spec.dir_allowed(f"{protocol}{container_name}/{dir}/"):
return self.get_dir_to_process(
container_name=container_name,
folder=dir + "/",
path_spec=path_spec,
protocol=protocol,
)
return folder
else:
return folder
def abs_browser(
self, path_spec: PathSpec, sample_size: int
) -> Iterable[Tuple[str, str, datetime, int]]:
if self.source_config.azure_config is None:
raise ValueError("azure_config not set. Cannot browse Azure Blob Storage")
abs_blob_service_client = (
self.source_config.azure_config.get_blob_service_client()
)
container_client = abs_blob_service_client.get_container_client(
self.source_config.azure_config.container_name
)
container_name = self.source_config.azure_config.container_name
logger.debug(f"Scanning container: {container_name}")
prefix = self.get_prefix(get_container_relative_path(path_spec.include))
logger.debug(f"Scanning objects with prefix:{prefix}")
matches = re.finditer(r"{\s*\w+\s*}", path_spec.include, re.MULTILINE)
matches_list = list(matches)
if matches_list and path_spec.sample_files:
max_start: int = -1
include: str = path_spec.include
max_match: str = ""
for match in matches_list:
pos = include.find(match.group())
if pos > max_start:
if max_match:
include = include.replace(max_match, "*")
max_start = match.start()
max_match = match.group()
table_index = include.find(max_match)
for folder in self.resolve_templated_folders(
container_name,
get_container_relative_path(include[:table_index]),
):
try:
for f in list_folders(
container_name, f"{folder}", self.source_config.azure_config
):
logger.info(f"Processing folder: {f}")
protocol = ContainerWUCreator.get_protocol(path_spec.include)
dir_to_process = self.get_dir_to_process(
container_name=container_name,
folder=f + "/",
path_spec=path_spec,
protocol=protocol,
)
logger.info(f"Getting files from folder: {dir_to_process}")
dir_to_process = dir_to_process.rstrip("\\")
for obj in container_client.list_blobs(
name_starts_with=f"{dir_to_process}",
results_per_page=PAGE_SIZE,
):
abs_path = self.create_abs_path(obj.name)
logger.debug(f"Sampling file: {abs_path}")
yield abs_path, obj.name, obj.last_modified, obj.size,
except Exception as e:
# This odd check if being done because boto does not have a proper exception to catch
# The exception that appears in stacktrace cannot actually be caught without a lot more work
# https://github.com/boto/boto3/issues/1195
if "NoSuchBucket" in repr(e):
logger.debug(
f"Got NoSuchBucket exception for {container_name}", e
)
self.get_report().report_warning(
"Missing bucket", f"No bucket found {container_name}"
)
else:
raise e
else:
logger.debug(
"No template in the pathspec can't do sampling, fallbacking to do full scan"
)
path_spec.sample_files = False
for obj in container_client.list_blobs(
prefix=f"{prefix}", results_per_page=PAGE_SIZE
):
abs_path = self.create_abs_path(obj.name)
logger.debug(f"Path: {abs_path}")
# the following line if using the file_system_client
# yield abs_path, obj.last_modified, obj.content_length,
yield abs_path, obj.name, obj.last_modified, obj.size
def create_abs_path(self, key: str) -> str:
if self.source_config.azure_config:
account_name = self.source_config.azure_config.account_name
container_name = self.source_config.azure_config.container_name
return (
f"https://{account_name}.blob.core.windows.net/{container_name}/{key}"
)
return ""
def local_browser(
self, path_spec: PathSpec
) -> Iterable[Tuple[str, str, datetime, int]]:
prefix = self.get_prefix(path_spec.include)
if os.path.isfile(prefix):
logger.debug(f"Scanning single local file: {prefix}")
file_name = prefix
yield prefix, file_name, datetime.utcfromtimestamp(
os.path.getmtime(prefix)
), os.path.getsize(prefix)
else:
logger.debug(f"Scanning files under local folder: {prefix}")
for root, dirs, files in os.walk(prefix):
dirs.sort(key=functools.cmp_to_key(partitioned_folder_comparator))
for file in sorted(files):
# We need to make sure the path is in posix style which is not true on windows
full_path = PurePath(
os.path.normpath(os.path.join(root, file))
).as_posix()
yield full_path, file, datetime.utcfromtimestamp(
os.path.getmtime(full_path)
), os.path.getsize(full_path)
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.container_WU_creator = ContainerWUCreator(
self.source_config.platform,
self.source_config.platform_instance,
self.source_config.env,
)
with PerfTimer():
assert self.source_config.path_specs
for path_spec in self.source_config.path_specs:
file_browser = (
self.abs_browser(
path_spec, self.source_config.number_of_files_to_sample
)
if self.is_abs_platform()
else self.local_browser(path_spec)
)
table_dict: Dict[str, TableData] = {}
for file, name, timestamp, size in file_browser:
if not path_spec.allowed(file):
continue
table_data = self.extract_table_data(
path_spec, file, name, timestamp, size
)
if table_data.table_path not in table_dict:
table_dict[table_data.table_path] = table_data
else:
table_dict[table_data.table_path].number_of_files = (
table_dict[table_data.table_path].number_of_files + 1
)
table_dict[table_data.table_path].size_in_bytes = (
table_dict[table_data.table_path].size_in_bytes
+ table_data.size_in_bytes
)
if (
table_dict[table_data.table_path].timestamp
< table_data.timestamp
) and (table_data.size_in_bytes > 0):
table_dict[
table_data.table_path
].full_path = table_data.full_path
table_dict[
table_data.table_path
].timestamp = table_data.timestamp
for guid, table_data in table_dict.items():
yield from self.ingest_table(table_data, path_spec)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]
def is_abs_platform(self):
return self.source_config.platform == "abs"
def get_report(self):
return self.report

View File

@ -0,0 +1,286 @@
import logging
import os
import re
from typing import Dict, Iterable, List, Optional
from azure.storage.blob import BlobProperties
from datahub.emitter.mce_builder import make_tag_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)
def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))
def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None
def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]
def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)
if abs_name.endswith("/"):
abs_name = abs_name[:-1]
name, extension = os.path.splitext(abs_name)
if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"
def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]
def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]
def get_abs_properties(
container_name: str,
blob_name: Optional[str],
full_path: str,
number_of_files: int,
size_in_bytes: int,
sample_files: bool,
azure_config: Optional[AzureConnectionConfig],
use_abs_container_properties: Optional[bool] = False,
use_abs_blob_properties: Optional[bool] = False,
) -> Dict[str, str]:
if azure_config is None:
raise ValueError(
"Azure configuration is not provided. Cannot retrieve container client."
)
blob_service_client = azure_config.get_blob_service_client()
container_client = blob_service_client.get_container_client(
container=container_name
)
custom_properties = {"schema_inferred_from": full_path}
if not sample_files:
custom_properties.update(
{
"number_of_files": str(number_of_files),
"size_in_bytes": str(size_in_bytes),
}
)
if use_abs_blob_properties and blob_name is not None:
blob_client = container_client.get_blob_client(blob=blob_name)
blob_properties = blob_client.get_blob_properties()
if blob_properties:
create_properties(
data=blob_properties,
prefix="blob",
custom_properties=custom_properties,
resource_name=blob_name,
json_properties=[
"metadata",
"content_settings",
"lease",
"copy",
"immutability_policy",
],
)
else:
logger.warning(
f"No blob properties found for container={container_name}, blob={blob_name}."
)
if use_abs_container_properties:
container_properties = container_client.get_container_properties()
if container_properties:
create_properties(
data=container_properties,
prefix="container",
custom_properties=custom_properties,
resource_name=container_name,
json_properties=["metadata"],
)
else:
logger.warning(
f"No container properties found for container={container_name}."
)
return custom_properties
def add_property(
key: str, value: str, custom_properties: Dict[str, str], resource_name: str
) -> Dict[str, str]:
if key in custom_properties:
key = f"{key}_{resource_name}"
if value is not None:
custom_properties[key] = str(value)
return custom_properties
def create_properties(
data: BlobProperties,
prefix: str,
custom_properties: Dict[str, str],
resource_name: str,
json_properties: List[str],
) -> None:
for item in data.items():
key = item[0]
transformed_key = f"{prefix}_{key}"
value = item[1]
if value is None:
continue
try:
# These are known properties with a json value, we process these recursively...
if key in json_properties:
create_properties(
data=value,
prefix=f"{prefix}_{key}",
custom_properties=custom_properties,
resource_name=resource_name,
json_properties=json_properties,
)
else:
custom_properties = add_property(
key=transformed_key,
value=value,
custom_properties=custom_properties,
resource_name=resource_name,
)
except Exception as exception:
logger.debug(
f"Could not create property {key} value {value}, from resource {resource_name}: {exception}."
)
def get_abs_tags(
container_name: str,
key_name: Optional[str],
dataset_urn: str,
azure_config: Optional[AzureConnectionConfig],
ctx: PipelineContext,
use_abs_blob_tags: Optional[bool] = False,
) -> Optional[GlobalTagsClass]:
# Todo add the service_client, when building out this get_abs_tags
if azure_config is None:
raise ValueError(
"Azure configuration is not provided. Cannot retrieve container client."
)
tags_to_add: List[str] = []
blob_service_client = azure_config.get_blob_service_client()
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(blob=key_name)
if use_abs_blob_tags and key_name is not None:
tag_set = blob_client.get_blob_tags()
if tag_set:
tags_to_add.extend(
make_tag_urn(f"""{key}:{value}""") for key, value in tag_set.items()
)
else:
# Unlike container tags, if an object does not have tags, it will just return an empty array
# as opposed to an exception.
logger.info(f"No tags found for container={container_name} key={key_name}")
if len(tags_to_add) == 0:
return None
if ctx.graph is not None:
logger.debug("Connected to DatahubApi, grabbing current tags to maintain.")
current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect(
entity_urn=dataset_urn,
aspect_type=GlobalTagsClass,
)
if current_tags:
tags_to_add.extend([current_tag.tag for current_tag in current_tags.tags])
else:
logger.warning("Could not connect to DatahubApi. No current tags to maintain")
# Sort existing tags
tags_to_add = sorted(list(set(tags_to_add)))
# Remove duplicate tags
new_tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
return new_tags
def list_folders(
container_name: str, prefix: str, azure_config: Optional[AzureConnectionConfig]
) -> Iterable[str]:
if azure_config is None:
raise ValueError(
"Azure configuration is not provided. Cannot retrieve container client."
)
abs_blob_service_client = azure_config.get_blob_service_client()
container_client = abs_blob_service_client.get_container_client(container_name)
current_level = prefix.count("/")
blob_list = container_client.list_blobs(name_starts_with=prefix)
this_dict = {}
for blob in blob_list:
blob_name = blob.name[: blob.name.rfind("/") + 1]
folder_structure_arr = blob_name.split("/")
folder_name = ""
if len(folder_structure_arr) > current_level:
folder_name = f"{folder_name}/{folder_structure_arr[current_level]}"
else:
continue
folder_name = folder_name[1 : len(folder_name)]
if folder_name.endswith("/"):
folder_name = folder_name[:-1]
if folder_name == "":
continue
folder_name = f"{prefix}{folder_name}"
if folder_name in this_dict:
continue
else:
this_dict[folder_name] = folder_name
yield f"{folder_name}"
def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])

View File

@ -0,0 +1,98 @@
from typing import Dict, Optional, Union
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient
from pydantic import Field, root_validator
from datahub.configuration import ConfigModel
from datahub.configuration.common import ConfigurationError
class AzureConnectionConfig(ConfigModel):
"""
Common Azure credentials config.
https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python
"""
base_path: str = Field(
default="/",
description="Base folder in hierarchical namespaces to start from.",
)
container_name: str = Field(
description="Azure storage account container name.",
)
account_name: str = Field(
description="Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
)
account_key: Optional[str] = Field(
description="Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
default=None,
)
sas_token: Optional[str] = Field(
description="Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
default=None,
)
client_secret: Optional[str] = Field(
description="Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
default=None,
)
client_id: Optional[str] = Field(
description="Azure client (Application) ID required when a `client_secret` is used as a credential.",
default=None,
)
tenant_id: Optional[str] = Field(
description="Azure tenant (Directory) ID required when a `client_secret` is used as a credential.",
default=None,
)
def get_abfss_url(self, folder_path: str = "") -> str:
if not folder_path.startswith("/"):
folder_path = f"/{folder_path}"
return f"abfss://{self.container_name}@{self.account_name}.dfs.core.windows.net{folder_path}"
# TODO DEX-1010
def get_filesystem_client(self) -> FileSystemClient:
return self.get_data_lake_service_client().get_file_system_client(
file_system=self.container_name
)
def get_blob_service_client(self):
return BlobServiceClient(
account_url=f"https://{self.account_name}.blob.core.windows.net",
credential=f"{self.get_credentials()}",
)
def get_data_lake_service_client(self) -> DataLakeServiceClient:
return DataLakeServiceClient(
account_url=f"https://{self.account_name}.dfs.core.windows.net",
credential=f"{self.get_credentials()}",
)
def get_credentials(
self,
) -> Union[Optional[str], ClientSecretCredential]:
if self.client_id and self.client_secret and self.tenant_id:
return ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
return self.sas_token if self.sas_token is not None else self.account_key
@root_validator()
def _check_credential_values(cls, values: Dict) -> Dict:
if (
values.get("account_key")
or values.get("sas_token")
or (
values.get("client_id")
and values.get("client_secret")
and values.get("tenant_id")
)
):
return values
raise ConfigurationError(
"credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)"
)

View File

@ -35,6 +35,7 @@ class DatasetContainerSubTypes(str, Enum):
FOLDER = "Folder"
S3_BUCKET = "S3 bucket"
GCS_BUCKET = "GCS bucket"
ABS_CONTAINER = "ABS container"
class BIContainerSubTypes(str, Enum):

View File

@ -16,6 +16,12 @@ from datahub.ingestion.source.aws.s3_util import (
get_s3_prefix,
is_s3_uri,
)
from datahub.ingestion.source.azure.abs_util import (
get_abs_prefix,
get_container_name,
get_container_relative_path,
is_abs_uri,
)
from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes
from datahub.ingestion.source.gcs.gcs_utils import (
get_gcs_bucket_name,
@ -29,6 +35,7 @@ logger: logging.Logger = logging.getLogger(__name__)
PLATFORM_S3 = "s3"
PLATFORM_GCS = "gcs"
PLATFORM_ABS = "abs"
class ContainerWUCreator:
@ -85,6 +92,8 @@ class ContainerWUCreator:
protocol = get_s3_prefix(path)
elif is_gcs_uri(path):
protocol = get_gcs_prefix(path)
elif is_abs_uri(path):
protocol = get_abs_prefix(path)
if protocol:
return protocol
@ -99,7 +108,25 @@ class ContainerWUCreator:
return get_bucket_name(path)
elif is_gcs_uri(path):
return get_gcs_bucket_name(path)
raise ValueError(f"Unable to get get bucket name form path: {path}")
elif is_abs_uri(path):
return get_container_name(path)
raise ValueError(f"Unable to get bucket name from path: {path}")
def get_sub_types(self) -> str:
if self.platform == PLATFORM_S3:
return DatasetContainerSubTypes.S3_BUCKET
elif self.platform == PLATFORM_GCS:
return DatasetContainerSubTypes.GCS_BUCKET
elif self.platform == PLATFORM_ABS:
return DatasetContainerSubTypes.ABS_CONTAINER
raise ValueError(f"Unable to sub type for platform: {self.platform}")
def get_base_full_path(self, path: str) -> str:
if self.platform == "s3" or self.platform == "gcs":
return get_bucket_relative_path(path)
elif self.platform == "abs":
return get_container_relative_path(path)
raise ValueError(f"Unable to get base full path from path: {path}")
def create_container_hierarchy(
self, path: str, dataset_urn: str
@ -107,22 +134,18 @@ class ContainerWUCreator:
logger.debug(f"Creating containers for {dataset_urn}")
base_full_path = path
parent_key = None
if self.platform in (PLATFORM_S3, PLATFORM_GCS):
if self.platform in (PLATFORM_S3, PLATFORM_GCS, PLATFORM_ABS):
bucket_name = self.get_bucket_name(path)
bucket_key = self.gen_bucket_key(bucket_name)
yield from self.create_emit_containers(
container_key=bucket_key,
name=bucket_name,
sub_types=[
DatasetContainerSubTypes.S3_BUCKET
if self.platform == "s3"
else DatasetContainerSubTypes.GCS_BUCKET
],
sub_types=[self.get_sub_types()],
parent_container_key=None,
)
parent_key = bucket_key
base_full_path = get_bucket_relative_path(path)
base_full_path = self.get_base_full_path(path)
parent_folder_path = (
base_full_path[: base_full_path.rfind("/")]

View File

@ -11,6 +11,7 @@ from wcmatch import pathlib
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_util import is_abs_uri
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri
# hide annoying debug errors from py4j
@ -107,7 +108,7 @@ class PathSpec(ConfigModel):
# glob_include = self.glob_include.rsplit("/", 1)[0]
glob_include = self.glob_include
for i in range(slash_to_remove_from_glob):
for _ in range(slash_to_remove_from_glob):
glob_include = glob_include.rsplit("/", 1)[0]
logger.debug(f"Checking dir to inclusion: {path}")
@ -169,7 +170,8 @@ class PathSpec(ConfigModel):
def turn_off_sampling_for_non_s3(cls, v, values):
is_s3 = is_s3_uri(values.get("include") or "")
is_gcs = is_gcs_uri(values.get("include") or "")
if not is_s3 and not is_gcs:
is_abs = is_abs_uri(values.get("include") or "")
if not is_s3 and not is_gcs and not is_abs:
# Sampling only makes sense on s3 and gcs currently
v = False
return v
@ -213,6 +215,10 @@ class PathSpec(ConfigModel):
def is_gcs(self):
return is_gcs_uri(self.include)
@cached_property
def is_abs(self):
return is_abs_uri(self.include)
@cached_property
def compiled_include(self):
parsable_include = PathSpec.get_parsable_include(self.include)