mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-12 15:57:44 +00:00
parent
777c6c3081
commit
fe5618c8f1
@ -49,9 +49,7 @@ def openmetadata_airflow_lineage_example():
|
||||
"bigquery_gcp.shopify.raw_customer",
|
||||
],
|
||||
},
|
||||
outlets={
|
||||
"tables": ["bigquery_gcp.shopify.fact_order"]
|
||||
},
|
||||
outlets={"tables": ["bigquery_gcp.shopify.fact_order"]},
|
||||
)
|
||||
def generate_data():
|
||||
"""write your query to generate ETL"""
|
||||
|
||||
@ -77,7 +77,6 @@ plugins: Dict[str, Set[str]] = {
|
||||
"pyarrow~=6.0.1",
|
||||
"google-cloud-datacatalog==3.6.2",
|
||||
},
|
||||
|
||||
"bigquery-usage": {"google-cloud-logging", "cachetools"},
|
||||
"docker": {"python_on_whales==0.34.0"},
|
||||
"backup": {"boto3~=1.19.12"},
|
||||
|
||||
@ -14,7 +14,6 @@ Histogram Metric definition
|
||||
"""
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
from sqlalchemy import and_, func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
@ -23,9 +23,7 @@ from airflow.operators.bash import BashOperator
|
||||
from airflow_provider_openmetadata.lineage.openmetadata import (
|
||||
OpenMetadataLineageBackend,
|
||||
)
|
||||
from airflow_provider_openmetadata.lineage.utils import (
|
||||
get_xlets,
|
||||
)
|
||||
from airflow_provider_openmetadata.lineage.utils import get_xlets
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||
|
||||
@ -87,7 +87,9 @@ class OMetaLineageTest(TestCase):
|
||||
|
||||
cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db)
|
||||
|
||||
cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database")
|
||||
cls.db_reference = EntityReference(
|
||||
id=cls.create_db_entity.id, name="test-db", type="database"
|
||||
)
|
||||
|
||||
cls.table = CreateTableRequest(
|
||||
name="test",
|
||||
|
||||
@ -203,7 +203,9 @@ class OMetaModelTest(TestCase):
|
||||
)
|
||||
create_db_entity = self.metadata.create_or_update(data=create_db)
|
||||
|
||||
db_reference = EntityReference(id=create_db_entity.id, name="test-db-ml", type="database")
|
||||
db_reference = EntityReference(
|
||||
id=create_db_entity.id, name="test-db-ml", type="database"
|
||||
)
|
||||
|
||||
create_table1 = CreateTableRequest(
|
||||
name="test-ml",
|
||||
|
||||
@ -13,15 +13,21 @@
|
||||
OpenMetadata high-level API Pipeline test
|
||||
"""
|
||||
import uuid
|
||||
from unittest import TestCase
|
||||
from datetime import datetime
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||
from metadata.generated.schema.api.services.createPipelineService import (
|
||||
CreatePipelineServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus, StatusType, Task, TaskStatus
|
||||
from metadata.generated.schema.entity.data.pipeline import (
|
||||
Pipeline,
|
||||
PipelineStatus,
|
||||
StatusType,
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.pipelineService import (
|
||||
PipelineService,
|
||||
PipelineServiceType,
|
||||
@ -209,7 +215,7 @@ class OMetaPipelineTest(TestCase):
|
||||
tasks=[
|
||||
Task(name="task1"),
|
||||
Task(name="task2"),
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
pipeline = self.metadata.create_or_update(data=create_pipeline)
|
||||
@ -222,8 +228,8 @@ class OMetaPipelineTest(TestCase):
|
||||
executionStatus=StatusType.Successful,
|
||||
taskStatus=[
|
||||
TaskStatus(name="task1", executionStatus=StatusType.Successful),
|
||||
]
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
# We get a list of status
|
||||
@ -239,8 +245,8 @@ class OMetaPipelineTest(TestCase):
|
||||
taskStatus=[
|
||||
TaskStatus(name="task1", executionStatus=StatusType.Successful),
|
||||
TaskStatus(name="task2", executionStatus=StatusType.Successful),
|
||||
]
|
||||
)
|
||||
],
|
||||
),
|
||||
)
|
||||
|
||||
assert updated.pipelineStatus[0].executionDate.__root__ == execution_ts
|
||||
@ -260,27 +266,30 @@ class OMetaPipelineTest(TestCase):
|
||||
tasks=[
|
||||
Task(name="task1"),
|
||||
Task(name="task2"),
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
pipeline = self.metadata.create_or_update(data=create_pipeline)
|
||||
|
||||
# Add new tasks
|
||||
updated_pipeline = self.metadata.add_task_to_pipeline(
|
||||
pipeline, Task(name="task3"),
|
||||
pipeline,
|
||||
Task(name="task3"),
|
||||
)
|
||||
|
||||
assert len(updated_pipeline.tasks) == 3
|
||||
|
||||
# Update a task already added
|
||||
updated_pipeline = self.metadata.add_task_to_pipeline(
|
||||
pipeline, Task(name="task3", displayName="TaskDisplay"),
|
||||
pipeline,
|
||||
Task(name="task3", displayName="TaskDisplay"),
|
||||
)
|
||||
|
||||
assert len(updated_pipeline.tasks) == 3
|
||||
assert next(
|
||||
iter(
|
||||
task for task in updated_pipeline.tasks
|
||||
task
|
||||
for task in updated_pipeline.tasks
|
||||
if task.displayName == "TaskDisplay"
|
||||
)
|
||||
)
|
||||
@ -290,9 +299,7 @@ class OMetaPipelineTest(TestCase):
|
||||
Task(name="task3"),
|
||||
Task(name="task4"),
|
||||
]
|
||||
updated_pipeline = self.metadata.add_task_to_pipeline(
|
||||
pipeline, *new_tasks
|
||||
)
|
||||
updated_pipeline = self.metadata.add_task_to_pipeline(pipeline, *new_tasks)
|
||||
|
||||
assert len(updated_pipeline.tasks) == 4
|
||||
|
||||
@ -307,7 +314,8 @@ class OMetaPipelineTest(TestCase):
|
||||
pipeline = self.metadata.create_or_update(data=self.create)
|
||||
|
||||
updated_pipeline = self.metadata.add_task_to_pipeline(
|
||||
pipeline, Task(name="task", displayName="TaskDisplay"),
|
||||
pipeline,
|
||||
Task(name="task", displayName="TaskDisplay"),
|
||||
)
|
||||
|
||||
assert len(updated_pipeline.tasks) == 1
|
||||
@ -326,17 +334,13 @@ class OMetaPipelineTest(TestCase):
|
||||
Task(name="task2"),
|
||||
Task(name="task3"),
|
||||
Task(name="task4"),
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
pipeline = self.metadata.create_or_update(data=create_pipeline)
|
||||
|
||||
updated_pipeline = self.metadata.clean_pipeline_tasks(
|
||||
pipeline=pipeline,
|
||||
tasks=[
|
||||
Task(name="task3"),
|
||||
Task(name="task4")
|
||||
]
|
||||
pipeline=pipeline, tasks=[Task(name="task3"), Task(name="task4")]
|
||||
)
|
||||
|
||||
assert len(updated_pipeline.tasks) == 2
|
||||
|
||||
@ -85,7 +85,9 @@ class OMetaTableTest(TestCase):
|
||||
|
||||
cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db)
|
||||
|
||||
cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database")
|
||||
cls.db_reference = EntityReference(
|
||||
id=cls.create_db_entity.id, name="test-db", type="database"
|
||||
)
|
||||
|
||||
cls.entity = Table(
|
||||
id=uuid.uuid4(),
|
||||
|
||||
@ -7,7 +7,9 @@ import unittest
|
||||
from unittest import TestCase
|
||||
|
||||
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
|
||||
from metadata.generated.schema.api.tags.createTagCategory import CreateTagCategoryRequest
|
||||
from metadata.generated.schema.api.tags.createTagCategory import (
|
||||
CreateTagCategoryRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.tags.tagCategory import Tag, TagCategory
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
@ -130,7 +132,9 @@ class OMetaTagMixinPut(TestCase):
|
||||
"""Test put tag category"""
|
||||
|
||||
rand_name = random.getrandbits(64)
|
||||
updated_primary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}")
|
||||
updated_primary_tag = CreateTagRequest(
|
||||
description="test tag", name=f"{rand_name}"
|
||||
)
|
||||
|
||||
self.metadata.update_primary_tag(
|
||||
CATEGORY_NAME, PRIMARY_TAG_NAME, updated_primary_tag
|
||||
@ -142,7 +146,9 @@ class OMetaTagMixinPut(TestCase):
|
||||
"""Test put tag category"""
|
||||
|
||||
rand_name = random.getrandbits(64)
|
||||
updated_secondary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}")
|
||||
updated_secondary_tag = CreateTagRequest(
|
||||
description="test tag", name=f"{rand_name}"
|
||||
)
|
||||
|
||||
self.metadata.update_secondary_tag(
|
||||
CATEGORY_NAME, PRIMARY_TAG_NAME, SECONDARY_TAG_NAME, updated_secondary_tag
|
||||
|
||||
@ -20,23 +20,17 @@ from copy import deepcopy
|
||||
from unittest import TestCase
|
||||
|
||||
import pytest
|
||||
from metadata.config.common import WorkflowExecutionError
|
||||
|
||||
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
from sqlalchemy import Column, Integer, String, create_engine
|
||||
from sqlalchemy.orm import declarative_base
|
||||
|
||||
from metadata.config.common import WorkflowExecutionError
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.ingestion.api.workflow import Workflow
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
|
||||
from metadata.orm_profiler.engines import create_and_bind_session
|
||||
|
||||
|
||||
sqlite_shared = "file:cachedb?mode=memory&cache=shared"
|
||||
|
||||
ingestion_config = {
|
||||
@ -44,8 +38,8 @@ ingestion_config = {
|
||||
"type": "sqlite",
|
||||
"config": {
|
||||
"service_name": "test_sqlite",
|
||||
"database": sqlite_shared # We need this to share the session
|
||||
}
|
||||
"database": sqlite_shared, # We need this to share the session
|
||||
},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"metadata_server": {
|
||||
@ -74,7 +68,9 @@ class ProfilerWorkflowTest(TestCase):
|
||||
Run the end to end workflow and validate
|
||||
"""
|
||||
|
||||
engine = create_engine(f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True)
|
||||
engine = create_engine(
|
||||
f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True
|
||||
)
|
||||
session = create_and_bind_session(engine)
|
||||
|
||||
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
|
||||
@ -106,7 +102,9 @@ class ProfilerWorkflowTest(TestCase):
|
||||
Validate that the ingestion ran correctly
|
||||
"""
|
||||
|
||||
table_entity: Table = self.metadata.get_by_name(entity=Table, fqdn="test_sqlite.main.users")
|
||||
table_entity: Table = self.metadata.get_by_name(
|
||||
entity=Table, fqdn="test_sqlite.main.users"
|
||||
)
|
||||
assert table_entity.fullyQualifiedName == "test_sqlite.main.users"
|
||||
|
||||
def test_profiler_workflow(self):
|
||||
@ -139,18 +137,18 @@ class ProfilerWorkflowTest(TestCase):
|
||||
{
|
||||
"name": "check name count",
|
||||
"column": "name",
|
||||
"expression": "count < 10"
|
||||
"expression": "count < 10",
|
||||
},
|
||||
{
|
||||
"name": "check null count",
|
||||
"column": "nickname",
|
||||
"expression": "null_count == 0"
|
||||
}
|
||||
]
|
||||
"expression": "null_count == 0",
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
profiler_workflow = ProfilerWorkflow.create(workflow_config)
|
||||
@ -158,7 +156,9 @@ class ProfilerWorkflowTest(TestCase):
|
||||
status = profiler_workflow.print_status()
|
||||
profiler_workflow.stop()
|
||||
|
||||
assert status == 1 # We have a test error, so we get a failure with exit status 1
|
||||
assert (
|
||||
status == 1
|
||||
) # We have a test error, so we get a failure with exit status 1
|
||||
|
||||
with pytest.raises(WorkflowExecutionError):
|
||||
profiler_workflow.raise_from_status()
|
||||
|
||||
@ -76,7 +76,9 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]):
|
||||
Column(name="id", dataType="INT", dataLength=1),
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
|
||||
db_ref = EntityReference(
|
||||
id=databases[0].id, name=databases[0].name.__root__, type="database"
|
||||
)
|
||||
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
|
||||
@ -64,7 +64,9 @@ def create_delete_table(client):
|
||||
Column(name="id", columnDataType="INT"),
|
||||
Column(name="name", columnDataType="VARCHAR"),
|
||||
]
|
||||
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
|
||||
db_ref = EntityReference(
|
||||
id=databases[0].id, name=databases[0].name.__root__, type="database"
|
||||
)
|
||||
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update_table(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
|
||||
@ -45,7 +45,9 @@ def create_delete_table(client: OpenMetadata):
|
||||
Column(name="id", dataType="INT", dataLength=1),
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
|
||||
db_ref = EntityReference(
|
||||
id=databases[0].id, name=databases[0].name.__root__, type="database"
|
||||
)
|
||||
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
|
||||
@ -87,10 +87,12 @@ def test_create_table_service(catalog_service):
|
||||
service=EntityReference(id=postgres_dbservice.id, type="databaseService"),
|
||||
)
|
||||
created_database = client.create_database(create_database_request)
|
||||
db_ref = EntityReference(id=created_database.id.__root__, name=created_database.name.__root__, type="database")
|
||||
table = CreateTableRequest(
|
||||
name=table_name, columns=columns, database=db_ref
|
||||
db_ref = EntityReference(
|
||||
id=created_database.id.__root__,
|
||||
name=created_database.name.__root__,
|
||||
type="database",
|
||||
)
|
||||
table = CreateTableRequest(name=table_name, columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update_table(table)
|
||||
if created_database and created_table:
|
||||
assert 1
|
||||
|
||||
@ -81,10 +81,10 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]):
|
||||
Column(name="name", dataType="VARCHAR", dataLength=1),
|
||||
]
|
||||
print(databases[0])
|
||||
db_ref = EntityReference(id=databases[0].id.__root__, name=databases[0].name.__root__, type="database")
|
||||
table = CreateTableRequest(
|
||||
name="test1", columns=columns, database=db_ref
|
||||
db_ref = EntityReference(
|
||||
id=databases[0].id.__root__, name=databases[0].name.__root__, type="database"
|
||||
)
|
||||
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
|
||||
created_table = client.create_or_update(table)
|
||||
if table.name.__root__ == created_table.name.__root__:
|
||||
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
|
||||
|
||||
@ -221,7 +221,8 @@ class MetricsTest(TestCase):
|
||||
|
||||
assert res.get(User.age.name)[Metrics.HISTOGRAM.name]
|
||||
assert (
|
||||
len(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"]) == 2 # Too little values
|
||||
len(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"])
|
||||
== 2 # Too little values
|
||||
)
|
||||
|
||||
def test_like_count(self):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user