chore(cli): drop support for python 3.7 (#9731)

This commit is contained in:
Harshal Sheth 2024-01-29 10:50:47 -08:00 committed by GitHub
parent f3cc4e068a
commit 1498c36875
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 799 additions and 857 deletions

View File

@ -31,7 +31,7 @@ jobs:
# DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }}
strategy:
matrix:
python-version: ["3.7", "3.10"]
python-version: ["3.8", "3.10"]
command:
[
"testQuick",
@ -40,7 +40,7 @@ jobs:
"testIntegrationBatch2",
]
include:
- python-version: "3.7"
- python-version: "3.8"
- python-version: "3.10"
fail-fast: false
steps:

View File

@ -24,7 +24,7 @@ source venv/bin/activate # activate the environment
Once inside the virtual environment, install `datahub` using the following commands
```shell
# Requires Python 3.7+
# Requires Python 3.8+
python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub
# validate that the install was successful

View File

@ -10,8 +10,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- Neo4j 5.x, may require migration from 4.x
- Build requires JDK17 (Runtime Java 11)
- Build requires Docker Compose > 2.20
- #9731 - The `acryl-datahub` CLI now requires Python 3.8+
- #9601 - The Unity Catalog(UC) ingestion source config `include_metastore` is now disabled by default. This change will affect the urns of all entities in the workspace.<br/>
Entity Hierarchy with `include_metastore: true` (Old)
Entity Hierarchy with `include_metastore: true` (Old)
```
- UC Metastore
- Catalog
@ -19,16 +21,19 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- Table
```
Entity Hierarchy with `include_metastore: false` (New)
Entity Hierarchy with `include_metastore: false` (New)
```
- Catalog
- Schema
- Table
```
We recommend using `platform_instance` for differentiating across metastores.
If stateful ingestion is enabled, running ingestion with latest cli version will perform all required cleanup. Otherwise, we recommend soft deleting all databricks data via the DataHub CLI:
`datahub delete --platform databricks --soft` and then reingesting with latest cli version.
`datahub delete --platform databricks --soft` and then reingesting with latest cli version.
- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
### Potential Downtime

View File

@ -22,7 +22,7 @@ If you're interested in a managed version, [Acryl Data](https://www.acryldata.io
| Linux | [Docker for Linux](https://docs.docker.com/desktop/install/linux-install/) and [Docker Compose](https://docs.docker.com/compose/install/linux/) |
- **Launch the Docker engine** from command line or the desktop app.
- Ensure you have **Python 3.7+** installed & configured. (Check using `python3 --version`).
- Ensure you have **Python 3.8+** installed & configured. (Check using `python3 --version`).
:::note Docker Resource Allocation

View File

@ -18,16 +18,10 @@ _version: str = package_metadata["__version__"]
_self_pin = f"=={_version}" if not _version.endswith("dev0") else ""
rest_common = {"requests", "requests_file"}
base_requirements = {
# Compatibility.
"dataclasses>=0.6; python_version < '3.7'",
"mypy_extensions>=0.4.3",
f"acryl-datahub[datahub-rest]{_self_pin}",
# Actual dependencies.
"pydantic>=1.5.1",
"apache-airflow >= 2.0.2",
*rest_common,
}
plugins: Dict[str, Set[str]] = {
@ -42,9 +36,8 @@ plugins: Dict[str, Set[str]] = {
},
"plugin-v1": set(),
"plugin-v2": {
# The v2 plugin requires Python 3.8+.
f"acryl-datahub[sql-parser]{_self_pin}",
"openlineage-airflow==1.2.0; python_version >= '3.8'",
"openlineage-airflow==1.2.0",
},
}
@ -144,7 +137,6 @@ setuptools.setup(
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
@ -161,7 +153,7 @@ setuptools.setup(
],
# Package info.
zip_safe=False,
python_requires=">=3.7",
python_requires=">=3.8",
package_data={
"datahub_airflow_plugin": ["py.typed"],
},

View File

@ -1,7 +1,6 @@
import datetime
import json
import os
import sys
from contextlib import contextmanager
from typing import Iterator
from unittest import mock
@ -318,137 +317,134 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
# Check that the right things were emitted.
assert mock_emitter.emit.call_count == 17 if capture_executions else 9
# Running further checks based on python version because args only exists in python 3.8+
if sys.version_info > (3, 8):
assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo"
assert (
mock_emitter.method_calls[0].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
# TODO: Replace this with a golden file-based comparison.
assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo"
assert (
mock_emitter.method_calls[0].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
assert mock_emitter.method_calls[1].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[1].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
assert mock_emitter.method_calls[1].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[1].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
assert mock_emitter.method_calls[2].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[2].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
assert mock_emitter.method_calls[2].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[2].args[0].entityUrn
== "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)"
)
assert mock_emitter.method_calls[3].args[0].aspectName == "dataJobInfo"
assert mock_emitter.method_calls[3].args[0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[3].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)
assert mock_emitter.method_calls[4].args[0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[4].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0]
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.outputDatasets[0]
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)
assert mock_emitter.method_calls[5].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[5].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert mock_emitter.method_calls[6].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[6].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)
assert mock_emitter.method_calls[7].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[7].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)
assert mock_emitter.method_calls[8].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[8].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
)
if capture_executions:
assert (
mock_emitter.method_calls[3].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
mock_emitter.method_calls[9].args[0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[9].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[4].args[0].aspectName == "dataJobInputOutput"
mock_emitter.method_calls[10].args[0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[4].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
mock_emitter.method_calls[10].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)"
mock_emitter.method_calls[11].args[0].aspectName
== "dataProcessInstanceInput"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
mock_emitter.method_calls[11].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0]
mock_emitter.method_calls[12].args[0].aspectName
== "dataProcessInstanceOutput"
)
assert (
mock_emitter.method_calls[12].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert mock_emitter.method_calls[13].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[13].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert mock_emitter.method_calls[14].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[4].args[0].aspect.outputDatasets[0]
mock_emitter.method_calls[14].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)
assert mock_emitter.method_calls[5].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[5].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
mock_emitter.method_calls[15].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert mock_emitter.method_calls[6].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[6].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
mock_emitter.method_calls[15].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert mock_emitter.method_calls[7].args[0].aspectName == "ownership"
assert (
mock_emitter.method_calls[7].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
mock_emitter.method_calls[16].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert mock_emitter.method_calls[8].args[0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[8].args[0].entityUrn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)"
mock_emitter.method_calls[16].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
if capture_executions:
assert (
mock_emitter.method_calls[9].args[0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[9].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[10].args[0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[10].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[11].args[0].aspectName
== "dataProcessInstanceInput"
)
assert (
mock_emitter.method_calls[11].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[12].args[0].aspectName
== "dataProcessInstanceOutput"
)
assert (
mock_emitter.method_calls[12].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert mock_emitter.method_calls[13].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[13].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert mock_emitter.method_calls[14].args[0].aspectName == "status"
assert (
mock_emitter.method_calls[14].args[0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)"
)
assert (
mock_emitter.method_calls[15].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[15].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)
assert (
mock_emitter.method_calls[16].args[0].aspectName
== "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[16].args[0].entityUrn
== "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29"
)

View File

@ -17,7 +17,7 @@ def get_coverage_arg(test_name) {
task checkPythonVersion(type: Exec) {
commandLine python_executable, '-c',
'import sys; assert (3, 11) > sys.version_info >= (3, 7), f"Python version {sys.version_info[:2]} not allowed"'
'import sys; assert (3, 11) > sys.version_info >= (3, 8), f"Python version {sys.version_info[:2]} not allowed"'
}
task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {

View File

@ -2,26 +2,31 @@
## Installing the CLI
Make sure you have installed DataHub CLI before following this guide.
Make sure you have installed DataHub CLI before following this guide.
```shell
# Requires Python 3.7+
# Requires Python 3.8+
python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub
# validate that the install was successful
datahub version
# If you see "command not found", try running this instead: python3 -m datahub version
```
Check out the [CLI Installation Guide](../docs/cli.md#installation) for more installation options and troubleshooting tips.
Check out the [CLI Installation Guide](../docs/cli.md#installation) for more installation options and troubleshooting tips.
After that, install the required plugin for the ingestion.
```shell
pip install 'acryl-datahub[datahub-rest]' # install the required plugin
```
Check out the [alternative installation options](../docs/cli.md#alternate-installation-options) for more reference.
Check out the [alternative installation options](../docs/cli.md#alternate-installation-options) for more reference.
## Configuring a Recipe
Create a recipe.yml file that defines the source and sink for metadata, as shown below.
```yaml
# my_reipe.yml
source:
@ -29,7 +34,7 @@ source:
config:
option_1: <value>
...
sink:
type: <sink_type_name>
config:
@ -39,7 +44,8 @@ sink:
For more information and examples on configuring recipes, please refer to [Recipes](recipe_overview.md).
## Ingesting Metadata
You can run ingestion using `datahub ingest` like below.
You can run ingestion using `datahub ingest` like below.
```shell
datahub ingest -c <path_to_recipe_file.yml>
@ -48,6 +54,7 @@ datahub ingest -c <path_to_recipe_file.yml>
## Reference
Please refer the following pages for advanced guids on CLI ingestion.
- [Reference for `datahub ingest` command](../docs/cli.md#ingest)
- [UI Ingestion Guide](../docs/ui-ingestion.md)
@ -56,4 +63,4 @@ DataHub server uses a 3 digit versioning scheme, while the CLI uses a 4 digit sc
We do this because we do CLI releases at a much higher frequency than server releases, usually every few days vs twice a month.
For ingestion sources, any breaking changes will be highlighted in the [release notes](../docs/how/updating-datahub.md). When fields are deprecated or otherwise changed, we will try to maintain backwards compatibility for two server releases, which is about 4-6 weeks. The CLI will also print warnings whenever deprecated options are used.
:::
:::

View File

@ -9,10 +9,10 @@ Also take a look at the guide to [adding a source](./adding-source.md).
### Requirements
1. Python 3.7+ must be installed in your host environment.
1. Python 3.8+ must be installed in your host environment.
2. Java 17 (gradle won't work with newer or older versions)
4. On Debian/Ubuntu: `sudo apt install python3-dev python3-venv`
5. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel`
3. On Debian/Ubuntu: `sudo apt install python3-dev python3-venv`
4. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel`
### Set up your Python environment

View File

@ -1,4 +1,3 @@
import sys
from typing import Dict, Set
import setuptools
@ -11,7 +10,6 @@ with open("./src/datahub/__init__.py") as fp:
base_requirements = {
# Typing extension should be >=3.10.0.2 ideally but we can't restrict due to a Airflow 2.1 dependency conflict.
"typing_extensions>=3.7.4.3",
"mypy_extensions>=0.4.3",
# Actual dependencies.
"typing-inspect",
# pydantic 1.8.2 is incompatible with mypy 0.910.
@ -48,9 +46,7 @@ framework_common = {
"click-spinner",
"requests_file",
"jsonref",
# jsonschema drops python 3.7 support in v4.18.0
"jsonschema<=4.17.3; python_version < '3.8'",
"jsonschema; python_version >= '3.8'",
"jsonschema",
"ruamel.yaml",
}
@ -463,7 +459,7 @@ base_dev_requirements = {
"black==22.12.0",
"coverage>=5.1",
"faker>=18.4.0",
"flake8>=3.8.3", # DEPRECATION: Once we drop Python 3.7, we can pin to 6.x.
"flake8>=6.0.0",
"flake8-tidy-imports>=4.3.0",
"flake8-bugbear==23.3.12",
"isort>=5.7.0",
@ -489,9 +485,9 @@ base_dev_requirements = {
"delta-lake",
"druid",
"elasticsearch",
"feast" if sys.version_info >= (3, 8) else None,
"iceberg" if sys.version_info >= (3, 8) else None,
"mlflow" if sys.version_info >= (3, 8) else None,
"feast",
"iceberg",
"mlflow",
"json-schema",
"ldap",
"looker",
@ -544,14 +540,14 @@ full_test_dev_requirements = {
"clickhouse",
"delta-lake",
"druid",
"feast" if sys.version_info >= (3, 8) else None,
"feast",
"hana",
"hive",
"iceberg" if sys.version_info >= (3, 8) else None,
"iceberg",
"kafka-connect",
"ldap",
"mongodb",
"mssql" if sys.version_info >= (3, 8) else None,
"mssql",
"mysql",
"mariadb",
"redash",
@ -699,7 +695,6 @@ See the [DataHub docs](https://datahubproject.io/docs/metadata-ingestion).
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
@ -716,7 +711,7 @@ See the [DataHub docs](https://datahubproject.io/docs/metadata-ingestion).
],
# Package info.
zip_safe=False,
python_requires=">=3.7",
python_requires=">=3.8",
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="./src"),
package_data={

View File

@ -16,16 +16,9 @@ def nice_version_name() -> str:
return __version__
if sys.version_info < (3, 7):
if sys.version_info < (3, 8):
warnings.warn(
"DataHub requires Python 3.7 or newer. "
"Please upgrade your Python version to continue using DataHub.",
FutureWarning,
stacklevel=2,
)
elif sys.version_info < (3, 8):
warnings.warn(
"DataHub will require Python 3.8 or newer soon. "
"DataHub requires Python 3.8 or newer. "
"Please upgrade your Python version to continue using DataHub.",
FutureWarning,
stacklevel=2,

View File

@ -2,11 +2,10 @@ import dataclasses
import json
import logging
import pprint
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, Optional
from typing import Any, Optional
import humanfriendly
import pydantic
@ -19,12 +18,6 @@ from datahub.utilities.lossy_collections import LossyList
logger = logging.getLogger(__name__)
LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"]
# The sort_dicts option was added in Python 3.8.
if sys.version_info >= (3, 8):
PPRINT_OPTIONS = {"sort_dicts": False}
else:
PPRINT_OPTIONS: Dict = {}
@runtime_checkable
class SupportsAsObj(Protocol):
@ -32,14 +25,6 @@ class SupportsAsObj(Protocol):
...
def _stacklevel_if_supported(level: int) -> dict:
# The logging module added support for stacklevel in Python 3.8.
if sys.version_info >= (3, 8):
return {"stacklevel": level}
else:
return {}
@dataclass
class Report(SupportsAsObj):
@staticmethod
@ -95,7 +80,7 @@ class Report(SupportsAsObj):
}
def as_string(self) -> str:
return pprint.pformat(self.as_obj(), width=150, **PPRINT_OPTIONS)
return pprint.pformat(self.as_obj(), width=150, sort_dicts=False)
def as_json(self) -> str:
return json.dumps(self.as_obj())
@ -118,7 +103,7 @@ class ReportAttribute(BaseModel):
return log_levels[self.severity]
def log(self, msg: str) -> None:
logger.log(level=self.logger_sev, msg=msg, **_stacklevel_if_supported(3))
logger.log(level=self.logger_sev, msg=msg, stacklevel=3)
class EntityFilterReport(ReportAttribute):

View File

@ -1,8 +1,3 @@
import sys
if sys.version_info < (3, 8):
raise ImportError("Feast is only supported on Python 3.8+")
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple, Union

View File

@ -1,8 +1,3 @@
import sys
if sys.version_info < (3, 8):
raise ImportError("Iceberg is only supported on Python 3.8+")
import json
import logging
import uuid

View File

@ -1,9 +1,3 @@
import sys
if sys.version_info < (3, 8):
raise ImportError("MLflow is only supported on Python 3.8+")
from dataclasses import dataclass
from typing import Any, Callable, Iterable, Optional, TypeVar, Union

View File

@ -1,7 +1,7 @@
from collections import Counter
from typing import Any, Counter as CounterType, Dict, Sequence, Tuple, Union
from mypy_extensions import TypedDict
from typing_extensions import TypedDict
class BasicSchemaDescription(TypedDict):

View File

@ -1,6 +1,3 @@
import sys
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
@ -8,10 +5,6 @@ from tests.test_helpers import mce_helpers
FROZEN_TIME = "2020-04-14 07:00:00"
pytestmark = pytest.mark.skipif(
sys.version_info < (3, 8), reason="requires python 3.8 or higher"
)
@freeze_time(FROZEN_TIME)
def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time):

View File

@ -1,5 +1,4 @@
import subprocess
import sys
from typing import Any, Dict, List
from unittest.mock import patch
@ -15,13 +14,7 @@ from tests.test_helpers.state_helpers import (
validate_all_providers_have_committed_successfully,
)
pytestmark = [
pytest.mark.integration_batch_1,
# Skip tests if not on Python 3.8 or higher.
pytest.mark.skipif(
sys.version_info < (3, 8), reason="Requires python 3.8 or higher"
),
]
pytestmark = pytest.mark.integration_batch_1
FROZEN_TIME = "2020-04-14 07:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"

View File

@ -1,104 +1,106 @@
import sys
from pathlib import Path
from typing import Any, Dict, TypeVar
if sys.version_info >= (3, 8):
from pathlib import Path
from typing import Any, Dict, TypeVar
import pytest
from mlflow import MlflowClient
import pytest
from mlflow import MlflowClient
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
T = TypeVar("T")
T = TypeVar("T")
@pytest.fixture
def tracking_uri(tmp_path: Path) -> str:
return str(tmp_path / "mlruns")
@pytest.fixture
def tracking_uri(tmp_path: Path) -> str:
return str(tmp_path / "mlruns")
@pytest.fixture
def sink_file_path(tmp_path: Path) -> str:
return str(tmp_path / "mlflow_source_mcps.json")
@pytest.fixture
def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]:
source_type = "mlflow"
return {
"run_id": "mlflow-source-test",
"source": {
"type": source_type,
"config": {
"tracking_uri": tracking_uri,
},
@pytest.fixture
def sink_file_path(tmp_path: Path) -> str:
return str(tmp_path / "mlflow_source_mcps.json")
@pytest.fixture
def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]:
source_type = "mlflow"
return {
"run_id": "mlflow-source-test",
"source": {
"type": source_type,
"config": {
"tracking_uri": tracking_uri,
},
"sink": {
"type": "file",
"config": {
"filename": sink_file_path,
},
},
"sink": {
"type": "file",
"config": {
"filename": sink_file_path,
},
}
},
}
@pytest.fixture
def generate_mlflow_data(tracking_uri: str) -> None:
client = MlflowClient(tracking_uri=tracking_uri)
experiment_name = "test-experiment"
run_name = "test-run"
model_name = "test-model"
test_experiment_id = client.create_experiment(experiment_name)
test_run = client.create_run(
experiment_id=test_experiment_id,
run_name=run_name,
)
client.log_param(
run_id=test_run.info.run_id,
key="p",
value=1,
)
client.log_metric(
run_id=test_run.info.run_id,
key="m",
value=0.85,
)
client.create_registered_model(
name=model_name,
tags=dict(
model_id=1,
model_env="test",
),
description="This a test registered model",
)
client.create_model_version(
name=model_name,
source="dummy_dir/dummy_file",
run_id=test_run.info.run_id,
tags=dict(model_version_id=1),
)
client.transition_model_version_stage(
name=model_name,
version="1",
stage="Archived",
)
def test_ingestion(
pytestconfig,
mock_time,
sink_file_path,
pipeline_config,
generate_mlflow_data,
):
print(f"MCPs file path: {sink_file_path}")
golden_file_path = (
pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json"
)
@pytest.fixture
def generate_mlflow_data(tracking_uri: str) -> None:
client = MlflowClient(tracking_uri=tracking_uri)
experiment_name = "test-experiment"
run_name = "test-run"
model_name = "test-model"
test_experiment_id = client.create_experiment(experiment_name)
test_run = client.create_run(
experiment_id=test_experiment_id,
run_name=run_name,
)
client.log_param(
run_id=test_run.info.run_id,
key="p",
value=1,
)
client.log_metric(
run_id=test_run.info.run_id,
key="m",
value=0.85,
)
client.create_registered_model(
name=model_name,
tags=dict(
model_id=1,
model_env="test",
),
description="This a test registered model",
)
client.create_model_version(
name=model_name,
source="dummy_dir/dummy_file",
run_id=test_run.info.run_id,
tags=dict(model_version_id=1),
)
client.transition_model_version_stage(
name=model_name,
version="1",
stage="Archived",
)
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=sink_file_path,
golden_path=golden_file_path,
)
def test_ingestion(
pytestconfig,
mock_time,
sink_file_path,
pipeline_config,
generate_mlflow_data,
):
print(f"MCPs file path: {sink_file_path}")
golden_file_path = (
pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json"
)
pipeline = Pipeline.create(pipeline_config)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=sink_file_path,
golden_path=golden_file_path,
)

View File

@ -1,6 +1,5 @@
import os
import subprocess
import sys
import time
import pytest
@ -9,10 +8,6 @@ from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
pytestmark = pytest.mark.skipif(
sys.version_info < (3, 8), reason="requires python 3.8 or higher"
)
@pytest.fixture(scope="module")
def mssql_runner(docker_compose_runner, pytestconfig):

View File

@ -1,482 +1,477 @@
import sys
import uuid
from decimal import Decimal
from typing import Any, Optional
import pytest
from pydantic import ValidationError
from pyiceberg.schema import Schema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
if sys.version_info >= (3, 8):
from pyiceberg.schema import Schema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.iceberg.iceberg import (
IcebergProfiler,
IcebergSource,
IcebergSourceConfig,
)
from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
FixedTypeClass,
NumberTypeClass,
RecordTypeClass,
StringTypeClass,
TimeTypeClass,
)
def with_iceberg_source() -> IcebergSource:
catalog: IcebergCatalogConfig = IcebergCatalogConfig(
name="test", type="rest", config={}
)
return IcebergSource(
ctx=PipelineContext(run_id="iceberg-source-test"),
config=IcebergSourceConfig(catalog=catalog),
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.iceberg.iceberg import (
IcebergProfiler,
IcebergSource,
IcebergSourceConfig,
)
from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
FixedTypeClass,
NumberTypeClass,
RecordTypeClass,
StringTypeClass,
TimeTypeClass,
def with_iceberg_profiler() -> IcebergProfiler:
iceberg_source_instance = with_iceberg_source()
return IcebergProfiler(
iceberg_source_instance.report, iceberg_source_instance.config.profiling
)
pytestmark = pytest.mark.skipif(
sys.version_info < (3, 8), reason="requires python 3.8 or higher"
)
def with_iceberg_source() -> IcebergSource:
catalog: IcebergCatalogConfig = IcebergCatalogConfig(
name="test", type="rest", config={}
)
return IcebergSource(
ctx=PipelineContext(run_id="iceberg-source-test"),
config=IcebergSourceConfig(catalog=catalog),
)
def assert_field(
schema_field: SchemaField,
expected_description: Optional[str],
expected_nullable: bool,
expected_type: Any,
) -> None:
assert (
schema_field.description == expected_description
), f"Field description '{schema_field.description}' is different from expected description '{expected_description}'"
assert (
schema_field.nullable == expected_nullable
), f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'"
assert isinstance(
schema_field.type.type, expected_type
), f"Field type {schema_field.type.type} is different from expected type {expected_type}"
def with_iceberg_profiler() -> IcebergProfiler:
iceberg_source_instance = with_iceberg_source()
return IcebergProfiler(
iceberg_source_instance.report, iceberg_source_instance.config.profiling
)
def assert_field(
schema_field: SchemaField,
expected_description: Optional[str],
expected_nullable: bool,
expected_type: Any,
) -> None:
assert (
schema_field.description == expected_description
), f"Field description '{schema_field.description}' is different from expected description '{expected_description}'"
assert (
schema_field.nullable == expected_nullable
), f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'"
assert isinstance(
schema_field.type.type, expected_type
), f"Field type {schema_field.type.type} is different from expected type {expected_type}"
def test_config_no_catalog():
"""
Test when no Iceberg catalog is provided.
"""
with pytest.raises(ValidationError, match="catalog"):
IcebergSourceConfig() # type: ignore
def test_config_no_catalog():
"""
Test when no Iceberg catalog is provided.
"""
with pytest.raises(ValidationError, match="catalog"):
IcebergSourceConfig() # type: ignore
def test_config_catalog_not_configured():
"""
Test when an Iceberg catalog is provided, but not properly configured.
"""
with pytest.raises(ValidationError):
IcebergCatalogConfig() # type: ignore
def test_config_catalog_not_configured():
"""
Test when an Iceberg catalog is provided, but not properly configured.
"""
with pytest.raises(ValidationError):
IcebergCatalogConfig() # type: ignore
with pytest.raises(ValidationError, match="conf"):
IcebergCatalogConfig(type="a type") # type: ignore
with pytest.raises(ValidationError, match="conf"):
IcebergCatalogConfig(type="a type") # type: ignore
with pytest.raises(ValidationError, match="type"):
IcebergCatalogConfig(conf={}) # type: ignore
with pytest.raises(ValidationError, match="type"):
IcebergCatalogConfig(conf={}) # type: ignore
def test_config_for_tests():
"""
Test valid iceberg source that will be used in unit tests.
"""
with_iceberg_source()
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_primitive_type_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a primitive typed Iceberg field to a SchemaField
"""
iceberg_source_instance = with_iceberg_source()
for column in [
NestedField(
1, "required_field", iceberg_type, True, "required field documentation"
),
NestedField(
1, "optional_field", iceberg_type, False, "optional field documentation"
),
]:
schema = Schema(column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(
schema
)
assert (
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0],
column.doc,
column.optional,
expected_schema_field_type,
)
def test_config_for_tests():
"""
Test valid iceberg source that will be used in unit tests.
"""
with_iceberg_source()
@pytest.mark.parametrize(
"iceberg_type, expected_array_nested_type",
[
(BinaryType(), "bytes"),
(BooleanType(), "boolean"),
(DateType(), "date"),
(
DecimalType(3, 2),
"decimal",
),
(DoubleType(), "double"),
(FixedType(4), "fixed"),
(FloatType(), "float"),
(IntegerType(), "int"),
(LongType(), "long"),
(StringType(), "string"),
(
TimestampType(),
"timestamp-micros",
),
(
TimestamptzType(),
"timestamp-micros",
),
(TimeType(), "time-micros"),
(
UUIDType(),
"uuid",
),
],
)
def test_iceberg_list_to_schema_field(
iceberg_type: PrimitiveType, expected_array_nested_type: Any
) -> None:
"""
Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type.
"""
for list_column in [
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
True,
"required field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
True,
"required field, optional element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
False,
"optional field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
False,
"optional field, optional element documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(list_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(
schema
)
assert (
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass
)
assert isinstance(
schema_fields[0].type.type, ArrayType
), f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}"
arrayType: ArrayType = schema_fields[0].type.type
assert arrayType.nestedType == [
expected_array_nested_type
], f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}"
@pytest.mark.parametrize(
"iceberg_type, expected_map_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_map_to_schema_field(
iceberg_type: PrimitiveType, expected_map_type: Any
) -> None:
"""
Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value.
"""
for map_column in [
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
True,
"required field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
True,
"required field, optional value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
False,
"optional field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
False,
"optional field, optional value documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(map_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(
schema
)
# Converting an Iceberg Map type will be done by creating an array of struct(key, value) records.
# The first field will be the array.
assert (
len(schema_fields) == 3
), f"Expected 3 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass
)
# The second field will be the key type
assert_field(schema_fields[1], None, False, expected_map_type)
# The third field will be the value type
assert_field(
schema_fields[2],
None,
not map_column.field_type.value_required,
expected_map_type,
)
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_struct_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a struct typed Iceberg field to a RecordType SchemaField.
"""
field1 = NestedField(11, "field1", iceberg_type, True, "field documentation")
struct_column = NestedField(
1, "structField", StructType(field1), True, "struct documentation"
)
iceberg_source_instance = with_iceberg_source()
schema = Schema(struct_column)
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_primitive_type_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a primitive typed Iceberg field to a SchemaField
"""
iceberg_source_instance = with_iceberg_source()
for column in [
NestedField(
1, "required_field", iceberg_type, True, "required field documentation"
),
NestedField(
1, "optional_field", iceberg_type, False, "optional field documentation"
),
]:
schema = Schema(column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert (
len(schema_fields) == 2
), f"Expected 2 fields, but got {len(schema_fields)}"
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass
)
assert_field(
schema_fields[1], field1.doc, field1.optional, expected_schema_field_type
schema_fields[0],
column.doc,
column.optional,
expected_schema_field_type,
)
@pytest.mark.parametrize(
"value_type, value, expected_value",
[
(BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"),
(BooleanType(), True, "True"),
(DateType(), 19543, "2023-07-05"),
(DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"),
(DoubleType(), 3.4, "3.4"),
(FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"),
(FloatType(), 3.4, "3.4"),
(IntegerType(), 3, "3"),
(LongType(), 4294967295000, "4294967295000"),
(StringType(), "a string", "a string"),
(
TimestampType(),
1688559488157000,
"2023-07-05T12:18:08.157000",
),
(
TimestamptzType(),
1688559488157000,
"2023-07-05T12:18:08.157000+00:00",
),
(TimeType(), 40400000000, "11:13:20"),
(
UUIDType(),
uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"),
"00010203-0405-0607-0809-0a0b0c0d0e0f",
),
],
)
def test_iceberg_profiler_value_render(
value_type: IcebergType, value: Any, expected_value: Optional[str]
) -> None:
iceberg_profiler_instance = with_iceberg_profiler()
@pytest.mark.parametrize(
"iceberg_type, expected_array_nested_type",
[
(BinaryType(), "bytes"),
(BooleanType(), "boolean"),
(DateType(), "date"),
(
DecimalType(3, 2),
"decimal",
),
(DoubleType(), "double"),
(FixedType(4), "fixed"),
(FloatType(), "float"),
(IntegerType(), "int"),
(LongType(), "long"),
(StringType(), "string"),
(
TimestampType(),
"timestamp-micros",
),
(
TimestamptzType(),
"timestamp-micros",
),
(TimeType(), "time-micros"),
(
UUIDType(),
"uuid",
),
],
)
def test_iceberg_list_to_schema_field(
iceberg_type: PrimitiveType, expected_array_nested_type: Any
) -> None:
"""
Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type.
"""
for list_column in [
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
True,
"required field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
True,
"required field, optional element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, True),
False,
"optional field, required element documentation",
),
NestedField(
1,
"listField",
ListType(2, iceberg_type, False),
False,
"optional field, optional element documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(list_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert (
iceberg_profiler_instance._render_value("a.dataset", value_type, value)
== expected_value
len(schema_fields) == 1
), f"Expected 1 field, but got {len(schema_fields)}"
assert_field(
schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass
)
assert isinstance(
schema_fields[0].type.type, ArrayType
), f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}"
arrayType: ArrayType = schema_fields[0].type.type
assert arrayType.nestedType == [
expected_array_nested_type
], f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}"
@pytest.mark.parametrize(
"iceberg_type, expected_map_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_map_to_schema_field(
iceberg_type: PrimitiveType, expected_map_type: Any
) -> None:
"""
Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value.
"""
for map_column in [
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
True,
"required field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
True,
"required field, optional value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, True),
False,
"optional field, required value documentation",
),
NestedField(
1,
"mapField",
MapType(11, iceberg_type, 12, iceberg_type, False),
False,
"optional field, optional value documentation",
),
]:
iceberg_source_instance = with_iceberg_source()
schema = Schema(map_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
# Converting an Iceberg Map type will be done by creating an array of struct(key, value) records.
# The first field will be the array.
assert (
len(schema_fields) == 3
), f"Expected 3 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass
)
def test_avro_decimal_bytes_nullable() -> None:
"""
The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do.
NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes.
"""
import avro.schema
# The second field will be the key type
assert_field(schema_fields[1], None, False, expected_map_type)
decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string)
print("\nDecimal (bytes)")
print(
f"Original avro schema string: {decimal_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}"
# The third field will be the value type
assert_field(
schema_fields[2],
None,
not map_column.field_type.value_required,
expected_map_type,
)
decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string)
print("\nDecimal (fixed)")
print(
f"Original avro schema string: {decimal_fixed_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}"
)
boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string)
print("\nBoolean")
print(
f"Original avro schema string: {boolean_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}"
)
@pytest.mark.parametrize(
"iceberg_type, expected_schema_field_type",
[
(BinaryType(), BytesTypeClass),
(BooleanType(), BooleanTypeClass),
(DateType(), DateTypeClass),
(
DecimalType(3, 2),
NumberTypeClass,
),
(DoubleType(), NumberTypeClass),
(FixedType(4), FixedTypeClass),
(FloatType(), NumberTypeClass),
(IntegerType(), NumberTypeClass),
(LongType(), NumberTypeClass),
(StringType(), StringTypeClass),
(
TimestampType(),
TimeTypeClass,
),
(
TimestamptzType(),
TimeTypeClass,
),
(TimeType(), TimeTypeClass),
(
UUIDType(),
StringTypeClass,
),
],
)
def test_iceberg_struct_to_schema_field(
iceberg_type: PrimitiveType, expected_schema_field_type: Any
) -> None:
"""
Test converting a struct typed Iceberg field to a RecordType SchemaField.
"""
field1 = NestedField(11, "field1", iceberg_type, True, "field documentation")
struct_column = NestedField(
1, "structField", StructType(field1), True, "struct documentation"
)
iceberg_source_instance = with_iceberg_source()
schema = Schema(struct_column)
schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema)
assert len(schema_fields) == 2, f"Expected 2 fields, but got {len(schema_fields)}"
assert_field(
schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass
)
assert_field(
schema_fields[1], field1.doc, field1.optional, expected_schema_field_type
)
@pytest.mark.parametrize(
"value_type, value, expected_value",
[
(BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"),
(BooleanType(), True, "True"),
(DateType(), 19543, "2023-07-05"),
(DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"),
(DoubleType(), 3.4, "3.4"),
(FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"),
(FloatType(), 3.4, "3.4"),
(IntegerType(), 3, "3"),
(LongType(), 4294967295000, "4294967295000"),
(StringType(), "a string", "a string"),
(
TimestampType(),
1688559488157000,
"2023-07-05T12:18:08.157000",
),
(
TimestamptzType(),
1688559488157000,
"2023-07-05T12:18:08.157000+00:00",
),
(TimeType(), 40400000000, "11:13:20"),
(
UUIDType(),
uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"),
"00010203-0405-0607-0809-0a0b0c0d0e0f",
),
],
)
def test_iceberg_profiler_value_render(
value_type: IcebergType, value: Any, expected_value: Optional[str]
) -> None:
iceberg_profiler_instance = with_iceberg_profiler()
assert (
iceberg_profiler_instance._render_value("a.dataset", value_type, value)
== expected_value
)
def test_avro_decimal_bytes_nullable() -> None:
"""
The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do.
NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes.
"""
import avro.schema
decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string)
print("\nDecimal (bytes)")
print(
f"Original avro schema string: {decimal_avro_schema_string}"
)
print(f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}")
decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}"""
decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string)
print("\nDecimal (fixed)")
print(
f"Original avro schema string: {decimal_fixed_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}"
)
boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}"""
boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string)
print("\nBoolean")
print(
f"Original avro schema string: {boolean_avro_schema_string}"
)
print(
f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}"
)

View File

@ -1,133 +1,140 @@
import sys
import datetime
from pathlib import Path
from typing import Any, TypeVar, Union
if sys.version_info >= (3, 8):
import datetime
from pathlib import Path
from typing import Any, TypeVar, Union
import pytest
from mlflow import MlflowClient
from mlflow.entities.model_registry import RegisteredModel
from mlflow.entities.model_registry.model_version import ModelVersion
from mlflow.store.entities import PagedList
import pytest
from mlflow import MlflowClient
from mlflow.entities.model_registry import RegisteredModel
from mlflow.entities.model_registry.model_version import ModelVersion
from mlflow.store.entities import PagedList
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource
T = TypeVar("T")
T = TypeVar("T")
@pytest.fixture
def tracking_uri(tmp_path: Path) -> str:
return str(tmp_path / "mlruns")
@pytest.fixture
def tracking_uri(tmp_path: Path) -> str:
return str(tmp_path / "mlruns")
@pytest.fixture
def source(tracking_uri: str) -> MLflowSource:
return MLflowSource(
ctx=PipelineContext(run_id="mlflow-source-test"),
config=MLflowConfig(tracking_uri=tracking_uri),
@pytest.fixture
def source(tracking_uri: str) -> MLflowSource:
return MLflowSource(
ctx=PipelineContext(run_id="mlflow-source-test"),
config=MLflowConfig(tracking_uri=tracking_uri),
)
@pytest.fixture
def registered_model(source: MLflowSource) -> RegisteredModel:
model_name = "abc"
return RegisteredModel(name=model_name)
@pytest.fixture
def model_version(
source: MLflowSource,
registered_model: RegisteredModel,
) -> ModelVersion:
version = "1"
return ModelVersion(
name=registered_model.name,
version=version,
creation_timestamp=datetime.datetime.now(),
)
def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]:
dummy_pages = dict(
page_1=PagedList(items=["a", "b"], token="page_2"),
page_2=PagedList(items=["c", "d"], token="page_3"),
page_3=PagedList(items=["e"], token=None),
)
if page_token is None:
page_to_return = dummy_pages["page_1"]
else:
page_to_return = dummy_pages[page_token]
if kwargs.get("case", "") == "upper":
page_to_return = PagedList(
items=[e.upper() for e in page_to_return.to_list()],
token=page_to_return.token,
)
return page_to_return
@pytest.fixture
def registered_model(source: MLflowSource) -> RegisteredModel:
model_name = "abc"
return RegisteredModel(name=model_name)
@pytest.fixture
def model_version(
source: MLflowSource,
registered_model: RegisteredModel,
) -> ModelVersion:
version = "1"
return ModelVersion(
name=registered_model.name,
version=version,
creation_timestamp=datetime.datetime.now(),
)
def test_stages(source):
mlflow_registered_model_stages = {
"Production",
"Staging",
"Archived",
None,
}
workunits = source._get_tags_workunits()
names = [wu.get_metadata()["metadata"].aspect.name for wu in workunits]
def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]:
dummy_pages = dict(
page_1=PagedList(items=["a", "b"], token="page_2"),
page_2=PagedList(items=["c", "d"], token="page_3"),
page_3=PagedList(items=["e"], token=None),
)
if page_token is None:
page_to_return = dummy_pages["page_1"]
else:
page_to_return = dummy_pages[page_token]
if kwargs.get("case", "") == "upper":
page_to_return = PagedList(
items=[e.upper() for e in page_to_return.to_list()],
token=page_to_return.token,
)
return page_to_return
assert len(names) == len(mlflow_registered_model_stages)
assert set(names) == {
"mlflow_" + str(stage).lower() for stage in mlflow_registered_model_stages
}
def test_stages(source):
mlflow_registered_model_stages = {
"Production",
"Staging",
"Archived",
None,
}
workunits = source._get_tags_workunits()
names = [wu.get_metadata()["metadata"].aspect.name for wu in workunits]
assert len(names) == len(mlflow_registered_model_stages)
assert set(names) == {
"mlflow_" + str(stage).lower() for stage in mlflow_registered_model_stages
}
def test_config_model_name_separator(source, model_version):
name_version_sep = "+"
source.config.model_name_separator = name_version_sep
expected_model_name = (
f"{model_version.name}{name_version_sep}{model_version.version}"
)
expected_urn = f"urn:li:mlModel:(urn:li:dataPlatform:mlflow,{expected_model_name},{source.config.env})"
def test_config_model_name_separator(source, model_version):
name_version_sep = "+"
source.config.model_name_separator = name_version_sep
expected_model_name = (
f"{model_version.name}{name_version_sep}{model_version.version}"
)
expected_urn = f"urn:li:mlModel:(urn:li:dataPlatform:mlflow,{expected_model_name},{source.config.env})"
urn = source._make_ml_model_urn(model_version)
urn = source._make_ml_model_urn(model_version)
assert urn == expected_urn
assert urn == expected_urn
def test_model_without_run(source, registered_model, model_version):
run = source._get_mlflow_run(model_version)
wu = source._get_ml_model_properties_workunit(
registered_model=registered_model,
model_version=model_version,
run=run,
)
aspect = wu.get_metadata()["metadata"].aspect
def test_model_without_run(source, registered_model, model_version):
run = source._get_mlflow_run(model_version)
wu = source._get_ml_model_properties_workunit(
registered_model=registered_model,
model_version=model_version,
run=run,
)
aspect = wu.get_metadata()["metadata"].aspect
assert aspect.hyperParams is None
assert aspect.trainingMetrics is None
assert aspect.hyperParams is None
assert aspect.trainingMetrics is None
def test_traverse_mlflow_search_func(source):
expected_items = ["a", "b", "c", "d", "e"]
items = list(source._traverse_mlflow_search_func(dummy_search_func))
def test_traverse_mlflow_search_func(source):
expected_items = ["a", "b", "c", "d", "e"]
assert items == expected_items
items = list(source._traverse_mlflow_search_func(dummy_search_func))
def test_traverse_mlflow_search_func_with_kwargs(source):
expected_items = ["A", "B", "C", "D", "E"]
assert items == expected_items
items = list(
source._traverse_mlflow_search_func(dummy_search_func, case="upper")
)
assert items == expected_items
def test_traverse_mlflow_search_func_with_kwargs(source):
expected_items = ["A", "B", "C", "D", "E"]
def test_make_external_link_local(source, model_version):
expected_url = None
items = list(source._traverse_mlflow_search_func(dummy_search_func, case="upper"))
url = source._make_external_url(model_version)
assert items == expected_items
assert url == expected_url
def test_make_external_link_remote(source, model_version):
tracking_uri_remote = "https://dummy-mlflow-tracking-server.org"
source.client = MlflowClient(tracking_uri=tracking_uri_remote)
expected_url = f"{tracking_uri_remote}/#/models/{model_version.name}/versions/{model_version.version}"
def test_make_external_link_local(source, model_version):
expected_url = None
url = source._make_external_url(model_version)
url = source._make_external_url(model_version)
assert url == expected_url
assert url == expected_url
def test_make_external_link_remote(source, model_version):
tracking_uri_remote = "https://dummy-mlflow-tracking-server.org"
source.client = MlflowClient(tracking_uri=tracking_uri_remote)
expected_url = f"{tracking_uri_remote}/#/models/{model_version.name}/versions/{model_version.version}"
url = source._make_external_url(model_version)
assert url == expected_url