Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

217 lines
6.7 KiB
Python
Raw Normal View History

import sys
import time
from os import path
import pytest
from metadata.generated.schema.entity.data.table import Table
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseLineageConfigType,
)
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture()
def native_lineage_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": "postgres-lineage",
"serviceName": db_service.fullyQualifiedName.root,
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
"sourceConfig": {
"config": {"type": DatabaseLineageConfigType.DatabaseLineage.value}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.mark.parametrize(
"source_config,expected_nodes",
[
({"includeDDL": False}, 0),
({"includeDDL": True}, 3),
],
ids=lambda config: (
"".join([f"{k}={str(v)}" for k, v in config.items()])
if isinstance(config, dict)
else ""
),
)
def test_native_lineage(
[GEN-356] Use ServiceSpec for loading sources based on connectors (#18322) * ref(profiler): use di for system profile - use source classes that can be overridden in system profiles - use a manifest class instead of factory to specify which class to resolve for connectors - example usage can be seen in redshift and snowflake * - added manifests for all custom profilers - used super() dependency injection in order for system metrics source - formatting * - implement spec for all source types - added docs for the new specification - added some pylint ignores in the importer module * remove TYPE_CHECKING in core.py * - deleted valuedispatch function - deleted get_system_metrics_by_dialect - implemented BigQueryProfiler with a system metrics source - moved import_source_class to BaseSpec * - removed tests related to the profiler factory * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * - reverted start_time - removed DML_STAT_TO_DML_STATEMENT_MAPPING - removed unused logger * fixed tests * format * bigquery system profile e2e tests * fixed module docstring * - removed import_side_effects from redshift. we still use it in postgres for the orm conversion maps. - removed leftover methods * - tests for BaseSpec - moved get_class_path to importer * - moved constructors around to get rid of useless kwargs * - changed test_system_metric * - added linage and usage to service_spec - fixed postgres native lineage test * add comments on collaborative constructors
2024-10-24 07:47:50 +02:00
patch_passwords_for_db_services,
source_config,
expected_nodes,
run_workflow,
ingestion_config,
native_lineage_config,
metadata,
db_service,
):
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, native_lineage_config)
film_actor_edges = metadata.get_lineage_by_name(
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
)
assert len(film_actor_edges["nodes"]) == expected_nodes
run_workflow(MetadataWorkflow, native_lineage_config)
@pytest.fixture()
def log_lineage_config(db_service, metadata, workflow_config, sink_config):
return {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_log_lineage(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
log_lineage_config,
metadata,
db_service,
):
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
# TODO fix the server so that we dont need to run this
reindex_search(metadata)
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
workflow = run_workflow(
MetadataWorkflow, log_lineage_config, raise_from_status=False
)
assert len(workflow.source.status.failures) == 0
customer_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
actor_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
nullable=False,
)
staff_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(actor_table.id.root)
)
assert edge is not None
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(staff_table.id.root)
)
assert edge is not None
def reindex_search(metadata: OpenMetadata, timeout=60):
start = time.time()
# wait for previous reindexing to finish (if any)
while True:
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
if len(response["data"]) == 0:
break
if response["data"][0]["status"] != "running":
break
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to start")
time.sleep(1)
time.sleep(
0.5
) # app interactivity is not immediate (probably bc async operations), so we wait a bit
metadata.client.post("/apps/trigger/SearchIndexingApplication")
time.sleep(0.5) # here too
status = None
while status != "success":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to complete")
time.sleep(1)
@pytest.fixture()
def long_cell_query_log(tmp_path_factory):
log_file = tmp_path_factory.mktemp("data") / "large_query_log.csv"
with open(log_file, "w") as f:
f.write("query_text,database_name,schema_name\n")
f.write(
"insert into dvdrental.public.rental select {} from dvdrental.public.payment\n".format(
"first_name || '" + "a" * 100_000 + "'"
)
)
return log_file
@pytest.fixture()
def long_cell_query_file(
db_service, metadata, workflow_config, sink_config, long_cell_query_log
):
return {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": str(long_cell_query_log),
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_log_file_with_long_cell(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
long_cell_query_file,
metadata,
db_service,
):
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
# TODO fix the server so that we dont need to run this
reindex_search(metadata)
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, long_cell_query_file)
rental_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.rental",
nullable=False,
)
payment_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.payment",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(payment_table.id.root), str(rental_table.id.root)
)
assert edge is not None