Create documentation resources for Data Quality as Code (closes #23800) (#24169)

* Brief documentation of installation requirements

* Minor fix to run tests only defined in OpenMetadata

* Add full example to Data Quality as Code

* Install `griffe2md` and fix docstrings

* Remove local openmetadata reference

* Fix writing, grammar and typos

* Fix test

* Fix formatting
This commit is contained in:
Eugenio 2025-11-11 11:25:42 +01:00 committed by GitHub
parent 56656d68cd
commit ef8b19142f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2168 additions and 45 deletions

View File

@ -0,0 +1,3 @@
openmetadata-compose.yml
.ipynb_checkpoints

View File

@ -0,0 +1,128 @@
# Data Quality as Code examples
Hi! Here you will find a couple of Jupyter notebook examples describing how to use OpenMetadata's Python SDK to run the data quality tests you already know directly from your ETLs.
We're using Docker to run OpenMetadata's stack locally. The jupyter notebooks in `notebooks` will be injected in the jupyter server so you can run the examples in that self-contained environment.
We're going to use the [NYC Yellow Taxi Ride Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) dataset for these examples. Using docker we will work on a Postgres database where we will load the initial data and run our ETL against it.
We will be working with these two files:
- Yellow Taxi Ride for September 2025 ([link to parquet file](https://python-sdk-resources.s3.eu-west-3.amazonaws.com/data-quality/yellow_tripdata_2025-09.parquet))
- Taxi Zones Lookup ([link to csv file](https://python-sdk-resources.s3.eu-west-3.amazonaws.com/data-quality/taxi_zone_lookup.csv))
## Table of Contents
1. [Setup](#setup)
2. [Running Data Quality tests for tables in OpenMetadata](#running-data-quality-tests-for-tables-in-openmetadata)
3. [Running Data Quality tests for pandas DataFrames](#running-data-quality-tests-for-pandas-dataframes)
## Setup
The initial setup will require that you run `./start [-v <version>]`. It will fetch OpenMetadata's docker compose for the `<version>` release and will boot the full stack plus instances of `jupyter` and `postgres` for this tutorial.
Once the whole system is running, you can start following these instructions:
1. Go to your [OpenMetadata](http://localhost:8585/) instance and login
- Email: admin@open-metadata.org
- Password: admin
2. Create a database service to our Postgres Database
1. Navigate to [Databases](http://localhost:8585/settings/services/databases) (Settings -> Services -> Databases)
![Services settings page](public/setup/services-settings-page.png)
2. Add a New Service
3. Select Postgres
4. Give it a name. We're using `Tutorial Postgres` for it.
5. Fill up the form with the following data:
- Username: tutorial_user
- Password: password
- Host And Port: dwh:5432
- Database: raw
![New Postgres Service](public/setup/new-database-page.png)
6. Configure the metadata ingestion to target only the metadata relevant to the tutorial
![Metadata Ingestion](public/setup/metadata-config.png)
7. Verify the ingestion is done and our `taxi_yellow` table appears in our `raw` database.
![Taxi Yellow table](public/setup/taxi-yellow-table.png)
Now go back to the console where you ran `./start` and check the logs. You should find a line saying "You can access `jupyter` instance at http://localhost:8888/?token=9e3bba6aba264fa8d4d476730f5fa1c03292598499d72513". Follow the link, and you'll be ready to move on to the next steps.
## Running Data Quality tests for tables in OpenMetadata
In this example, we're going to run data quality tests against our `taxi_yellow` table from an ETL that simply takes the data from a parquet file in an S3 bucket and loads it into our `raw` database.
For this we will be working on the `notebooks/test_workflows.ipynb` notebook, which will be using the `metadata.sdk.data_quality` to showcase how we leverage OpenMetadata and the Python SDK to trigger test case workflows directly from the ETL.
The ultimate goal is to make every stakeholder an owner of the data quality. So while engineers just need to make sure their ETLs work, data stewards can update their Data Quality tests on the fly and have the ETL pick them up in the next run.
But first, let's make sure we have some tests in place. Follow these steps:
1. Go to the table's `Data Observability` tab and move to [`Data Quality`](http://localhost:8585/table/Tutorial%20Postgres.raw.public.taxi_yellow/profiler/data-quality)
![Table's Data Quality tab](public/test_workflow/table-data-quality.png)
2. Click on `Add` and choose the `Test Case` option
3. Set up the test. The data we will be pushing to the table in each ETL run should have 10,000 rows. Let's set that up.
1. Select `Table Row Count To Equal` as Test Type
2. Set `Count` to 10000
3. Name the test `taxi_yellow_table_row_count_is_10000`
4. If prompted, scroll down to the pipeline section and click on the `On Demand` button
5. Create it
4. Let's add a couple more tests. Our ETL will keep taxi rides that start and end in a Yellow Zone location, so let's add tests that verify that `PUZone` and `DOZone` equal `Yellow Zone` and nothing more:
1. Indicate the test should be performed at `Column Level`
2. Select the column `PUZone`
3. Select the test type `Column Values To Match Regex Pattern`
4. Add the following RegEx Pattern: `^Yellow Zone$`.
5. Create it and repeat for column `DOZone`
Check it out in [Jupyter](http://localhost:8888/lab/tree/notebooks/test_workflow.ipynb)
> **NOTE**: the link above takes you to Jupyter running on localhost. It requires that you follow the [previous steps](#setup) and access Jupyter with the login token beforehand
By the end of the notebook, you should have the following results:
### Results
#### OpenMetadata page of the `taxi_yellow` data quality
![Taxi Yellow Data Quality pt1](public/test_workflow/taxi-yellow-data-quality.png)
#### Jupyter notebook with success reports for each test
![Jupyter notebook run pt1](public/test_workflow/jupyter-notebook-report.png)
## Running Data Quality tests for pandas DataFrames
In this example we're going to use `pandas` to transform the data from the `taxi_yellow` table we prepared in the previous step.
For this we will be working on the `notebooks/test_dataframe.ipynb` notebook, which will be using the `metadata.sdk.data_quality.dataframes` package to showcase how we leverage OpenMetadata and the Python SDK to run validations right between transforming and loading data in our ETLs.
But first, let's make sure we have some tests in place. Follow these steps:
1. Go to the table's `Data Observability` tab and move to [`Data Quality`](http://localhost:8585/table/Tutorial%20Postgres.stg.public.dw_taxi_rides/profiler/data-quality)
2. Click on `Add` and choose the `Test Case` option
3. Set up the first test. Let's make sure all rows have a `total_amount` greater than 0.
1. Indicate the test should be performed at `Column Level`
2. Select `Column Values To Be Between` as Test Type
3. Let's set a sensible minimum count of 1 and leave maximum blank
4. Name the test `amount_is_greater_than_0`
5. If prompted, scroll down to the pipeline section and click on the `On Demand` button
6. Create it
4. Let's add a test that ensures we're removing outliers in duration:
1. Indicate the test should be performed at `Column Level`
2. Select the column `trip_duration_min`
3. Select the test type `Column Values To Be Between`
4. Set 1 and 180 as minimum and maximum values.
5. Name the test `trip_duration_to_be_between_1_and_180_minutes`
5. Now let's do the same for outliers in distance:
1. Indicate the test should be performed at `Column Level`
2. Select the column `trip_distance`
3. Select the test type `Column Values To Be Between`
4. Leave `Min` unfilled and set 100 as `Max`.
5. Name the test `trip_distance_to_be_at_most_100`
Check it out in [Jupyter](http://localhost:8888/lab/tree/notebooks/test_dataframe.ipynb)
By the end of the notebook, you should have the following results:
### Results
#### OpenMetadata page of the `dw_taxi_trips` data quality
![Taxi Yellow Data Quality pt2](public/test_dataframe/taxi-yellow-data-quality.png)
#### Jupyter notebook with success reports for each test
![Jupyter notebook run pt2](public/test_dataframe/jupyter-notebook-report.png)

View File

@ -0,0 +1,31 @@
# Depends on OpenMetadata's compose file
# Should be run like `docker compose -f openmetadata-compose.yml -f docker-compose.yml up`
services:
jupyter:
image: quay.io/jupyter/minimal-notebook:python-3.11.10
container_name: jupyter-notebook
volumes:
- "./README.md:/home/jovyan/README.md"
- "./notebooks:/home/jovyan/notebooks"
- "../../../ingestion:/opt/openmetadata/ingestion"
networks:
- app_net
ports:
- "8888:8888"
dwh:
image: postgres:15-alpine
container_name: dwh-postgres
volumes:
- dwh_data:/var/lib/postgresql/data
- "./pg_scripts:/docker-entrypoint-initdb.d"
ports:
- "5432:5432"
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
networks:
- app_net
volumes:
dwh_data: {}

View File

@ -0,0 +1,572 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "14093fef-c7ab-4566-88e2-318553ba3aef",
"metadata": {},
"source": "# Running Data Quality tests for DataFrame\n\nIn the following Notebook we will work with the `Tutorial Postgres.raw.public.taxi_yellow` table from the previous tutorial - [test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb) - and perform some transformations on it before loading it into our staging database.\n\n## Purpose\nWe want to showcase how we can hook OpenMetadata's data quality mechanisms directly in your ETLs before your data reaches its destination. For that, we're building an ETL that transforms the data we previously built and loads it in a table for which we have set up data quality tests in the [given instructions](/lab/tree/README.md).\n\n## Description of the ETL\nFor context, please refer to [the test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb)\n\nIn this case we will run some simple data cleaning and transformations on the dataframe. Then, we will use the `DataFrameValidator` interface to load the chunks of validated data into the destination and then finally report those results to OpenMetadata.\n\nWe will use the [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) library to run the Data Quality tests we have defined in [OpenMetadata](http://localhost:8585/table/Tutorial%20Postgres.raw.public.taxi_yellow/profiler/data-quality).\n\n## Dependencies\nFor our ETL we will be using SQLAlchemy to load the table, Pandas DataFrames to perform transformations, [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) to run data quality tests and the OpenMetadata [Postgres Connector](https://docs.open-metadata.org/latest/connectors/database/postgres).\n\nWe can install all these dependencies specifying the right extras. A full list can be found in the project's [`setup.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/setup.py), check it out if your installation differs from the example below.\n\n## Requirements\nIf you haven't, please follow the [setup](/lab/tree/README.md#setup) steps in the README\n\nFor this example you will need:\n\n- To have run the [`test_workflow`](/lab/tree/notebooks/test_workflow.ipynb) notebook\n- An OpenMetadata instance running (achieved by following the setup instructions above)\n- A bot JWT token. You can do so by using [Ingestion Bot's](http://localhost:8585/bots/ingestion-bot) token from your OpenMetadata instance\n- [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) version 1.11.0.0 or above (installed in this Notebook)"
},
{
"cell_type": "code",
"execution_count": 1,
"id": "initial_id",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Obtaining file:///opt/openmetadata/ingestion\n",
" Installing build dependencies ... \u001b[?25ldone\n",
"\u001b[?25h Checking if build backend supports build_editable ... \u001b[?25ldone\n",
"\u001b[?25h Getting requirements to build editable ... \u001b[?25ldone\n",
"\u001b[?25h Preparing editable metadata (pyproject.toml) ... \u001b[?25ldone\n",
"\u001b[?25hRequirement already satisfied: Jinja2>=2.11.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (3.1.6)\n",
"Requirement already satisfied: httpx~=0.28.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.28.1)\n",
"Requirement already satisfied: memory-profiler in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.61.0)\n",
"Requirement already satisfied: packaging in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (24.1)\n",
"Requirement already satisfied: PyYAML~=6.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (6.0.2)\n",
"Requirement already satisfied: shapely in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.1.2)\n",
"Requirement already satisfied: importlib-metadata>=4.13.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (8.5.0)\n",
"Requirement already satisfied: pymysql~=1.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.1.2)\n",
"Requirement already satisfied: cached-property==1.5.2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.5.2)\n",
"Requirement already satisfied: python-dotenv>=0.19.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.2.1)\n",
"Requirement already satisfied: jsonpatch<2.0,>=1.24 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.33)\n",
"Requirement already satisfied: mypy-extensions>=0.4.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.1.0)\n",
"Requirement already satisfied: kubernetes>=21.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (34.1.0)\n",
"Requirement already satisfied: jaraco.functools<4.2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.1.0)\n",
"Requirement already satisfied: chardet==4.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.0.0)\n",
"Requirement already satisfied: collate-sqllineage~=1.6.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.6.22)\n",
"Requirement already satisfied: collate-data-diff>=0.11.6 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.11.7)\n",
"Requirement already satisfied: google-cloud-secret-manager==2.24.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.24.0)\n",
"Requirement already satisfied: google-crc32c in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.7.1)\n",
"Requirement already satisfied: requests>=2.23 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.32.3)\n",
"Requirement already satisfied: tabulate==0.9.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
"Requirement already satisfied: antlr4-python3-runtime==4.9.2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.9.2)\n",
"Requirement already satisfied: python-dateutil>=2.8.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.0)\n",
"Requirement already satisfied: pydantic-settings>=2.7.0,~=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.11.0)\n",
"Requirement already satisfied: sqlalchemy<2,>=1.4.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.4.54)\n",
"Requirement already satisfied: azure-identity~=1.12 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.25.1)\n",
"Requirement already satisfied: pydantic<2.12,>=2.7.0,~=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.2)\n",
"Requirement already satisfied: azure-keyvault-secrets in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.10.0)\n",
"Requirement already satisfied: boto3<2.0,>=1.20 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.40.68)\n",
"Requirement already satisfied: cryptography>=42.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (46.0.0)\n",
"Requirement already satisfied: email-validator>=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.3.0)\n",
"Requirement already satisfied: requests-aws4auth~=1.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
"Requirement already satisfied: typing-inspect in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
"Requirement already satisfied: snowflake-connector-python<4.0.0,>=3.13.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (3.18.0)\n",
"Requirement already satisfied: setuptools~=70.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (70.3.0)\n",
"Requirement already satisfied: mysql-connector-python>=9.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (9.5.0)\n",
"Requirement already satisfied: google-api-core!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (2.28.1)\n",
"Requirement already satisfied: google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (2.43.0)\n",
"Requirement already satisfied: proto-plus<2.0.0,>=1.22.3 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.26.1)\n",
"Requirement already satisfied: protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<7.0.0,>=3.20.2 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (6.33.0)\n",
"Requirement already satisfied: grpc-google-iam-v1<1.0.0,>=0.14.0 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.14.3)\n",
"Requirement already satisfied: numpy<2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.26.4)\n",
"Requirement already satisfied: pandas~=2.0.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.0.3)\n",
"Requirement already satisfied: psycopg2-binary in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.11)\n",
"Requirement already satisfied: GeoAlchemy2~=0.12 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.18.0)\n",
"Requirement already satisfied: azure-core>=1.31.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.36.0)\n",
"Requirement already satisfied: msal>=1.30.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.34.0)\n",
"Requirement already satisfied: msal-extensions>=1.2.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
"Requirement already satisfied: typing-extensions>=4.0.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (4.12.2)\n",
"Requirement already satisfied: botocore<1.41.0,>=1.40.68 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (1.40.68)\n",
"Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (1.0.1)\n",
"Requirement already satisfied: s3transfer<0.15.0,>=0.14.0 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (0.14.0)\n",
"Requirement already satisfied: attrs>=23.1.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (24.2.0)\n",
"Requirement already satisfied: click>=8.1 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.3.0)\n",
"Requirement already satisfied: dbt-core<2.0.0,>=1.0.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.10.13)\n",
"Requirement already satisfied: dsnparse<0.2.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.1.15)\n",
"Requirement already satisfied: keyring in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (25.6.0)\n",
"Requirement already satisfied: mashumaro<3.11.0,>=2.9 in /opt/conda/lib/python3.11/site-packages (from mashumaro[msgpack]<3.11.0,>=2.9->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.10)\n",
"Requirement already satisfied: rich in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (14.2.0)\n",
"Requirement already satisfied: toml>=0.10.2 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.10.2)\n",
"Requirement already satisfied: urllib3<2 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.26.20)\n",
"Requirement already satisfied: sqlparse<0.6,>=0.5 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.5.3)\n",
"Requirement already satisfied: networkx>=2.4 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.5)\n",
"Requirement already satisfied: collate-sqlfluff~=3.3.0 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.3.6)\n",
"Requirement already satisfied: cffi>=1.14 in /opt/conda/lib/python3.11/site-packages (from cryptography>=42.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.17.1)\n",
"Requirement already satisfied: dnspython>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from email-validator>=2.0->openmetadata-ingestion==1.10.0.0.dev0) (2.8.0)\n",
"Requirement already satisfied: idna>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from email-validator>=2.0->openmetadata-ingestion==1.10.0.0.dev0) (3.10)\n",
"Requirement already satisfied: anyio in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (4.6.2.post1)\n",
"Requirement already satisfied: certifi in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (2024.8.30)\n",
"Requirement already satisfied: httpcore==1.* in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (1.0.6)\n",
"Requirement already satisfied: h11<0.15,>=0.13 in /opt/conda/lib/python3.11/site-packages (from httpcore==1.*->httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (0.14.0)\n",
"Requirement already satisfied: zipp>=3.20 in /opt/conda/lib/python3.11/site-packages (from importlib-metadata>=4.13.0->openmetadata-ingestion==1.10.0.0.dev0) (3.20.2)\n",
"Requirement already satisfied: more-itertools in /opt/conda/lib/python3.11/site-packages (from jaraco.functools<4.2.0->openmetadata-ingestion==1.10.0.0.dev0) (10.8.0)\n",
"Requirement already satisfied: MarkupSafe>=2.0 in /opt/conda/lib/python3.11/site-packages (from Jinja2>=2.11.3->openmetadata-ingestion==1.10.0.0.dev0) (3.0.2)\n",
"Requirement already satisfied: jsonpointer>=1.9 in /opt/conda/lib/python3.11/site-packages (from jsonpatch<2.0,>=1.24->openmetadata-ingestion==1.10.0.0.dev0) (3.0.0)\n",
"Requirement already satisfied: six>=1.9.0 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.16.0)\n",
"Requirement already satisfied: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.8.0)\n",
"Requirement already satisfied: requests-oauthlib in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (2.0.0)\n",
"Requirement already satisfied: durationpy>=0.7 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (0.10)\n",
"Requirement already satisfied: pytz>=2020.1 in /opt/conda/lib/python3.11/site-packages (from pandas~=2.0.3->openmetadata-ingestion==1.10.0.0.dev0) (2024.2)\n",
"Requirement already satisfied: tzdata>=2022.1 in /opt/conda/lib/python3.11/site-packages (from pandas~=2.0.3->openmetadata-ingestion==1.10.0.0.dev0) (2025.2)\n",
"Requirement already satisfied: annotated-types>=0.6.0 in /opt/conda/lib/python3.11/site-packages (from pydantic<2.12,>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (0.7.0)\n",
"Requirement already satisfied: pydantic-core==2.23.4 in /opt/conda/lib/python3.11/site-packages (from pydantic<2.12,>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (2.23.4)\n",
"Requirement already satisfied: typing-inspection>=0.4.0 in /opt/conda/lib/python3.11/site-packages (from pydantic-settings>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.2)\n",
"Requirement already satisfied: charset-normalizer<4,>=2 in /opt/conda/lib/python3.11/site-packages (from requests>=2.23->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
"Requirement already satisfied: asn1crypto<2.0.0,>0.24.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (1.5.1)\n",
"Requirement already satisfied: pyOpenSSL<26.0.0,>=22.0.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (25.3.0)\n",
"Requirement already satisfied: pyjwt<3.0.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (2.9.0)\n",
"Requirement already satisfied: filelock<4,>=3.5 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (3.20.0)\n",
"Requirement already satisfied: sortedcontainers>=2.4.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (2.4.0)\n",
"Requirement already satisfied: platformdirs<5.0.0,>=2.6.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (4.3.6)\n",
"Requirement already satisfied: tomlkit in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (0.13.3)\n",
"Requirement already satisfied: greenlet!=0.4.17 in /opt/conda/lib/python3.11/site-packages (from sqlalchemy<2,>=1.4.0->openmetadata-ingestion==1.10.0.0.dev0) (3.1.1)\n",
"Requirement already satisfied: isodate>=0.6.1 in /opt/conda/lib/python3.11/site-packages (from azure-keyvault-secrets->openmetadata-ingestion==1.10.0.0.dev0) (0.6.1)\n",
"Requirement already satisfied: psutil in /opt/conda/lib/python3.11/site-packages (from memory-profiler->openmetadata-ingestion==1.10.0.0.dev0) (6.0.0)\n",
"Requirement already satisfied: pycparser in /opt/conda/lib/python3.11/site-packages (from cffi>=1.14->cryptography>=42.0.0->openmetadata-ingestion==1.10.0.0.dev0) (2.22)\n",
"Requirement already satisfied: colorama>=0.3 in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.6)\n",
"Requirement already satisfied: diff-cover>=2.5.0 in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (9.7.1)\n",
"Requirement already satisfied: pathspec in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.12.1)\n",
"Requirement already satisfied: pytest in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (8.4.2)\n",
"Requirement already satisfied: regex in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (2025.11.3)\n",
"Requirement already satisfied: tblib in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.2.1)\n",
"Requirement already satisfied: tqdm in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (4.66.5)\n",
"Requirement already satisfied: agate<1.10,>=1.7.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.9.1)\n",
"Requirement already satisfied: jsonschema<5.0,>=4.19.1 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (4.23.0)\n",
"Requirement already satisfied: snowplow-tracker<2.0,>=1.0.2 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.0)\n",
"Requirement already satisfied: dbt-extractor<=0.6,>=0.5.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.6.0)\n",
"Requirement already satisfied: dbt-semantic-interfaces<0.10,>=0.9.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
"Requirement already satisfied: dbt-common<2.0,>=1.27.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.36.0)\n",
"Requirement already satisfied: dbt-adapters<2.0,>=1.15.5 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.18.0)\n",
"Requirement already satisfied: dbt-protos<2.0,>=1.0.346 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.0.382)\n",
"Requirement already satisfied: daff>=1.3.46 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.4.2)\n",
"Requirement already satisfied: googleapis-common-protos<2.0.0,>=1.56.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.72.0)\n",
"Requirement already satisfied: grpcio<2.0.0,>=1.33.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.76.0)\n",
"Requirement already satisfied: grpcio-status<2.0.0,>=1.33.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.76.0)\n",
"Requirement already satisfied: cachetools<7.0,>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (6.2.1)\n",
"Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.2)\n",
"Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (4.9.1)\n",
"Requirement already satisfied: msgpack>=0.5.6 in /opt/conda/lib/python3.11/site-packages (from mashumaro[msgpack]<3.11.0,>=2.9->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.2)\n",
"Requirement already satisfied: sniffio>=1.1 in /opt/conda/lib/python3.11/site-packages (from anyio->httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
"Requirement already satisfied: SecretStorage>=3.2 in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
"Requirement already satisfied: jeepney>=0.4.2 in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
"Requirement already satisfied: jaraco.classes in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
"Requirement already satisfied: jaraco.context in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (6.0.1)\n",
"Requirement already satisfied: oauthlib>=3.0.0 in /opt/conda/lib/python3.11/site-packages (from requests-oauthlib->kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (3.2.2)\n",
"Requirement already satisfied: markdown-it-py>=2.2.0 in /opt/conda/lib/python3.11/site-packages (from rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (4.0.0)\n",
"Requirement already satisfied: pygments<3.0.0,>=2.13.0 in /opt/conda/lib/python3.11/site-packages (from rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.19.2)\n",
"Requirement already satisfied: Babel>=2.0 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.14.0)\n",
"Requirement already satisfied: leather>=0.3.2 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.4.0)\n",
"Requirement already satisfied: parsedatetime!=2.5,>=2.1 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.6)\n",
"Requirement already satisfied: python-slugify>=1.2.1 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.0.4)\n",
"Requirement already satisfied: pytimeparse>=1.1.5 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.8)\n",
"Requirement already satisfied: deepdiff<9.0,>=7.0 in /opt/conda/lib/python3.11/site-packages (from dbt-common<2.0,>=1.27.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.6.1)\n",
"Requirement already satisfied: pluggy<2,>=0.13.1 in /opt/conda/lib/python3.11/site-packages (from diff-cover>=2.5.0->collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (1.5.0)\n",
"Requirement already satisfied: jsonschema-specifications>=2023.03.6 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2024.10.1)\n",
"Requirement already satisfied: referencing>=0.28.4 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.35.1)\n",
"Requirement already satisfied: rpds-py>=0.7.1 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.20.0)\n",
"Requirement already satisfied: mdurl~=0.1 in /opt/conda/lib/python3.11/site-packages (from markdown-it-py>=2.2.0->rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.1.2)\n",
"Requirement already satisfied: pyasn1<0.7.0,>=0.6.1 in /opt/conda/lib/python3.11/site-packages (from pyasn1-modules>=0.2.1->google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.6.1)\n",
"Requirement already satisfied: backports.tarfile in /opt/conda/lib/python3.11/site-packages (from jaraco.context->keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.2.0)\n",
"Requirement already satisfied: iniconfig>=1 in /opt/conda/lib/python3.11/site-packages (from pytest->collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (2.3.0)\n",
"Requirement already satisfied: orderly-set<6,>=5.4.1 in /opt/conda/lib/python3.11/site-packages (from deepdiff<9.0,>=7.0->dbt-common<2.0,>=1.27.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (5.5.0)\n",
"Requirement already satisfied: text-unidecode>=1.3 in /opt/conda/lib/python3.11/site-packages (from python-slugify>=1.2.1->agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.3)\n",
"Building wheels for collected packages: openmetadata-ingestion\n",
" Building editable for openmetadata-ingestion (pyproject.toml) ... \u001b[?25ldone\n",
"\u001b[?25h Created wheel for openmetadata-ingestion: filename=openmetadata_ingestion-1.10.0.0.dev0-0.editable-py3-none-any.whl size=14100 sha256=7de4eb10961d6008744422eeb45dbc15b3967308192e57518fa91f670744ae60\n",
" Stored in directory: /tmp/pip-ephem-wheel-cache-65dv56xb/wheels/94/a6/4b/951e6297508c20775c8465f8caed457f0821461c94c158f900\n",
"Successfully built openmetadata-ingestion\n",
"Installing collected packages: openmetadata-ingestion\n",
" Attempting uninstall: openmetadata-ingestion\n",
" Found existing installation: openmetadata-ingestion 1.10.0.0.dev0\n",
" Uninstalling openmetadata-ingestion-1.10.0.0.dev0:\n",
" Successfully uninstalled openmetadata-ingestion-1.10.0.0.dev0\n",
"Successfully installed openmetadata-ingestion-1.10.0.0.dev0\n"
]
}
],
"source": "!pip install \"openmetadata-ingestion[pandas,postgres]>=1.11.0.0\""
},
{
"cell_type": "markdown",
"id": "84b422b5-dae7-4094-ae50-cd38c0754a6b",
"metadata": {},
"source": [
"## Initial SDK setup\n",
"In this step we make sure our Python code is ready to work against OpenMetadata\n",
"\n",
"You will be prompted for the JWT token mentioned in the [requirements](#requirements) section"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "4c2a0cc0-f335-4b53-943d-357a078d7de2",
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
"Please introduce a JWT token for authentication with OM ········\n"
]
},
{
"data": {
"text/plain": [
"<metadata.sdk.client.OpenMetadata at 0xffff8c176910>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from getpass import getpass\n",
"\n",
"from metadata.sdk import configure\n",
"\n",
"jwt_token = getpass(\"Please introduce a JWT token for authentication with OM\")\n",
"\n",
"configure(\n",
" host=\"http://openmetadata_server:8585/api\",\n",
" jwt_token=jwt_token,\n",
")"
]
},
{
"cell_type": "markdown",
"id": "aa6bb55b-2dd5-4852-a849-78611bbc7ebe",
"metadata": {},
"source": [
"## Implementation of the ETL"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "4ff0449c-0507-46e4-8e80-3d585fd1c0c3",
"metadata": {},
"outputs": [],
"source": [
"# Define the transformation function to run on dataframes\n",
"def transform(df):\n",
" # Keep only relevant columns\n",
" cols_to_keep = [\n",
" \"vendorid\", \"tpep_pickup_datetime\", \"tpep_dropoff_datetime\",\n",
" \"passenger_count\", \"trip_distance\",\n",
" \"pulocationid\", \"dolocationid\",\n",
" \"payment_type\", \"fare_amount\", \"tip_amount\",\n",
" \"total_amount\", \"congestion_surcharge\"\n",
" ]\n",
" df_stg = df[cols_to_keep]\n",
"\n",
" # Remove invalid or zero values\n",
" df_stg = df_stg[\n",
" (df_stg[\"fare_amount\"] > 0) &\n",
" (df_stg[\"total_amount\"] > 0) &\n",
" (df_stg[\"trip_distance\"] > 0) &\n",
" (df_stg[\"passenger_count\"] > 0)\n",
" ]\n",
"\n",
" # --- 2. Feature engineering ---\n",
" df_stg[\"trip_duration_min\"] = (\n",
" (df_stg[\"tpep_dropoff_datetime\"] - df_stg[\"tpep_pickup_datetime\"]).dt.total_seconds() / 60\n",
" )\n",
"\n",
" # Filter unrealistic durations and distances\n",
" df_stg = df_stg[\n",
" (df_stg[\"trip_duration_min\"] >= 1) &\n",
" (df_stg[\"trip_duration_min\"] <= 180) &\n",
" (df_stg[\"trip_distance\"] <= 100)\n",
" ]\n",
"\n",
" return df_stg"
]
},
{
"cell_type": "markdown",
"id": "df37a76c-5e7a-43e4-90e2-bdda99df4d87",
"metadata": {},
"source": [
"## Run Data Quality tests"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "f09777b0-6e29-4b92-bd23-f02936cf8481",
"metadata": {},
"outputs": [],
"source": [
"# Define the validator we will use\n",
"from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator\n",
"\n",
"validator = DataFrameValidator()\n",
"\n",
"# Load the tests defined in OpenMetadata for the table `Tutorial Postgres.stg.public.dw_taxi_trips`\n",
"validator.add_openmetadata_table_tests(\"Tutorial Postgres.stg.public.dw_taxi_trips\")\n",
"\n",
"# Alternatively, one could define the same tests as code with:\n",
"# from metadata.sdk.data_quality import ColumnValuesToBeBetween\n",
"# validator.add_tests(\n",
"# ColumnValuesToBeBetween(\n",
"# name=\"amount_is_greater_than_0\",\n",
"# min_value=0,\n",
"# ),\n",
"# ColumnValuesToBeBetween(\n",
"# name=\"trip_duration_to_be_between_1_and_180_minutes\",\n",
"# min_value=1,\n",
"# max_value=180,\n",
"# ),\n",
"# ColumnValuesToBeBetween(\n",
"# name=\"trip_distance_to_be_at_most_100\",\n",
"# max_value=100,\n",
"# ),\n",
"# )"
]
},
{
"cell_type": "markdown",
"id": "34518c0c-02ef-40ac-b6da-73400c12af54",
"metadata": {},
"source": [
"### Run mechanisms\n",
"\n",
"The `DataFrameValidator` is designed so that you can use it in a variety of use cases, including when memory is a concern and you're running your ETLs using Pandas. For such cases we have created a shortcut which will make your code smaller. But let's check the trivial use case first, when your whole data fits in memory."
]
},
{
"cell_type": "markdown",
"id": "45186085-aa85-41f8-bc24-268610f139d5",
"metadata": {},
"source": [
"#### Strategy: data fits in memory\n",
"\n",
"The following ETL reads from the source, applies transformations and validates the dataframe"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "1f766e18-003d-4560-9ff3-179a8cc18f98",
"metadata": {},
"outputs": [],
"source": [
"# Read, transform, validate and load\n",
"import pandas as pd\n",
"from sqlalchemy import MetaData, Table, create_engine, delete, insert\n",
"\n",
"SOURCE = \"postgresql://user:pass@dwh:5432/raw\"\n",
"DESTINATION = \"postgresql://user:pass@dwh:5432/stg\"\n",
"\n",
"source = create_engine(SOURCE)\n",
"\n",
"# Read\n",
"with source.connect() as conn:\n",
" df = pd.read_sql(\"SELECT * FROM taxi_yellow\", conn)\n",
"\n",
"# Transform\n",
"df = transform(df)\n",
"\n",
"# Validate\n",
"results = validator.validate(df)\n",
"\n",
"# Load?\n",
"if results.success:\n",
" destination = create_engine(\"postgresql://user:pass@dwh:5432/stg\", future=True)\n",
"\n",
" with destination.connect() as conn:\n",
" table = Table(\"dw_taxi_trips\", MetaData(), autoload_with=conn)\n",
"\n",
" # Truncate and insert\n",
" conn.execute(delete(table))\n",
" conn.execute(insert(table).values(), df.to_dict(orient=\"records\"))\n",
"\n",
"# Optional: publish results to OpenMetadata\n",
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
]
},
{
"cell_type": "markdown",
"id": "61c1be0c-8bf0-47eb-a71c-f6c54a9abac2",
"metadata": {},
"source": "#### Strategy: loading data in chunks\nNow, this use case has two variants. The first one is pretty similar to the one before and it requires that your code follows the validator's `FailureMode`, which defaults to a short circuit. The second requires that you only define three methods: one that returns chunks of probably transformed dataframes, another that loads chunks and a third that handles errors.\n\nThe validator's default and only (for now) failure mode short-circuits execution of any other test case and stops iterating on the chunks of data if a failure is encountered. We will want our code to behave as such, so after short circuiting we will rollback our changes in the destination database."
},
{
"cell_type": "code",
"execution_count": null,
"id": "dd84d688-e164-449a-ae28-26ded58d6b3c",
"metadata": {},
"outputs": [],
"source": "# First: define a mechanism to load and transform chunks of data\n## Credentials to the user are set up in `docker-compose.yml`\n\nimport pandas as pd\nfrom sqlalchemy import create_engine\n\ndef load_and_transform():\n engine = create_engine(SOURCE)\n \n with engine.connect() as conn:\n chunks = pd.read_sql(\"SELECT * FROM taxi_yellow\", conn, chunksize=1_000)\n\n for df in chunks:\n yield transform(df)"
},
{
"cell_type": "markdown",
"id": "a9b05f59-51a7-453f-a329-6917272cfd1d",
"metadata": {},
"source": [
"We will want the success and failure methods to have access to the same SQL connection so that everything stays in the same transaction. Thus we will create a little helper"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b5678c86-c08e-4d2b-8248-40c4bb761976",
"metadata": {},
"outputs": [],
"source": [
"# A little helper to manage the connection\n",
"class SQLAlchemyValidationSession:\n",
" def __init__(self, connection_string, table_name):\n",
" self.engine = create_engine(connection_string, future=True)\n",
" self.table = Table(table_name, MetaData(), autoload_with=self.engine)\n",
" self._conn = None\n",
"\n",
" def with_conn(self, connection):\n",
" self._conn = connection\n",
" return self\n",
"\n",
" def load_df_to_destination(self, df, _result):\n",
" \"\"\"Loads data into destination.\"\"\"\n",
" self._conn.execute(insert(self.table).values(), df.to_dict(orient=\"records\"))\n",
"\n",
" def rollback(self, _df, _result):\n",
" \"\"\"Clears data previously loaded\"\"\"\n",
" self._conn.rollback()\n",
"\n",
" def __enter__(self):\n",
" conn = self.engine.connect()\n",
" return self.with_conn(conn)\n",
"\n",
" def __exit__(self ,type, value, traceback):\n",
" self._conn.close()\n",
" self._conn = None"
]
},
{
"cell_type": "markdown",
"id": "6450d996-9429-456a-9f19-63fb17cbb83b",
"metadata": {},
"source": [
"**Example 1: loading data in chunks with the `DataFrameValidator.validate` method**"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "0a381ca8-da2e-4860-ad59-91fc7997a363",
"metadata": {},
"outputs": [],
"source": [
"# Example 1: loading data in chunks with the `DataFrameValidator.validate` method\n",
"from metadata.sdk.data_quality.dataframes.validation_results import ValidationResult\n",
"\n",
"validation_session = SQLAlchemyValidationSession(\n",
" connection_string=DESTINATION,\n",
" table_name=\"dw_taxi_trips\"\n",
")\n",
"\n",
"results = []\n",
"with validation_session as session:\n",
" for transformed_df in load_and_transform():\n",
" result = validator.validate(transformed_df)\n",
"\n",
" results.append(result)\n",
" \n",
" if result.success:\n",
" session.load_df_to_destination(transformed_df, result)\n",
" else:\n",
" session.rollback(df, result)\n",
" break\n",
"\n",
"# Aggregate results for each test case for every chunk\n",
"results = ValidationResult.merge(*results)\n",
"\n",
"# Publish to the server\n",
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
]
},
{
"cell_type": "markdown",
"id": "39ea430b-ac7e-4d8c-a8fc-537fcb4e3d11",
"metadata": {},
"source": [
"**Example 2: loading data in chunks with the `DataFrameValidator.run` method**\n",
"\n",
"This method is a shortcut to the loop above that returns results already merged\n",
"\n",
"> ⚠ **NOTE:** there is however one caveat. Some of our tests require the whole dataframe to be in memory for them to work. For example, tests counting the total amount of rows would return false results because they'd be running on subsets of the data. Future versions of the SDK will solve this issue. For the time being, if your data does not fit in memory you should resort to the example in [test_workflow.ipynb](/lab/tree/notebooks/test_workflow.ipynb)."
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "6ad82243-e48e-4e1e-8e47-47d7a563e999",
"metadata": {},
"outputs": [],
"source": [
"with validation_session as session:\n",
" results = validator.run(\n",
" load_and_transform(),\n",
" on_success=session.load_df_to_destination,\n",
" on_failure=session.rollback,\n",
" )\n",
"\n",
"# Results are already merged and ready to publish in OpenMetadata\n",
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "42414d5e-fbb8-43da-bd65-310c34c35d81",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Test: amount_is_greater_than_0\n",
"Status: TestCaseStatus.Success\n",
"Result: Found min=9.45, max=54.65 vs. the expected min=1.0, max=inf.; Found min=8.3, max=89.7 vs. the expected min=1.0, max=inf.; Found min=9.4, max=81.15 vs. the expected min=1.0, max=inf.; Found min=5.9, max=51.66 vs. the expected min=1.0, max=inf.; Found min=6.9, max=157.71 vs. the expected min=1.0, max=inf.; Found min=5.2, max=69.24 vs. the expected min=1.0, max=inf.; Found min=7.92, max=89.25 vs. the expected min=1.0, max=inf.; Found min=7.7, max=66.5 vs. the expected min=1.0, max=inf.; Found min=5.2, max=62.7 vs. the expected min=1.0, max=inf.; Found min=7.7, max=100.41 vs. the expected min=1.0, max=inf.\n",
"\n",
"Test: trip_distance_to_be_at_most_100\n",
"Status: TestCaseStatus.Success\n",
"Result: Found min=0.02, max=7.45 vs. the expected min=-inf, max=100.0.; Found min=0.03, max=9.23 vs. the expected min=-inf, max=100.0.; Found min=0.06, max=8.79 vs. the expected min=-inf, max=100.0.; Found min=0.05, max=9.39 vs. the expected min=-inf, max=100.0.; Found min=0.04, max=30.4 vs. the expected min=-inf, max=100.0.; Found min=0.15, max=15.9 vs. the expected min=-inf, max=100.0.; Found min=0.2, max=14.02 vs. the expected min=-inf, max=100.0.; Found min=0.01, max=10.7 vs. the expected min=-inf, max=100.0.; Found min=0.15, max=7.81 vs. the expected min=-inf, max=100.0.; Found min=0.09, max=16.88 vs. the expected min=-inf, max=100.0.\n",
"\n",
"Test: trip_duration_to_be_between_1_and_180_minutes\n",
"Status: TestCaseStatus.Success\n",
"Result: Found min=1.1, max=30.816666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.0833333333333333, max=32.93333333333333 vs. the expected min=1.0, max=180.0.; Found min=1.1333333333333333, max=30.1 vs. the expected min=1.0, max=180.0.; Found min=1.0666666666666667, max=24.883333333333333 vs. the expected min=1.0, max=180.0.; Found min=1.0, max=58.15 vs. the expected min=1.0, max=180.0.; Found min=1.0833333333333333, max=167.06666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.0333333333333334, max=37.46666666666667 vs. the expected min=1.0, max=180.0.; Found min=1.1333333333333333, max=32.75 vs. the expected min=1.0, max=180.0.; Found min=1.1, max=30.516666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.1666666666666667, max=37.583333333333336 vs. the expected min=1.0, max=180.0.\n"
]
}
],
"source": [
"# In both cases, results should be the same\n",
"for test_case, test_result in results.test_cases_and_results:\n",
"\n",
" print(f\"\\nTest: {test_case.name.root}\")\n",
" print(f\"Status: {test_result.testCaseStatus}\")\n",
" print(f\"Result: {test_result.result}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,83 @@
#!/usr/bin/env bash
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
CREATE ROLE tutorial_user LOGIN PASSWORD 'password';
CREATE DATABASE raw;
CREATE DATABASE stg;
EOSQL
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname raw <<-EOSQL
-- Grant CONNECT so the user can access the database
GRANT CONNECT ON DATABASE raw TO tutorial_user;
-- Grant USAGE on the schema (required to access tables)
GRANT USAGE ON SCHEMA public TO tutorial_user;
-- Grant SELECT on all existing tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO tutorial_user;
-- Ensure future tables are also readable
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO tutorial_user;
CREATE TABLE IF NOT EXISTS taxi_yellow (
VendorID int,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count int,
trip_distance float,
RatecodeID int,
store_and_fwd_flag varchar,
PULocationID int,
PUZone varchar(16),
DOZone varchar(16),
DOLocationID int,
payment_type int,
fare_amount float,
extra float,
mta_tax float,
tip_amount float,
tolls_amount float,
improvement_surcharge float,
total_amount float,
congestion_surcharge float,
Airport_fee float,
cbd_congestion_fee float
);
EOSQL
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname stg <<-EOSQL
-- Grant CONNECT so the user can access the database
GRANT CONNECT ON DATABASE stg TO tutorial_user;
-- Grant USAGE on the schema (required to access tables)
GRANT USAGE ON SCHEMA public TO tutorial_user;
-- Grant SELECT on all existing tables
GRANT SELECT ON ALL TABLES IN SCHEMA public TO tutorial_user;
-- Ensure future tables are also readable
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO tutorial_user;
CREATE TABLE IF NOT EXISTS dw_taxi_trips (
vendor_id INT,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
passenger_count INT,
trip_distance FLOAT,
pu_location_id INT,
do_location_id INT,
payment_type INT,
fare_amount FLOAT,
tip_amount FLOAT,
total_amount FLOAT,
congestion_surcharge FLOAT,
pickup_hour SMALLINT,
pickup_dayofweek SMALLINT,
trip_duration_min FLOAT,
avg_speed_mph FLOAT,
trip_category VARCHAR(16)
);
EOSQL

Binary file not shown.

After

Width:  |  Height:  |  Size: 60 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 140 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 107 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 109 KiB

View File

@ -0,0 +1,59 @@
#!/usr/bin/env bash
if ! docker info &> /dev/null; then
echo "Docker must be running to start services"
exit 1
fi
help_function()
{
echo "
Usage: $0 [OPTIONS]
This script downloads the docker compose of a given release and merges it with \`docker-compose.override.yml\`
Options:
-s services. Space separated list of services to boot from the compose file. Options: [jupyter, dwh, ingestion, openmetadata-server] Default: none.
-v OpenMetadata version: Default [1.10.4]
-d Download: [true, false]. Forces downloading the OM docker compose if already downloaded. Default [false]
-h For usage help
"
exit 1 # Exit script after printing help
}
while getopts "v:d:s:h" opt
do
case "$opt" in
s ) services="$OPTARG" ;;
v ) version="$OPTARG" ;;
d ) download="$OPTARG" ;;
h ) help_function ;;
? ) help_function ;;
esac
done
version="${version:=1.10.4}"
download="${download:=false}"
if [[ ! -f openmetadata-compose.yml ]] || [[ "$download" == "true" ]]; then
source="https://github.com/open-metadata/OpenMetadata/releases/download/${version}-release/docker-compose.yml"
echo "Downloading openmetadata-compose.yml from $source"
curl -fsSL $source -o openmetadata-compose.yml
fi
docker compose -f openmetadata-compose.yml -f docker-compose.yml up $services -d
if [[ "$services" != "*jupyter*" ]] && [[ "$services" != "" ]]; then
exit 0
fi
while ! grep jupyter <(docker ps) &> /dev/null; do
echo "Waiting for jupyter container to be running"
sleep 1
done
url=""
while [[ "$url" == "" ]]; do
url="$(docker compose -f openmetadata-compose.yml -f docker-compose.yml exec jupyter bash -c "jupyter server list | grep /home/jovyan | sed \"s/\$(hostname)/localhost/g\" | awk 'NR==1 { print \$1 }'")"
done
echo "You can access \`jupyter\` instance at $url"

View File

@ -71,6 +71,7 @@ VERSIONS = {
"presidio-analyzer": "presidio-analyzer==2.2.358",
"asammdf": "asammdf~=7.4.5",
"kafka-connect": "kafka-connect-py==0.10.11",
"griffe2md": "griffe2md~=1.2",
}
COMMONS = {
@ -361,6 +362,10 @@ plugins: Dict[str, Set[str]] = {
"teradata": {VERSIONS["teradata"]},
"trino": {VERSIONS["trino"], DATA_DIFF["trino"]},
"vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5", DATA_DIFF["vertica"]},
# SDK Data Quality: Required for DataFrame validation (DataFrameValidator)
# Install with: pip install 'openmetadata-ingestion[pandas]'
"pandas": {VERSIONS["pandas"], VERSIONS["numpy"]},
"pyarrow": {VERSIONS["pyarrow"]},
"pii-processor": {
VERSIONS["spacy"],
VERSIONS["pandas"],
@ -467,6 +472,10 @@ test = {
VERSIONS["kafka-connect"],
}
docs = {
VERSIONS["griffe2md"],
}
if sys.version_info >= (3, 9):
test.add("locust~=2.32.0")
@ -537,5 +546,6 @@ setup(
"sklearn",
}
),
"docs": docs,
},
)

View File

@ -12,9 +12,10 @@
"""
Generic Delimiter-Separated-Values implementation
"""
from functools import singledispatchmethod
from __future__ import annotations
from pyarrow.parquet import ParquetFile
from functools import singledispatchmethod
from typing import TYPE_CHECKING
from metadata.generated.schema.entity.services.connections.database.datalake.azureConfig import (
AzureConfig,
@ -39,6 +40,9 @@ from metadata.readers.file.adls import AZURE_PATH, return_azure_storage_options
from metadata.readers.models import ConfigSource
from metadata.utils.logger import ingestion_logger
if TYPE_CHECKING:
from pyarrow.parquet import ParquetFile
logger = ingestion_logger()
@ -145,6 +149,7 @@ class ParquetDataFrameReader(DataFrameReader):
"""
# pylint: disable=import-outside-toplevel
from gcsfs import GCSFileSystem
from pyarrow.parquet import ParquetFile
gcs = GCSFileSystem()
file_path = f"gs://{bucket_name}/{key}"
@ -239,6 +244,7 @@ class ParquetDataFrameReader(DataFrameReader):
def _(self, _: AzureConfig, key: str, bucket_name: str) -> DatalakeColumnWrapper:
import pandas as pd # pylint: disable=import-outside-toplevel
import pyarrow.fs as fs
from pyarrow.parquet import ParquetFile
storage_options = return_azure_storage_options(self.config_source)
account_url = AZURE_PATH.format(
@ -284,6 +290,7 @@ class ParquetDataFrameReader(DataFrameReader):
import os
import pandas as pd # pylint: disable=import-outside-toplevel
from pyarrow.parquet import ParquetFile
# Check file size to determine reading strategy
try:

View File

@ -10,6 +10,24 @@ The SDK is part of the openmetadata-ingestion package:
pip install openmetadata-ingestion
```
### Data Quality SDK Installation
For running data quality tests, additional dependencies may be required:
**DataFrame Validation:**
```bash
pip install 'openmetadata-ingestion[pandas]'
```
**Table-Based Testing:**
```bash
# Install the database extra matching your table's service type
pip install 'openmetadata-ingestion[mysql]' # For MySQL
pip install 'openmetadata-ingestion[postgres]' # For PostgreSQL
pip install 'openmetadata-ingestion[snowflake]' # For Snowflake
pip install 'openmetadata-ingestion[clickhouse]' # For ClickHouse
```
## Quick Start
### Configure the SDK

View File

@ -41,7 +41,7 @@ class DataFrameValidator:
Provides a simple interface to configure and execute data quality tests
on pandas DataFrames using OpenMetadata test definitions.
Example:
Examples:
validator = DataFrameValidator()
validator.add_test(ColumnValuesToBeNotNull(column="email"))
validator.add_test(ColumnValuesToBeUnique(column="customer_id"))

View File

@ -55,7 +55,7 @@ class TestRunner:
table_fqn: Fully qualified name of the table to test
client: OpenMetadata API client
Example:
Examples:
>>> from metadata.sdk.data_quality import TestRunner, TableRowCountToBeBetween
>>> runner = TestRunner.for_table("MySQL.default.db.table")
>>> runner.add_test(TableRowCountToBeBetween(min_count=100, max_count=1000))
@ -129,7 +129,7 @@ class TestRunner:
Returns:
TestRunner instance
Example:
Examples:
>>> from metadata.sdk.data_quality import TestRunner, TableColumnCountToBeBetween
>>> runner = TestRunner.for_table("MySQL.default.db.table")
>>> runner.add_test(TableColumnCountToBeBetween(min_count=10))
@ -228,7 +228,7 @@ class TestRunner:
Returns:
Self for method chaining
Example:
Examples:
>>> runner.add_tests(
... TableRowCountToBeBetween(min_count=100),
... ColumnValuesToBeNotNull(column="user_id")
@ -242,13 +242,7 @@ class TestRunner:
Returns:
List of test case results
Raises:
ValueError: If no tests have been added
"""
if not self.test_definitions:
raise ValueError("No tests added. Use add_test() to add test definitions.")
config = self.config_builder.build()
workflow = TestSuiteWorkflow.create( # pyright: ignore[reportUnknownMemberType]

View File

@ -1,4 +1,5 @@
"""Convenience classes that represent test definitions"""
from .base_tests import *
from .column_tests import *
from .table_tests import *

View File

@ -29,7 +29,7 @@ class ColumnValuesToBeInSet(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeInSet(column="status", allowed_values=["active", "inactive", "pending"])
>>> test = ColumnValuesToBeInSet(column="country_code", allowed_values=["US", "UK", "CA"])
"""
@ -68,7 +68,7 @@ class ColumnValuesToBeNotInSet(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeNotInSet(column="email", forbidden_values=["test@test.com", "admin@admin.com"])
>>> test = ColumnValuesToBeNotInSet(column="status", forbidden_values=["deleted", "archived"])
"""
@ -106,7 +106,7 @@ class ColumnValuesToBeNotNull(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeNotNull(column="user_id")
>>> test = ColumnValuesToBeNotNull(column="email")
"""
@ -140,7 +140,7 @@ class ColumnValuesToBeUnique(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeUnique(column="user_id")
>>> test = ColumnValuesToBeUnique(column="email")
"""
@ -175,7 +175,7 @@ class ColumnValuesToMatchRegex(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToMatchRegex(column="email", regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
>>> test = ColumnValuesToMatchRegex(column="phone", regex=r"^\\+?1?\\d{9,15}$")
"""
@ -212,7 +212,7 @@ class ColumnValuesToNotMatchRegex(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToNotMatchRegex(column="email", regex=r".*@test\\.com$")
>>> test = ColumnValuesToNotMatchRegex(column="name", regex=r"^test.*")
"""
@ -252,7 +252,7 @@ class ColumnValuesToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeBetween(column="age", min_value=0, max_value=120)
>>> test = ColumnValuesToBeBetween(column="price", min_value=0.01, max_value=9999.99)
"""
@ -298,7 +298,7 @@ class ColumnValueMaxToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueMaxToBeBetween(column="temperature", min_value=-50, max_value=50)
>>> test = ColumnValueMaxToBeBetween(column="score", min_value=90, max_value=100)
"""
@ -347,7 +347,7 @@ class ColumnValueMinToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueMinToBeBetween(column="temperature", min_value=-50, max_value=0)
>>> test = ColumnValueMinToBeBetween(column="age", min_value=0, max_value=18)
"""
@ -396,7 +396,7 @@ class ColumnValueMeanToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueMeanToBeBetween(column="rating", min_value=3.0, max_value=4.5)
>>> test = ColumnValueMeanToBeBetween(column="response_time_ms", min_value=100, max_value=500)
"""
@ -449,7 +449,7 @@ class ColumnValueMedianToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueMedianToBeBetween(column="salary", min_value=50000, max_value=75000)
>>> test = ColumnValueMedianToBeBetween(column="age", min_value=25, max_value=45)
"""
@ -502,7 +502,7 @@ class ColumnValueStdDevToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueStdDevToBeBetween(column="response_time", min_value=0, max_value=100)
>>> test = ColumnValueStdDevToBeBetween(column="score", min_value=5, max_value=15)
"""
@ -555,7 +555,7 @@ class ColumnValuesSumToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesSumToBeBetween(column="revenue", min_value=1000000, max_value=5000000)
>>> test = ColumnValuesSumToBeBetween(column="quantity", min_value=100, max_value=1000)
"""
@ -604,7 +604,7 @@ class ColumnValuesMissingCount(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesMissingCount(column="optional_field", missing_count_value=100)
>>> test = ColumnValuesMissingCount(column="status", missing_value_match=["N/A", "Unknown"])
"""
@ -654,7 +654,7 @@ class ColumnValueLengthsToBeBetween(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValueLengthsToBeBetween(column="username", min_length=3, max_length=20)
>>> test = ColumnValueLengthsToBeBetween(column="description", min_length=10, max_length=500)
"""
@ -703,7 +703,7 @@ class ColumnValuesToBeAtExpectedLocation(ColumnTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = ColumnValuesToBeAtExpectedLocation(column="id", expected_value="1", row_index=0)
>>> test = ColumnValuesToBeAtExpectedLocation(column="rank", expected_value="first", row_index=0)
"""

View File

@ -29,7 +29,7 @@ class TableColumnCountToBeBetween(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableColumnCountToBeBetween(min_count=5, max_count=10)
>>> test = TableColumnCountToBeBetween(min_count=5) # Only minimum
"""
@ -74,7 +74,7 @@ class TableColumnCountToEqual(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableColumnCountToEqual(column_count=10)
"""
@ -110,7 +110,7 @@ class TableRowCountToBeBetween(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableRowCountToBeBetween(min_count=1000, max_count=5000)
>>> test = TableRowCountToBeBetween(min_count=100) # Only minimum
"""
@ -155,7 +155,7 @@ class TableRowCountToEqual(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableRowCountToEqual(row_count=50)
"""
@ -194,7 +194,7 @@ class TableRowInsertedCountToBeBetween(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableRowInsertedCountToBeBetween(min_count=100, max_count=1000, range_type="DAY", range_interval=1)
>>> test = TableRowInsertedCountToBeBetween(min_count=50, range_type="HOUR", range_interval=6)
"""
@ -249,7 +249,7 @@ class TableColumnToMatchSet(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableColumnToMatchSet(column_names=["id", "name", "email"])
>>> test = TableColumnToMatchSet(column_names=["col1", "col2"], ordered=True)
"""
@ -289,7 +289,7 @@ class TableColumnNameToExist(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableColumnNameToExist(column_name="user_id")
"""
@ -325,7 +325,7 @@ class TableCustomSQLQuery(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableCustomSQLQuery(
... sql_expression="SELECT * FROM {table} WHERE price < 0",
... strategy="ROWS"
@ -369,7 +369,7 @@ class TableDiff(TableTest):
display_name: Custom display name for UI
description: Custom test description
Example:
Examples:
>>> test = TableDiff(
... table2="service.database.schema.reference_table",
... key_columns=["id"],

View File

@ -9,7 +9,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Examples demonstrating DataFrame validation with OpenMetadata SDK."""
"""Examples demonstrating DataFrame validation with OpenMetadata SDK.
Installation:
For basic DataFrame validation:
pip install 'openmetadata-ingestion[pandas]'
For reading Parquet files:
pip install 'openmetadata-ingestion[pandas,pyarrow]'
For reading from S3 datalakes:
pip install 'openmetadata-ingestion[pandas,datalake-s3]'
"""
# pyright: reportUnknownVariableType=false, reportAttributeAccessIssue=false, reportUnknownMemberType=false
# pyright: reportUnusedCallResult=false
# pylint: disable=W5001

View File

@ -3,6 +3,15 @@ Example: Data Quality as Code with OpenMetadata SDK
This example demonstrates how to run data quality tests programmatically
using the simplified DQ as Code API.
Installation:
For MySQL tables:
pip install 'openmetadata-ingestion[mysql]'
For other databases, replace [mysql] with your database type:
pip install 'openmetadata-ingestion[postgres]'
pip install 'openmetadata-ingestion[snowflake]'
pip install 'openmetadata-ingestion[clickhouse]'
"""
# pyright: reportUnusedCallResult=false

View File

@ -1,6 +1,7 @@
"""
Unit tests for DQ as Code TestRunner
"""
from tempfile import NamedTemporaryFile
from typing import Generator
from unittest.mock import MagicMock, Mock, create_autospec, patch
@ -13,10 +14,20 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.generated.schema.type.basic import (
EntityName,
@ -52,7 +63,7 @@ def mock_table():
dataType=DataType.VARCHAR,
),
],
serviceType="Mysql",
serviceType=DatabaseServiceType.Mysql,
service=EntityReference.model_construct(
id=uuid4(),
name="MySQL",
@ -83,7 +94,7 @@ def mock_service(mock_connection):
id=uuid4(),
name=EntityName("MySQL"),
serviceType="Mysql",
connection=mock_connection,
connection=DatabaseConnection(config=mock_connection),
)
@ -91,7 +102,10 @@ def mock_service(mock_connection):
def mock_client(mock_table, mock_service):
"""Mock OMeta client"""
mock = MagicMock()
mock.config = MagicMock()
mock.config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
securityConfig=OpenMetadataJWTClientConfig(jwtToken="the-jwt-token"),
)
mock.get_by_name.side_effect = (mock_table,)
mock.get_by_id.side_effect = (mock_service,)
return mock
@ -207,12 +221,13 @@ def test_add_test(mock_get_client):
assert len(runner.test_definitions) == 1
def test_run_without_tests(mock_get_client):
@patch("metadata.sdk.data_quality.runner.TestSuiteWorkflow")
@patch("metadata.sdk.data_quality.runner.WorkflowConfigBuilder")
def test_run_without_tests(mock_builder_class, mock_workflow_class, mock_get_client):
"""Test error when running without tests"""
runner = TestRunner.for_table("MySQL.default.test_db.test_table")
with pytest.raises(ValueError, match="No tests added"):
runner.run()
assert runner.run() is not None
@patch("metadata.sdk.data_quality.runner.TestSuiteWorkflow")