Fix #3823 & Fix generate default Enum field (#3909)

* Fix sample data DAG

* Fix callback imports

* Use --set-default-enum-member in generate

* Format

* Add faulty merge hard_delete

* Fix airflow lineage, improve naming and fix lineage tests

* Add mysql url test

* Add mysql url test

* Update CI name

* Fix test ometa endpoint

* Format

* Fix metadata config
This commit is contained in:
Pere Miquel Brull 2022-04-07 14:52:50 +02:00 committed by GitHub
parent d429f0b868
commit 93525aea0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 202 additions and 67 deletions

View File

@ -56,7 +56,7 @@ py_format_check: ## Check if Python sources are correctly formatted
generate: ## Generate the pydantic models from the JSON Schemas to the ingestion module
@echo "Running Datamodel Code Generator"
@echo "Make sure to first run the install_dev recipe"
datamodel-codegen --input catalog-rest-service/src/main/resources/json --input-file-type jsonschema --output ingestion/src/metadata/generated
datamodel-codegen --input catalog-rest-service/src/main/resources/json --input-file-type jsonschema --output ingestion/src/metadata/generated --set-default-enum-member
$(MAKE) install
## Ingestion tests & QA

View File

@ -33,7 +33,7 @@ curl --location --request PATCH 'localhost:8080/api/v1/dags/sample_data' \
--data-raw '{
"is_paused": false
}'
until curl -s -f -o /dev/null "http://localhost:8585/api/v1/tables/name/bigquery_gcp:shopify:fact_sale"; do
until curl -s -f -o /dev/null "http://localhost:8585/api/v1/tables/name/bigquery_gcp.ecommerce_db.shopify.fact_sale"; do
printf '.'
sleep 2
done

View File

@ -37,7 +37,11 @@ config = """
"source": {
"type": "sample-data",
"serviceName": "bigquery_gcp",
"serviceConnection": "",
"serviceConnection": {
"config": {
"type": "BigQuery"
}
},
"sourceConfig": {}
},
"sink": {
@ -46,14 +50,13 @@ config = """
},
"workflowConfig": {
"openMetadataServerConfig": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
},
"config": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}
"""

View File

@ -15,10 +15,7 @@ OpenMetadata Airflow Lineage Backend
import logging
from typing import TYPE_CHECKING, Dict
from airflow_provider_openmetadata.lineage.config import (
get_lineage_config,
get_metadata_config,
)
from airflow_provider_openmetadata.lineage.config.loader import get_lineage_config
from airflow_provider_openmetadata.lineage.utils import (
add_status,
get_xlets,
@ -42,8 +39,7 @@ def failure_callback(context: Dict[str, str]) -> None:
"""
try:
config = get_lineage_config()
metadata_config = get_metadata_config(config)
client = OpenMetadata(metadata_config)
metadata = OpenMetadata(config.metadata_config)
operator: "BaseOperator" = context["task"]
@ -54,13 +50,13 @@ def failure_callback(context: Dict[str, str]) -> None:
# Get the pipeline created or updated during the lineage
pipeline = parse_lineage(
config, context, operator, op_inlets, op_outlets, client
config, context, operator, op_inlets, op_outlets, metadata
)
add_status(
operator=operator,
pipeline=pipeline,
client=client,
metadata=metadata,
context=context,
)
@ -79,18 +75,17 @@ def success_callback(context: Dict[str, str]) -> None:
try:
config = get_lineage_config()
metadata_config = get_metadata_config(config)
client = OpenMetadata(metadata_config)
metadata = OpenMetadata(config.metadata_config)
operator: "BaseOperator" = context["task"]
dag: "DAG" = context["dag"]
operator.log.info("Updating pipeline status on success...")
airflow_service_entity = client.get_by_name(
airflow_service_entity = metadata.get_by_name(
entity=PipelineService, fqdn=config.airflow_service_name
)
pipeline: Pipeline = client.get_by_name(
pipeline: Pipeline = metadata.get_by_name(
entity=Pipeline,
fqdn=f"{airflow_service_entity.name}.{dag.dag_id}",
)
@ -98,7 +93,7 @@ def success_callback(context: Dict[str, str]) -> None:
add_status(
operator=operator,
pipeline=pipeline,
client=client,
metadata=metadata,
context=context,
)

View File

@ -79,12 +79,12 @@ class OpenMetadataLineageBackend(LineageBackend):
try:
config: AirflowLineageConfig = get_lineage_config()
client = OpenMetadata(config.metadata_config)
metadata = OpenMetadata(config.metadata_config)
op_inlets = get_xlets(operator, "_inlets")
op_outlets = get_xlets(operator, "_outlets")
parse_lineage(config, context, operator, op_inlets, op_outlets, client)
parse_lineage(config, context, operator, op_inlets, op_outlets, metadata)
except Exception as exc: # pylint: disable=broad-except
operator.log.error(traceback.format_exc())
operator.log.error(exc)

View File

@ -130,7 +130,7 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
operator: "BaseOperator",
dag: "DAG",
airflow_service_entity: PipelineService,
client: OpenMetadata,
metadata: OpenMetadata,
) -> Pipeline:
"""
Prepare the upsert of pipeline entity with the given task
@ -145,7 +145,7 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
:param operator: task being examined by lineage
:param dag: airflow dag
:param airflow_service_entity: PipelineService
:param client: OpenMetadata API client
:param metadata: OpenMetadata API client
:return: PipelineEntity
"""
pipeline_service_url = conf.get("webserver", "base_url")
@ -182,9 +182,9 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
operator.log.info(
f"Checking if the pipeline {airflow_service_entity.name}.{dag.dag_id} exists. If not, we will create it."
)
current_pipeline: Pipeline = client.get_by_name(
current_pipeline: Pipeline = metadata.get_by_name(
entity=Pipeline,
fqdn=f"{airflow_service_entity.name}.{dag.dag_id}",
fqdn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}",
fields=["tasks"],
)
@ -206,16 +206,16 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
owner=current_pipeline.owner if current_pipeline else None,
tags=current_pipeline.tags if current_pipeline else None,
)
pipeline = client.create_or_update(pipeline_request)
pipeline: Pipeline = metadata.create_or_update(pipeline_request)
# Add the task we are processing in the lineage backend
operator.log.info("Adding tasks to pipeline...")
updated_pipeline = client.add_task_to_pipeline(pipeline, task)
updated_pipeline = metadata.add_task_to_pipeline(pipeline, task)
# Clean pipeline
try:
operator.log.info("Cleaning pipeline tasks...")
updated_pipeline = client.clean_pipeline_tasks(updated_pipeline, dag.task_ids)
updated_pipeline = metadata.clean_pipeline_tasks(updated_pipeline, dag.task_ids)
except Exception as exc: # pylint: disable=broad-except
operator.log.warning(f"Error cleaning pipeline tasks {exc}")
@ -252,7 +252,7 @@ def get_dag_status(all_tasks: List[str], task_status: List[TaskStatus]):
def add_status(
operator: "BaseOperator",
pipeline: Pipeline,
client: OpenMetadata,
metadata: OpenMetadata,
context: Dict,
) -> None:
"""
@ -269,7 +269,7 @@ def add_status(
# Check if we already have a pipelineStatus for
# our execution_date that we should update
pipeline_status: List[PipelineStatus] = client.get_by_id(
pipeline_status: List[PipelineStatus] = metadata.get_by_id(
entity=Pipeline, entity_id=pipeline.id, fields=["pipelineStatus"]
).pipelineStatus
@ -303,7 +303,7 @@ def add_status(
)
operator.log.info(f"Added status to DAG {updated_status}")
client.add_pipeline_status(pipeline=pipeline, status=updated_status)
metadata.add_pipeline_status(pipeline=pipeline, status=updated_status)
# pylint: disable=too-many-arguments,too-many-locals
@ -313,7 +313,7 @@ def parse_lineage(
operator: "BaseOperator",
inlets: List,
outlets: List,
client: OpenMetadata,
metadata: OpenMetadata,
) -> Optional[Pipeline]:
"""
Main logic to extract properties from DAG and the
@ -325,7 +325,7 @@ def parse_lineage(
:param operator: task being executed
:param inlets: list of upstream tables
:param outlets: list of downstream tables
:param client: OpenMetadata client
:param metadata: OpenMetadata client
"""
operator.log.info("Parsing Lineage for OpenMetadata")
@ -335,19 +335,19 @@ def parse_lineage(
try:
airflow_service_entity = get_or_create_pipeline_service(
operator, client, config
operator, metadata, config
)
pipeline = create_or_update_pipeline(
task_instance=task_instance,
operator=operator,
dag=dag,
airflow_service_entity=airflow_service_entity,
client=client,
metadata=metadata,
)
operator.log.info("Parsing Lineage")
for table in inlets if inlets else []:
table_entity = client.get_by_name(entity=Table, fqdn=table)
table_entity = metadata.get_by_name(entity=Table, fqdn=table)
operator.log.debug(f"from entity {table_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
@ -356,10 +356,10 @@ def parse_lineage(
)
)
operator.log.debug(f"From lineage {lineage}")
client.add_lineage(lineage)
metadata.add_lineage(lineage)
for table in outlets if outlets else []:
table_entity = client.get_by_name(entity=Table, fqdn=table)
table_entity = metadata.get_by_name(entity=Table, fqdn=table)
operator.log.debug(f"To entity {table_entity}")
lineage = AddLineageRequest(
edge=EntitiesEdge(
@ -368,7 +368,7 @@ def parse_lineage(
)
)
operator.log.debug(f"To lineage {lineage}")
client.add_lineage(lineage)
metadata.add_lineage(lineage)
return pipeline
@ -382,19 +382,19 @@ def parse_lineage(
def get_or_create_pipeline_service(
operator: "BaseOperator", client: OpenMetadata, config: AirflowLineageConfig
operator: "BaseOperator", metadata: OpenMetadata, config: AirflowLineageConfig
) -> PipelineService:
"""
Check if we already have the airflow instance as a PipelineService,
otherwise create it.
:param operator: task from which we extract the lineage
:param client: OpenMetadata API wrapper
:param metadata: OpenMetadata API wrapper
:param config: lineage config
:return: PipelineService
"""
operator.log.info("Get Airflow Service ID")
airflow_service_entity = client.get_by_name(
airflow_service_entity = metadata.get_by_name(
entity=PipelineService, fqdn=config.airflow_service_name
)
@ -404,7 +404,7 @@ def get_or_create_pipeline_service(
serviceType=PipelineServiceType.Airflow,
pipelineUrl=conf.get("webserver", "base_url"),
)
airflow_service_entity = client.create_or_update(pipeline_service)
airflow_service_entity = metadata.create_or_update(pipeline_service)
operator.log.info("Created airflow service entity {}", airflow_service_entity)
return airflow_service_entity

View File

@ -559,6 +559,7 @@ class OpenMetadata(
entity: Type[T],
entity_id: Union[str, basic.Uuid],
recursive: bool = False,
hard_delete: bool = False,
) -> None:
"""
API call to delete an entity from entity ID
@ -570,7 +571,8 @@ class OpenMetadata(
None
"""
url = f"{self.get_suffix(entity)}/{model_str(entity_id)}"
url += f"?recursive=true" if recursive else ""
url += f"?recursive={str(recursive).lower()}"
url += f"&hardDelete={str(hard_delete).lower()}"
self.client.delete(url)
def compute_percentile(self, entity: Union[Type[T], str], date: str) -> None:

View File

@ -28,16 +28,26 @@ from airflow_provider_openmetadata.lineage.openmetadata import (
)
from airflow_provider_openmetadata.lineage.utils import get_xlets
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -47,7 +57,7 @@ class AirflowLineageTest(TestCase):
Run this test installing the necessary airflow version
"""
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
assert metadata.health_check()
@ -55,7 +65,13 @@ class AirflowLineageTest(TestCase):
service = CreateDatabaseServiceRequest(
name="test-service-table-lineage",
serviceType=DatabaseServiceType.MySQL,
databaseConnection=DatabaseConnection(hostPort="localhost"),
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
password="password",
hostPort="http://localhost:1234",
)
),
)
service_type = "databaseService"
@ -67,26 +83,36 @@ class AirflowLineageTest(TestCase):
Prepare ingredients: Table Entity + DAG
"""
cls.service_entity = cls.metadata.create_or_update(data=cls.service)
service_entity = cls.metadata.create_or_update(data=cls.service)
cls.create_db = CreateDatabaseRequest(
create_db = CreateDatabaseRequest(
name="test-db",
service=EntityReference(id=cls.service_entity.id, type="databaseService"),
service=EntityReference(id=service_entity.id, type="databaseService"),
)
cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db)
create_db_entity = cls.metadata.create_or_update(data=create_db)
cls.db_reference = EntityReference(
id=cls.create_db_entity.id, name="test-db", type="database"
db_reference = EntityReference(
id=create_db_entity.id, name="test-db", type="database"
)
cls.create = CreateTableRequest(
create_schema = CreateDatabaseSchemaRequest(
name="test-schema", database=db_reference
)
create_schema_entity = cls.metadata.create_or_update(data=create_schema)
schema_reference = EntityReference(
id=create_schema_entity.id, name="test-schema", type="databaseSchema"
)
create = CreateTableRequest(
name="lineage-test",
database=cls.db_reference,
databaseSchema=schema_reference,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
cls.table = cls.metadata.create_or_update(data=cls.create)
cls.table = cls.metadata.create_or_update(data=create)
with DAG(
"lineage",
@ -98,13 +124,21 @@ class AirflowLineageTest(TestCase):
t1 = BashOperator( # Using BashOperator as a random example
task_id="task1",
bash_command="date",
outlets={"tables": ["test-service-table-lineage:test-db:lineage-test"]},
outlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test"
]
},
)
t2 = BashOperator( # Using BashOperator as a random example
task_id="task2",
bash_command="sleep 5",
inlets={"tables": ["test-service-table-lineage:test-db:lineage-test"]},
inlets={
"tables": [
"test-service-table-lineage.test-db.test-schema.lineage-test"
]
},
)
t3 = BashOperator(
@ -116,6 +150,25 @@ class AirflowLineageTest(TestCase):
cls.dag = dag
@classmethod
def tearDownClass(cls) -> None:
"""
Clean up
"""
service_id = str(
cls.metadata.get_by_name(
entity=DatabaseService, fqdn="test-service-table-lineage"
).id.__root__
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
def test_xlets(self):
"""
Verify that we can extract inlets and outlets
@ -123,12 +176,12 @@ class AirflowLineageTest(TestCase):
self.assertIsNone(get_xlets(self.dag.get_task("task1"), "_inlets"))
self.assertEqual(
["test-service-table-lineage:test-db:lineage-test"],
["test-service-table-lineage.test-db.test-schema.lineage-test"],
get_xlets(self.dag.get_task("task1"), "_outlets"),
)
self.assertEqual(
["test-service-table-lineage:test-db:lineage-test"],
["test-service-table-lineage.test-db.test-schema.lineage-test"],
get_xlets(self.dag.get_task("task2"), "_inlets"),
)
self.assertIsNone(get_xlets(self.dag.get_task("task2"), "_outlets"))
@ -158,19 +211,24 @@ class AirflowLineageTest(TestCase):
)
self.assertIsNotNone(
self.metadata.get_by_name(entity=Pipeline, fqdn="local_airflow_3:lineage")
self.metadata.get_by_name(entity=Pipeline, fqdn="local_airflow_3.lineage")
)
lineage = self.metadata.get_lineage_by_name(
entity=Pipeline, fqdn="local_airflow_3:lineage"
entity=Pipeline, fqdn="local_airflow_3.lineage"
)
print(lineage)
nodes = {node["id"] for node in lineage["nodes"]}
self.assertIn(str(self.table.id.__root__), nodes)
def test_lineage_task_group(self):
"""
Test end to end for task groups
Test end to end for task groups.
Run the lineage execution mimicking
the execution of three tasks
"""
with DAG(
@ -210,8 +268,42 @@ class AirflowLineageTest(TestCase):
},
)
self.backend.send_lineage(
operator=dag.get_task("group1.task2"),
context={
"dag": dag,
"task": dag.get_task("group1.task2"),
"task_instance": TaskInstance(
task=dag.get_task("group1.task2"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},
)
self.backend.send_lineage(
operator=dag.get_task("end"),
context={
"dag": dag,
"task": dag.get_task("end"),
"task_instance": TaskInstance(
task=dag.get_task("end"),
execution_date=datetime.strptime(
"2022-03-15T08:13:45", "%Y-%m-%dT%H:%M:%S"
),
run_id="scheduled__2022-03-15T08:13:45.967068+00:00",
state="running",
),
},
)
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqdn="local_airflow_3:task_group_lineage", fields=["tasks"]
entity=Pipeline, fqdn="local_airflow_3.task_group_lineage", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("group1.task1", {task.name for task in pipeline.tasks})
self.assertIn("group1.task2", {task.name for task in pipeline.tasks})
self.assertIn("end", {task.name for task in pipeline.tasks})

View File

@ -25,6 +25,9 @@ from sqlalchemy.orm import declarative_base
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
@ -73,7 +76,7 @@ class ProfilerWorkflowTest(TestCase):
)
session = create_and_bind_session(engine)
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
@classmethod

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.metrics import Metrics
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
@ -33,6 +34,9 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -42,7 +46,7 @@ class OMetaEndpointTest(TestCase):
from the generated entity classes
"""
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
def test_entities_suffix(self):
@ -54,6 +58,7 @@ class OMetaEndpointTest(TestCase):
# Db
self.assertEqual(self.metadata.get_suffix(Database), "/databases")
self.assertEqual(self.metadata.get_suffix(DatabaseSchema), "/databaseSchemas")
self.assertEqual(self.metadata.get_suffix(Table), "/tables")
# Dashboards

View File

@ -21,6 +21,9 @@ from sklearn.tree import DecisionTreeClassifier
from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -29,7 +32,7 @@ class OMetaModelMixinTest(TestCase):
Test the MlModel integrations from MlModel Mixin
"""
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
server_config = OpenMetadataServerConfig(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
iris = datasets.load_iris()

View File

@ -0,0 +1,32 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata source URL building tests
"""
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.utils.source_connections import get_connection_url
def test_mysql_url():
"""
Validate MySQL URL building
"""
connection = MysqlConnection(
username="username",
password="password",
hostPort="localhost:1234",
)
url = get_connection_url(connection)
assert url == "mysql+pymysql://username:password@localhost:1234"