unstructured/test_unstructured_ingest/python/test-ingest-delta-table-output.py
Roman Isecke ba4477ac20
feat: support table conversion for tabular destination connectors (#1917)
### Description
* A full schema was introduced to map the type of all output content
from the json partition output and mapped to a flattened table structure
to leverage table-based destination connectors. The delta table
destination connector was updated at the moment to take advantage of
this.
* Existing method to convert to a dataframe was updated because it had a
bug in it. Object content in the metadata would have the key name
changed when flattened but then this would be omitted since it didn't
exist in the `_get_metadata_table_fieldnames` response.
* Unit test was added to make sure we handle all values possible in an
Element when converting to a table
* Delta table ingest test was split into a source and destination test
(looking ahead to split these up in CI)

---------

Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com>
Co-authored-by: rbiseck3 <rbiseck3@users.noreply.github.com>
2023-11-03 16:47:21 +00:00

38 lines
1.2 KiB
Python
Executable File

import click
from deltalake import DeltaTable
@click.command()
@click.option("--table-uri", type=str)
def run_check(table_uri):
print(f"Checking contents of table at {table_uri}")
delta_table = DeltaTable(
table_uri=table_uri,
)
df = delta_table.to_pandas()
expected_rows = 5
expected_columns = 18
print(f"Number of rows in table vs expected: {len(df)}/{expected_rows}")
print(f"Number of columns in table vs expected: {len(df.columns)}/{expected_columns}")
number_of_rows = len(df)
assert number_of_rows == 5, (
f"number of rows in generated table ({number_of_rows}) "
f"doesn't match expected value: {expected_rows}"
)
"""
The number of columns is associated with the flattened JSON structure of the partition output.
If this changes, it's most likely due to the metadata changing in the output.
"""
number_of_columns = len(df.columns)
assert number_of_columns == 18, (
f"number of columns in generated table ({number_of_columns}) doesn't "
f"match expected value: {expected_columns}"
)
print("table check complete")
if __name__ == "__main__":
run_check()