Fix #4235 - Run data profiler workflows from Airflow REST (#4325)

Fix #4235 - Run data profiler workflows from Airflow REST (#4325)
This commit is contained in:
Pere Miquel Brull 2022-04-21 17:53:29 +02:00 committed by GitHub
parent e19bb85202
commit 2444d3de3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 507 additions and 109 deletions

View File

@ -29,6 +29,11 @@
"description": "Supports Usage Extraction.",
"type": "boolean",
"default": true
},
"supportsProfiler": {
"description": "Supports Profiler",
"type": "boolean",
"default": true
}
}
}

View File

@ -57,6 +57,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false

View File

@ -60,6 +60,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -87,6 +87,9 @@
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -62,6 +62,9 @@
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -60,6 +60,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -39,6 +39,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -53,6 +53,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -61,6 +61,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,12 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -55,6 +55,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -70,6 +70,9 @@
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -60,6 +60,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false

View File

@ -67,6 +67,9 @@
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -58,6 +58,9 @@
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsProfiler": {
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,

View File

@ -9,7 +9,7 @@
"description": "Type of Pipeline - metadata, usage",
"type": "string",
"javaType": "org.openmetadata.catalog.entity.services.ingestionPipelines.PipelineType",
"enum": ["metadata", "usage"]
"enum": ["metadata", "usage", "profiler"]
},
"pipelineStatus": {
"type": "object",
@ -170,10 +170,6 @@
"description": "Indicates if the workflow has been successfully deployed to Airflow.",
"type": "boolean"
},
"nextExecutionDate": {
"description": "Next execution date from the underlying pipeline platform once the pipeline scheduled.",
"$ref": "../../../type/basic.json#/definitions/date"
},
"href": {
"description": "Link to this ingestion pipeline resource.",
"$ref": "../../../type/basic.json#/definitions/href"

View File

@ -3,7 +3,22 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DashboardServiceMetadataPipeline",
"description": "DashboardService Metadata Pipeline Configuration.",
"definitions": {
"dashboardMetadataConfigType": {
"description": "Dashboard Source Config Metadata Pipeline type",
"type": "string",
"enum": [
"DashboardMetadata"
],
"default": "DashboardMetadata"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/dashboardMetadataConfigType",
"default": "DashboardMetadata"
},
"dashboardFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"

View File

@ -4,6 +4,14 @@
"title": "DatabaseServiceMetadataPipeline",
"description": "DatabaseService Metadata Pipeline Configuration.",
"definitions": {
"databaseMetadataConfigType": {
"description": "Database Source Config Metadata Pipeline type",
"type": "string",
"enum": [
"DatabaseMetadata"
],
"default": "DatabaseMetadata"
},
"localHttpDBTConfig": {
"description": "Local and HTTP DBT configs.",
"type": "object",
@ -22,6 +30,11 @@
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/databaseMetadataConfigType",
"default": "DatabaseMetadata"
},
"markDeletedTables": {
"description": "Optional configuration to soft delete tables in OpenMetadata if the source tables are deleted.",
"type": "boolean",

View File

@ -0,0 +1,26 @@
{
"$id": "https://open-metadata.org/schema/entity/services/ingestionPipelines/databaseServiceProfilerPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DatabaseServiceProfilerPipeline",
"description": "DatabaseService Profiler Pipeline Configuration.",
"definitions": {
"profilerConfigType": {
"description": "Profiler Source Config Pipeline type",
"type": "string",
"enum": ["Profiler"],
"default": "Profiler"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/profilerConfigType",
"default": "Profiler"
},
"fqnFilterPattern": {
"description": "Regex to only fetch tables with FQN matching the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false
}

View File

@ -3,7 +3,22 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DatabaseServiceQueryUsagePipeline",
"description": "DatabaseService Query Usage Pipeline Configuration.",
"definitions": {
"databaseUsageConfigType": {
"description": "Database Source Config Usage Pipeline type",
"type": "string",
"enum": [
"DatabaseUsage"
],
"default": "DatabaseUsage"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/databaseUsageConfigType",
"default": "DatabaseUsage"
},
"queryLogDuration": {
"description": "Configuration to tune how far we want to look back in query logs to process usage data.",
"type": "integer",

View File

@ -3,8 +3,22 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MessagingServiceMetadataPipeline",
"description": "MessagingService Metadata Pipeline Configuration.",
"definitions": {},
"definitions": {
"messagingMetadataConfigType": {
"description": "Messaging Source Config Metadata Pipeline type",
"type": "string",
"enum": [
"MessagingMetadata"
],
"default": "MessagingMetadata"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/messagingMetadataConfigType",
"default": "MessagingMetadata"
},
"topicFilterPattern": {
"description": "Regex to only fetch topics that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"

View File

@ -23,6 +23,9 @@
},
{
"$ref": "messagingServiceMetadataPipeline.json"
},
{
"$ref": "databaseServiceProfilerPipeline.json"
}
]
}

View File

@ -23,27 +23,26 @@ import click
from pydantic import ValidationError
from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_ingestion_source, get_processor, get_sink
from metadata.config.workflow import get_processor, get_sink
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source import SQLSource, SQLSourceStatus
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.utils import logger
from metadata.utils.engines import create_and_bind_session
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.engines import create_and_bind_session, get_engine, test_connection
from metadata.utils.filters import filter_by_fqn
logger = logger()
@ -54,7 +53,6 @@ class ProfilerWorkflow:
"""
config: OpenMetadataWorkflowConfig
source: Source
processor: Processor
sink: Sink
metadata: OpenMetadata
@ -65,20 +63,13 @@ class ProfilerWorkflow:
self.config.workflowConfig.openMetadataServerConfig
)
# We will use the existing sources to build the Engine
self.source: Source = get_ingestion_source(
source_type=self.config.source.type,
source_config=self.config.source,
metadata_config=self.metadata_config,
)
if not isinstance(self.source, SQLSource):
raise ValueError(
f"Invalid source type for {self.source}. We only support SQLSource in the Profiler"
)
# Prepare the connection to the source service
# We don't need the whole Source class, as it is the OM Server
engine = get_engine(self.config.source.serviceConnection.__root__.config)
test_connection(engine)
# Init and type the source config
self.source_config: DatabaseServiceMetadataPipeline = (
self.source_config: DatabaseServiceProfilerPipeline = (
self.config.source.sourceConfig.config
)
self.source_status = SQLSourceStatus()
@ -89,7 +80,7 @@ class ProfilerWorkflow:
metadata_config=self.metadata_config,
_from="orm_profiler",
# Pass the session as kwargs for the profiler
session=create_and_bind_session(self.source.engine),
session=create_and_bind_session(engine),
)
if self.config.sink:
@ -124,23 +115,12 @@ class ProfilerWorkflow:
"""
for table in tables:
# Validate schema
if filter_by_schema(
schema_filter_pattern=self.source_config.schemaFilterPattern,
schema_name=table.databaseSchema.name,
if filter_by_fqn(
fqn_filter_pattern=self.source_config.fqnFilterPattern,
fqn=table.fullyQualifiedName.__root__,
):
self.source_status.filter(
table.databaseSchema.name, "Schema pattern not allowed"
)
continue
# Validate database
if filter_by_table(
table_filter_pattern=self.source_config.tableFilterPattern,
table_name=str(table.name.__root__),
):
self.source_status.filter(
table.name.__root__, "Table name pattern not allowed"
table.fullyQualifiedName.__root__, "Schema pattern not allowed"
)
continue

View File

@ -10,7 +10,10 @@
# limitations under the License.
"""
Helper that implements table and filter pattern logic
Helper that implements table and filter pattern logic.
Most of these methods are applying the same logic,
but assigning specific names helps better follow the
code.
"""
import re
from typing import List, Optional
@ -131,3 +134,16 @@ def filter_by_dashboard(
:return: True for filtering, False otherwise
"""
return _filter(dashboard_filter_pattern, dashboard_name)
def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Include takes precedence over exclude
:param fqn_filter_pattern: Model defining FQN filtering logic
:param fqn: table FQN name
:return: True for filtering, False otherwise
"""
return _filter(fqn_filter_pattern, fqn)

View File

@ -24,8 +24,8 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween,
@ -47,7 +47,7 @@ config = {
"type": "sqlite",
"serviceName": "my_service",
"serviceConnection": {"config": {"type": "SQLite"}},
"sourceConfig": {"config": {}},
"sourceConfig": {"config": {"type": "Profiler"}},
},
"processor": {"type": "orm-profiler", "config": {}},
"sink": {"type": "metadata-rest", "config": {}},
@ -66,7 +66,7 @@ def test_init_workflow():
"""
We can initialise the workflow from a config
"""
assert isinstance(workflow.source_config, DatabaseServiceMetadataPipeline)
assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
assert isinstance(workflow.processor, OrmProfilerProcessor)
@ -116,44 +116,29 @@ def test_filter_entities():
assert len(list(workflow.filter_entities(all_tables))) == 3
# We can exclude based on the schema name
exclude_filter_schema_config = deepcopy(config)
exclude_filter_schema_config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = {"excludes": ["one_schema"]}
exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"excludes": ["service*"]
}
exclude_filter_schema_workflow = ProfilerWorkflow.create(
exclude_filter_schema_config
)
assert len(list(exclude_filter_schema_workflow.filter_entities(all_tables))) == 1
exclude_workflow = ProfilerWorkflow.create(exclude_config)
assert len(list(exclude_workflow.filter_entities(all_tables))) == 0
# We can include based on the schema name
include_filter_schema_config = deepcopy(config)
include_filter_schema_config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = {"includes": ["another_schema"]}
exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"excludes": ["service.db.another*"]
}
include_filter_schema_workflow = ProfilerWorkflow.create(
include_filter_schema_config
)
assert len(list(include_filter_schema_workflow.filter_entities(all_tables))) == 1
exclude_workflow = ProfilerWorkflow.create(exclude_config)
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
# We can exclude based on the table name
exclude_filter_table_config = deepcopy(config)
exclude_filter_table_config["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = {"excludes": ["tab*"]}
include_config = deepcopy(config)
include_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"includes": ["service*"]
}
exclude_filter_table_workflow = ProfilerWorkflow.create(exclude_filter_table_config)
assert len(list(exclude_filter_table_workflow.filter_entities(all_tables))) == 0
# We can include based on the table name
include_filter_table_config = deepcopy(config)
include_filter_table_config["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = {"includes": ["table1"]}
include_filter_table_workflow = ProfilerWorkflow.create(include_filter_table_config)
assert len(list(include_filter_table_workflow.filter_entities(all_tables))) == 1
include_workflow = ProfilerWorkflow.create(include_config)
assert len(list(include_workflow.filter_entities(all_tables))) == 3
def test_profile_def():

View File

@ -13,12 +13,13 @@ Metadata DAG common functions
"""
import json
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional
from airflow import DAG
from metadata.generated.schema.type import basic
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
try:
from airflow.operators.python import PythonOperator
@ -43,7 +44,7 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
Task that creates and runs the ingestion workflow.
The workflow_config gets cooked form the incoming
airflow_pipeline.
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
@ -56,6 +57,24 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
workflow.stop()
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
"""
Task that creates and runs the profiler workflow.
The workflow_config gets cooked form the incoming
ingestionPipeline.
This is the callable used to create the PythonOperator
"""
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
def date_to_datetime(
date: Optional[basic.Date], date_format: str = "%Y-%m-%d"
) -> Optional[datetime]:
@ -107,10 +126,11 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
}
def build_ingestion_dag(
def build_dag(
task_name: str,
ingestion_pipeline: IngestionPipeline,
workflow_config: OpenMetadataWorkflowConfig,
workflow_fn: Callable,
) -> DAG:
"""
Build a simple metadata workflow DAG
@ -120,7 +140,7 @@ def build_ingestion_dag(
PythonOperator(
task_id=task_name,
python_callable=metadata_ingestion_workflow,
python_callable=workflow_fn,
op_kwargs={"workflow_config": workflow_config},
retries=ingestion_pipeline.airflowConfig.retries,
retry_delay=ingestion_pipeline.airflowConfig.retryDelay,

View File

@ -13,7 +13,10 @@ Metadata DAG function builder
"""
from airflow import DAG
from openmetadata.workflows.ingestion.common import build_ingestion_dag
from openmetadata.workflows.ingestion.common import (
build_dag,
metadata_ingestion_workflow,
)
try:
from airflow.operators.python import PythonOperator
@ -55,10 +58,11 @@ def build_metadata_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
Build a simple metadata workflow DAG
"""
workflow_config = build_metadata_workflow_config(ingestion_pipeline)
dag = build_ingestion_dag(
dag = build_dag(
task_name="ingestion_task",
ingestion_pipeline=ingestion_pipeline,
workflow_config=workflow_config,
workflow_fn=metadata_ingestion_workflow,
)
return dag

View File

@ -0,0 +1,70 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Profiler DAG function builder
"""
from airflow import DAG
from openmetadata.workflows.ingestion.common import build_dag, profiler_workflow
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Processor,
Sink,
WorkflowConfig,
)
def build_profiler_workflow_config(
ingestion_pipeline: IngestionPipeline,
) -> OpenMetadataWorkflowConfig:
"""
Given an airflow_pipeline, prepare the workflow config JSON
"""
workflow_config = OpenMetadataWorkflowConfig(
source=ingestion_pipeline.source,
sink=Sink(
type="metadata-rest",
config={},
),
processor=Processor(
type="orm-profiler",
config={},
),
workflowConfig=WorkflowConfig(
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection
),
)
return workflow_config
def build_profiler_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
"""
Build a simple metadata workflow DAG
"""
workflow_config = build_profiler_workflow_config(ingestion_pipeline)
dag = build_dag(
task_name="profiler_task",
ingestion_pipeline=ingestion_pipeline,
workflow_config=workflow_config,
workflow_fn=profiler_workflow,
)
return dag

View File

@ -14,35 +14,18 @@ DAG builder registry.
Add a function for each type from PipelineType
"""
from collections import namedtuple
from openmetadata.workflows.ingestion.metadata import build_metadata_dag
from openmetadata.workflows.ingestion.profiler import build_profiler_dag
from openmetadata.workflows.ingestion.usage import build_usage_dag
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.dispatch import enum_register
def register():
"""
Helps us register custom functions for rendering
"""
registry = dict()
def add(name: str = None):
def inner(fn):
_name = fn.__name__ if not name else name
registry[_name] = fn
return fn
return inner
Register = namedtuple("Register", ["add", "registry"])
return Register(add, registry)
build_registry = register()
build_registry = enum_register()
build_registry.add(PipelineType.metadata.value)(build_metadata_dag)
build_registry.add(PipelineType.usage.value)(build_usage_dag)
build_registry.add(PipelineType.profiler.value)(build_profiler_dag)

View File

@ -14,7 +14,10 @@ Metadata DAG function builder
from airflow import DAG
from openmetadata.workflows.ingestion.common import build_ingestion_dag
from openmetadata.workflows.ingestion.common import (
build_dag,
metadata_ingestion_workflow,
)
from metadata.generated.schema.metadataIngestion.workflow import (
BulkSink,
@ -65,10 +68,11 @@ def build_usage_dag(airflow_pipeline: IngestionPipeline) -> DAG:
Build a simple metadata workflow DAG
"""
workflow_config = build_usage_workflow_config(airflow_pipeline)
dag = build_ingestion_dag(
dag = build_dag(
task_name="usage_task",
ingestion_pipeline=airflow_pipeline,
workflow_config=workflow_config,
workflow_fn=metadata_ingestion_workflow,
)
return dag

View File

@ -0,0 +1,174 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate metadata ingestion workflow generation
"""
import json
import uuid
from unittest import TestCase
from openmetadata.workflows.ingestion.metadata import build_metadata_workflow_config
from openmetadata.workflows.ingestion.profiler import build_profiler_workflow_config
from openmetadata.workflows.ingestion.usage import build_usage_workflow_config
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
AirflowConfig,
IngestionPipeline,
PipelineType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
class OMetaServiceTest(TestCase):
"""
Run this integration test with the local API available
Install the ingestion package before running the tests
"""
service_entity_id = None
server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
assert metadata.health_check()
data = {
"type": "mysql",
"serviceName": "local_mysql",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"password": "openmetadata_password",
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
}
workflow_source = WorkflowSource(**data)
@classmethod
def setUpClass(cls) -> None:
"""
Prepare ingredients.
Mock a db service to build the IngestionPipeline
"""
service: DatabaseService = cls.metadata.get_service_or_create(
entity=DatabaseService, config=cls.workflow_source
)
cls.service_entity_id = service.id
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
cls.metadata.delete(
entity=DatabaseService,
entity_id=cls.service_entity_id,
recursive=True,
hard_delete=True,
)
def test_ingestion_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
and properly load a Workflow
"""
ingestion_pipeline = IngestionPipeline(
id=uuid.uuid4(),
name="test_ingestion_workflow",
pipelineType=PipelineType.metadata,
fullyQualifiedName="local_mysql.test_ingestion_workflow",
source=self.workflow_source,
openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig(
startDate="2022-04-10",
),
service=EntityReference(
id=self.service_entity_id,
type="databaseService",
),
)
workflow_config = build_metadata_workflow_config(ingestion_pipeline)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
Workflow.create(config)
def test_usage_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
and properly load a usage Workflow
"""
ingestion_pipeline = IngestionPipeline(
id=uuid.uuid4(),
name="test_ingestion_workflow",
pipelineType=PipelineType.usage,
fullyQualifiedName="local_mysql.test_usage_workflow",
source=self.workflow_source,
openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig(
startDate="2022-04-10",
),
service=EntityReference(
id=self.service_entity_id,
type="databaseService",
),
)
workflow_config = build_usage_workflow_config(ingestion_pipeline)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
Workflow.create(config)
def test_profiler_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
and properly load a Profiler Workflow
"""
ingestion_pipeline = IngestionPipeline(
id=uuid.uuid4(),
name="test_profiler_workflow",
pipelineType=PipelineType.profiler,
fullyQualifiedName="local_mysql.test_profiler_workflow",
source=self.workflow_source,
openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig(
startDate="2022-04-10",
),
service=EntityReference(
id=self.service_entity_id,
type="databaseService",
),
)
workflow_config = build_profiler_workflow_config(ingestion_pipeline)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
ProfilerWorkflow.create(config)