mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-13 12:10:23 +00:00
409 lines
19 KiB
Python
409 lines
19 KiB
Python
import unittest
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Optional, Tuple
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from datahub.ingestion.source.hex.constants import HEX_PLATFORM_URN
|
|
from datahub.ingestion.source.hex.query_fetcher import (
|
|
HexQueryFetcher,
|
|
HexQueryFetcherReport,
|
|
QueryResponse,
|
|
)
|
|
from datahub.metadata.schema_classes import (
|
|
AuditStampClass,
|
|
QueryPropertiesClass,
|
|
QueryStatementClass,
|
|
QuerySubjectClass,
|
|
QuerySubjectsClass,
|
|
)
|
|
from datahub.metadata.urns import DatasetUrn, QueryUrn
|
|
|
|
|
|
class TestHexQueryFetcherExtractHexMetadata(unittest.TestCase):
|
|
"""Test cases for HexQueryFetcher._extract_hex_metadata method"""
|
|
|
|
def setUp(self):
|
|
self.mock_client = MagicMock()
|
|
self.workspace_name = "some-hex-workspace"
|
|
self.start_datetime = datetime(2023, 1, 1)
|
|
self.report = HexQueryFetcherReport()
|
|
self.fetcher = HexQueryFetcher(
|
|
datahub_client=self.mock_client,
|
|
workspace_name=self.workspace_name,
|
|
start_datetime=self.start_datetime,
|
|
end_datetime=self.start_datetime - timedelta(days=1),
|
|
report=self.report,
|
|
)
|
|
|
|
def test_extract_hex_metadata_with_matching_workspace(self):
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is not None
|
|
project_id, workspace_name = result
|
|
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
|
|
assert workspace_name == "some-hex-workspace"
|
|
|
|
def test_extract_hex_metadata_with_non_matching_workspace(self):
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/different-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is not None
|
|
project_id, workspace_name = result
|
|
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
|
|
assert workspace_name == "different-workspace"
|
|
|
|
def test_extract_hex_metadata_without_url_returns_none(self):
|
|
# missing project_url
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_no_metadata(self):
|
|
# no Hex metadata
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- This is a regular comment
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_non_scheduled_run(self):
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "LOGICAL_VIEW", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_missing_context(self):
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_invalid_json(self):
|
|
# invalid JSON in Hex metadata
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", INVALID_JSON}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_missing_project_id(self):
|
|
# missing project_id
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_invalid_url_format_returns_none(self):
|
|
# invalid URL format in project_url
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_url": "https://invalid-url-format/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is None
|
|
|
|
def test_extract_hex_metadata_with_custom_domain(self):
|
|
# custom domain in project_url (single-tenant deployment)
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://my-hex-instance.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is not None
|
|
project_id, workspace_name = result
|
|
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
|
|
assert workspace_name == "some-hex-workspace"
|
|
|
|
def test_extract_hex_metadata_with_http_protocol(self):
|
|
# HTTP protocol (not HTTPS)
|
|
sql = """
|
|
select *
|
|
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
|
|
limit 100
|
|
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "http://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is not None
|
|
project_id, workspace_name = result
|
|
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
|
|
assert workspace_name == "some-hex-workspace"
|
|
|
|
def test_extract_hex_metadata_with_complex_urls(self):
|
|
# complex workspace names and paths
|
|
urls_to_test = [
|
|
# URL with hyphens in workspace name
|
|
"""{"context": "SCHEDULED_RUN", "project_id": "123", "project_url": "https://app.hex.tech/my-complex-workspace-name/hex/project-id"}""",
|
|
# URL with underscores
|
|
"""{"context": "SCHEDULED_RUN", "project_id": "123", "project_url": "https://app.hex.tech/workspace_with_underscores/hex/project-id"}""",
|
|
# URL with special chars in domain
|
|
"""{"context": "SCHEDULED_RUN", "project_id": "123", "project_url": "https://my-custom-subdomain.hex.tech/some-hex-workspace/hex/project-id"}""",
|
|
# URL with long path after /hex/
|
|
"""{"context": "SCHEDULED_RUN", "project_id": "123", "project_url": "https://app.hex.tech/some-hex-workspace/hex/project-id/draft/logic?selectedCellId=67c38da0-e631"}""",
|
|
]
|
|
|
|
expected_workspaces = [
|
|
"my-complex-workspace-name",
|
|
"workspace_with_underscores",
|
|
"some-hex-workspace",
|
|
"some-hex-workspace",
|
|
]
|
|
|
|
for i, url_json in enumerate(urls_to_test):
|
|
sql = f"""
|
|
select * from table
|
|
-- Hex query metadata: {url_json}
|
|
"""
|
|
|
|
result = self.fetcher._extract_hex_metadata(sql)
|
|
assert result is not None, (
|
|
f"Failed to extract metadata from URL: {url_json}"
|
|
)
|
|
project_id, workspace_name = result
|
|
assert project_id == "123"
|
|
assert workspace_name == expected_workspaces[i], (
|
|
f"Expected workspace {expected_workspaces[i]} but got {workspace_name}"
|
|
)
|
|
|
|
|
|
class TestHexQueryFetcherFetch(unittest.TestCase):
|
|
"""Test cases for the HexQueryFetcher.fetch method"""
|
|
|
|
def setUp(self):
|
|
self.mock_client = MagicMock()
|
|
self.workspace_name = "workspace1"
|
|
self.start_datetime = datetime(2023, 1, 1)
|
|
self.report = HexQueryFetcherReport()
|
|
|
|
self.fetcher = HexQueryFetcher(
|
|
datahub_client=self.mock_client,
|
|
workspace_name=self.workspace_name,
|
|
start_datetime=self.start_datetime,
|
|
end_datetime=self.start_datetime - timedelta(days=1),
|
|
report=self.report,
|
|
)
|
|
|
|
# valid test data
|
|
self.query_urn_1 = QueryUrn.from_string("urn:li:query:query1")
|
|
self.query_urn_2 = QueryUrn.from_string("urn:li:query:query2")
|
|
self.dataset_urn_1 = DatasetUrn.from_string(
|
|
"urn:li:dataset:(urn:li:dataPlatform:snowflake,table1,PROD)"
|
|
)
|
|
self.dataset_urn_2 = DatasetUrn.from_string(
|
|
"urn:li:dataset:(urn:li:dataPlatform:snowflake,table1,PROD)"
|
|
)
|
|
# self.entities_data matches the return type of HexQueryFetcher._fetch_query_entities
|
|
self.entities_data: Dict[
|
|
QueryUrn,
|
|
Tuple[Optional[QueryPropertiesClass], Optional[QuerySubjectsClass]],
|
|
] = {
|
|
self.query_urn_1: (
|
|
QueryPropertiesClass(
|
|
created=AuditStampClass._construct_with_defaults(),
|
|
lastModified=AuditStampClass._construct_with_defaults(),
|
|
statement=QueryStatementClass(
|
|
value="""SELECT * FROM table -- Hex query metadata: {"context": "SCHEDULED_RUN", "project_id": "project1", "project_url": "https://app.hex.tech/workspace1/hex/project1"}"""
|
|
),
|
|
source=HEX_PLATFORM_URN.urn(),
|
|
),
|
|
QuerySubjectsClass(
|
|
subjects=[
|
|
QuerySubjectClass(entity=self.dataset_urn_1.urn()),
|
|
QuerySubjectClass(entity=self.dataset_urn_2.urn()),
|
|
]
|
|
),
|
|
),
|
|
self.query_urn_2: (
|
|
QueryPropertiesClass(
|
|
created=AuditStampClass._construct_with_defaults(),
|
|
lastModified=AuditStampClass._construct_with_defaults(),
|
|
statement=QueryStatementClass(
|
|
value="""SELECT * FROM table -- Hex query metadata: {"context": "SCHEDULED_RUN", "project_id": "project2", "project_url": "https://app.hex.tech/workspace1/hex/project2"}"""
|
|
),
|
|
source=HEX_PLATFORM_URN.urn(),
|
|
),
|
|
QuerySubjectsClass(
|
|
subjects=[QuerySubjectClass(entity=self.dataset_urn_1.urn())]
|
|
),
|
|
),
|
|
}
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
|
|
)
|
|
def test_fetch_with_valid_data(
|
|
self, mock_fetch_query_entities, mock_fetch_query_urns
|
|
):
|
|
mock_fetch_query_urns.return_value = [self.query_urn_1]
|
|
mock_fetch_query_entities.return_value = self.entities_data
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 2
|
|
assert all(isinstance(qr, QueryResponse) for qr in results)
|
|
assert results[0].urn == self.query_urn_1
|
|
assert results[0].hex_project_id == "project1"
|
|
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
|
|
assert results[1].urn == self.query_urn_2
|
|
assert results[1].hex_project_id == "project2"
|
|
assert results[1].dataset_subjects == [self.dataset_urn_1]
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
|
|
)
|
|
def test_fetch_with_missing_hex_query_metadata(
|
|
self, mock_fetch_query_entities, mock_fetch_query_urns
|
|
):
|
|
# force fail in query_urn_2
|
|
self.entities_data[self.query_urn_2][0].statement.value = ( # type: ignore
|
|
"SELECT * FROM table -- IT'S MISSING HERE"
|
|
)
|
|
mock_fetch_query_urns.return_value = [self.query_urn_1]
|
|
mock_fetch_query_entities.return_value = self.entities_data
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 1
|
|
assert all(isinstance(qr, QueryResponse) for qr in results)
|
|
assert results[0].urn == self.query_urn_1
|
|
assert results[0].hex_project_id == "project1"
|
|
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
|
|
)
|
|
def test_fetch_with_missing_not_matching_workspace(
|
|
self, mock_fetch_query_entities, mock_fetch_query_urns
|
|
):
|
|
# force not match in query_urn_2
|
|
self.entities_data[self.query_urn_2][0].statement.value = ( # type: ignore
|
|
"""SELECT * FROM table -- Hex query metadata: {"context": "SCHEDULED_RUN", "project_id": "project1", "project_url": "https://app.hex.tech/YET_ANOTHER_WORKSPACE/hex/project1"}"""
|
|
)
|
|
mock_fetch_query_urns.return_value = [self.query_urn_1]
|
|
mock_fetch_query_entities.return_value = self.entities_data
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 1
|
|
assert all(isinstance(qr, QueryResponse) for qr in results)
|
|
assert results[0].urn == self.query_urn_1
|
|
assert results[0].hex_project_id == "project1"
|
|
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
|
|
)
|
|
def test_fetch_with_no_subjects(
|
|
self, mock_fetch_query_entities, mock_fetch_query_urns
|
|
):
|
|
# force no subjects query_urn_2
|
|
self.entities_data[self.query_urn_2][1].subjects = [] # type: ignore
|
|
mock_fetch_query_urns.return_value = [self.query_urn_1]
|
|
mock_fetch_query_entities.return_value = self.entities_data
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 1
|
|
assert all(isinstance(qr, QueryResponse) for qr in results)
|
|
assert results[0].urn == self.query_urn_1
|
|
assert results[0].hex_project_id == "project1"
|
|
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
def test_fetch_with_no_query_urns_found(self, mock_fetch_query_urns):
|
|
mock_fetch_query_urns.return_value = []
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 0
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
|
|
)
|
|
def test_fetch_query_entities_fail(
|
|
self, mock_fetch_query_entities, mock_fetch_query_urns
|
|
):
|
|
mock_fetch_query_urns.return_value = [self.query_urn_1]
|
|
mock_fetch_query_entities.side_effect = Exception(
|
|
"Failed to fetch query entities"
|
|
)
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 0
|
|
assert self.report.errors == 1
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
|
|
)
|
|
def test_fetch_query_urns_fail(self, mock_fetch_query_urns):
|
|
mock_fetch_query_urns.side_effect = Exception("Failed to fetch query urns")
|
|
|
|
results = list(self.fetcher.fetch())
|
|
|
|
assert len(results) == 0
|
|
assert self.report.errors == 1
|