mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-18 19:18:28 +00:00
* Brief documentation of installation requirements * Minor fix to run tests only defined in OpenMetadata * Add full example to Data Quality as Code * Install `griffe2md` and fix docstrings * Remove local openmetadata reference * Fix writing, grammar and typos * Fix test * Fix formatting
572 lines
47 KiB
Plaintext
572 lines
47 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "14093fef-c7ab-4566-88e2-318553ba3aef",
|
|
"metadata": {},
|
|
"source": "# Running Data Quality tests for DataFrame\n\nIn the following Notebook we will work with the `Tutorial Postgres.raw.public.taxi_yellow` table from the previous tutorial - [test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb) - and perform some transformations on it before loading it into our staging database.\n\n## Purpose\nWe want to showcase how we can hook OpenMetadata's data quality mechanisms directly in your ETLs before your data reaches its destination. For that, we're building an ETL that transforms the data we previously built and loads it in a table for which we have set up data quality tests in the [given instructions](/lab/tree/README.md).\n\n## Description of the ETL\nFor context, please refer to [the test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb)\n\nIn this case we will run some simple data cleaning and transformations on the dataframe. Then, we will use the `DataFrameValidator` interface to load the chunks of validated data into the destination and then finally report those results to OpenMetadata.\n\nWe will use the [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) library to run the Data Quality tests we have defined in [OpenMetadata](http://localhost:8585/table/Tutorial%20Postgres.raw.public.taxi_yellow/profiler/data-quality).\n\n## Dependencies\nFor our ETL we will be using SQLAlchemy to load the table, Pandas DataFrames to perform transformations, [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) to run data quality tests and the OpenMetadata [Postgres Connector](https://docs.open-metadata.org/latest/connectors/database/postgres).\n\nWe can install all these dependencies specifying the right extras. A full list can be found in the project's [`setup.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/setup.py), check it out if your installation differs from the example below.\n\n## Requirements\nIf you haven't, please follow the [setup](/lab/tree/README.md#setup) steps in the README\n\nFor this example you will need:\n\n- To have run the [`test_workflow`](/lab/tree/notebooks/test_workflow.ipynb) notebook\n- An OpenMetadata instance running (achieved by following the setup instructions above)\n- A bot JWT token. You can do so by using [Ingestion Bot's](http://localhost:8585/bots/ingestion-bot) token from your OpenMetadata instance\n- [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) version 1.11.0.0 or above (installed in this Notebook)"
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "initial_id",
|
|
"metadata": {
|
|
"scrolled": true
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Obtaining file:///opt/openmetadata/ingestion\n",
|
|
" Installing build dependencies ... \u001b[?25ldone\n",
|
|
"\u001b[?25h Checking if build backend supports build_editable ... \u001b[?25ldone\n",
|
|
"\u001b[?25h Getting requirements to build editable ... \u001b[?25ldone\n",
|
|
"\u001b[?25h Preparing editable metadata (pyproject.toml) ... \u001b[?25ldone\n",
|
|
"\u001b[?25hRequirement already satisfied: Jinja2>=2.11.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (3.1.6)\n",
|
|
"Requirement already satisfied: httpx~=0.28.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.28.1)\n",
|
|
"Requirement already satisfied: memory-profiler in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.61.0)\n",
|
|
"Requirement already satisfied: packaging in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (24.1)\n",
|
|
"Requirement already satisfied: PyYAML~=6.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (6.0.2)\n",
|
|
"Requirement already satisfied: shapely in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.1.2)\n",
|
|
"Requirement already satisfied: importlib-metadata>=4.13.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (8.5.0)\n",
|
|
"Requirement already satisfied: pymysql~=1.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.1.2)\n",
|
|
"Requirement already satisfied: cached-property==1.5.2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.5.2)\n",
|
|
"Requirement already satisfied: python-dotenv>=0.19.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.2.1)\n",
|
|
"Requirement already satisfied: jsonpatch<2.0,>=1.24 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.33)\n",
|
|
"Requirement already satisfied: mypy-extensions>=0.4.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.1.0)\n",
|
|
"Requirement already satisfied: kubernetes>=21.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (34.1.0)\n",
|
|
"Requirement already satisfied: jaraco.functools<4.2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.1.0)\n",
|
|
"Requirement already satisfied: chardet==4.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.0.0)\n",
|
|
"Requirement already satisfied: collate-sqllineage~=1.6.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.6.22)\n",
|
|
"Requirement already satisfied: collate-data-diff>=0.11.6 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.11.7)\n",
|
|
"Requirement already satisfied: google-cloud-secret-manager==2.24.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.24.0)\n",
|
|
"Requirement already satisfied: google-crc32c in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.7.1)\n",
|
|
"Requirement already satisfied: requests>=2.23 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.32.3)\n",
|
|
"Requirement already satisfied: tabulate==0.9.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
|
|
"Requirement already satisfied: antlr4-python3-runtime==4.9.2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.9.2)\n",
|
|
"Requirement already satisfied: python-dateutil>=2.8.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.0)\n",
|
|
"Requirement already satisfied: pydantic-settings>=2.7.0,~=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.11.0)\n",
|
|
"Requirement already satisfied: sqlalchemy<2,>=1.4.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.4.54)\n",
|
|
"Requirement already satisfied: azure-identity~=1.12 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.25.1)\n",
|
|
"Requirement already satisfied: pydantic<2.12,>=2.7.0,~=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.2)\n",
|
|
"Requirement already satisfied: azure-keyvault-secrets in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (4.10.0)\n",
|
|
"Requirement already satisfied: boto3<2.0,>=1.20 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.40.68)\n",
|
|
"Requirement already satisfied: cryptography>=42.0.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (46.0.0)\n",
|
|
"Requirement already satisfied: email-validator>=2.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.3.0)\n",
|
|
"Requirement already satisfied: requests-aws4auth~=1.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
|
|
"Requirement already satisfied: typing-inspect in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
|
|
"Requirement already satisfied: snowflake-connector-python<4.0.0,>=3.13.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (3.18.0)\n",
|
|
"Requirement already satisfied: setuptools~=70.0 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (70.3.0)\n",
|
|
"Requirement already satisfied: mysql-connector-python>=9.1 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (9.5.0)\n",
|
|
"Requirement already satisfied: google-api-core!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (2.28.1)\n",
|
|
"Requirement already satisfied: google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (2.43.0)\n",
|
|
"Requirement already satisfied: proto-plus<2.0.0,>=1.22.3 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.26.1)\n",
|
|
"Requirement already satisfied: protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<7.0.0,>=3.20.2 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (6.33.0)\n",
|
|
"Requirement already satisfied: grpc-google-iam-v1<1.0.0,>=0.14.0 in /opt/conda/lib/python3.11/site-packages (from google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.14.3)\n",
|
|
"Requirement already satisfied: numpy<2 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (1.26.4)\n",
|
|
"Requirement already satisfied: pandas~=2.0.3 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.0.3)\n",
|
|
"Requirement already satisfied: psycopg2-binary in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (2.9.11)\n",
|
|
"Requirement already satisfied: GeoAlchemy2~=0.12 in /opt/conda/lib/python3.11/site-packages (from openmetadata-ingestion==1.10.0.0.dev0) (0.18.0)\n",
|
|
"Requirement already satisfied: azure-core>=1.31.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.36.0)\n",
|
|
"Requirement already satisfied: msal>=1.30.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.34.0)\n",
|
|
"Requirement already satisfied: msal-extensions>=1.2.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
|
|
"Requirement already satisfied: typing-extensions>=4.0.0 in /opt/conda/lib/python3.11/site-packages (from azure-identity~=1.12->openmetadata-ingestion==1.10.0.0.dev0) (4.12.2)\n",
|
|
"Requirement already satisfied: botocore<1.41.0,>=1.40.68 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (1.40.68)\n",
|
|
"Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (1.0.1)\n",
|
|
"Requirement already satisfied: s3transfer<0.15.0,>=0.14.0 in /opt/conda/lib/python3.11/site-packages (from boto3<2.0,>=1.20->openmetadata-ingestion==1.10.0.0.dev0) (0.14.0)\n",
|
|
"Requirement already satisfied: attrs>=23.1.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (24.2.0)\n",
|
|
"Requirement already satisfied: click>=8.1 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.3.0)\n",
|
|
"Requirement already satisfied: dbt-core<2.0.0,>=1.0.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.10.13)\n",
|
|
"Requirement already satisfied: dsnparse<0.2.0 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.1.15)\n",
|
|
"Requirement already satisfied: keyring in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (25.6.0)\n",
|
|
"Requirement already satisfied: mashumaro<3.11.0,>=2.9 in /opt/conda/lib/python3.11/site-packages (from mashumaro[msgpack]<3.11.0,>=2.9->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.10)\n",
|
|
"Requirement already satisfied: rich in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (14.2.0)\n",
|
|
"Requirement already satisfied: toml>=0.10.2 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.10.2)\n",
|
|
"Requirement already satisfied: urllib3<2 in /opt/conda/lib/python3.11/site-packages (from collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.26.20)\n",
|
|
"Requirement already satisfied: sqlparse<0.6,>=0.5 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.5.3)\n",
|
|
"Requirement already satisfied: networkx>=2.4 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.5)\n",
|
|
"Requirement already satisfied: collate-sqlfluff~=3.3.0 in /opt/conda/lib/python3.11/site-packages (from collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.3.6)\n",
|
|
"Requirement already satisfied: cffi>=1.14 in /opt/conda/lib/python3.11/site-packages (from cryptography>=42.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.17.1)\n",
|
|
"Requirement already satisfied: dnspython>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from email-validator>=2.0->openmetadata-ingestion==1.10.0.0.dev0) (2.8.0)\n",
|
|
"Requirement already satisfied: idna>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from email-validator>=2.0->openmetadata-ingestion==1.10.0.0.dev0) (3.10)\n",
|
|
"Requirement already satisfied: anyio in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (4.6.2.post1)\n",
|
|
"Requirement already satisfied: certifi in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (2024.8.30)\n",
|
|
"Requirement already satisfied: httpcore==1.* in /opt/conda/lib/python3.11/site-packages (from httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (1.0.6)\n",
|
|
"Requirement already satisfied: h11<0.15,>=0.13 in /opt/conda/lib/python3.11/site-packages (from httpcore==1.*->httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (0.14.0)\n",
|
|
"Requirement already satisfied: zipp>=3.20 in /opt/conda/lib/python3.11/site-packages (from importlib-metadata>=4.13.0->openmetadata-ingestion==1.10.0.0.dev0) (3.20.2)\n",
|
|
"Requirement already satisfied: more-itertools in /opt/conda/lib/python3.11/site-packages (from jaraco.functools<4.2.0->openmetadata-ingestion==1.10.0.0.dev0) (10.8.0)\n",
|
|
"Requirement already satisfied: MarkupSafe>=2.0 in /opt/conda/lib/python3.11/site-packages (from Jinja2>=2.11.3->openmetadata-ingestion==1.10.0.0.dev0) (3.0.2)\n",
|
|
"Requirement already satisfied: jsonpointer>=1.9 in /opt/conda/lib/python3.11/site-packages (from jsonpatch<2.0,>=1.24->openmetadata-ingestion==1.10.0.0.dev0) (3.0.0)\n",
|
|
"Requirement already satisfied: six>=1.9.0 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.16.0)\n",
|
|
"Requirement already satisfied: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (1.8.0)\n",
|
|
"Requirement already satisfied: requests-oauthlib in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (2.0.0)\n",
|
|
"Requirement already satisfied: durationpy>=0.7 in /opt/conda/lib/python3.11/site-packages (from kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (0.10)\n",
|
|
"Requirement already satisfied: pytz>=2020.1 in /opt/conda/lib/python3.11/site-packages (from pandas~=2.0.3->openmetadata-ingestion==1.10.0.0.dev0) (2024.2)\n",
|
|
"Requirement already satisfied: tzdata>=2022.1 in /opt/conda/lib/python3.11/site-packages (from pandas~=2.0.3->openmetadata-ingestion==1.10.0.0.dev0) (2025.2)\n",
|
|
"Requirement already satisfied: annotated-types>=0.6.0 in /opt/conda/lib/python3.11/site-packages (from pydantic<2.12,>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (0.7.0)\n",
|
|
"Requirement already satisfied: pydantic-core==2.23.4 in /opt/conda/lib/python3.11/site-packages (from pydantic<2.12,>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (2.23.4)\n",
|
|
"Requirement already satisfied: typing-inspection>=0.4.0 in /opt/conda/lib/python3.11/site-packages (from pydantic-settings>=2.7.0,~=2.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.2)\n",
|
|
"Requirement already satisfied: charset-normalizer<4,>=2 in /opt/conda/lib/python3.11/site-packages (from requests>=2.23->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
|
|
"Requirement already satisfied: asn1crypto<2.0.0,>0.24.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (1.5.1)\n",
|
|
"Requirement already satisfied: pyOpenSSL<26.0.0,>=22.0.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (25.3.0)\n",
|
|
"Requirement already satisfied: pyjwt<3.0.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (2.9.0)\n",
|
|
"Requirement already satisfied: filelock<4,>=3.5 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (3.20.0)\n",
|
|
"Requirement already satisfied: sortedcontainers>=2.4.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (2.4.0)\n",
|
|
"Requirement already satisfied: platformdirs<5.0.0,>=2.6.0 in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (4.3.6)\n",
|
|
"Requirement already satisfied: tomlkit in /opt/conda/lib/python3.11/site-packages (from snowflake-connector-python<4.0.0,>=3.13.1->openmetadata-ingestion==1.10.0.0.dev0) (0.13.3)\n",
|
|
"Requirement already satisfied: greenlet!=0.4.17 in /opt/conda/lib/python3.11/site-packages (from sqlalchemy<2,>=1.4.0->openmetadata-ingestion==1.10.0.0.dev0) (3.1.1)\n",
|
|
"Requirement already satisfied: isodate>=0.6.1 in /opt/conda/lib/python3.11/site-packages (from azure-keyvault-secrets->openmetadata-ingestion==1.10.0.0.dev0) (0.6.1)\n",
|
|
"Requirement already satisfied: psutil in /opt/conda/lib/python3.11/site-packages (from memory-profiler->openmetadata-ingestion==1.10.0.0.dev0) (6.0.0)\n",
|
|
"Requirement already satisfied: pycparser in /opt/conda/lib/python3.11/site-packages (from cffi>=1.14->cryptography>=42.0.0->openmetadata-ingestion==1.10.0.0.dev0) (2.22)\n",
|
|
"Requirement already satisfied: colorama>=0.3 in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.6)\n",
|
|
"Requirement already satisfied: diff-cover>=2.5.0 in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (9.7.1)\n",
|
|
"Requirement already satisfied: pathspec in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (0.12.1)\n",
|
|
"Requirement already satisfied: pytest in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (8.4.2)\n",
|
|
"Requirement already satisfied: regex in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (2025.11.3)\n",
|
|
"Requirement already satisfied: tblib in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (3.2.1)\n",
|
|
"Requirement already satisfied: tqdm in /opt/conda/lib/python3.11/site-packages (from collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (4.66.5)\n",
|
|
"Requirement already satisfied: agate<1.10,>=1.7.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.9.1)\n",
|
|
"Requirement already satisfied: jsonschema<5.0,>=4.19.1 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (4.23.0)\n",
|
|
"Requirement already satisfied: snowplow-tracker<2.0,>=1.0.2 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.0)\n",
|
|
"Requirement already satisfied: dbt-extractor<=0.6,>=0.5.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.6.0)\n",
|
|
"Requirement already satisfied: dbt-semantic-interfaces<0.10,>=0.9.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
|
|
"Requirement already satisfied: dbt-common<2.0,>=1.27.0 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.36.0)\n",
|
|
"Requirement already satisfied: dbt-adapters<2.0,>=1.15.5 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.18.0)\n",
|
|
"Requirement already satisfied: dbt-protos<2.0,>=1.0.346 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.0.382)\n",
|
|
"Requirement already satisfied: daff>=1.3.46 in /opt/conda/lib/python3.11/site-packages (from dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.4.2)\n",
|
|
"Requirement already satisfied: googleapis-common-protos<2.0.0,>=1.56.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.72.0)\n",
|
|
"Requirement already satisfied: grpcio<2.0.0,>=1.33.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.76.0)\n",
|
|
"Requirement already satisfied: grpcio-status<2.0.0,>=1.33.2 in /opt/conda/lib/python3.11/site-packages (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.10.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,!=2.9.*,<3.0.0,>=1.34.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (1.76.0)\n",
|
|
"Requirement already satisfied: cachetools<7.0,>=2.0.0 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (6.2.1)\n",
|
|
"Requirement already satisfied: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.4.2)\n",
|
|
"Requirement already satisfied: rsa<5,>=3.1.4 in /opt/conda/lib/python3.11/site-packages (from google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (4.9.1)\n",
|
|
"Requirement already satisfied: msgpack>=0.5.6 in /opt/conda/lib/python3.11/site-packages (from mashumaro[msgpack]<3.11.0,>=2.9->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.2)\n",
|
|
"Requirement already satisfied: sniffio>=1.1 in /opt/conda/lib/python3.11/site-packages (from anyio->httpx~=0.28.0->openmetadata-ingestion==1.10.0.0.dev0) (1.3.1)\n",
|
|
"Requirement already satisfied: SecretStorage>=3.2 in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
|
|
"Requirement already satisfied: jeepney>=0.4.2 in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.9.0)\n",
|
|
"Requirement already satisfied: jaraco.classes in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (3.4.0)\n",
|
|
"Requirement already satisfied: jaraco.context in /opt/conda/lib/python3.11/site-packages (from keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (6.0.1)\n",
|
|
"Requirement already satisfied: oauthlib>=3.0.0 in /opt/conda/lib/python3.11/site-packages (from requests-oauthlib->kubernetes>=21.0.0->openmetadata-ingestion==1.10.0.0.dev0) (3.2.2)\n",
|
|
"Requirement already satisfied: markdown-it-py>=2.2.0 in /opt/conda/lib/python3.11/site-packages (from rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (4.0.0)\n",
|
|
"Requirement already satisfied: pygments<3.0.0,>=2.13.0 in /opt/conda/lib/python3.11/site-packages (from rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.19.2)\n",
|
|
"Requirement already satisfied: Babel>=2.0 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.14.0)\n",
|
|
"Requirement already satisfied: leather>=0.3.2 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.4.0)\n",
|
|
"Requirement already satisfied: parsedatetime!=2.5,>=2.1 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2.6)\n",
|
|
"Requirement already satisfied: python-slugify>=1.2.1 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.0.4)\n",
|
|
"Requirement already satisfied: pytimeparse>=1.1.5 in /opt/conda/lib/python3.11/site-packages (from agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.1.8)\n",
|
|
"Requirement already satisfied: deepdiff<9.0,>=7.0 in /opt/conda/lib/python3.11/site-packages (from dbt-common<2.0,>=1.27.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (8.6.1)\n",
|
|
"Requirement already satisfied: pluggy<2,>=0.13.1 in /opt/conda/lib/python3.11/site-packages (from diff-cover>=2.5.0->collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (1.5.0)\n",
|
|
"Requirement already satisfied: jsonschema-specifications>=2023.03.6 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (2024.10.1)\n",
|
|
"Requirement already satisfied: referencing>=0.28.4 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.35.1)\n",
|
|
"Requirement already satisfied: rpds-py>=0.7.1 in /opt/conda/lib/python3.11/site-packages (from jsonschema<5.0,>=4.19.1->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.20.0)\n",
|
|
"Requirement already satisfied: mdurl~=0.1 in /opt/conda/lib/python3.11/site-packages (from markdown-it-py>=2.2.0->rich->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (0.1.2)\n",
|
|
"Requirement already satisfied: pyasn1<0.7.0,>=0.6.1 in /opt/conda/lib/python3.11/site-packages (from pyasn1-modules>=0.2.1->google-auth!=2.24.0,!=2.25.0,<3.0.0,>=2.14.1->google-cloud-secret-manager==2.24.0->openmetadata-ingestion==1.10.0.0.dev0) (0.6.1)\n",
|
|
"Requirement already satisfied: backports.tarfile in /opt/conda/lib/python3.11/site-packages (from jaraco.context->keyring->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.2.0)\n",
|
|
"Requirement already satisfied: iniconfig>=1 in /opt/conda/lib/python3.11/site-packages (from pytest->collate-sqlfluff~=3.3.0->collate-sqllineage~=1.6.0->openmetadata-ingestion==1.10.0.0.dev0) (2.3.0)\n",
|
|
"Requirement already satisfied: orderly-set<6,>=5.4.1 in /opt/conda/lib/python3.11/site-packages (from deepdiff<9.0,>=7.0->dbt-common<2.0,>=1.27.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (5.5.0)\n",
|
|
"Requirement already satisfied: text-unidecode>=1.3 in /opt/conda/lib/python3.11/site-packages (from python-slugify>=1.2.1->agate<1.10,>=1.7.0->dbt-core<2.0.0,>=1.0.0->collate-data-diff>=0.11.6->openmetadata-ingestion==1.10.0.0.dev0) (1.3)\n",
|
|
"Building wheels for collected packages: openmetadata-ingestion\n",
|
|
" Building editable for openmetadata-ingestion (pyproject.toml) ... \u001b[?25ldone\n",
|
|
"\u001b[?25h Created wheel for openmetadata-ingestion: filename=openmetadata_ingestion-1.10.0.0.dev0-0.editable-py3-none-any.whl size=14100 sha256=7de4eb10961d6008744422eeb45dbc15b3967308192e57518fa91f670744ae60\n",
|
|
" Stored in directory: /tmp/pip-ephem-wheel-cache-65dv56xb/wheels/94/a6/4b/951e6297508c20775c8465f8caed457f0821461c94c158f900\n",
|
|
"Successfully built openmetadata-ingestion\n",
|
|
"Installing collected packages: openmetadata-ingestion\n",
|
|
" Attempting uninstall: openmetadata-ingestion\n",
|
|
" Found existing installation: openmetadata-ingestion 1.10.0.0.dev0\n",
|
|
" Uninstalling openmetadata-ingestion-1.10.0.0.dev0:\n",
|
|
" Successfully uninstalled openmetadata-ingestion-1.10.0.0.dev0\n",
|
|
"Successfully installed openmetadata-ingestion-1.10.0.0.dev0\n"
|
|
]
|
|
}
|
|
],
|
|
"source": "!pip install \"openmetadata-ingestion[pandas,postgres]>=1.11.0.0\""
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "84b422b5-dae7-4094-ae50-cd38c0754a6b",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Initial SDK setup\n",
|
|
"In this step we make sure our Python code is ready to work against OpenMetadata\n",
|
|
"\n",
|
|
"You will be prompted for the JWT token mentioned in the [requirements](#requirements) section"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "4c2a0cc0-f335-4b53-943d-357a078d7de2",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdin",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Please introduce a JWT token for authentication with OM ········\n"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"<metadata.sdk.client.OpenMetadata at 0xffff8c176910>"
|
|
]
|
|
},
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"from getpass import getpass\n",
|
|
"\n",
|
|
"from metadata.sdk import configure\n",
|
|
"\n",
|
|
"jwt_token = getpass(\"Please introduce a JWT token for authentication with OM\")\n",
|
|
"\n",
|
|
"configure(\n",
|
|
" host=\"http://openmetadata_server:8585/api\",\n",
|
|
" jwt_token=jwt_token,\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "aa6bb55b-2dd5-4852-a849-78611bbc7ebe",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Implementation of the ETL"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "4ff0449c-0507-46e4-8e80-3d585fd1c0c3",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Define the transformation function to run on dataframes\n",
|
|
"def transform(df):\n",
|
|
" # Keep only relevant columns\n",
|
|
" cols_to_keep = [\n",
|
|
" \"vendorid\", \"tpep_pickup_datetime\", \"tpep_dropoff_datetime\",\n",
|
|
" \"passenger_count\", \"trip_distance\",\n",
|
|
" \"pulocationid\", \"dolocationid\",\n",
|
|
" \"payment_type\", \"fare_amount\", \"tip_amount\",\n",
|
|
" \"total_amount\", \"congestion_surcharge\"\n",
|
|
" ]\n",
|
|
" df_stg = df[cols_to_keep]\n",
|
|
"\n",
|
|
" # Remove invalid or zero values\n",
|
|
" df_stg = df_stg[\n",
|
|
" (df_stg[\"fare_amount\"] > 0) &\n",
|
|
" (df_stg[\"total_amount\"] > 0) &\n",
|
|
" (df_stg[\"trip_distance\"] > 0) &\n",
|
|
" (df_stg[\"passenger_count\"] > 0)\n",
|
|
" ]\n",
|
|
"\n",
|
|
" # --- 2. Feature engineering ---\n",
|
|
" df_stg[\"trip_duration_min\"] = (\n",
|
|
" (df_stg[\"tpep_dropoff_datetime\"] - df_stg[\"tpep_pickup_datetime\"]).dt.total_seconds() / 60\n",
|
|
" )\n",
|
|
"\n",
|
|
" # Filter unrealistic durations and distances\n",
|
|
" df_stg = df_stg[\n",
|
|
" (df_stg[\"trip_duration_min\"] >= 1) &\n",
|
|
" (df_stg[\"trip_duration_min\"] <= 180) &\n",
|
|
" (df_stg[\"trip_distance\"] <= 100)\n",
|
|
" ]\n",
|
|
"\n",
|
|
" return df_stg"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "df37a76c-5e7a-43e4-90e2-bdda99df4d87",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Run Data Quality tests"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "f09777b0-6e29-4b92-bd23-f02936cf8481",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Define the validator we will use\n",
|
|
"from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator\n",
|
|
"\n",
|
|
"validator = DataFrameValidator()\n",
|
|
"\n",
|
|
"# Load the tests defined in OpenMetadata for the table `Tutorial Postgres.stg.public.dw_taxi_trips`\n",
|
|
"validator.add_openmetadata_table_tests(\"Tutorial Postgres.stg.public.dw_taxi_trips\")\n",
|
|
"\n",
|
|
"# Alternatively, one could define the same tests as code with:\n",
|
|
"# from metadata.sdk.data_quality import ColumnValuesToBeBetween\n",
|
|
"# validator.add_tests(\n",
|
|
"# ColumnValuesToBeBetween(\n",
|
|
"# name=\"amount_is_greater_than_0\",\n",
|
|
"# min_value=0,\n",
|
|
"# ),\n",
|
|
"# ColumnValuesToBeBetween(\n",
|
|
"# name=\"trip_duration_to_be_between_1_and_180_minutes\",\n",
|
|
"# min_value=1,\n",
|
|
"# max_value=180,\n",
|
|
"# ),\n",
|
|
"# ColumnValuesToBeBetween(\n",
|
|
"# name=\"trip_distance_to_be_at_most_100\",\n",
|
|
"# max_value=100,\n",
|
|
"# ),\n",
|
|
"# )"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "34518c0c-02ef-40ac-b6da-73400c12af54",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Run mechanisms\n",
|
|
"\n",
|
|
"The `DataFrameValidator` is designed so that you can use it in a variety of use cases, including when memory is a concern and you're running your ETLs using Pandas. For such cases we have created a shortcut which will make your code smaller. But let's check the trivial use case first, when your whole data fits in memory."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "45186085-aa85-41f8-bc24-268610f139d5",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Strategy: data fits in memory\n",
|
|
"\n",
|
|
"The following ETL reads from the source, applies transformations and validates the dataframe"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "1f766e18-003d-4560-9ff3-179a8cc18f98",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Read, transform, validate and load\n",
|
|
"import pandas as pd\n",
|
|
"from sqlalchemy import MetaData, Table, create_engine, delete, insert\n",
|
|
"\n",
|
|
"SOURCE = \"postgresql://user:pass@dwh:5432/raw\"\n",
|
|
"DESTINATION = \"postgresql://user:pass@dwh:5432/stg\"\n",
|
|
"\n",
|
|
"source = create_engine(SOURCE)\n",
|
|
"\n",
|
|
"# Read\n",
|
|
"with source.connect() as conn:\n",
|
|
" df = pd.read_sql(\"SELECT * FROM taxi_yellow\", conn)\n",
|
|
"\n",
|
|
"# Transform\n",
|
|
"df = transform(df)\n",
|
|
"\n",
|
|
"# Validate\n",
|
|
"results = validator.validate(df)\n",
|
|
"\n",
|
|
"# Load?\n",
|
|
"if results.success:\n",
|
|
" destination = create_engine(\"postgresql://user:pass@dwh:5432/stg\", future=True)\n",
|
|
"\n",
|
|
" with destination.connect() as conn:\n",
|
|
" table = Table(\"dw_taxi_trips\", MetaData(), autoload_with=conn)\n",
|
|
"\n",
|
|
" # Truncate and insert\n",
|
|
" conn.execute(delete(table))\n",
|
|
" conn.execute(insert(table).values(), df.to_dict(orient=\"records\"))\n",
|
|
"\n",
|
|
"# Optional: publish results to OpenMetadata\n",
|
|
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "61c1be0c-8bf0-47eb-a71c-f6c54a9abac2",
|
|
"metadata": {},
|
|
"source": "#### Strategy: loading data in chunks\nNow, this use case has two variants. The first one is pretty similar to the one before and it requires that your code follows the validator's `FailureMode`, which defaults to a short circuit. The second requires that you only define three methods: one that returns chunks of probably transformed dataframes, another that loads chunks and a third that handles errors.\n\nThe validator's default and only (for now) failure mode short-circuits execution of any other test case and stops iterating on the chunks of data if a failure is encountered. We will want our code to behave as such, so after short circuiting we will rollback our changes in the destination database."
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "dd84d688-e164-449a-ae28-26ded58d6b3c",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": "# First: define a mechanism to load and transform chunks of data\n## Credentials to the user are set up in `docker-compose.yml`\n\nimport pandas as pd\nfrom sqlalchemy import create_engine\n\ndef load_and_transform():\n engine = create_engine(SOURCE)\n \n with engine.connect() as conn:\n chunks = pd.read_sql(\"SELECT * FROM taxi_yellow\", conn, chunksize=1_000)\n\n for df in chunks:\n yield transform(df)"
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "a9b05f59-51a7-453f-a329-6917272cfd1d",
|
|
"metadata": {},
|
|
"source": [
|
|
"We will want the success and failure methods to have access to the same SQL connection so that everything stays in the same transaction. Thus we will create a little helper"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "b5678c86-c08e-4d2b-8248-40c4bb761976",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# A little helper to manage the connection\n",
|
|
"class SQLAlchemyValidationSession:\n",
|
|
" def __init__(self, connection_string, table_name):\n",
|
|
" self.engine = create_engine(connection_string, future=True)\n",
|
|
" self.table = Table(table_name, MetaData(), autoload_with=self.engine)\n",
|
|
" self._conn = None\n",
|
|
"\n",
|
|
" def with_conn(self, connection):\n",
|
|
" self._conn = connection\n",
|
|
" return self\n",
|
|
"\n",
|
|
" def load_df_to_destination(self, df, _result):\n",
|
|
" \"\"\"Loads data into destination.\"\"\"\n",
|
|
" self._conn.execute(insert(self.table).values(), df.to_dict(orient=\"records\"))\n",
|
|
"\n",
|
|
" def rollback(self, _df, _result):\n",
|
|
" \"\"\"Clears data previously loaded\"\"\"\n",
|
|
" self._conn.rollback()\n",
|
|
"\n",
|
|
" def __enter__(self):\n",
|
|
" conn = self.engine.connect()\n",
|
|
" return self.with_conn(conn)\n",
|
|
"\n",
|
|
" def __exit__(self ,type, value, traceback):\n",
|
|
" self._conn.close()\n",
|
|
" self._conn = None"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6450d996-9429-456a-9f19-63fb17cbb83b",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Example 1: loading data in chunks with the `DataFrameValidator.validate` method**"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"id": "0a381ca8-da2e-4860-ad59-91fc7997a363",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Example 1: loading data in chunks with the `DataFrameValidator.validate` method\n",
|
|
"from metadata.sdk.data_quality.dataframes.validation_results import ValidationResult\n",
|
|
"\n",
|
|
"validation_session = SQLAlchemyValidationSession(\n",
|
|
" connection_string=DESTINATION,\n",
|
|
" table_name=\"dw_taxi_trips\"\n",
|
|
")\n",
|
|
"\n",
|
|
"results = []\n",
|
|
"with validation_session as session:\n",
|
|
" for transformed_df in load_and_transform():\n",
|
|
" result = validator.validate(transformed_df)\n",
|
|
"\n",
|
|
" results.append(result)\n",
|
|
" \n",
|
|
" if result.success:\n",
|
|
" session.load_df_to_destination(transformed_df, result)\n",
|
|
" else:\n",
|
|
" session.rollback(df, result)\n",
|
|
" break\n",
|
|
"\n",
|
|
"# Aggregate results for each test case for every chunk\n",
|
|
"results = ValidationResult.merge(*results)\n",
|
|
"\n",
|
|
"# Publish to the server\n",
|
|
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "39ea430b-ac7e-4d8c-a8fc-537fcb4e3d11",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Example 2: loading data in chunks with the `DataFrameValidator.run` method**\n",
|
|
"\n",
|
|
"This method is a shortcut to the loop above that returns results already merged\n",
|
|
"\n",
|
|
"> ⚠ **NOTE:** there is however one caveat. Some of our tests require the whole dataframe to be in memory for them to work. For example, tests counting the total amount of rows would return false results because they'd be running on subsets of the data. Future versions of the SDK will solve this issue. For the time being, if your data does not fit in memory you should resort to the example in [test_workflow.ipynb](/lab/tree/notebooks/test_workflow.ipynb)."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"id": "6ad82243-e48e-4e1e-8e47-47d7a563e999",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"with validation_session as session:\n",
|
|
" results = validator.run(\n",
|
|
" load_and_transform(),\n",
|
|
" on_success=session.load_df_to_destination,\n",
|
|
" on_failure=session.rollback,\n",
|
|
" )\n",
|
|
"\n",
|
|
"# Results are already merged and ready to publish in OpenMetadata\n",
|
|
"results.publish(\"Tutorial Postgres.stg.public.dw_taxi_trips\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 13,
|
|
"id": "42414d5e-fbb8-43da-bd65-310c34c35d81",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\n",
|
|
"Test: amount_is_greater_than_0\n",
|
|
"Status: TestCaseStatus.Success\n",
|
|
"Result: Found min=9.45, max=54.65 vs. the expected min=1.0, max=inf.; Found min=8.3, max=89.7 vs. the expected min=1.0, max=inf.; Found min=9.4, max=81.15 vs. the expected min=1.0, max=inf.; Found min=5.9, max=51.66 vs. the expected min=1.0, max=inf.; Found min=6.9, max=157.71 vs. the expected min=1.0, max=inf.; Found min=5.2, max=69.24 vs. the expected min=1.0, max=inf.; Found min=7.92, max=89.25 vs. the expected min=1.0, max=inf.; Found min=7.7, max=66.5 vs. the expected min=1.0, max=inf.; Found min=5.2, max=62.7 vs. the expected min=1.0, max=inf.; Found min=7.7, max=100.41 vs. the expected min=1.0, max=inf.\n",
|
|
"\n",
|
|
"Test: trip_distance_to_be_at_most_100\n",
|
|
"Status: TestCaseStatus.Success\n",
|
|
"Result: Found min=0.02, max=7.45 vs. the expected min=-inf, max=100.0.; Found min=0.03, max=9.23 vs. the expected min=-inf, max=100.0.; Found min=0.06, max=8.79 vs. the expected min=-inf, max=100.0.; Found min=0.05, max=9.39 vs. the expected min=-inf, max=100.0.; Found min=0.04, max=30.4 vs. the expected min=-inf, max=100.0.; Found min=0.15, max=15.9 vs. the expected min=-inf, max=100.0.; Found min=0.2, max=14.02 vs. the expected min=-inf, max=100.0.; Found min=0.01, max=10.7 vs. the expected min=-inf, max=100.0.; Found min=0.15, max=7.81 vs. the expected min=-inf, max=100.0.; Found min=0.09, max=16.88 vs. the expected min=-inf, max=100.0.\n",
|
|
"\n",
|
|
"Test: trip_duration_to_be_between_1_and_180_minutes\n",
|
|
"Status: TestCaseStatus.Success\n",
|
|
"Result: Found min=1.1, max=30.816666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.0833333333333333, max=32.93333333333333 vs. the expected min=1.0, max=180.0.; Found min=1.1333333333333333, max=30.1 vs. the expected min=1.0, max=180.0.; Found min=1.0666666666666667, max=24.883333333333333 vs. the expected min=1.0, max=180.0.; Found min=1.0, max=58.15 vs. the expected min=1.0, max=180.0.; Found min=1.0833333333333333, max=167.06666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.0333333333333334, max=37.46666666666667 vs. the expected min=1.0, max=180.0.; Found min=1.1333333333333333, max=32.75 vs. the expected min=1.0, max=180.0.; Found min=1.1, max=30.516666666666666 vs. the expected min=1.0, max=180.0.; Found min=1.1666666666666667, max=37.583333333333336 vs. the expected min=1.0, max=180.0.\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"# In both cases, results should be the same\n",
|
|
"for test_case, test_result in results.test_cases_and_results:\n",
|
|
"\n",
|
|
" print(f\"\\nTest: {test_case.name.root}\")\n",
|
|
" print(f\"Status: {test_result.testCaseStatus}\")\n",
|
|
" print(f\"Result: {test_result.result}\")"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.11.10"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
} |