fix(ingestion/prefect-plugin): Prefect plugin (#10643)

Co-authored-by: shubhamjagtap639 <shubham.jagtap@gslab.com>
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
dushayntAW 2024-08-29 15:40:10 +02:00 committed by GitHub
parent 1a051b1eef
commit 6204cba2bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 2567 additions and 4 deletions

View File

@ -91,6 +91,8 @@ jobs:
-x :metadata-ingestion-modules:airflow-plugin:check \
-x :metadata-ingestion-modules:dagster-plugin:build \
-x :metadata-ingestion-modules:dagster-plugin:check \
-x :metadata-ingestion-modules:prefect-plugin:build \
-x :metadata-ingestion-modules:prefect-plugin:check \
-x :metadata-ingestion-modules:gx-plugin:build \
-x :metadata-ingestion-modules:gx-plugin:check \
-x :datahub-frontend:build \
@ -138,4 +140,4 @@ jobs:
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
path: ${{ github.event_path }}

86
.github/workflows/prefect-plugin.yml vendored Normal file
View File

@ -0,0 +1,86 @@
name: Prefect Plugin
on:
push:
branches:
- master
paths:
- ".github/workflows/prefect-plugin.yml"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
pull_request:
branches:
- "**"
paths:
- ".github/workflows/prefect-plugin.yml"
- "metadata-ingestion-modules/prefect-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
release:
types: [published]
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
prefect-plugin:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
include:
- python-version: "3.8"
- python-version: "3.9"
- python-version: "3.10"
fail-fast: false
steps:
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: 17
- uses: gradle/gradle-build-action@v2
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install prefect package
run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: ${{ always() && matrix.python-version == '3.10'}}
with:
name: Test Results (Prefect Plugin ${{ matrix.python-version}})
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
!**/binary/**
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: .
fail_ci_if_error: false
flags: prefect,prefect-${{ matrix.extra_pip_extras }}
name: pytest-prefect-${{ matrix.python-version }}
verbose: true
event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}

View File

@ -2,7 +2,7 @@ name: Test Results
on:
workflow_run:
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "GX Plugin"]
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "Prefect Plugin", "GX Plugin"]
types:
- completed

View File

@ -86,6 +86,7 @@ task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
':metadata-ingestion:buildWheel',
':metadata-ingestion-modules:airflow-plugin:buildWheel',
':metadata-ingestion-modules:dagster-plugin:buildWheel',
':metadata-ingestion-modules:prefect-plugin:buildWheel',
':metadata-ingestion-modules:gx-plugin:buildWheel',
]) {
inputs.files(projectMdFiles)

View File

@ -85,7 +85,7 @@
"tags": {
"Platform Type": "Orchestrator",
"Connection Type": "Pull",
"Features": "Stateful Ingestion, UI Ingestion, Status Aspect"
"Features": "Status Aspect"
}
},
{
@ -429,6 +429,17 @@
"Features": "Stateful Ingestion, Lower Casing, Status Aspect"
}
},
{
"Path": "docs/lineage/prefect",
"imgPath": "img/logos/platforms/prefect.svg",
"Title": "Prefect",
"Description": "Prefect is a modern workflow orchestration for data and ML engineers.",
"tags": {
"Platform Type": "Orchestrator",
"Connection Type": "Pull",
"Features": "Status Aspect"
}
},
{
"Path": "docs/generated/ingestion/sources/presto",
"imgPath": "img/logos/platforms/presto.svg",

View File

@ -573,6 +573,7 @@ function copy_python_wheels(): void {
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/dagster-plugin/dist",
"../metadata-ingestion-modules/prefect-plugin/dist",
"../metadata-ingestion-modules/gx-plugin/dist",
];

View File

@ -444,6 +444,11 @@ module.exports = {
id: "docs/lineage/openlineage",
label: "OpenLineage",
},
{
type: "doc",
id: "docs/lineage/prefect",
label: "Prefect",
},
{
type: "doc",
id: "metadata-integration/java/acryl-spark-lineage/README",
@ -917,6 +922,7 @@ module.exports = {
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/dagster-plugin/README"
//"metadata-ingestion-modules/prefect-plugin/README"
//"metadata-ingestion-modules/gx-plugin/README"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"

View File

@ -40,6 +40,7 @@ const platformLogos = [
name: "CouchBase",
imageUrl: "/img/logos/platforms/couchbase.svg",
},
{ name: "Dagster", imageUrl: "/img/logos/platforms/dagster.png" },
{ name: "Databricks", imageUrl: "/img/logos/platforms/databricks.png" },
{ name: "DBT", imageUrl: "/img/logos/platforms/dbt.svg" },
{ name: "Deltalake", imageUrl: "/img/logos/platforms/deltalake.svg" },
@ -87,6 +88,7 @@ const platformLogos = [
{ name: "Pinot", imageUrl: "/img/logos/platforms/pinot.svg" },
{ name: "PostgreSQL", imageUrl: "/img/logos/platforms/postgres.svg" },
{ name: "PowerBI", imageUrl: "/img/logos/platforms/powerbi.png" },
{ name: "Prefect", imageUrl: "/img/logos/platforms/prefect.svg" },
{ name: "Presto", imageUrl: "/img/logos/platforms/presto.svg" },
{ name: "Protobuf", imageUrl: "/img/logos/platforms/protobuf.png" },
{ name: "Pulsar", imageUrl: "/img/logos/platforms/pulsar.png" },

View File

@ -0,0 +1 @@
<svg viewBox="-0.06000000000002065 -0.26 372.94 603.6800000000001" xmlns="http://www.w3.org/2000/svg" width="1544" height="2500"><linearGradient id="a" gradientUnits="userSpaceOnUse" x1="321.038" x2="190.456" y1="372.543" y2="236.209"><stop offset="0" stop-color="#b4c0c9"/><stop offset="1" stop-color="#94a5b2"/></linearGradient><linearGradient id="b" gradientUnits="userSpaceOnUse" x1="18.929" x2="216.256" y1="266.734" y2="622.904"><stop offset="0" stop-color="#647989"/><stop offset="1" stop-color="#2f383e"/></linearGradient><linearGradient id="c" gradientUnits="userSpaceOnUse" x1="125.8" x2="336.043" y1="36.466" y2="236.983"><stop offset="0" stop-color="#2edaff"/><stop offset="1" stop-color="#004bff"/></linearGradient><path d="M372.88 279.56l-186.55-93.5v190.06z" fill="url(#a)"/><path d="M210.09 363.81l-23.76-141.29v153.6zM348.97 267.48l-162.64-81.42v20.73z" fill="#0d3958" opacity=".2"/><path d="M11.23 597.97c.39-.23 171.36-83.14 171.51-83.23 2.34-1.44 3.66-1.79 3.74-5.91l-.08-322.77c-72.43 36.23-174.39 87.12-183.19 91.8-2.81 1.56-3.2 2.89-3.2 4.91v306.87c-.07 6.93.39 13.78 11.22 8.33z" fill="url(#b)"/><path d="M372.88 96.68v182.81S24.85 105.32 12.81 99.09c-8.03-4.2-8.03-8.19 0-12.69C20.2 82.26 161.42 11.82 183.07.98c2.8-1.24 3.67-1.24 6.15-.23l180.17 90.31c2.57 1.26 3.35 2.5 3.49 5.62z" fill="url(#c)"/></svg>

After

Width:  |  Height:  |  Size: 1.3 KiB

137
docs/lineage/prefect.md Normal file
View File

@ -0,0 +1,137 @@
# Prefect Integration with DataHub
## Overview
DataHub supports integration with Prefect, allowing you to ingest:
- Prefect flow and task metadata
- Flow run and Task run information
- Lineage information (when available)
This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.
## Prefect DataHub Block
### What is a Prefect DataHub Block?
Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The `prefect-datahub` block uses the [DataHub REST](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to send metadata events while running Prefect flows.
### Prerequisites
1. Use either Prefect Cloud (recommended) or a self-hosted Prefect server.
2. For Prefect Cloud setup, refer to the [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) guide.
3. For self-hosted Prefect server setup, refer to the [Host Prefect Server](https://docs.prefect.io/latest/guides/host/) guide.
4. Ensure the Prefect API URL is set correctly. Verify using:
```shell
prefect profile inspect
```
5. API URL format:
- Prefect Cloud: `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>`
- Self-hosted: `http://<host>:<port>/api`
## Setup Instructions
### 1. Installation
Install `prefect-datahub` using pip:
```shell
pip install 'prefect-datahub'
```
Note: Requires Python 3.7+
### 2. Saving Configurations to a Block
Save your configuration to the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks):
```python
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("MY-DATAHUB-BLOCK")
```
Configuration options:
| Config | Type | Default | Description |
|--------|------|---------|-------------|
| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL |
| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) |
| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) |
### 3. Using the Block in Prefect Workflows
Load and use the saved block in your Prefect workflows:
```python
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")
@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
```
**Note**: To emit tasks, you must call `emit_flow()`. Otherwise, no metadata will be emitted.
## Concept Mapping
| Prefect Concept | DataHub Concept |
|-----------------|-----------------|
| [Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) |
| [Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
| [Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) |
| [Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
| [Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) |
## Validation and Troubleshooting
### Validating the Setup
1. Check the Prefect UI's Blocks menu for the DataHub emitter.
2. Run a Prefect workflow and look for DataHub-related log messages:
```text
Emitting flow to datahub...
Emitting tasks to datahub...
```
### Debugging Common Issues
#### Incorrect Prefect API URL
If the Prefect API URL is incorrect, set it manually:
```shell
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
```
#### DataHub Connection Error
If you encounter a `ConnectionError: HTTPConnectionPool(host='localhost', port=8080)`, ensure that your DataHub GMS service is running.
## Additional Resources
- [Prefect Documentation](https://docs.prefect.io/)
- [DataHub Documentation](https://datahubproject.io/docs/)
For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.

View File

@ -0,0 +1,143 @@
.envrc
src/prefect_datahub/__init__.py.bak
.vscode/
output
pvenv36/
bq_credentials.json
/tmp
*.bak
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Generated classes
src/datahub/metadata/
wheels/
junit.quick.xml

View File

@ -0,0 +1,132 @@
# prefect-datahub
Emit flows & tasks metadata to DataHub REST API with `prefect-datahub`
<p align="center">
<a href="https://pypi.python.org/pypi/prefect-datahub/" alt="PyPI version">
<img alt="PyPI" src="https://img.shields.io/pypi/v/prefect-datahub?color=0052FF&labelColor=090422" /></a>
<a href="https://github.com/datahub-project/datahub/" alt="Stars">
<img src="https://img.shields.io/github/stars/datahub-project/datahub?color=0052FF&labelColor=090422" /></a>
<a href="https://pypistats.org/packages/prefect-datahub/" alt="Downloads">
<img src="https://img.shields.io/pypi/dm/prefect-datahub?color=0052FF&labelColor=090422" /></a>
<a href="https://github.com/datahub-project/datahub/pulse" alt="Activity">
<img src="https://img.shields.io/github/commit-activity/m/datahub-project/datahub?color=0052FF&labelColor=090422" /></a>
<br/>
<a href="https://datahubspace.slack.com" alt="Slack">
<img src="https://img.shields.io/badge/slack-join_community-red.svg?color=0052FF&labelColor=090422&logo=slack" /></a>
</p>
## Introduction
The `prefect-datahub` collection allows you to easily integrate DataHub's metadata ingestion capabilities into your Prefect workflows. With this collection, you can emit metadata about your flows, tasks, and workspace to DataHub's metadata service.
## Features
- Seamless integration with Prefect workflows
- Support for ingesting metadata of flows, tasks, and workspaces to DataHub GMS REST API
- Easy configuration using Prefect blocks
## Prerequisites
- Python 3.7+
- Prefect 2.0.0+
- A running instance of DataHub
## Installation
Install `prefect-datahub` using pip:
```bash
pip install prefect-datahub
```
We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.
## Getting Started
### 1. Set up DataHub
Before using `prefect-datahub`, you need to deploy an instance of DataHub. Follow the instructions on the [DataHub Quickstart page](https://datahubproject.io/docs/quickstart) to set up DataHub.
After successful deployment, the DataHub GMS service should be running on `http://localhost:8080` if deployed locally.
### 2. Configure DataHub Emitter
Save your DataHub configuration as a Prefect block:
```python
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="DEV",
platform_instance="local_prefect",
token=None, # generate auth token in the datahub and provide here if gms endpoint is secure
)
datahub_emitter.save("datahub-emitter-test")
```
Configuration options:
| Config | Type | Default | Description |
|--------|------|---------|-------------|
| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL |
| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) |
| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) |
### 3. Use DataHub Emitter in Your Workflows
Here's an example of how to use the DataHub Emitter in a Prefect workflow:
```python
from prefect import flow, task
from prefect_datahub.datahub_emitter import DatahubEmitter
from prefect_datahub.entities import Dataset
datahub_emitter_block = DatahubEmitter.load("datahub-emitter-test")
@task(name="Extract", description="Extract the data")
def extract():
return "This is data"
@task(name="Transform", description="Transform the data")
def transform(data, datahub_emitter):
transformed_data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableX")],
outputs=[Dataset("snowflake", "mydb.schema.tableY")],
)
return transformed_data
@flow(name="ETL", description="Extract transform load flow")
def etl():
datahub_emitter = datahub_emitter_block
data = extract()
transformed_data = transform(data, datahub_emitter)
datahub_emitter.emit_flow()
if __name__ == "__main__":
etl()
```
**Note**: To emit task metadata, you must call `emit_flow()` at the end of your flow. Otherwise, no metadata will be emitted.
## Advanced Usage
For more advanced usage and configuration options, please refer to the [prefect-datahub documentation](https://datahubproject.io/docs/lineage/prefect/).
## Contributing
We welcome contributions to `prefect-datahub`! Please refer to our [Contributing Guidelines](https://datahubproject.io/docs/contributing) for more information on how to get started.
## Support
If you encounter any issues or have questions, you can:
- Open an issue in the [DataHub GitHub repository](https://github.com/datahub-project/datahub/issues)
- Join the [DataHub Slack community](https://datahubspace.slack.com)
- Seek help in the [Prefect Slack community](https://prefect.io/slack)
## License
`prefect-datahub` is released under the Apache 2.0 license. See the [LICENSE](https://github.com/datahub-project/datahub/blob/master/LICENSE) file for details.

View File

@ -0,0 +1,127 @@
plugins {
id 'base'
}
ext {
python_executable = 'python3'
venv_name = 'venv'
}
if (!project.hasProperty("extra_pip_requirements")) {
ext.extra_pip_requirements = ""
}
def pip_install_command = "VIRTUAL_ENV=${venv_name} ${venv_name}/bin/uv pip install -e ../../metadata-ingestion"
task checkPythonVersion(type: Exec) {
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)'
}
task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {
def sentinel_file = "${venv_name}/.venv_environment_sentinel"
inputs.file file('setup.py')
outputs.file(sentinel_file)
commandLine 'bash', '-c',
"${python_executable} -m venv ${venv_name} && " +
"${venv_name}/bin/python -m pip install --upgrade pip uv wheel 'setuptools>=63.0.0' && " +
"touch ${sentinel_file}"
}
task installPackage(type: Exec, dependsOn: [environmentSetup, ':metadata-ingestion:codegen']) {
def sentinel_file = "${venv_name}/.build_install_package_sentinel"
inputs.file file('setup.py')
outputs.file(sentinel_file)
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${pip_install_command} -e . ${extra_pip_requirements} &&" +
"touch ${sentinel_file}"
}
task install(dependsOn: [installPackage])
task installDev(type: Exec, dependsOn: [install]) {
def sentinel_file = "${venv_name}/.build_install_dev_sentinel"
inputs.file file('setup.py')
outputs.file("${sentinel_file}")
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"${pip_install_command} -e .[dev] ${extra_pip_requirements} && " +
"touch ${sentinel_file}"
}
task lint(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
"source ${venv_name}/bin/activate && set -x && " +
"black --check --diff src/ tests/ && " +
"isort --check --diff src/ tests/ && " +
"flake8 --count --statistics src/ tests/ && " +
"mypy --show-traceback --show-error-codes src/ tests/"
}
task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && " +
"black src/ tests/ && " +
"isort src/ tests/ && " +
"flake8 src/ tests/ && " +
"mypy src/ tests/ "
}
task installDevTest(type: Exec, dependsOn: [installDev]) {
def sentinel_file = "${venv_name}/.build_install_dev_test_sentinel"
inputs.file file('setup.py')
outputs.dir("${venv_name}")
outputs.file("${sentinel_file}")
commandLine 'bash', '-x', '-c',
"${pip_install_command} -e .[dev,integration-tests] && touch ${sentinel_file}"
}
def testFile = hasProperty('testFile') ? testFile : 'unknown'
task testSingle(dependsOn: [installDevTest]) {
doLast {
if (testFile != 'unknown') {
exec {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest ${testFile}"
}
} else {
throw new GradleException("No file provided. Use -PtestFile=<test_file>")
}
}
}
task testQuick(type: Exec, dependsOn: installDevTest) {
// We can't enforce the coverage requirements if we run a subset of the tests.
inputs.files(project.fileTree(dir: "src/", include: "**/*.py"))
inputs.files(project.fileTree(dir: "tests/"))
outputs.dir("${venv_name}")
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest --cov-config=setup.cfg --cov-report xml:coverage_quick.xml -vv --continue-on-collection-errors --junit-xml=junit.quick.xml -s"
}
task testFull(type: Exec, dependsOn: [testQuick, installDevTest]) {
commandLine 'bash', '-x', '-c',
"source ${venv_name}/bin/activate && pytest -m 'not slow_integration' -vv --continue-on-collection-errors --junit-xml=junit.full.xml"
}
task buildWheel(type: Exec, dependsOn: [environmentSetup]) {
commandLine 'bash', '-c', "source ${venv_name}/bin/activate && " +
'uv pip install build && RELEASE_VERSION="\${RELEASE_VERSION:-0.0.0.dev1}" RELEASE_SKIP_INSTALL=1 RELEASE_SKIP_UPLOAD=1 ./scripts/release.sh'
}
task cleanPythonCache(type: Exec) {
commandLine 'bash', '-c',
"find src -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete -o -type d -empty -delete"
}
build.dependsOn install
check.dependsOn lint
check.dependsOn testQuick
clean {
delete venv_name
delete 'build'
delete 'dist'
}
clean.dependsOn cleanPythonCache

View File

@ -0,0 +1,19 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=54.0.0", "wheel", "pip>=21.0.0"]
[tool.black]
extend-exclude = '''
# A regex preceded with ^/ will apply only to files and directories
# in the root of the project.
^/tmp
'''
include = '\.pyi?$'
[tool.isort]
indent = ' '
profile = 'black'
sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER'
[tool.pyright]
extraPaths = ['tests']

View File

@ -0,0 +1,26 @@
#!/bin/bash
set -euxo pipefail
if [[ ! ${RELEASE_SKIP_TEST:-} ]] && [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew build # also runs tests
elif [[ ! ${RELEASE_SKIP_INSTALL:-} ]]; then
../../gradlew install
fi
MODULE=prefect_datahub
# Check packaging constraint.
python -c 'import setuptools; where="./src"; assert setuptools.find_packages(where) == setuptools.find_namespace_packages(where), "you seem to be missing or have extra __init__.py files"'
if [[ ${RELEASE_VERSION:-} ]]; then
# Replace version with RELEASE_VERSION env variable
sed -i.bak "s/__version__ = \"1\!0.0.0.dev0\"/__version__ = \"$(echo $RELEASE_VERSION|sed s/-/+/)\"/" src/${MODULE}/__init__.py
else
vim src/${MODULE}/__init__.py
fi
rm -rf build dist || true
python -m build
if [[ ! ${RELEASE_SKIP_UPLOAD:-} ]]; then
python -m twine upload 'dist/*'
fi
mv src/${MODULE}/__init__.py.bak src/${MODULE}/__init__.py

View File

@ -0,0 +1,74 @@
[flake8]
max-complexity = 15
ignore =
# Ignore: line length issues, since black's formatter will take care of them.
E501,
# Ignore: 1 blank line required before class docstring.
D203,
# See https://stackoverflow.com/a/57074416.
W503,
# See https://github.com/psf/black/issues/315.
E203
exclude =
.git,
venv,
.tox,
__pycache__
per-file-ignores =
# imported but unused
__init__.py: F401
ban-relative-imports = true
[mypy]
plugins =
sqlmypy,
pydantic.mypy
exclude = ^(venv|build|dist)/
ignore_missing_imports = yes
strict_optional = yes
check_untyped_defs = yes
disallow_incomplete_defs = yes
disallow_untyped_decorators = yes
warn_unused_configs = yes
# eventually we'd like to enable these
disallow_untyped_defs = no
# try to be a bit more strict in certain areas of the codebase
[mypy-datahub.*]
ignore_missing_imports = no
[mypy-tests.*]
ignore_missing_imports = no
[tool:pytest]
asyncio_mode = auto
addopts = --cov=src --cov-report term-missing --cov-config setup.cfg --strict-markers
testpaths =
tests/unit
tests/integration
[coverage:run]
# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,
# and tox interact, we should not uncomment the following line.
# See https://pytest-cov.readthedocs.io/en/latest/config.html and
# https://coverage.readthedocs.io/en/coverage-5.0/config.html.
# We also have some additional pytest/cov config options in tox.ini.
# source = src
[coverage:paths]
# This is necessary for tox-based coverage to be counted properly.
source =
src
*/site-packages
[coverage:report]
# The fail_under value ensures that at least some coverage data is collected.
# We override its value in the tox config.
show_missing = true
exclude_lines =
pragma: no cover
@abstract
if TYPE_CHECKING:
omit =
# omit example jobs
src/prefect_datahub/example/*

View File

@ -0,0 +1,130 @@
import os
import pathlib
import setuptools
package_metadata: dict = {}
with open("./src/prefect_datahub/__init__.py") as fp:
exec(fp.read(), package_metadata)
def get_long_description():
root = os.path.dirname(__file__)
return pathlib.Path(os.path.join(root, "README.md")).read_text()
_version: str = package_metadata["__version__"]
_self_pin = (
f"=={_version}"
if not (_version.endswith(("dev0", "dev1")) or "docker" in _version)
else ""
)
rest_common = {"requests", "requests_file"}
base_requirements = {
# For python 3.7 and importlib-metadata>=5.0.0, build failed with attribute error
"importlib-metadata>=4.4.0,<5.0.0; python_version < '3.8'",
# Actual dependencies.
"prefect >= 2.0.0",
*rest_common,
# Ignoring the dependency below because it causes issues with the vercel built wheel install
# f"acryl-datahub[datahub-rest]{_self_pin}",
"acryl-datahub[datahub-rest]",
}
mypy_stubs = {
"types-dataclasses",
"sqlalchemy-stubs",
"types-setuptools",
"types-six",
"types-python-dateutil",
"types-requests",
"types-toml",
"types-PyYAML",
"types-freezegun",
"types-cachetools",
# versions 0.1.13 and 0.1.14 seem to have issues
"types-click==0.1.12",
"types-tabulate",
# avrogen package requires this
"types-pytz",
}
dev_requirements = {
*base_requirements,
*mypy_stubs,
"black==22.12.0",
"coverage>=5.1",
"flake8>=3.8.3",
"flake8-tidy-imports>=4.3.0",
"isort>=5.7.0",
"mypy>=1.4.0",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
"pydantic>=1.10",
"pytest>=6.2.2",
"pytest-asyncio>=0.16.0",
"pytest-cov>=2.8.1",
"tox",
"deepdiff",
"requests-mock",
"freezegun",
"jsonpickle",
"build",
"twine",
"packaging",
}
entry_points = {
"prefect.block": "prefect-datahub = prefect_datahub.prefect_datahub:DatahubEmitter"
}
setuptools.setup(
# Package metadata.
name=package_metadata["__package_name__"],
version=package_metadata["__version__"],
url="https://datahubproject.io/",
project_urls={
"Documentation": "https://datahubproject.io/docs/",
"Source": "https://github.com/datahub-project/datahub",
"Changelog": "https://github.com/datahub-project/datahub/releases",
},
license="Apache License 2.0",
description="Datahub prefect block to capture executions and send to Datahub",
long_description=get_long_description(),
long_description_content_type="text/markdown",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"License :: OSI Approved",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Unix",
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Environment :: MacOS X",
"Topic :: Software Development",
],
# Package info.
zip_safe=False,
python_requires=">=3.7",
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="./src"),
entry_points=entry_points,
# Dependencies.
install_requires=list(base_requirements),
extras_require={
"dev": list(dev_requirements),
},
)

View File

@ -0,0 +1,21 @@
# Published at https://pypi.org/project/acryl-datahub/.
__package_name__ = "prefect-datahub"
__version__ = "1!0.0.0.dev0"
def is_dev_mode() -> bool:
return __version__.endswith("dev0")
def nice_version_name() -> str:
if is_dev_mode():
return "unavailable (installed in develop mode)"
return __version__
def get_provider_info():
return {
"package-name": f"{__package_name__}",
"name": f"{__package_name__}",
"description": "Datahub prefect block to capture executions and send to Datahub",
}

View File

@ -0,0 +1,659 @@
"""Datahub Emitter classes used to emit prefect metadata to Datahub REST."""
import asyncio
import traceback
from typing import Any, Dict, List, Optional, cast
from uuid import UUID
import datahub.emitter.mce_builder as builder
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import BrowsePathsClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from prefect import get_run_logger
from prefect.blocks.core import Block
from prefect.client import cloud, orchestration
from prefect.client.schemas import FlowRun, TaskRun, Workspace
from prefect.client.schemas.objects import Flow
from prefect.context import FlowRunContext, TaskRunContext
from prefect.settings import PREFECT_API_URL
from pydantic.v1 import SecretStr
from prefect_datahub.entities import _Entity
ORCHESTRATOR = "prefect"
# Flow and task common constants
VERSION = "version"
RETRIES = "retries"
TIMEOUT_SECONDS = "timeout_seconds"
LOG_PRINTS = "log_prints"
ON_COMPLETION = "on_completion"
ON_FAILURE = "on_failure"
# Flow constants
FLOW_RUN_NAME = "flow_run_name"
TASK_RUNNER = "task_runner"
PERSIST_RESULT = "persist_result"
ON_CANCELLATION = "on_cancellation"
ON_CRASHED = "on_crashed"
# Task constants
CACHE_EXPIRATION = "cache_expiration"
TASK_RUN_NAME = "task_run_name"
REFRESH_CACHE = "refresh_cache"
TASK_KEY = "task_key"
# Flow run and task run common constants
ID = "id"
CREATED = "created"
UPDATED = "updated"
TAGS = "tags"
ESTIMATED_RUN_TIME = "estimated_run_time"
START_TIME = "start_time"
END_TIME = "end_time"
TOTAL_RUN_TIME = "total_run_time"
NEXT_SCHEDULED_START_TIME = "next_scheduled_start_time"
# Fask run constants
CREATED_BY = "created_by"
AUTO_SCHEDULED = "auto_scheduled"
# Task run constants
FLOW_RUN_ID = "flow_run_id"
RUN_COUNT = "run_count"
UPSTREAM_DEPENDENCIES = "upstream_dependencies"
# States constants
COMPLETE = "Completed"
FAILED = "Failed"
CANCELLED = "Cancelled"
class DatahubEmitter(Block):
"""
Block used to emit prefect task and flow related metadata to Datahub REST
"""
_block_type_name: Optional[str] = "datahub emitter"
datahub_rest_url: str = "http://localhost:8080"
env: str = builder.DEFAULT_ENV
platform_instance: Optional[str] = None
token: Optional[SecretStr] = None
_datajobs_to_emit: Dict[str, Any] = {}
def __init__(self, *args: Any, **kwargs: Any):
"""
Initialize datahub rest emitter
"""
super().__init__(*args, **kwargs)
# self._datajobs_to_emit: Dict[str, _Entity] = {}
token = self.token.get_secret_value() if self.token is not None else None
self.emitter = DatahubRestEmitter(gms_server=self.datahub_rest_url, token=token)
self.emitter.test_connection()
def _entities_to_urn_list(self, iolets: List[_Entity]) -> List[DatasetUrn]:
"""
Convert list of _entity to list of dataser urn
Args:
iolets (list[_Entity]): The list of entities.
Returns:
The list of Dataset URN.
"""
return [DatasetUrn.create_from_string(let.urn) for let in iolets]
def _get_workspace(self) -> Optional[str]:
"""
Fetch workspace name if present in configured prefect api url.
Returns:
The workspace name.
"""
try:
asyncio.run(cloud.get_cloud_client().api_healthcheck())
except Exception:
get_run_logger().debug(traceback.format_exc())
return None
if "workspaces" not in PREFECT_API_URL.value():
get_run_logger().debug(
"Cannot fetch workspace name. Please login to prefect cloud using "
"command 'prefect cloud login'."
)
return None
current_workspace_id = PREFECT_API_URL.value().split("/")[-1]
workspaces: List[Workspace] = asyncio.run(
cloud.get_cloud_client().read_workspaces()
)
for workspace in workspaces:
if str(workspace.workspace_id) == current_workspace_id:
return workspace.workspace_name
return None
async def _get_flow_run_graph(self, flow_run_id: str) -> Optional[List[Dict]]:
"""
Fetch the flow run graph for provided flow run id
Args:
flow_run_id (str): The flow run id.
Returns:
The flow run graph in json format.
"""
try:
response = orchestration.get_client()._client.get(
f"/flow_runs/{flow_run_id}/graph"
)
if asyncio.iscoroutine(response):
response = await response
if hasattr(response, "json"):
response_json = response.json()
else:
raise ValueError("Response object does not have a 'json' method")
except Exception:
get_run_logger().debug(traceback.format_exc())
return None
return response_json
def _emit_browsepath(self, urn: str, workspace_name: str) -> None:
"""
Emit browsepath for provided urn. Set path as orchestrator/env/workspace_name.
Args:
urn (str): The entity URN
workspace_name (str): The prefect cloud workspace name
"""
mcp = MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=BrowsePathsClass(
paths=[f"/{ORCHESTRATOR}/{self.env}/{workspace_name}"]
),
)
self.emitter.emit(mcp)
def _generate_datajob(
self,
flow_run_ctx: FlowRunContext,
task_run_ctx: Optional[TaskRunContext] = None,
task_key: Optional[str] = None,
) -> Optional[DataJob]:
"""
Create datajob entity using task run ctx and flow run ctx.
Assign description, tags, and properties to created datajob.
Args:
flow_run_ctx (FlowRunContext): The prefect current running flow run context.
task_run_ctx (Optional[TaskRunContext]): The prefect current running task \
run context.
task_key (Optional[str]): The task key.
Returns:
The datajob entity.
"""
assert flow_run_ctx.flow
dataflow_urn = DataFlowUrn.create_from_ids(
orchestrator=ORCHESTRATOR,
flow_id=flow_run_ctx.flow.name,
env=self.env,
platform_instance=self.platform_instance,
)
if task_run_ctx is not None:
datajob = DataJob(
id=task_run_ctx.task.task_key,
flow_urn=dataflow_urn,
name=task_run_ctx.task.name,
)
datajob.description = task_run_ctx.task.description
datajob.tags = task_run_ctx.task.tags
job_property_bag: Dict[str, str] = {}
allowed_task_keys = [
VERSION,
CACHE_EXPIRATION,
TASK_RUN_NAME,
RETRIES,
TIMEOUT_SECONDS,
LOG_PRINTS,
REFRESH_CACHE,
TASK_KEY,
ON_COMPLETION,
ON_FAILURE,
]
for key in allowed_task_keys:
if (
hasattr(task_run_ctx.task, key)
and getattr(task_run_ctx.task, key) is not None
):
job_property_bag[key] = repr(getattr(task_run_ctx.task, key))
datajob.properties = job_property_bag
return datajob
elif task_key is not None:
datajob = DataJob(
id=task_key, flow_urn=dataflow_urn, name=task_key.split(".")[-1]
)
return datajob
return None
def _generate_dataflow(self, flow_run_ctx: FlowRunContext) -> Optional[DataFlow]:
"""
Create dataflow entity using flow run ctx.
Assign description, tags, and properties to created dataflow.
Args:
flow_run_ctx (FlowRunContext): The prefect current running flow run context.
Returns:
The dataflow entity.
"""
async def get_flow(flow_id: UUID) -> Flow:
client = orchestration.get_client()
if not hasattr(client, "read_flow"):
raise ValueError("Client does not support async read_flow method")
return await client.read_flow(flow_id=flow_id)
assert flow_run_ctx.flow
assert flow_run_ctx.flow_run
try:
flow: Flow = asyncio.run(get_flow(flow_run_ctx.flow_run.flow_id))
except Exception:
get_run_logger().debug(traceback.format_exc())
return None
assert flow
dataflow = DataFlow(
orchestrator=ORCHESTRATOR,
id=flow_run_ctx.flow.name,
env=self.env,
name=flow_run_ctx.flow.name,
platform_instance=self.platform_instance,
)
dataflow.description = flow_run_ctx.flow.description
dataflow.tags = set(flow.tags)
flow_property_bag: Dict[str, str] = {}
flow_property_bag[ID] = str(flow.id)
flow_property_bag[CREATED] = str(flow.created)
flow_property_bag[UPDATED] = str(flow.updated)
allowed_flow_keys = [
VERSION,
FLOW_RUN_NAME,
RETRIES,
TASK_RUNNER,
TIMEOUT_SECONDS,
PERSIST_RESULT,
LOG_PRINTS,
ON_COMPLETION,
ON_FAILURE,
ON_CANCELLATION,
ON_CRASHED,
]
for key in allowed_flow_keys:
if (
hasattr(flow_run_ctx.flow, key)
and getattr(flow_run_ctx.flow, key) is not None
):
flow_property_bag[key] = repr(getattr(flow_run_ctx.flow, key))
dataflow.properties = flow_property_bag
return dataflow
def _emit_tasks(
self,
flow_run_ctx: FlowRunContext,
dataflow: DataFlow,
workspace_name: Optional[str] = None,
) -> None:
"""
Emit prefect tasks metadata to datahub rest. Add upstream dependencies if
present for each task.
Args:
flow_run_ctx (FlowRunContext): The prefect current running flow run context
dataflow (DataFlow): The datahub dataflow entity.
workspace_name Optional(str): The prefect cloud workpace name.
"""
try:
assert flow_run_ctx.flow_run
graph_json = asyncio.run(
self._get_flow_run_graph(str(flow_run_ctx.flow_run.id))
)
if graph_json is None:
return
task_run_key_map: Dict[str, str] = {}
for prefect_future in flow_run_ctx.task_run_futures:
if prefect_future.task_run is not None:
task_run_key_map[
str(prefect_future.task_run.id)
] = prefect_future.task_run.task_key
for node in graph_json:
datajob_urn = DataJobUrn.create_from_ids(
data_flow_urn=str(dataflow.urn),
job_id=task_run_key_map[node[ID]],
)
datajob: Optional[DataJob] = None
if str(datajob_urn) in self._datajobs_to_emit:
datajob = cast(DataJob, self._datajobs_to_emit[str(datajob_urn)])
else:
datajob = self._generate_datajob(
flow_run_ctx=flow_run_ctx, task_key=task_run_key_map[node[ID]]
)
if datajob is not None:
for each in node[UPSTREAM_DEPENDENCIES]:
upstream_task_urn = DataJobUrn.create_from_ids(
data_flow_urn=str(dataflow.urn),
job_id=task_run_key_map[each[ID]],
)
datajob.upstream_urns.extend([upstream_task_urn])
datajob.emit(self.emitter)
if workspace_name is not None:
self._emit_browsepath(str(datajob.urn), workspace_name)
self._emit_task_run(
datajob=datajob,
flow_run_name=flow_run_ctx.flow_run.name,
task_run_id=UUID(node[ID]),
)
except Exception:
get_run_logger().debug(traceback.format_exc())
def _emit_flow_run(self, dataflow: DataFlow, flow_run_id: UUID) -> None:
"""
Emit prefect flow run to datahub rest. Prefect flow run get mapped with datahub
data process instance entity which get's generate from provided dataflow entity.
Assign flow run properties to data process instance properties.
Args:
dataflow (DataFlow): The datahub dataflow entity used to create \
data process instance.
flow_run_id (UUID): The prefect current running flow run id.
"""
async def get_flow_run(flow_run_id: UUID) -> FlowRun:
client = orchestration.get_client()
if not hasattr(client, "read_flow_run"):
raise ValueError("Client does not support async read_flow_run method")
response = client.read_flow_run(flow_run_id=flow_run_id)
if asyncio.iscoroutine(response):
response = await response
return FlowRun.parse_obj(response)
flow_run: FlowRun = asyncio.run(get_flow_run(flow_run_id))
assert flow_run
if self.platform_instance is not None:
dpi_id = f"{self.platform_instance}.{flow_run.name}"
else:
dpi_id = flow_run.name
dpi = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dpi_id)
dpi_property_bag: Dict[str, str] = {}
allowed_flow_run_keys = [
ID,
CREATED,
UPDATED,
CREATED_BY,
AUTO_SCHEDULED,
ESTIMATED_RUN_TIME,
START_TIME,
TOTAL_RUN_TIME,
NEXT_SCHEDULED_START_TIME,
TAGS,
RUN_COUNT,
]
for key in allowed_flow_run_keys:
if hasattr(flow_run, key) and getattr(flow_run, key) is not None:
dpi_property_bag[key] = str(getattr(flow_run, key))
dpi.properties.update(dpi_property_bag)
if flow_run.start_time is not None:
dpi.emit_process_start(
emitter=self.emitter,
start_timestamp_millis=int(flow_run.start_time.timestamp() * 1000),
)
def _emit_task_run(
self, datajob: DataJob, flow_run_name: str, task_run_id: UUID
) -> None:
"""
Emit prefect task run to datahub rest. Prefect task run get mapped with datahub
data process instance entity which get's generate from provided datajob entity.
Assign task run properties to data process instance properties.
Args:
datajob (DataJob): The datahub datajob entity used to create \
data process instance.
flow_run_name (str): The prefect current running flow run name.
task_run_id (str): The prefect task run id.
"""
async def get_task_run(task_run_id: UUID) -> TaskRun:
client = orchestration.get_client()
if not hasattr(client, "read_task_run"):
raise ValueError("Client does not support async read_task_run method")
response = client.read_task_run(task_run_id=task_run_id)
if asyncio.iscoroutine(response):
response = await response
return TaskRun.parse_obj(response)
task_run: TaskRun = asyncio.run(get_task_run(task_run_id))
assert task_run
if self.platform_instance is not None:
dpi_id = f"{self.platform_instance}.{flow_run_name}.{task_run.name}"
else:
dpi_id = f"{flow_run_name}.{task_run.name}"
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=dpi_id,
clone_inlets=True,
clone_outlets=True,
)
dpi_property_bag: Dict[str, str] = {}
allowed_task_run_keys = [
ID,
FLOW_RUN_ID,
CREATED,
UPDATED,
ESTIMATED_RUN_TIME,
START_TIME,
END_TIME,
TOTAL_RUN_TIME,
NEXT_SCHEDULED_START_TIME,
TAGS,
RUN_COUNT,
]
for key in allowed_task_run_keys:
if hasattr(task_run, key) and getattr(task_run, key) is not None:
dpi_property_bag[key] = str(getattr(task_run, key))
dpi.properties.update(dpi_property_bag)
state_result_map: Dict[str, InstanceRunResult] = {
COMPLETE: InstanceRunResult.SUCCESS,
FAILED: InstanceRunResult.FAILURE,
CANCELLED: InstanceRunResult.SKIPPED,
}
if task_run.state_name not in state_result_map:
raise Exception(
f"State should be either complete, failed or cancelled and it was "
f"{task_run.state_name}"
)
result = state_result_map[task_run.state_name]
if task_run.start_time is not None:
dpi.emit_process_start(
emitter=self.emitter,
start_timestamp_millis=int(task_run.start_time.timestamp() * 1000),
emit_template=False,
)
if task_run.end_time is not None:
dpi.emit_process_end(
emitter=self.emitter,
end_timestamp_millis=int(task_run.end_time.timestamp() * 1000),
result=result,
result_type=ORCHESTRATOR,
)
def add_task(
self,
inputs: Optional[List[_Entity]] = None,
outputs: Optional[List[_Entity]] = None,
) -> None:
"""
Store prefect current running task metadata temporarily which later get emit
to datahub rest only if user calls emit_flow. Prefect task gets mapped with
datahub datajob entity. Assign provided inputs and outputs as datajob inlets
and outlets respectively.
Args:
inputs (Optional[list]): The list of task inputs.
outputs (Optional[list]): The list of task outputs.
Example:
Emit the task metadata as show below:
```python
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
```
"""
try:
flow_run_ctx = FlowRunContext.get()
task_run_ctx = TaskRunContext.get()
assert flow_run_ctx
assert task_run_ctx
datajob = self._generate_datajob(
flow_run_ctx=flow_run_ctx, task_run_ctx=task_run_ctx
)
if datajob is not None:
if inputs is not None:
datajob.inlets.extend(self._entities_to_urn_list(inputs))
if outputs is not None:
datajob.outlets.extend(self._entities_to_urn_list(outputs))
self._datajobs_to_emit[str(datajob.urn)] = cast(_Entity, datajob)
except Exception:
get_run_logger().debug(traceback.format_exc())
def emit_flow(self) -> None:
"""
Emit prefect current running flow metadata to datahub rest. Prefect flow gets
mapped with datahub dataflow entity. If the user hasn't called add_task in
the task function still emit_flow will emit a task but without task name,
description,tags and properties.
Example:
Emit the flow metadata as show below:
```python
from prefect import flow, task
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = extract()
data = transform(data)
load(data)
datahub_emitter.emit_flow()
```
"""
try:
flow_run_ctx = FlowRunContext.get()
assert flow_run_ctx
assert flow_run_ctx.flow_run
workspace_name = self._get_workspace()
# Emit flow and flow run
get_run_logger().info("Emitting flow to datahub...")
dataflow = self._generate_dataflow(flow_run_ctx=flow_run_ctx)
if dataflow is not None:
dataflow.emit(self.emitter)
if workspace_name is not None:
self._emit_browsepath(str(dataflow.urn), workspace_name)
self._emit_flow_run(dataflow, flow_run_ctx.flow_run.id)
self._emit_tasks(flow_run_ctx, dataflow, workspace_name)
except Exception:
get_run_logger().debug(traceback.format_exc())

View File

@ -0,0 +1,46 @@
from abc import abstractmethod
from typing import Optional
import attr
import datahub.emitter.mce_builder as builder
from datahub.utilities.urns.urn import guess_entity_type
class _Entity:
@property
@abstractmethod
def urn(self) -> str:
pass
@attr.s(auto_attribs=True, str=True)
class Dataset(_Entity):
platform: str
name: str
env: str = builder.DEFAULT_ENV
platform_instance: Optional[str] = None
@property
def urn(self):
return builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=self.name,
platform_instance=self.platform_instance,
env=self.env,
)
@attr.s(str=True)
class Urn(_Entity):
_urn: str = attr.ib()
@_urn.validator
def _validate_urn(self, attribute, value):
if not value.startswith("urn:"):
raise ValueError("invalid urn provided: urns must start with 'urn:'")
if guess_entity_type(value) != "dataset":
raise ValueError("Datajob input/output currently only supports datasets")
@property
def urn(self):
return self._urn

View File

@ -0,0 +1,52 @@
from typing import List, Tuple
from prefect import flow, task
from prefect_datahub.datahub_emitter import DatahubEmitter
from prefect_datahub.entities import Dataset
datahub_emitter_block = DatahubEmitter.load("datahub-emitter-test")
@task(name="Extract", description="Extract the data")
def extract() -> str:
data = "This is data"
return data
@task(name="Transform", description="Transform the data")
def transform(
data: str, datahub_emitter: DatahubEmitter
) -> Tuple[List[str], DatahubEmitter]:
data_list_str = data.split(" ")
datahub_emitter.add_task(
inputs=[
Dataset(
platform="snowflake",
name="mydb.schema.tableA",
env=datahub_emitter.env,
platform_instance=datahub_emitter.platform_instance,
)
],
outputs=[
Dataset(
platform="snowflake",
name="mydb.schema.tableB",
env=datahub_emitter.env,
platform_instance=datahub_emitter.platform_instance,
)
],
)
return data_list_str, datahub_emitter
@flow(name="ETL", description="Extract transform load flow")
def etl() -> None:
datahub_emitter = datahub_emitter_block
data = extract()
return_value = transform(data, datahub_emitter) # type: ignore
emitter = return_value[1]
emitter.emit_flow()
etl()

View File

@ -0,0 +1,9 @@
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="DEV",
platform_instance="local_prefect",
token=None, # generate auth token in the datahub and provide here if gms endpoint is secure
)
datahub_emitter.save("datahub-emitter-test") # type: ignore

View File

@ -0,0 +1,2 @@
def test_dummy():
pass

View File

@ -0,0 +1,31 @@
import re
from typing import Type
import pytest
from prefect.blocks.core import Block
from prefect_datahub.datahub_emitter import DatahubEmitter
@pytest.mark.parametrize("block", [DatahubEmitter])
class TestAllBlocksAdhereToStandards:
@pytest.fixture
def block(self, block):
return block
def test_has_a_description(self, block: Type[Block]) -> None:
assert block.get_description()
def test_has_a_valid_code_example(self, block: Type[Block]) -> None:
code_example = block.get_code_example()
assert code_example is not None, f"{block.__name__} is missing a code example"
import_pattern = rf"from .* import {block.__name__}"
assert re.search(import_pattern, code_example) is not None, (
f"The code example for {block.__name__} is missing an import statement"
f" matching the pattern {import_pattern}"
)
block_load_pattern = rf'.* = {block.__name__}\.load\("BLOCK_NAME"\)'
assert re.search(block_load_pattern, code_example), (
f"The code example for {block.__name__} is missing a .load statement"
f" matching the pattern {block_load_pattern}"
)

View File

@ -0,0 +1,824 @@
import asyncio
import json
import logging
from typing import Dict, List, Optional, cast
from unittest.mock import MagicMock, Mock, patch
from uuid import UUID
import pytest
from datahub.api.entities.datajob import DataJob
from datahub.utilities.urns.dataset_urn import DatasetUrn
from prefect.client.schemas import FlowRun, TaskRun, Workspace
from prefect.futures import PrefectFuture
from prefect.server.schemas.core import Flow
from prefect.task_runners import SequentialTaskRunner
from requests.models import Response
from prefect_datahub.datahub_emitter import DatahubEmitter
from prefect_datahub.entities import Dataset, _Entity
mock_transform_task_json: Dict = {
"name": "transform",
"description": "Transform the actual data",
"task_key": "__main__.transform",
"tags": ["etl flow task"],
}
mock_extract_task_run_json: Dict = {
"id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
"created": "2023-06-06T05:51:54.822707+00:00",
"updated": "2023-06-06T05:51:55.126000+00:00",
"name": "Extract-0",
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_key": "__main__.extract",
"dynamic_key": "0",
"cache_key": None,
"cache_expiration": None,
"task_version": None,
"empirical_policy": {
"max_retries": 0,
"retry_delay_seconds": 0.0,
"retries": 0,
"retry_delay": 0,
"retry_jitter_factor": None,
},
"tags": [],
"state_id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
"task_inputs": {},
"state_type": "COMPLETED",
"state_name": "Completed",
"run_count": 1,
"flow_run_run_count": 1,
"expected_start_time": "2023-06-06T05:51:54.822183+00:00",
"next_scheduled_start_time": None,
"start_time": "2023-06-06T05:51:55.016264+00:00",
"end_time": "2023-06-06T05:51:55.096534+00:00",
"total_run_time": 0.08027,
"estimated_run_time": 0.08027,
"estimated_start_time_delta": 0.194081,
"state": {
"id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.096534+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": False,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
}
mock_transform_task_run_json: Dict = {
"id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
"created": "2023-06-06T05:51:55.160372+00:00",
"updated": "2023-06-06T05:51:55.358000+00:00",
"name": "transform-0",
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_key": "__main__.transform",
"dynamic_key": "0",
"cache_key": None,
"cache_expiration": None,
"task_version": None,
"empirical_policy": {
"max_retries": 0,
"retry_delay_seconds": 0.0,
"retries": 0,
"retry_delay": 0,
"retry_jitter_factor": None,
},
"tags": [],
"state_id": "971ad82e-6e5f-4691-abab-c900358e96c2",
"task_inputs": {
"actual_data": [
{"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
]
},
"state_type": "COMPLETED",
"state_name": "Completed",
"run_count": 1,
"flow_run_run_count": 1,
"expected_start_time": "2023-06-06T05:51:55.159416+00:00",
"next_scheduled_start_time": None,
"start_time": "2023-06-06T05:51:55.243159+00:00",
"end_time": "2023-06-06T05:51:55.332950+00:00",
"total_run_time": 0.089791,
"estimated_run_time": 0.089791,
"estimated_start_time_delta": 0.083743,
"state": {
"id": "971ad82e-6e5f-4691-abab-c900358e96c2",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.332950+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": False,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
}
mock_load_task_run_json: Dict = {
"id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
"created": "2023-06-06T05:51:55.389823+00:00",
"updated": "2023-06-06T05:51:55.566000+00:00",
"name": "Load_task-0",
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_key": "__main__.load",
"dynamic_key": "0",
"cache_key": None,
"cache_expiration": None,
"task_version": None,
"empirical_policy": {
"max_retries": 0,
"retry_delay_seconds": 0.0,
"retries": 0,
"retry_delay": 0,
"retry_jitter_factor": None,
},
"tags": [],
"state_id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
"task_inputs": {
"data": [
{"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
]
},
"state_type": "COMPLETED",
"state_name": "Completed",
"run_count": 1,
"flow_run_run_count": 1,
"expected_start_time": "2023-06-06T05:51:55.389075+00:00",
"next_scheduled_start_time": None,
"start_time": "2023-06-06T05:51:55.461812+00:00",
"end_time": "2023-06-06T05:51:55.535954+00:00",
"total_run_time": 0.074142,
"estimated_run_time": 0.074142,
"estimated_start_time_delta": 0.072737,
"state": {
"id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.535954+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": True,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
}
mock_flow_json: Dict = {
"id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
"created": "2023-06-02T12:31:10.988697+00:00",
"updated": "2023-06-02T12:31:10.988710+00:00",
"name": "etl",
"description": "Extract transform load flow",
"tags": [],
}
mock_flow_run_json: Dict = {
"id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"created": "2023-06-06T05:51:54.544266+00:00",
"updated": "2023-06-06T05:51:55.622000+00:00",
"name": "olivine-beagle",
"flow_id": "cc65498f-d950-4114-8cc1-7af9e8fdf91b",
"state_id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
"deployment_id": None,
"work_queue_name": None,
"flow_version": "3ba54dfa31a7c9af4161aa4cd020a527",
"parameters": {},
"idempotency_key": None,
"context": {},
"empirical_policy": {
"max_retries": 0,
"retry_delay_seconds": 0.0,
"retries": 0,
"retry_delay": 0,
"pause_keys": [],
"resuming": False,
},
"tags": [],
"parent_task_run_id": None,
"state_type": "COMPLETED",
"state_name": "Completed",
"run_count": 1,
"expected_start_time": "2023-06-06T05:51:54.543357+00:00",
"next_scheduled_start_time": None,
"start_time": "2023-06-06T05:51:54.750523+00:00",
"end_time": "2023-06-06T05:51:55.596446+00:00",
"total_run_time": 0.845923,
"estimated_run_time": 0.845923,
"estimated_start_time_delta": 0.207166,
"auto_scheduled": False,
"infrastructure_document_id": None,
"infrastructure_pid": None,
"created_by": None,
"work_pool_name": None,
"state": {
"id": "ca2db325-d98f-40e7-862e-449cd0cc9a6e",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.596446+00:00",
"message": "All states completed.",
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": None,
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": False,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
}
mock_graph_json: List[Dict] = [
{
"id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
"name": "Extract-0",
"upstream_dependencies": [],
"state": {
"id": "e280decd-2cc8-4428-a70f-149bcaf95b3c",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.096534+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": False,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
"expected_start_time": "2023-06-06T05:51:54.822183+00:00",
"start_time": "2023-06-06T05:51:55.016264+00:00",
"end_time": "2023-06-06T05:51:55.096534+00:00",
"total_run_time": 0.08027,
"estimated_run_time": 0.08027,
"untrackable_result": False,
},
{
"id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
"name": "Load_task-0",
"upstream_dependencies": [
{"input_type": "task_run", "id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7"}
],
"state": {
"id": "0cad13c8-84e4-4bcf-8616-c5904e10dcb4",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.535954+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "f19f83ea-316f-4781-8cbe-1d5d8719afc3",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": True,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
"expected_start_time": "2023-06-06T05:51:55.389075+00:00",
"start_time": "2023-06-06T05:51:55.461812+00:00",
"end_time": "2023-06-06T05:51:55.535954+00:00",
"total_run_time": 0.074142,
"estimated_run_time": 0.074142,
"untrackable_result": True,
},
{
"id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
"name": "transform-0",
"upstream_dependencies": [
{"input_type": "task_run", "id": "fa14a52b-d271-4c41-99cb-6b42ca7c070b"}
],
"state": {
"id": "971ad82e-6e5f-4691-abab-c900358e96c2",
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2023-06-06T05:51:55.332950+00:00",
"message": None,
"data": {"type": "unpersisted"},
"state_details": {
"flow_run_id": "c3b947e5-3fa1-4b46-a2e2-58d50c938f2e",
"task_run_id": "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7",
"child_flow_run_id": None,
"scheduled_time": None,
"cache_key": None,
"cache_expiration": None,
"untrackable_result": False,
"pause_timeout": None,
"pause_reschedule": False,
"pause_key": None,
"refresh_cache": None,
},
},
"expected_start_time": "2023-06-06T05:51:55.159416+00:00",
"start_time": "2023-06-06T05:51:55.243159+00:00",
"end_time": "2023-06-06T05:51:55.332950+00:00",
"total_run_time": 0.089791,
"estimated_run_time": 0.089791,
"untrackable_result": False,
},
]
mock_workspace_json: Dict = {
"account_id": "33e98cfe-ad06-4ceb-a500-c11148499f75",
"account_name": "shubhamjagtapgslabcom",
"account_handle": "shubhamjagtapgslabcom",
"workspace_id": "157eb822-1b3b-4338-ae80-98edd5d00cb9",
"workspace_name": "datahub",
"workspace_description": "",
"workspace_handle": "datahub",
}
async def mock_task_run_future():
extract_prefect_future: PrefectFuture = PrefectFuture(
name=mock_extract_task_run_json["name"],
key=UUID("4552629a-ac04-4590-b286-27642292739f"),
task_runner=SequentialTaskRunner(),
)
extract_prefect_future.task_run = cast(
None, TaskRun.parse_obj(mock_extract_task_run_json)
)
transform_prefect_future: PrefectFuture = PrefectFuture(
name=mock_transform_task_run_json["name"],
key=UUID("40fff3e5-5ef4-4b8b-9cc8-786f91bcc656"),
task_runner=SequentialTaskRunner(),
)
transform_prefect_future.task_run = cast(
None, TaskRun.parse_obj(mock_transform_task_run_json)
)
load_prefect_future: PrefectFuture = PrefectFuture(
name=mock_load_task_run_json["name"],
key=UUID("7565f596-9eb0-4330-ba34-963e7839883e"),
task_runner=SequentialTaskRunner(),
)
load_prefect_future.task_run = cast(
None, TaskRun.parse_obj(mock_load_task_run_json)
)
return [extract_prefect_future, transform_prefect_future, load_prefect_future]
@pytest.fixture(scope="module")
def mock_run_logger():
with patch(
"prefect_datahub.datahub_emitter.get_run_logger",
return_value=logging.getLogger(),
) as mock_logger:
yield mock_logger
@pytest.fixture(scope="module")
def mock_run_context(mock_run_logger):
task_run_ctx = MagicMock()
task_run_ctx.task.task_key = mock_transform_task_json["task_key"]
task_run_ctx.task.name = mock_transform_task_json["name"]
task_run_ctx.task.description = mock_transform_task_json["description"]
task_run_ctx.task.tags = mock_transform_task_json["tags"]
flow_run_ctx = MagicMock()
flow_run_ctx.flow.name = mock_flow_json["name"]
flow_run_ctx.flow.description = mock_flow_json["description"]
flow_run_obj = FlowRun.parse_obj(mock_flow_run_json)
flow_run_ctx.flow_run.id = flow_run_obj.id
flow_run_ctx.flow_run.name = flow_run_obj.name
flow_run_ctx.flow_run.flow_id = flow_run_obj.flow_id
flow_run_ctx.flow_run.start_time = flow_run_obj.start_time
flow_run_ctx.task_run_futures = asyncio.run(mock_task_run_future())
with patch(
"prefect_datahub.datahub_emitter.TaskRunContext"
) as mock_task_run_ctx, patch(
"prefect_datahub.datahub_emitter.FlowRunContext"
) as mock_flow_run_ctx:
mock_task_run_ctx.get.return_value = task_run_ctx
mock_flow_run_ctx.get.return_value = flow_run_ctx
yield (task_run_ctx, flow_run_ctx)
async def mock_task_run(*args, **kwargs):
task_run_id = str(kwargs["task_run_id"])
if task_run_id == "fa14a52b-d271-4c41-99cb-6b42ca7c070b":
return TaskRun.parse_obj(mock_extract_task_run_json)
elif task_run_id == "dd15ee83-5d28-4bf1-804f-f84eab9f9fb7":
return TaskRun.parse_obj(mock_transform_task_run_json)
elif task_run_id == "f19f83ea-316f-4781-8cbe-1d5d8719afc3":
return TaskRun.parse_obj(mock_load_task_run_json)
return None
async def mock_flow(*args, **kwargs):
return Flow.parse_obj(mock_flow_json)
async def mock_flow_run(*args, **kwargs):
return FlowRun.parse_obj(mock_flow_run_json)
async def mock_flow_run_graph(*args, **kwargs):
response = Response()
response.status_code = 200
response._content = json.dumps(mock_graph_json, separators=(",", ":")).encode(
"utf-8"
)
return response
async def mock_api_healthcheck(*args, **kwargs):
return None
async def mock_read_workspaces(*args, **kwargs):
return [Workspace.parse_obj(mock_workspace_json)]
@pytest.fixture(scope="module")
def mock_prefect_client():
prefect_client_mock = MagicMock()
prefect_client_mock.read_flow.side_effect = mock_flow
prefect_client_mock.read_flow_run.side_effect = mock_flow_run
prefect_client_mock.read_task_run.side_effect = mock_task_run
prefect_client_mock._client.get.side_effect = mock_flow_run_graph
with patch("prefect_datahub.datahub_emitter.orchestration") as mock_client:
mock_client.get_client.return_value = prefect_client_mock
yield prefect_client_mock
@pytest.fixture(scope="module")
def mock_prefect_cloud_client():
prefect_cloud_client_mock = MagicMock()
prefect_cloud_client_mock.api_healthcheck.side_effect = mock_api_healthcheck
prefect_cloud_client_mock.read_workspaces.side_effect = mock_read_workspaces
with patch("prefect_datahub.datahub_emitter.cloud") as mock_client, patch(
"prefect_datahub.datahub_emitter.PREFECT_API_URL.value",
return_value="https://api.prefect.cloud/api/accounts/33e98cfe-ad06-4ceb-"
"a500-c11148499f75/workspaces/157eb822-1b3b-4338-ae80-98edd5d00cb9",
):
mock_client.get_cloud_client.return_value = prefect_cloud_client_mock
yield prefect_cloud_client_mock
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
def test_entities_to_urn_list(mock_emit):
dataset_urn_list = DatahubEmitter()._entities_to_urn_list(
[Dataset("snowflake", "mydb.schema.tableA")]
)
for dataset_urn in dataset_urn_list:
assert isinstance(dataset_urn, DatasetUrn)
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
def test_get_flow_run_graph(mock_emit, mock_prefect_client):
graph_json = asyncio.run(
DatahubEmitter()._get_flow_run_graph("c3b947e5-3fa1-4b46-a2e2-58d50c938f2e")
)
assert isinstance(graph_json, list)
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
def test__get_workspace(mock_emit, mock_prefect_cloud_client):
workspace_name = DatahubEmitter()._get_workspace()
assert workspace_name == "datahub"
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
def test_add_task(mock_emit, mock_run_context):
mock_emitter = Mock()
mock_emit.return_value = mock_emitter
datahub_emitter = DatahubEmitter()
inputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableA")]
outputs: Optional[List[_Entity]] = [Dataset("snowflake", "mydb.schema.tableC")]
datahub_emitter.add_task(
inputs=inputs,
outputs=outputs,
)
task_run_ctx = mock_run_context[0]
flow_run_ctx = mock_run_context[1]
expected_datajob_urn = (
f"urn:li:dataJob:(urn:li:dataFlow:"
f"(prefect,{flow_run_ctx.flow.name},PROD),{task_run_ctx.task.task_key})"
)
assert expected_datajob_urn in datahub_emitter._datajobs_to_emit.keys()
actual_datajob = datahub_emitter._datajobs_to_emit[expected_datajob_urn]
assert isinstance(actual_datajob, DataJob)
assert str(actual_datajob.flow_urn) == "urn:li:dataFlow:(prefect,etl,PROD)"
assert actual_datajob.name == task_run_ctx.task.name
assert actual_datajob.description == task_run_ctx.task.description
assert actual_datajob.tags == task_run_ctx.task.tags
assert (
str(actual_datajob.inlets[0])
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)"
)
assert (
str(actual_datajob.outlets[0])
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
)
assert mock_emit.emit.call_count == 0
@patch("prefect_datahub.datahub_emitter.DatahubRestEmitter", autospec=True)
def test_emit_flow(
mock_emit, mock_run_context, mock_prefect_client, mock_prefect_cloud_client
):
mock_emitter = Mock()
mock_emit.return_value = mock_emitter
platform_instance = "datahub_workspace"
datahub_emitter = DatahubEmitter(platform_instance=platform_instance)
datahub_emitter.add_task()
datahub_emitter.emit_flow()
task_run_ctx = mock_run_context[0]
flow_run_ctx = mock_run_context[1]
expected_dataflow_urn = (
f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)"
)
expected_dataflow_urn = (
f"urn:li:dataFlow:(prefect,{platform_instance}.{flow_run_ctx.flow.name},PROD)"
)
# Ignore the first call (index 0) which is a connection call
# DataFlow assertions
assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo"
assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[2][1][0].aspectName == "status"
assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[3][1][0].aspectName == "ownership"
assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[4][1][0].aspectName == "globalTags"
assert mock_emitter.method_calls[4][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[5][1][0].aspectName == "browsePaths"
assert mock_emitter.method_calls[5][1][0].entityUrn == expected_dataflow_urn
# DataProcessInstance assertions for the flow
assert (
mock_emitter.method_calls[10][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[10][1][0].entityUrn
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
)
assert (
mock_emitter.method_calls[11][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[11][1][0].entityUrn
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
)
assert (
mock_emitter.method_calls[12][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[12][1][0].entityUrn
== "urn:li:dataProcessInstance:56231547bcc2781e0c14182ceab6c9ac"
)
# DataJob assertions for extract
assert mock_emitter.method_calls[13][1][0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[13][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
assert mock_emitter.method_calls[14][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[14][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
assert mock_emitter.method_calls[15][1][0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[15][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
assert mock_emitter.method_calls[16][1][0].aspectName == "ownership"
assert (
mock_emitter.method_calls[16][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
assert mock_emitter.method_calls[17][1][0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[17][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
assert mock_emitter.method_calls[18][1][0].aspectName == "browsePaths"
assert (
mock_emitter.method_calls[18][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.extract)"
)
# DataProcessInstance assertions for extract
assert (
mock_emitter.method_calls[19][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[19][1][0].entityUrn
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
)
assert (
mock_emitter.method_calls[20][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[20][1][0].entityUrn
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
)
assert (
mock_emitter.method_calls[21][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[21][1][0].entityUrn
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
)
assert (
mock_emitter.method_calls[22][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[22][1][0].entityUrn
== "urn:li:dataProcessInstance:b048ba729c1403f229a0760f8765d691"
)
# DataJob assertions for load
assert mock_emitter.method_calls[23][1][0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[23][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
assert mock_emitter.method_calls[24][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[24][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
assert mock_emitter.method_calls[25][1][0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[25][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
assert mock_emitter.method_calls[26][1][0].aspectName == "ownership"
assert (
mock_emitter.method_calls[26][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
assert mock_emitter.method_calls[27][1][0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[27][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
assert mock_emitter.method_calls[28][1][0].aspectName == "browsePaths"
assert (
mock_emitter.method_calls[28][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.load)"
)
# DataProcessInstance assertions for load
assert (
mock_emitter.method_calls[29][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[29][1][0].entityUrn
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
)
assert (
mock_emitter.method_calls[30][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[30][1][0].entityUrn
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
)
assert (
mock_emitter.method_calls[31][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[31][1][0].entityUrn
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
)
assert (
mock_emitter.method_calls[32][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[32][1][0].entityUrn
== "urn:li:dataProcessInstance:e7df9fe09bb4da19687b8199e5ee5038"
)
# DataJob assertions for transform
assert mock_emitter.method_calls[33][1][0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[33][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
assert mock_emitter.method_calls[34][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[34][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
assert mock_emitter.method_calls[35][1][0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[35][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
assert mock_emitter.method_calls[36][1][0].aspectName == "ownership"
assert (
mock_emitter.method_calls[36][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
assert mock_emitter.method_calls[37][1][0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[37][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
assert (
mock_emitter.method_calls[37][1][0].aspect.tags[0].tag
== f"urn:li:tag:{task_run_ctx.task.tags[0]}"
)
assert mock_emitter.method_calls[38][1][0].aspectName == "browsePaths"
assert (
mock_emitter.method_calls[38][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},__main__.transform)"
)
# DataProcessInstance assertions for transform
assert (
mock_emitter.method_calls[39][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[39][1][0].entityUrn
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
)
assert (
mock_emitter.method_calls[40][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[40][1][0].entityUrn
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
)
assert (
mock_emitter.method_calls[41][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[41][1][0].entityUrn
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
)
assert (
mock_emitter.method_calls[42][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[42][1][0].entityUrn
== "urn:li:dataProcessInstance:bfa255d4d1fba52d23a52c9de4f6d0a6"
)

View File

@ -69,6 +69,16 @@ source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
```
### (Optional) Set up your Python environment for developing on Prefect Plugin
From the repository root:
```shell
cd metadata-ingestion-modules/prefect-plugin
../../gradlew :metadata-ingestion-modules:prefect-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
```
### (Optional) Set up your Python environment for developing on GX Plugin
From the repository root:
@ -276,4 +286,4 @@ tox -- --update-golden-files
# Update golden files for a specific environment.
tox -e py310-airflow26 -- --update-golden-files
```
```

View File

@ -267,6 +267,16 @@
"logoUrl": "/assets/platforms/prefectlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:presto",
"aspect": {
"datasetNameDelimiter": ".",
"name": "prefect",
"displayName": "Prefect",
"type": "OTHERS",
"logoUrl": "/assets/platforms/prefectlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:presto",
"aspect": {

View File

@ -63,6 +63,7 @@ include 'ingestion-scheduler'
include 'metadata-ingestion-modules:airflow-plugin'
include 'metadata-ingestion-modules:gx-plugin'
include 'metadata-ingestion-modules:dagster-plugin'
include 'metadata-ingestion-modules:prefect-plugin'
include 'smoke-test'
include 'metadata-auth:auth-api'
include 'metadata-service:schema-registry-api'