10 KiB
Path Specs
Path Specs (path_specs
) is a list of Path Spec (path_spec
) objects where each individual path_spec
represents one or more datasets. Include path (path_spec.include
) represents formatted path to the dataset. This path must end with *.*
or *.[ext]
to represent leaf level. If *.[ext]
is provided then files with only specified extension type will be scanned. ".[ext]
" can be any of supported file types. Refer example 1 below for more details.
All folder levels need to be specified in include path. You can use /*/
to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use {table}
placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder {partition_key[i]}
to represent name of i
th partition and {partition_value[i]}
to represent value of i
th partition. During ingestion, i
will be used to match partition_key to partition. Refer example 2 and 3 below for more details.
Exclude paths (path_spec.exclude
) can be used to ignore paths that are not relevant to current path_spec
. This path cannot have named variables ( {}
). Exclude path can have **
to represent multiple folder levels. Refer example 4 below for more details.
Refer example 5 if your bucket has more complex dataset representation.
Additional points to note
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.
Partitioned Dataset support
If your dataset is partitioned by the partition_key
=partition_value
format, then the partition values are auto-detected.
Otherwise, you can specify partitions in the following way in the path_spec:
- Specify partition_key and partition_value in the path like =>
{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/{partition_key[2]}={partition_value[2]}
- Partition key can be specify using named variables in the path_spec like =>
year={year}/month={month}/day={day}
3 if the path is in the form of /value1/value2/value3 the source infer partition value from the path and assign partition_0, partition_1, partition_2 etc
Dataset creation time is determined by the creation time of earliest created file in the lowest partition while last updated time is determined by the last updated time of the latest updated file in the highest partition.
How the source determines the highest/lowest partition it is based on the traversal method set in the path_spec.
- If the traversal method is set to
MAX
then the source will try to find the latest partition by ordering the partitions each level and find the latest partiton. This traversal method won't look for earilest partition/creation time but this is the fastest. - If the traversal method is set to
MIN_MAX
then the source will try to find the latest and earliest partition by ordering the partitions each level and find the latest/earliest partiton. This traversal sort folders purely by name therefor it is fast but it doesn't guarantee the latest partition will have the latest created file. - If the traversal method is set to
ALL
then the source will try to find the latest and earliest partition by listing all the files in all the partitions and find the creation/last modification time based on the file creations. This is the slowest but for non time partitioned datasets this is the only way to find the latest/earliest partition.
Path Specs - Examples
Example 1 - Individual file as Dataset
Bucket structure:
test-bucket
├── employees.csv
├── departments.json
└── food_items.csv
Path specs config to ingest employees.csv
and food_items.csv
as datasets:
path_specs:
- include: s3://test-bucket/*.csv
This will automatically ignore departments.json
file. To include it, use *.*
instead of *.csv
.
Example 2 - Folder of files as Dataset (without Partitions)
Bucket structure:
test-bucket
└── offers
├── 1.avro
└── 2.avro
Path specs config to ingest folder offers
as dataset:
path_specs:
- include: s3://test-bucket/{table}/*.avro
{table}
represents folder for which dataset will be created.
Example 3 - Folder of files as Dataset (with Partitions)
Bucket structure:
test-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
Path specs config to ingest folders orders
and returns
as datasets:
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
or with partition auto-detection:
path_specs:
- include: s3://test-bucket/{table}/
One can also use include: s3://test-bucket/{table}/*/*/*.parquet
here however above format is preferred as it allows declaring partitions explicitly.
Example 4 - Folder of files as Dataset (with Partitions), and Exclude Filter
Bucket structure:
test-bucket
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
Path specs config to ingest folder orders
as dataset but not folder tmp_orders
:
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
exclude:
- **/tmp_orders/**
or with partition auto-detection:
path_specs:
- include: s3://test-bucket/{table}/
Example 5 - Advanced - Either Individual file OR Folder of files as Dataset
Bucket structure:
test-bucket
├── customers
│ ├── part1.json
│ ├── part2.json
│ ├── part3.json
│ └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
└── year=2022
└── month=2
├── 1.parquet
├── 2.parquet
└── 3.parquet
Path specs config:
path_specs:
- include: s3://test-bucket/*.csv
exclude:
- **/tmp_10101000.csv
- include: s3://test-bucket/{table}/*.json
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
Above config has 3 path_specs and will ingest following datasets
employees.csv
- Single File as Datasetfood_items.csv
- Single File as Datasetcustomers
- Folder as Datasetorders
- Folder as Dataset and will ignore filetmp_10101000.csv
Valid path_specs.include
s3://my-bucket/foo/tests/bar.avro # single file table
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/ #table with partition autodetection. Partition only can be detected if it is in the format of key=value
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
s3://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
s3://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in bucket
s3://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in bucket
Valid path_specs.exclude
- **/tests/**
- s3://my-bucket/hr/**
- */tests/.csv
- s3://my-bucket/foo/*/my_table/**
If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.
:::caution
Specify as long fixed prefix ( with out /*/ ) as possible in path_specs.include
. This will reduce the scanning time and cost, specifically on AWS S3
:::
:::caution
Running profiling against many tables or over many rows can run up significant costs. While we've done our best to limit the expensiveness of the queries the profiler runs, you should be prudent about the set of tables profiling is enabled on or the frequency of the profiling runs.
:::
:::caution
If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.
:::
Compatibility
Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the SPARK_HOME
and SPARK_VERSION
environment variables to be set. The Spark+Hadoop binary can be downloaded here.
For an example guide on setting up PyDeequ on AWS, see this guide.
:::caution
From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [72c62b6596/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala (L158)
]
Avro files that contain such columns won't be profiled.
:::