feat(ingest): data-lake - remove spark requirement if not profiling (#4131)

This commit is contained in:
Kevin Hu 2022-02-25 02:26:06 -05:00 committed by GitHub
parent 4b4c7d593b
commit 02fe05eb8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1890 additions and 562 deletions

View File

@ -19,6 +19,8 @@ jobs:
metadata-ingestion-general:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
strategy:
matrix:
python-version: ["3.6", "3.9.9"]
@ -46,6 +48,8 @@ jobs:
metadata-ingestion-by-version:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
strategy:
matrix:
python-version: ["3.6", "3.9.9"]

View File

@ -99,6 +99,21 @@ snowflake_common = {
"cryptography",
}
data_lake_base = {
*aws_common,
"parse>=1.19.0",
"pyarrow>=6.0.1",
"tableschema>=1.20.2",
"ujson>=4.3.0",
"types-ujson>=4.2.1",
"smart-open[s3]>=5.2.1",
}
data_lake_profiling = {
"pydeequ==1.0.1",
"pyspark==3.0.3",
}
# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
@ -118,7 +133,7 @@ plugins: Dict[str, Set[str]] = {
"clickhouse-usage": sql_common | {"clickhouse-sqlalchemy==0.1.8"},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"data-lake": {*aws_common, "pydeequ==1.0.1", "pyspark==3.0.3", "parse==1.19.0"},
"data-lake": {*data_lake_base, *data_lake_profiling},
"dbt": {"requests"},
"druid": sql_common | {"pydruid>=0.6.2"},
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
@ -196,6 +211,7 @@ base_dev_requirements = {
*base_requirements,
*framework_common,
*mypy_stubs,
*data_lake_base,
"black>=21.12b0",
"coverage>=5.1",
"flake8>=3.8.3",

View File

@ -10,7 +10,7 @@ This source is in **Beta** and under active development. Not yet considered read
## Setup
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Because the files are read using PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed.
To install this plugin, run `pip install 'acryl-datahub[data-lake]'`. Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see [compatibility](#compatibility) for more details).
The data lake connector extracts schemas and profiles from a variety of file formats (see below for an exhaustive list).
Individual files are ingested as tables, and profiles are computed similar to the [SQL profiler](./sql_profiles.md).
@ -37,7 +37,7 @@ If you would like to write a more complicated function for resolving file names,
Extracts:
- Row and column counts for each table
- For each column, if applicable:
- For each column, if profiling is enabled:
- null counts and proportions
- distinct counts and proportions
- minimum, maximum, mean, median, standard deviation, some quantile values
@ -47,20 +47,25 @@ This connector supports both local files as well as those stored on AWS S3 (whic
- CSV
- TSV
- Parquet
- JSON
- Parquet
- Apache Avro
Schemas for Parquet and Avro files are extracted as provided.
Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.
:::caution
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
:::
| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
| Capability | Status | Details |
| ----------------- | ------ | ---------------------------------------- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
## Quickstart recipe
@ -99,6 +104,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `aws_config.aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_config.aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_config.aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `max_rows` | | `100` | Maximum number of rows to use when inferring schemas for TSV and CSV files. |
| `schema_patterns.allow` | | `*` | List of regex patterns for tables to ingest. Defaults to all. |
| `schema_patterns.deny` | | | List of regex patterns for tables to not ingest. Defaults to none. |
| `schema_patterns.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching of tables to ingest. |
@ -121,9 +127,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
## Compatibility
Files are read using PySpark and profiles are computed with PyDeequ.
We currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` environment variable to be set for PySpark.
The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).
For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/).

View File

@ -256,9 +256,8 @@ class GlueSource(Source):
# append S3 format if different ones exist
if len(s3_formats[s3_uri]) > 1:
node_urn = make_s3_urn(
s3_uri,
f"{s3_uri}.{node_args.get('format')}",
self.env,
suffix=node_args.get("format"),
)
else:

View File

@ -1,17 +1,35 @@
from typing import Optional
import os
S3_PREFIXES = ["s3://", "s3n://", "s3a://"]
def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:
def is_s3_uri(uri: str) -> bool:
return any(uri.startswith(prefix) for prefix in S3_PREFIXES)
if not s3_uri.startswith("s3://"):
raise ValueError("S3 URIs should begin with 's3://'")
def strip_s3_prefix(s3_uri: str) -> str:
# remove S3 prefix (s3://)
s3_name = s3_uri[5:]
for s3_prefix in S3_PREFIXES:
if s3_uri.startswith(s3_prefix):
plain_base_path = s3_uri[len(s3_prefix) :]
return plain_base_path
raise ValueError(
f"Not an S3 URI. Must start with one of the following prefixes: {str(S3_PREFIXES)}"
)
def make_s3_urn(s3_uri: str, env: str) -> str:
s3_name = strip_s3_prefix(s3_uri)
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
if suffix is not None:
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})"
name, extension = os.path.splitext(s3_name)
if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"

View File

@ -30,16 +30,18 @@ from pyspark.sql.types import (
TimestampType,
)
from pyspark.sql.utils import AnalysisException
from smart_open import open as smart_open
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.aws.s3_util import is_s3_uri, make_s3_urn, strip_s3_prefix
from datahub.ingestion.source.data_lake.config import DataLakeSourceConfig
from datahub.ingestion.source.data_lake.profiling import _SingleTableProfiler
from datahub.ingestion.source.data_lake.report import DataLakeSourceReport
from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
@ -49,7 +51,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
@ -171,6 +172,9 @@ class DataLakeSource(Source):
for config_flag in profiling_flags_to_report
},
)
self.init_spark()
def init_spark(self):
conf = SparkConf()
@ -201,7 +205,6 @@ class DataLakeSource(Source):
# see https://hadoop.apache.org/docs/r3.0.3/hadoop-aws/tools/hadoop-aws/index.html#Changing_Authentication_Providers
if all(x is not None for x in aws_provided_credentials):
conf.set(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
@ -233,7 +236,7 @@ class DataLakeSource(Source):
)
conf.set("spark.jars.excludes", pydeequ.f2j_maven_coord)
conf.set("spark.driver.memory", config.spark_driver_memory)
conf.set("spark.driver.memory", self.source_config.spark_driver_memory)
self.spark = SparkSession.builder.config(conf=conf).getOrCreate()
@ -243,7 +246,7 @@ class DataLakeSource(Source):
return cls(config, ctx)
def read_file(self, file: str) -> Optional[DataFrame]:
def read_file_spark(self, file: str) -> Optional[DataFrame]:
extension = os.path.splitext(file)[1]
@ -294,7 +297,7 @@ class DataLakeSource(Source):
return df.toDF(*(c.replace(".", "_") for c in df.columns))
def get_table_schema(
self, dataframe: DataFrame, file_path: str, table_name: str
self, file_path: str, table_name: str
) -> Iterable[MetadataWorkUnit]:
data_platform_urn = make_data_platform_urn(self.source_config.platform)
@ -305,7 +308,7 @@ class DataLakeSource(Source):
dataset_name = os.path.basename(file_path)
# if no path spec is provided and the file is in S3, then use the S3 path to construct an URN
if self.source_config.platform == "s3" and self.source_config.path_spec is None:
if is_s3_uri(file_path) and self.source_config.path_spec is None:
dataset_urn = make_s3_urn(file_path, self.source_config.env)
dataset_snapshot = DatasetSnapshot(
@ -319,25 +322,53 @@ class DataLakeSource(Source):
)
dataset_snapshot.aspects.append(dataset_properties)
column_fields = []
if file_path.startswith("s3a://"):
if self.source_config.aws_config is None:
raise ValueError("AWS config is required for S3 file sources")
for field in dataframe.schema.fields:
s3_client = self.source_config.aws_config.get_s3_client()
field = SchemaField(
fieldPath=field.name,
type=get_column_type(self.report, dataset_name, field.dataType),
nativeDataType=str(field.dataType),
recursive=False,
file = smart_open(file_path, "rb", transport_params={"client": s3_client})
else:
file = open(file_path, "rb")
fields = []
try:
if file_path.endswith(".parquet"):
fields = parquet.ParquetInferrer().infer_schema(file)
elif file_path.endswith(".csv"):
fields = csv_tsv.CsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif file_path.endswith(".tsv"):
fields = csv_tsv.TsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif file_path.endswith(".json"):
fields = json.JsonInferrer().infer_schema(file)
elif file_path.endswith(".avro"):
fields = avro.AvroInferrer().infer_schema(file)
else:
self.report.report_warning(
file_path, f"file {file_path} has unsupported extension"
)
file.close()
except Exception as e:
self.report.report_warning(
file_path, f"could not infer schema for file {file_path}: {e}"
)
file.close()
column_fields.append(field)
fields = sorted(fields, key=lambda f: f.fieldPath)
schema_metadata = SchemaMetadata(
schemaName=dataset_name,
platform=data_platform_urn,
version=0,
hash="",
fields=column_fields,
fields=fields,
platformSchema=OtherSchemaClass(rawSchema=""),
)
@ -348,10 +379,16 @@ class DataLakeSource(Source):
self.report.report_workunit(wu)
yield wu
def get_table_name(self, relative_path: str) -> str:
def get_table_name(self, relative_path: str, full_path: str) -> str:
if self.source_config.path_spec is None:
return relative_path
name, extension = os.path.splitext(full_path)
if extension != "":
extension = extension[1:] # remove the dot
return f"{name}_{extension}"
return name
def warn():
self.report.report_warning(
@ -382,24 +419,28 @@ class DataLakeSource(Source):
self, full_path: str, relative_path: str
) -> Iterable[MetadataWorkUnit]:
table_name = self.get_table_name(relative_path)
table = self.read_file(full_path)
# if table is not readable, skip
if table is None:
return
table_name = self.get_table_name(relative_path, full_path)
# yield the table schema first
logger.debug(
f"Ingesting {full_path}: making table schemas {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
yield from self.get_table_schema(table, full_path, table_name)
yield from self.get_table_schema(full_path, table_name)
# If profiling is not enabled, skip the rest
if not self.source_config.profiling.enabled:
return
# read in the whole table with Spark for profiling
table = self.read_file_spark(full_path)
# if table is not readable, skip
if table is None:
self.report.report_warning(
table_name, f"unable to read table {table_name} from file {full_path}"
)
return
with PerfTimer() as timer:
# init PySpark analysis object
logger.debug(
@ -460,10 +501,7 @@ class DataLakeSource(Source):
def get_workunits_s3(self) -> Iterable[MetadataWorkUnit]:
for s3_prefix in S3_PREFIXES:
if self.source_config.base_path.startswith(s3_prefix):
plain_base_path = self.source_config.base_path.lstrip(s3_prefix)
break
plain_base_path = strip_s3_prefix(self.source_config.base_path)
# append a trailing slash if it's not there so prefix filtering works
if not plain_base_path.endswith("/"):
@ -531,11 +569,7 @@ class DataLakeSource(Source):
with PerfTimer() as timer:
# check if file is an s3 object
if any(
self.source_config.base_path.startswith(s3_prefix)
for s3_prefix in S3_PREFIXES
):
if is_s3_uri(self.source_config.base_path):
yield from self.get_workunits_s3()
else:

View File

@ -29,6 +29,8 @@ class DataLakeSourceConfig(ConfigModel):
spark_driver_memory: str = "4g"
max_rows: int = 100
@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]

View File

@ -1,13 +1,9 @@
import logging
from collections import Counter
from dataclasses import dataclass, field
from typing import Any
from typing import Counter as CounterType
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView
import bson
import pymongo
from mypy_extensions import TypedDict
from packaging import version
from pydantic import PositiveInt, validator
from pymongo.mongo_client import MongoClient
@ -17,6 +13,10 @@ from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.schema_inference.object import (
SchemaDescription,
construct_schema,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
@ -116,181 +116,6 @@ _field_type_mapping: Dict[Union[Type, str], Type] = {
}
def is_nullable_doc(doc: Dict[str, Any], field_path: Tuple) -> bool:
"""
Check if a nested field is nullable in a document from a collection.
Parameters
----------
doc:
document to check nullability for
field_path:
path to nested field to check, ex. ('first_field', 'nested_child', '2nd_nested_child')
"""
field = field_path[0]
# if field is inside
if field in doc:
value = doc[field]
if value is None:
return True
# if no fields left, must be non-nullable
if len(field_path) == 1:
return False
# otherwise, keep checking the nested fields
remaining_fields = field_path[1:]
# if dictionary, check additional level of nesting
if isinstance(value, dict):
return is_nullable_doc(doc[field], remaining_fields)
# if list, check if any member is missing field
if isinstance(value, list):
# count empty lists of nested objects as nullable
if len(value) == 0:
return True
return any(is_nullable_doc(x, remaining_fields) for x in doc[field])
# any other types to check?
# raise ValueError("Nested type not 'list' or 'dict' encountered")
return True
return True
def is_nullable_collection(
collection: Iterable[Dict[str, Any]], field_path: Tuple
) -> bool:
"""
Check if a nested field is nullable in a collection.
Parameters
----------
collection:
collection to check nullability for
field_path:
path to nested field to check, ex. ('first_field', 'nested_child', '2nd_nested_child')
"""
return any(is_nullable_doc(doc, field_path) for doc in collection)
class BasicSchemaDescription(TypedDict):
types: CounterType[type] # field types and times seen
count: int # times the field was seen
class SchemaDescription(BasicSchemaDescription):
delimited_name: str # collapsed field name
# we use 'mixed' to denote mixed types, so we need a str here
type: Union[type, str] # collapsed type
nullable: bool # if field is ever missing
def construct_schema(
collection: Iterable[Dict[str, Any]], delimiter: str
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Construct (infer) a schema from a collection of documents.
For each field (represented as a tuple to handle nested items), reports the following:
- `types`: Python types of field values
- `count`: Number of times the field was encountered
- `type`: type of the field if `types` is just a single value, otherwise `mixed`
- `nullable`: if field is ever null/missing
- `delimited_name`: name of the field, joined by a given delimiter
Parameters
----------
collection:
collection to construct schema over.
delimiter:
string to concatenate field names by
"""
schema: Dict[Tuple[str, ...], BasicSchemaDescription] = {}
def append_to_schema(doc: Dict[str, Any], parent_prefix: Tuple[str, ...]) -> None:
"""
Recursively update the schema with a document, which may/may not contain nested fields.
Parameters
----------
doc:
document to scan
parent_prefix:
prefix of fields that the document is under, pass an empty tuple when initializing
"""
for key, value in doc.items():
new_parent_prefix = parent_prefix + (key,)
# if nested value, look at the types within
if isinstance(value, dict):
append_to_schema(value, new_parent_prefix)
# if array of values, check what types are within
if isinstance(value, list):
for item in value:
# if dictionary, add it as a nested object
if isinstance(item, dict):
append_to_schema(item, new_parent_prefix)
# don't record None values (counted towards nullable)
if value is not None:
if new_parent_prefix not in schema:
schema[new_parent_prefix] = {
"types": Counter([type(value)]),
"count": 1,
}
else:
# update the type count
schema[new_parent_prefix]["types"].update({type(value): 1})
schema[new_parent_prefix]["count"] += 1
for document in collection:
append_to_schema(document, ())
extended_schema: Dict[Tuple[str, ...], SchemaDescription] = {}
for field_path in schema.keys():
field_types = schema[field_path]["types"]
field_type: Union[str, type] = "mixed"
# if single type detected, mark that as the type to go with
if len(field_types.keys()) == 1:
field_type = next(iter(field_types))
field_extended: SchemaDescription = {
"types": schema[field_path]["types"],
"count": schema[field_path]["count"],
"nullable": is_nullable_collection(collection, field_path),
"delimited_name": delimiter.join(field_path),
"type": field_type,
}
extended_schema[field_path] = field_extended
return extended_schema
def construct_schema_pymongo(
collection: pymongo.collection.Collection,
delimiter: str,

View File

@ -0,0 +1,17 @@
from typing import IO, List
from avro.datafile import DataFileReader
from avro.io import DatumReader
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
class AvroInferrer(SchemaInferenceBase):
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
reader = DataFileReader(file, DatumReader())
fields = schema_util.avro_schema_to_mce_fields(reader.schema)
return fields

View File

@ -0,0 +1,15 @@
from typing import IO, List
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
class SchemaInferenceBase:
"""
Base class for file schema inference.
"""
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
"""
Infer schema from file.
"""
raise NotImplementedError("infer_schema not implemented")

View File

@ -0,0 +1,74 @@
from typing import IO, Dict, List, Type
from tableschema import Table
from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
StringTypeClass,
TimeTypeClass,
UnionTypeClass,
)
# see https://github.com/frictionlessdata/tableschema-py/blob/main/tableschema/schema.py#L545
tableschema_type_map: Dict[str, Type] = {
"duration": TimeTypeClass,
"geojson": RecordTypeClass,
"geopoint": RecordTypeClass,
"object": RecordTypeClass,
"array": ArrayTypeClass,
"datetime": TimeTypeClass,
"time": TimeTypeClass,
"date": DateTypeClass,
"integer": NumberTypeClass,
"number": NumberTypeClass,
"boolean": BooleanTypeClass,
"string": StringTypeClass,
"any": UnionTypeClass,
}
def get_table_schema_fields(table: Table, max_rows: int) -> List[SchemaField]:
table.infer(limit=max_rows)
fields: List[SchemaField] = []
for raw_field in table.schema.fields:
mapped_type: Type = tableschema_type_map.get(raw_field.type, NullTypeClass)
field = SchemaField(
fieldPath=raw_field.name,
type=SchemaFieldDataType(mapped_type()),
nativeDataType=str(raw_field.type),
recursive=False,
)
fields.append(field)
return fields
class CsvInferrer(SchemaInferenceBase):
def __init__(self, max_rows: int):
self.max_rows = max_rows
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
# infer schema of a csv file without reading the whole file
table = Table(file, format="csv")
return get_table_schema_fields(table, max_rows=self.max_rows)
class TsvInferrer(SchemaInferenceBase):
def __init__(self, max_rows: int):
self.max_rows = max_rows
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
# infer schema of a tsv file without reading the whole file
table = Table(file, format="tsv")
return get_table_schema_fields(table, max_rows=self.max_rows)

View File

@ -0,0 +1,59 @@
from typing import IO, Dict, List, Type, Union
import ujson
from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase
from datahub.ingestion.source.schema_inference.object import construct_schema
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
StringTypeClass,
UnionTypeClass,
)
_field_type_mapping: Dict[Union[Type, str], Type] = {
list: ArrayTypeClass,
bool: BooleanTypeClass,
type(None): NullTypeClass,
int: NumberTypeClass,
float: NumberTypeClass,
str: StringTypeClass,
dict: RecordTypeClass,
"mixed": UnionTypeClass,
}
class JsonInferrer(SchemaInferenceBase):
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
datastore = ujson.load(file)
if not isinstance(datastore, list):
datastore = [datastore]
schema = construct_schema(datastore, delimiter=".")
fields: List[SchemaField] = []
for schema_field in sorted(schema.values(), key=lambda x: x["delimited_name"]):
mapped_type = _field_type_mapping.get(schema_field["type"], NullTypeClass)
native_type = schema_field["type"]
if isinstance(native_type, type):
native_type = native_type.__name__
field = SchemaField(
fieldPath=schema_field["delimited_name"],
nativeDataType=native_type,
type=SchemaFieldDataType(type=mapped_type()),
nullable=schema_field["nullable"],
recursive=False,
)
fields.append(field)
return fields

View File

@ -0,0 +1,168 @@
from collections import Counter
from typing import Any
from typing import Counter as CounterType
from typing import Dict, Sequence, Tuple, Union
from mypy_extensions import TypedDict
class BasicSchemaDescription(TypedDict):
types: CounterType[type] # field types and times seen
count: int # times the field was seen
class SchemaDescription(BasicSchemaDescription):
delimited_name: str # collapsed field name
# we use 'mixed' to denote mixed types, so we need a str here
type: Union[type, str] # collapsed type
nullable: bool # if field is ever missing
def is_field_nullable(doc: Dict[str, Any], field_path: Tuple) -> bool:
"""
Check if a nested field is nullable in a document from a collection.
Parameters
----------
doc:
document to check nullability for
field_path:
path to nested field to check, ex. ('first_field', 'nested_child', '2nd_nested_child')
"""
if not field_path:
return True
field = field_path[0]
# if field is inside
if field in doc:
value = doc[field]
if value is None:
return True
# if no fields left, must be non-nullable
if len(field_path) == 1:
return False
# otherwise, keep checking the nested fields
remaining_fields = field_path[1:]
# if dictionary, check additional level of nesting
if isinstance(value, dict):
return is_field_nullable(doc[field], remaining_fields)
# if list, check if any member is missing field
if isinstance(value, list):
# count empty lists of nested objects as nullable
if len(value) == 0:
return True
return any(is_field_nullable(x, remaining_fields) for x in doc[field])
# any other types to check?
# raise ValueError("Nested type not 'list' or 'dict' encountered")
return True
return True
def is_nullable_collection(
collection: Sequence[Dict[str, Any]], field_path: Tuple
) -> bool:
"""
Check if a nested field is nullable in a collection.
Parameters
----------
collection:
collection to check nullability for
field_path:
path to nested field to check, ex. ('first_field', 'nested_child', '2nd_nested_child')
"""
return any(is_field_nullable(doc, field_path) for doc in collection)
def construct_schema(
collection: Sequence[Dict[str, Any]], delimiter: str
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Construct (infer) a schema from a collection of documents.
For each field (represented as a tuple to handle nested items), reports the following:
- `types`: Python types of field values
- `count`: Number of times the field was encountered
- `type`: type of the field if `types` is just a single value, otherwise `mixed`
- `nullable`: if field is ever null/missing
- `delimited_name`: name of the field, joined by a given delimiter
Parameters
----------
collection:
collection to construct schema over.
delimiter:
string to concatenate field names by
"""
schema: Dict[Tuple[str, ...], BasicSchemaDescription] = {}
def append_to_schema(doc: Dict[str, Any], parent_prefix: Tuple[str, ...]) -> None:
"""
Recursively update the schema with a document, which may/may not contain nested fields.
Parameters
----------
doc:
document to scan
parent_prefix:
prefix of fields that the document is under, pass an empty tuple when initializing
"""
for key, value in doc.items():
new_parent_prefix = parent_prefix + (key,)
# if nested value, look at the types within
if isinstance(value, dict):
append_to_schema(value, new_parent_prefix)
# if array of values, check what types are within
if isinstance(value, list):
for item in value:
# if dictionary, add it as a nested object
if isinstance(item, dict):
append_to_schema(item, new_parent_prefix)
# don't record None values (counted towards nullable)
if value is not None:
if new_parent_prefix not in schema:
schema[new_parent_prefix] = {
"types": Counter([type(value)]),
"count": 1,
}
else:
# update the type count
schema[new_parent_prefix]["types"].update({type(value): 1})
schema[new_parent_prefix]["count"] += 1
for document in collection:
append_to_schema(document, ())
extended_schema: Dict[Tuple[str, ...], SchemaDescription] = {}
for field_path in schema.keys():
field_types = schema[field_path]["types"]
field_type: Union[str, type] = "mixed"
# if single type detected, mark that as the type to go with
if len(field_types.keys()) == 1:
field_type = next(iter(field_types))
field_extended: SchemaDescription = {
"types": schema[field_path]["types"],
"count": schema[field_path]["count"],
"nullable": is_nullable_collection(collection, field_path),
"delimited_name": delimiter.join(field_path),
"type": field_type,
}
extended_schema[field_path] = field_extended
return extended_schema

View File

@ -0,0 +1,98 @@
from typing import IO, Any, Callable, Dict, List, Type
import pyarrow
import pyarrow.parquet
from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
StringTypeClass,
TimeTypeClass,
UnionTypeClass,
)
# see https://arrow.apache.org/docs/python/api/datatypes.html#type-checking
pyarrow_type_map: Dict[Callable[[Any], bool], Type] = {
pyarrow.types.is_boolean: BooleanTypeClass,
pyarrow.types.is_integer: NumberTypeClass,
pyarrow.types.is_signed_integer: NumberTypeClass,
pyarrow.types.is_unsigned_integer: NumberTypeClass,
pyarrow.types.is_int8: NumberTypeClass,
pyarrow.types.is_int16: NumberTypeClass,
pyarrow.types.is_int32: NumberTypeClass,
pyarrow.types.is_int64: NumberTypeClass,
pyarrow.types.is_uint8: NumberTypeClass,
pyarrow.types.is_uint16: NumberTypeClass,
pyarrow.types.is_uint32: NumberTypeClass,
pyarrow.types.is_uint64: NumberTypeClass,
pyarrow.types.is_floating: NumberTypeClass,
pyarrow.types.is_float16: NumberTypeClass,
pyarrow.types.is_float32: NumberTypeClass,
pyarrow.types.is_float64: NumberTypeClass,
pyarrow.types.is_decimal: NumberTypeClass,
pyarrow.types.is_list: ArrayTypeClass,
pyarrow.types.is_large_list: ArrayTypeClass,
pyarrow.types.is_struct: RecordTypeClass,
pyarrow.types.is_union: UnionTypeClass,
pyarrow.types.is_nested: RecordTypeClass,
pyarrow.types.is_temporal: TimeTypeClass,
pyarrow.types.is_timestamp: TimeTypeClass,
pyarrow.types.is_date: DateTypeClass,
pyarrow.types.is_date32: DateTypeClass,
pyarrow.types.is_date64: DateTypeClass,
pyarrow.types.is_time: TimeTypeClass,
pyarrow.types.is_time32: TimeTypeClass,
pyarrow.types.is_time64: TimeTypeClass,
pyarrow.types.is_null: NullTypeClass,
pyarrow.types.is_binary: BytesTypeClass,
pyarrow.types.is_unicode: StringTypeClass,
pyarrow.types.is_string: StringTypeClass,
pyarrow.types.is_large_binary: BytesTypeClass,
pyarrow.types.is_large_unicode: StringTypeClass,
pyarrow.types.is_large_string: StringTypeClass,
pyarrow.types.is_fixed_size_binary: BytesTypeClass,
pyarrow.types.is_map: RecordTypeClass,
pyarrow.types.is_dictionary: RecordTypeClass,
}
def map_pyarrow_type(pyarrow_type: Type) -> Type:
for checker, mapped_type in pyarrow_type_map.items():
if checker(pyarrow_type):
return mapped_type
return NullTypeClass
class ParquetInferrer(SchemaInferenceBase):
def infer_schema(self, file: IO[bytes]) -> List[SchemaField]:
# infer schema of a parquet file without reading the whole file
# read the first line of the file
schema = pyarrow.parquet.read_schema(file, memory_map=True)
fields: List[SchemaField] = []
for name, pyarrow_type in zip(schema.names, schema.types):
mapped_type = map_pyarrow_type(pyarrow_type)
field = SchemaField(
fieldPath=name,
type=SchemaFieldDataType(mapped_type()),
nativeDataType=str(pyarrow_type),
recursive=False,
)
fields.append(field)
return fields

View File

@ -0,0 +1,130 @@
import tempfile
from typing import List, Type
import avro.schema
import pandas as pd
import ujson
from avro import schema as avro_schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
NumberTypeClass,
SchemaField,
StringTypeClass,
)
from tests.unit.test_schema_util import assert_field_paths_match
expected_field_paths = [
"boolean_field",
"integer_field",
"string_field",
]
expected_field_paths_avro = [
"[version=2.0].[type=test].[type=boolean].boolean_field",
"[version=2.0].[type=test].[type=int].integer_field",
"[version=2.0].[type=test].[type=string].string_field",
]
expected_field_types = [BooleanTypeClass, NumberTypeClass, StringTypeClass]
test_table = pd.DataFrame(
{
"boolean_field": [True, False, True],
"integer_field": [1, 2, 3],
"string_field": ["a", "b", "c"],
}
)
def assert_field_types_match(
fields: List[SchemaField], expected_field_types: List[Type]
) -> None:
assert len(fields) == len(expected_field_types)
for field, expected_type in zip(fields, expected_field_types):
assert isinstance(field.type.type, expected_type)
def test_infer_schema_csv():
with tempfile.TemporaryFile(mode="w+b") as file:
file.write(bytes(test_table.to_csv(index=False, header=True), encoding="utf-8"))
file.seek(0)
fields = csv_tsv.CsvInferrer(max_rows=100).infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
def test_infer_schema_tsv():
with tempfile.TemporaryFile(mode="w+b") as file:
file.write(
bytes(
test_table.to_csv(index=False, header=True, sep="\t"), encoding="utf-8"
)
)
file.seek(0)
fields = csv_tsv.TsvInferrer(max_rows=100).infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
def test_infer_schema_json():
with tempfile.TemporaryFile(mode="w+b") as file:
file.write(bytes(test_table.to_json(orient="records"), encoding="utf-8"))
file.seek(0)
fields = json.JsonInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
def test_infer_schema_parquet():
with tempfile.TemporaryFile(mode="w+b") as file:
test_table.to_parquet(file)
file.seek(0)
fields = parquet.ParquetInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths)
assert_field_types_match(fields, expected_field_types)
def test_infer_schema_avro():
with tempfile.TemporaryFile(mode="w+b") as file:
schema = avro_schema.parse(
ujson.dumps(
{
"type": "record",
"name": "test",
"fields": [
{"name": "boolean_field", "type": "boolean"},
{"name": "integer_field", "type": "int"},
{"name": "string_field", "type": "string"},
],
}
)
)
writer = DataFileWriter(file, DatumWriter(), schema)
records = test_table.to_dict(orient="records")
for record in records:
writer.append(record)
writer.sync()
file.seek(0)
fields = avro.AvroInferrer().infer_schema(file)
fields.sort(key=lambda x: x.fieldPath)
assert_field_paths_match(fields, expected_field_paths_avro)
assert_field_types_match(fields, expected_field_types)

View File

@ -321,7 +321,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-input-bucket/file.txt,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-input-bucket/file_txt,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -346,7 +346,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-output-bucket/file.txt,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-output-bucket/file_txt,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -370,7 +370,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/input-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/input-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -396,7 +396,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/output-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/output-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -422,7 +422,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/model-artifact.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/model-artifact.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -446,7 +446,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/output-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/output-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -470,7 +470,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/data-source.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/data-source.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -494,7 +494,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/category-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/category-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -518,7 +518,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-dataset.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-dataset.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -542,7 +542,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -566,7 +566,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,processing-job/input-data.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,processing-job/input-data.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -595,7 +595,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/input-dataset.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/input-dataset.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -623,7 +623,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/output-data.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/output-data.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -647,7 +647,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/checkpoint-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/checkpoint-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -671,7 +671,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-hook-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-hook-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -695,7 +695,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/tensorboard-output-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/tensorboard-output-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -719,7 +719,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -743,7 +743,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-rule-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-rule-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -767,7 +767,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-rule-config.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-rule-config.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -791,7 +791,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/input-data-source.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/input-data-source.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -818,7 +818,7 @@
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/output.tar.gz,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/output.tar_gz,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
@ -911,10 +911,10 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-input-bucket/file.txt,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-input-bucket/file_txt,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-output-bucket/file.txt,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,auto-ml-job-output-bucket/file_txt,PROD)"
],
"inputDatajobs": []
}
@ -993,10 +993,10 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/input-config.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/input-config.tar_gz,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/output-config.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,compilation-job-bucket/output-config.tar_gz,PROD)"
],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,compilation:a-compilation-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:compilation-job/a-compilation-job)"
@ -1077,8 +1077,8 @@
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/model-artifact.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/output-config.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/model-artifact.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,edge-packaging-bucket/output-config.tar_gz,PROD)"
],
"inputDatajobs": []
}
@ -1236,12 +1236,12 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/category-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/data-source.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/category-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/data-source.tar_gz,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-dataset.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,labeling-job/output-dataset.tar_gz,PROD)"
],
"inputDatajobs": []
}
@ -1325,7 +1325,7 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,processing-job/input-data.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,processing-job/input-data.tar_gz,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [
@ -1432,16 +1432,16 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/input-dataset.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/input-dataset.tar_gz,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/checkpoint-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-hook-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-rule-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/output-data.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-rule-config.tar.gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/tensorboard-output-config.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/checkpoint-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-hook-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/debug-rule-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/output-data.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/profiler-rule-config.tar_gz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:s3,training-job/tensorboard-output-config.tar_gz,PROD)"
],
"inputDatajobs": []
}
@ -1524,10 +1524,10 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/input-data-source.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/input-data-source.tar_gz,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/output.tar.gz,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:s3,transform-job/output.tar_gz,PROD)"
],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,auto_ml:an-auto-ml-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:auto-ml-job/an-auto-ml-job)",
@ -1812,4 +1812,4 @@
"proposedDelta": null,
"systemMetadata": null
}
]
]

View File

@ -18,6 +18,7 @@ python =
# see more here -> https://github.com/tox-dev/tox/issues/1105#issuecomment-448596282
[testenv]
passenv = SPARK_VERSION
deps =
.[dev]
commands =