feat(ingest): new hex connector - part 2 (#12985)

This commit is contained in:
Sergio Gómez Villamor 2025-04-03 12:44:37 +02:00 committed by GitHub
parent 7618af549c
commit d2bb33f7c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 3295 additions and 36 deletions

View File

@ -20,4 +20,12 @@ Currently, the [Hex API](https://learn.hex.tech/docs/api/api-reference) has some
2. **Metadata Access**: There is no direct method to retrieve metadata for Collections, Status, or Categories. This information is only available indirectly through references within Projects and Components.
Please keep these limitations in mind when working with the Hex connector.
Please keep these limitations in mind when working with the Hex connector.
For the Dataset - Hex Project lineage, the connector relies on the
[_Hex query metadata_](https://learn.hex.tech/docs/explore-data/cells/sql-cells/sql-cells-introduction#query-metadata) feature.
Therefore, in order to extract lineage information, the required setup must include:
- A separated warehouse ingestor (_eg_ BigQuery, Snowflake, Redshift, ...) with `use_queries_v2` enabled in order to fetch Queries.
This will ingest the queries into DataHub as `Query` entities and the ones triggered by Hex will include the corresponding _Hex query metadata_.
- A DataHub server with version >= SaaS `0.3.10` or > OSS `1.0.0` so the `Query` entities are properly indexed by source (Hex in this case) and so fetched and processed by the Hex ingestor in order to emit the Dataset - Project lineage.

View File

@ -1,3 +1,8 @@
from datahub.metadata.urns import DataPlatformUrn
HEX_PLATFORM_NAME = "hex"
HEX_PLATFORM_URN = DataPlatformUrn(platform_name=HEX_PLATFORM_NAME)
HEX_API_BASE_URL_DEFAULT = "https://app.hex.tech/api/v1"
HEX_API_PAGE_SIZE_DEFAULT = 100
DATAHUB_API_PAGE_SIZE_DEFAULT = 100

View File

@ -1,9 +1,12 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Iterable, List, Optional
from pydantic import Field, SecretStr
from pydantic import Field, SecretStr, root_validator
from typing_extensions import assert_never
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.datetimes import parse_user_datetime
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
@ -21,12 +24,17 @@ from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.hex.api import HexApi, HexApiReport
from datahub.ingestion.source.hex.constants import (
DATAHUB_API_PAGE_SIZE_DEFAULT,
HEX_API_BASE_URL_DEFAULT,
HEX_API_PAGE_SIZE_DEFAULT,
HEX_PLATFORM_NAME,
)
from datahub.ingestion.source.hex.mapper import Mapper
from datahub.ingestion.source.hex.model import Component, Project
from datahub.ingestion.source.hex.query_fetcher import (
HexQueryFetcher,
HexQueryFetcherReport,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
@ -34,9 +42,10 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.sdk.main_client import DataHubClient
class HexSourceConfig(
@ -93,9 +102,73 @@ class HexSourceConfig(
default=True,
description="Set ownership identity from owner/creator email",
)
include_lineage: bool = Field(
default=True,
description='Include Hex lineage, being fetched from DataHub. See "Limitations" section in the docs for more details about the limitations of this feature.',
)
lineage_start_time: Optional[datetime] = Field(
default=None,
description="Earliest date of lineage to consider. Default: 1 day before lineage end time. You can specify absolute time like '2023-01-01' or relative time like '-7 days' or '-7d'.",
)
lineage_end_time: Optional[datetime] = Field(
default=None,
description="Latest date of lineage to consider. Default: Current time in UTC. You can specify absolute time like '2023-01-01' or relative time like '-1 day' or '-1d'.",
)
datahub_page_size: int = Field(
default=DATAHUB_API_PAGE_SIZE_DEFAULT,
description="Number of items to fetch per DataHub API call.",
)
@root_validator(pre=True)
def validate_lineage_times(cls, data: Dict[str, Any]) -> Dict[str, Any]:
# lineage_end_time default = now
if "lineage_end_time" not in data or data["lineage_end_time"] is None:
data["lineage_end_time"] = datetime.now(tz=timezone.utc)
# if string is given, parse it
if isinstance(data["lineage_end_time"], str):
data["lineage_end_time"] = parse_user_datetime(data["lineage_end_time"])
# if no timezone is given, assume UTC
if data["lineage_end_time"].tzinfo is None:
data["lineage_end_time"] = data["lineage_end_time"].replace(
tzinfo=timezone.utc
)
# at this point, we ensure there is a non null datetime with UTC timezone for lineage_end_time
assert (
data["lineage_end_time"]
and isinstance(data["lineage_end_time"], datetime)
and data["lineage_end_time"].tzinfo is not None
and data["lineage_end_time"].tzinfo == timezone.utc
)
# lineage_start_time default = lineage_end_time - 1 day
if "lineage_start_time" not in data or data["lineage_start_time"] is None:
data["lineage_start_time"] = data["lineage_end_time"] - timedelta(days=1)
# if string is given, parse it
if isinstance(data["lineage_start_time"], str):
data["lineage_start_time"] = parse_user_datetime(data["lineage_start_time"])
# if no timezone is given, assume UTC
if data["lineage_start_time"].tzinfo is None:
data["lineage_start_time"] = data["lineage_start_time"].replace(
tzinfo=timezone.utc
)
# at this point, we ensure there is a non null datetime with UTC timezone for lineage_start_time
assert (
data["lineage_start_time"]
and isinstance(data["lineage_start_time"], datetime)
and data["lineage_start_time"].tzinfo is not None
and data["lineage_start_time"].tzinfo == timezone.utc
)
return data
class HexReport(StaleEntityRemovalSourceReport, HexApiReport):
@dataclass
class HexReport(
StaleEntityRemovalSourceReport,
HexApiReport,
IngestionStageReport,
HexQueryFetcherReport,
):
pass
@ -110,7 +183,7 @@ class HexSource(StatefulIngestionSourceBase):
def __init__(self, config: HexSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.source_config = config
self.report = HexReport()
self.report: HexReport = HexReport()
self.platform = HEX_PLATFORM_NAME
self.hex_api = HexApi(
report=self.report,
@ -129,6 +202,28 @@ class HexSource(StatefulIngestionSourceBase):
categories_as_tags=self.source_config.categories_as_tags,
set_ownership_from_email=self.source_config.set_ownership_from_email,
)
self.project_registry: Dict[str, Project] = {}
self.component_registry: Dict[str, Component] = {}
self.datahub_client: Optional[DataHubClient] = None
self.query_fetcher: Optional[HexQueryFetcher] = None
if self.source_config.include_lineage:
graph = ctx.require_graph("Lineage")
assert self.source_config.lineage_start_time and isinstance(
self.source_config.lineage_start_time, datetime
)
assert self.source_config.lineage_end_time and isinstance(
self.source_config.lineage_end_time, datetime
)
self.datahub_client = DataHubClient(graph=graph)
self.query_fetcher = HexQueryFetcher(
datahub_client=self.datahub_client,
workspace_name=self.source_config.workspace_name,
start_datetime=self.source_config.lineage_start_time,
end_datetime=self.source_config.lineage_end_time,
report=self.report,
page_size=self.source_config.datahub_page_size,
)
@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "HexSource":
@ -143,25 +238,58 @@ class HexSource(StatefulIngestionSourceBase):
).workunit_processor,
]
def get_report(self) -> StatefulIngestionReport:
def get_report(self) -> HexReport:
return self.report
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.mapper.map_workspace()
for project_or_component in self.hex_api.fetch_projects():
if isinstance(project_or_component, Project):
if self.source_config.project_title_pattern.allowed(
project_or_component.title
):
yield from self.mapper.map_project(project=project_or_component)
elif isinstance(project_or_component, Component):
if (
self.source_config.include_components
and self.source_config.component_title_pattern.allowed(
with self.report.new_stage("Fetch Hex assets from Hex API"):
for project_or_component in self.hex_api.fetch_projects():
if isinstance(project_or_component, Project):
if self.source_config.project_title_pattern.allowed(
project_or_component.title
)
):
yield from self.mapper.map_component(component=project_or_component)
else:
assert_never(project_or_component)
):
self.project_registry[project_or_component.id] = (
project_or_component
)
elif isinstance(project_or_component, Component):
if (
self.source_config.include_components
and self.source_config.component_title_pattern.allowed(
project_or_component.title
)
):
self.component_registry[project_or_component.id] = (
project_or_component
)
else:
assert_never(project_or_component)
if self.source_config.include_lineage:
assert self.datahub_client and self.query_fetcher
with self.report.new_stage(
"Fetch Hex lineage from existing Queries in DataHub"
):
for query_metadata in self.query_fetcher.fetch():
project = self.project_registry.get(query_metadata.hex_project_id)
if project:
project.upstream_datasets.extend(
query_metadata.dataset_subjects
)
project.upstream_schema_fields.extend(
query_metadata.schema_field_subjects
)
else:
self.report.report_warning(
title="Missing project for lineage",
message="Lineage missed because missed project, likely due to filter patterns or deleted project.",
context=str(query_metadata),
)
with self.report.new_stage("Emit"):
yield from self.mapper.map_workspace()
for project in self.project_registry.values():
yield from self.mapper.map_project(project=project)
for component in self.component_registry.values():
yield from self.mapper.map_component(component=component)

View File

@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple, Union
from datahub._codegen.aspect import (
_Aspect, # TODO: is there a better import than this one?
@ -46,6 +46,7 @@ from datahub.metadata.schema_classes import (
DashboardInfoClass,
DashboardUsageStatisticsClass,
DataPlatformInstanceClass,
EdgeClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
@ -53,7 +54,14 @@ from datahub.metadata.schema_classes import (
TagAssociationClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import ContainerUrn, CorpUserUrn, DashboardUrn, Urn
from datahub.metadata.urns import (
ContainerUrn,
CorpUserUrn,
DashboardUrn,
DatasetUrn,
SchemaFieldUrn,
Urn,
)
logger = logging.getLogger(__name__)
@ -116,6 +124,8 @@ class Mapper:
),
externalUrl=f"{self._base_url}/{self._workspace_name}/hex/{project.id}",
customProperties=dict(id=project.id),
datasetEdges=self._dataset_edges(project.upstream_datasets),
# TODO: support schema field upstream, maybe InputFields?
)
subtypes = SubTypesClass(
@ -343,6 +353,22 @@ class Mapper:
else None,
)
def _dataset_edges(
self, upstream: List[Union[DatasetUrn, SchemaFieldUrn]]
) -> Optional[List[EdgeClass]]:
# TBC: is there support for CLL in Dashboards? for the moment, skip SchemaFieldUrns
return (
[
EdgeClass(
destinationUrn=upstream_urn.urn(),
)
for upstream_urn in upstream
if isinstance(upstream_urn, DatasetUrn)
]
if upstream
else None
)
def _yield_mcps(
self, entity_urn: Urn, aspects: List[Optional[_Aspect]]
) -> Iterable[MetadataWorkUnit]:

View File

@ -1,6 +1,8 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Union
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
@dataclass
@ -51,6 +53,12 @@ class Project:
creator: Optional[Owner] = None
owner: Optional[Owner] = None
analytics: Optional[Analytics] = None
upstream_datasets: List[Union[DatasetUrn, SchemaFieldUrn]] = field(
default_factory=list
)
upstream_schema_fields: List[Union[DatasetUrn, SchemaFieldUrn]] = field(
default_factory=list
)
@dataclass

View File

@ -0,0 +1,297 @@
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.hex.constants import (
DATAHUB_API_PAGE_SIZE_DEFAULT,
HEX_PLATFORM_URN,
)
from datahub.metadata.schema_classes import QueryPropertiesClass, QuerySubjectsClass
from datahub.metadata.urns import DatasetUrn, QueryUrn, SchemaFieldUrn
from datahub.sdk.main_client import DataHubClient
from datahub.sdk.search_filters import FilterDsl as F
from datahub.utilities.time import datetime_to_ts_millis
logger = logging.getLogger(__name__)
# Pattern to extract both project_id and workspace_name from Hex metadata in SQL comments
HEX_METADATA_PATTERN = r'-- Hex query metadata: \{.*?"project_id": "([^"]+)".*?"project_url": "https?://[^/]+/([^/]+)/hex/.*?\}'
@dataclass
class QueryResponse:
"""This is the public response model for the HexQueryFetcher."""
urn: QueryUrn
hex_project_id: str
dataset_subjects: List[DatasetUrn] = field(default_factory=list)
schema_field_subjects: List[SchemaFieldUrn] = field(default_factory=list)
@dataclass
class HexQueryFetcherReport(SourceReport):
start_datetime: Optional[datetime] = None
end_datetime: Optional[datetime] = None
fetched_query_urns: int = 0
fetched_query_objects: int = 0
filtered_out_queries_missing_metadata: int = 0
filtered_out_queries_different_workspace: int = 0
filtered_out_queries_no_subjects: int = 0
total_queries: int = 0
total_dataset_subjects: int = 0
total_schema_field_subjects: int = 0
num_calls_fetch_query_entities: int = 0
class HexQueryFetcher:
def __init__(
self,
datahub_client: DataHubClient,
workspace_name: str,
start_datetime: datetime,
end_datetime: datetime,
report: HexQueryFetcherReport,
page_size: int = DATAHUB_API_PAGE_SIZE_DEFAULT,
):
self.datahub_client = datahub_client
self.workspace_name = workspace_name
self.start_datetime = start_datetime
self.end_datetime = end_datetime
self.report = report
self.page_size = page_size
self.report.start_datetime = start_datetime
self.report.end_datetime = end_datetime
def fetch(self) -> Iterable[QueryResponse]:
try:
query_urns = self._fetch_query_urns_filter_hex_and_last_modified()
assert all(isinstance(urn, QueryUrn) for urn in query_urns)
self.report.fetched_query_urns = len(query_urns)
entities_by_urn = self._fetch_query_entities(query_urns)
self.report.fetched_query_objects = len(entities_by_urn)
except Exception as e:
self.report.failure(
title="Error fetching Queries for lineage",
message="Error fetching Queries will result on missing lineage",
context=str(
dict(
workspace_name=self.workspace_name,
start_datetime=self.start_datetime,
end_datetime=self.end_datetime,
)
),
exc=e,
)
else:
if not query_urns or not entities_by_urn:
self.report.warning(
title="No Queries found with Hex as origin",
message="No lineage because of no Queries found with Hex as origin in the given time range; you may consider extending the time range to fetch more queries.",
context=str(
dict(
workspace_name=self.workspace_name,
start_datetime=self.start_datetime,
end_datetime=self.end_datetime,
)
),
)
return
for query_urn, (
query_properties,
query_subjects,
) in entities_by_urn.items():
maybe_query_response = self._build_query_response(
query_urn=query_urn,
query_properties=query_properties,
query_subjects=query_subjects,
)
if maybe_query_response:
yield maybe_query_response
def _fetch_query_entities(
self, query_urns: List[QueryUrn]
) -> Dict[
QueryUrn, Tuple[Optional[QueryPropertiesClass], Optional[QuerySubjectsClass]]
]:
entities_by_urn: Dict[
QueryUrn,
Tuple[Optional[QueryPropertiesClass], Optional[QuerySubjectsClass]],
] = {}
for i in range(0, len(query_urns), self.page_size):
batch = query_urns[i : i + self.page_size]
logger.debug(f"Fetching query entities for {len(batch)} queries: {batch}")
entities = self.datahub_client._graph.get_entities(
entity_name=QueryUrn.ENTITY_TYPE,
urns=[urn.urn() for urn in batch],
aspects=[
QueryPropertiesClass.ASPECT_NAME,
QuerySubjectsClass.ASPECT_NAME,
],
with_system_metadata=False,
)
self.report.num_calls_fetch_query_entities += 1
logger.debug(f"Get entities response: {entities}")
for urn, entity in entities.items():
query_urn = QueryUrn.from_string(urn)
properties_tuple = entity.get(
QueryPropertiesClass.ASPECT_NAME, (None, None)
)
query_properties: Optional[QueryPropertiesClass] = None
if properties_tuple and properties_tuple[0]:
assert isinstance(properties_tuple[0], QueryPropertiesClass)
query_properties = properties_tuple[0]
subjects_tuple = entity.get(
QuerySubjectsClass.ASPECT_NAME, (None, None)
)
query_subjects: Optional[QuerySubjectsClass] = None
if subjects_tuple and subjects_tuple[0]:
assert isinstance(subjects_tuple[0], QuerySubjectsClass)
query_subjects = subjects_tuple[0]
entities_by_urn[query_urn] = (query_properties, query_subjects)
return entities_by_urn
def _fetch_query_urns_filter_hex_and_last_modified(self) -> List[QueryUrn]:
last_modified_start_at_millis = datetime_to_ts_millis(self.start_datetime)
last_modified_end_at_millis = datetime_to_ts_millis(self.end_datetime)
urns = self.datahub_client.search.get_urns(
filter=F.and_(
F.entity_type(QueryUrn.ENTITY_TYPE),
F.custom_filter("origin", "EQUAL", [HEX_PLATFORM_URN.urn()]),
F.custom_filter(
"lastModifiedAt",
"GREATER_THAN_OR_EQUAL_TO",
[str(last_modified_start_at_millis)],
),
F.custom_filter(
"lastModifiedAt",
"LESS_THAN_OR_EQUAL_TO",
[str(last_modified_end_at_millis)],
),
),
)
logger.debug(f"Get URNS by filter: {urns}")
return [QueryUrn.from_string(urn.urn()) for urn in urns]
def _extract_hex_metadata(self, sql_statement: str) -> Optional[Tuple[str, str]]:
"""
Extract project ID and workspace name from SQL statement.
Looks for Hex metadata in SQL comments in the format:
-- Hex query metadata: {"project_id": "...", "project_url": "https://app.hex.tech/{workspace_name}/hex/..."}
Example:
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_url": "https://app.hex.tech/acryl-partnership/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f"}
# TODO: Consider supporting multiline metadata format in the future:
# -- Hex query metadata: {
# -- "categories": ["Scratchpad"],
# -- "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf",
# -- ...
# -- }
Returns:
A tuple of (project_id, workspace_name) if both are successfully extracted
None if extraction fails for any reason
"""
# Extract both project_id and workspace name in a single regex operation
match = re.search(HEX_METADATA_PATTERN, sql_statement)
if not match:
return None
try:
project_id = match.group(1)
workspace_name = match.group(2)
return project_id, workspace_name
except (IndexError, AttributeError) as e:
self.report.warning(
title="Failed to extract information from Hex query metadata",
message="Failed to extract information from Hex query metadata will result on missing lineage",
context=sql_statement,
exc=e,
)
return None
def _build_query_response(
self,
query_urn: QueryUrn,
query_properties: Optional[QueryPropertiesClass],
query_subjects: Optional[QuerySubjectsClass],
) -> Optional[QueryResponse]:
# Skip if missing required aspects
if (
not query_properties
or not query_properties.statement
or not query_properties.statement.value
or not query_subjects
or query_subjects.subjects is None # empty list is allowed
):
logger.debug(
f"Skipping query {query_urn} - missing required fields: {(query_properties, query_subjects)}"
)
self.report.filtered_out_queries_missing_metadata += 1
return None
# Extract hex metadata (project_id and workspace_name)
metadata_result = self._extract_hex_metadata(query_properties.statement.value)
if not metadata_result:
logger.debug(f"Skipping query {query_urn} - failed to extract Hex metadata")
self.report.filtered_out_queries_missing_metadata += 1
return None
hex_project_id, workspace_from_url = metadata_result
# Validate workspace
if workspace_from_url != self.workspace_name:
logger.debug(
f"Skipping query {query_urn} - workspace '{workspace_from_url}' doesn't match '{self.workspace_name}'"
)
self.report.filtered_out_queries_different_workspace += 1
return None
# Extract subjects
dataset_subjects: List[DatasetUrn] = []
schema_field_subjects: List[SchemaFieldUrn] = []
for subject in query_subjects.subjects:
if subject.entity and subject.entity.startswith("urn:li:dataset:"):
dataset_subjects.append(DatasetUrn.from_string(subject.entity))
elif subject.entity and subject.entity.startswith("urn:li:schemaField:"):
schema_field_subjects.append(SchemaFieldUrn.from_string(subject.entity))
if not dataset_subjects and not schema_field_subjects:
self.report.filtered_out_queries_no_subjects += 1
return None
# Create response
response = QueryResponse(
urn=query_urn,
hex_project_id=hex_project_id,
dataset_subjects=dataset_subjects,
schema_field_subjects=schema_field_subjects,
)
logger.debug(
f"Succesfully extracted {len(dataset_subjects)} dataset subjects and {len(schema_field_subjects)} schema field subjects for query {query_urn}: {dataset_subjects} {schema_field_subjects}"
)
self.report.total_queries += 1
self.report.total_dataset_subjects += len(dataset_subjects)
self.report.total_schema_field_subjects += len(schema_field_subjects)
logger.debug(
f"Processed query {query_urn} with Hex project ID {hex_project_id}"
)
return response

View File

@ -0,0 +1,47 @@
[
{
"urn": "urn:li:query:307d80903ebbc165944c52ae79efaeb1736f9bb37d9f8fa48fc0af69d725413f",
"queryKey": {
"value": {
"id": "307d80903ebbc165944c52ae79efaeb1736f9bb37d9f8fa48fc0af69d725413f"
}
},
"querySubjects": {
"value": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_orders,PROD)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD),age_m)"
},
{
"entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD),age_y)"
}
]
}
},
"queryProperties": {
"value": {
"statement": {
"value": "select * from \"LONG_TAIL_COMPANIONS\".\"ANALYTICS\".\"PET_DETAILS\" left outer join \"LONG_TAIL_COMPANIONS\".\"ANALYTICS\".\"PET_ORDERS\" limit 100\n-- Hex query metadata: {\"categories\": [\"Scratchpad\"], \"cell_type\": \"SQL\", \"connection\": \"Long Tail Companions\", \"context\": \"SCHEDULED_RUN\", \"project_id\": \"d73da67d-c87b-4dd8-9e7f-b79cb7f822cf\", \"project_name\": \"PlayNotebook\", \"project_url\": \"https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=2520f866-8bb6-43dc-8d52-8f522a938b9c\", \"status\": \"In development\", \"trace_id\": \"0195cd3713e6700088f1535fa4c874a5\", \"user_email\": \"alice@email.com\"}",
"language": "SQL"
},
"customProperties": {},
"source": "SYSTEM",
"lastModified": {
"actor": "urn:li:corpuser:sf_hex_user",
"time": 1742904697868
},
"created": {
"actor": "urn:li:corpuser:sf_hex_user",
"time": 1742904697868
},
"origin": "urn:li:dataPlatform:hex"
}
}
}
]

View File

@ -0,0 +1,44 @@
[
{
"urn": "urn:li:query:87fe9c2def1a5b7932ec9f12c4b55a56dcae6a29fa1d5b78902e34e73abcd123",
"queryKey": {
"value": {
"id": "87fe9c2def1a5b7932ec9f12c4b55a56dcae6a29fa1d5b78902e34e73abcd123"
}
},
"querySubjects": {
"value": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.visit_core,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.visit_details,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.visit_more_details,PROD)"
}
]
}
},
"queryProperties": {
"value": {
"statement": {
"value": "select * from \"LONG_TAIL_COMPANIONS\".\"ANALYTICS\".\"VISIT_core\" join \"LONG_TAIL_COMPANIONS\".\"ANALYTICS\".\"VISIT_DETAILS\" join \"LONG_TAIL_COMPANIONS\".\"ANALYTICS\".\"VISIT_MORE_DETAILS\" limit 100\n-- Hex query metadata: {\"categories\": [\"Scratchpad\"], \"cell_type\": \"SQL\", \"connection\": \"Long Tail Companions\", \"context\": \"SCHEDULED_RUN\", \"project_id\": \"2ef730de-25ec-4131-94af-3517e743a738\", \"project_name\": \"Welcome to Hex!\", \"project_url\": \"https://app.hex.tech/some-hex-workspace/hex/2ef730de-25ec-4131-94af-3517e743a738/draft/logic?selectedCellId=3630g977-9cc7-54ed-9e63-9g633b049c1e\", \"status\": \"In development\", \"trace_id\": \"0195cd3713e6700088f1535fa4c874a6\", \"user_email\": \"bob@email.com\"}",
"language": "SQL"
},
"customProperties": {},
"source": "SYSTEM",
"lastModified": {
"actor": "urn:li:corpuser:sf_hex_user",
"time": 1742904697969
},
"created": {
"actor": "urn:li:corpuser:sf_hex_user",
"time": 1742904697969
},
"origin": "urn:li:dataPlatform:hex"
}
}
}
]

View File

@ -0,0 +1,14 @@
{
"data": {
"scrollAcrossEntities": {
"nextScrollId": "page_2_scroll_id",
"searchResults": [
{
"entity": {
"urn": "urn:li:query:307d80903ebbc165944c52ae79efaeb1736f9bb37d9f8fa48fc0af69d725413f"
}
}
]
}
}
}

View File

@ -0,0 +1,14 @@
{
"data": {
"scrollAcrossEntities": {
"nextScrollId": null,
"searchResults": [
{
"entity": {
"urn": "urn:li:query:87fe9c2def1a5b7932ec9f12c4b55a56dcae6a29fa1d5b78902e34e73abcd123"
}
}
]
}
}
}

View File

@ -7,10 +7,27 @@ services:
- "8000:8000"
volumes:
- ./hex_projects_response.json:/app/hex_projects_response.json
- ./mock_server.py:/app/mock_server.py
command: ["python", "/app/mock_server.py"]
- ./mock_hex_server.py:/app/mock_hex_server.py
command: ["python", "/app/mock_hex_server.py"]
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8000/health"]
interval: 5s
timeout: 5s
retries: 3
datahub-mock-api:
image: python:3.9-alpine
container_name: datahub-mock-api
ports:
- "8010:8010"
volumes:
- ./datahub_entities_v3_page1.json:/app/datahub_entities_v3_page1.json
- ./datahub_entities_v3_page2.json:/app/datahub_entities_v3_page2.json
- ./datahub_get_urns_by_filter_page1.json:/app/datahub_get_urns_by_filter_page1.json
- ./datahub_get_urns_by_filter_page2.json:/app/datahub_get_urns_by_filter_page2.json
- ./mock_datahub_server.py:/app/mock_datahub_server.py
command: ["python", "/app/mock_datahub_server.py"]
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8010/health"]
interval: 5s
timeout: 5s
retries: 3

View File

@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Mock DataHub server that handles both GET and POST requests and supports pagination
"""
import http.server
import json
import socketserver
from http import HTTPStatus
from urllib.parse import urlparse
PORT = 8010
# Load the mock response data
with open("/app/datahub_entities_v3_page1.json", "r") as f:
ENTITIES_V3_PAGE1_RESPONSE = f.read()
with open("/app/datahub_entities_v3_page2.json", "r") as f:
ENTITIES_V3_PAGE2_RESPONSE = f.read()
with open("/app/datahub_get_urns_by_filter_page1.json", "r") as f:
URNS_BY_FILTER_PAGE1_RESPONSE = f.read()
with open("/app/datahub_get_urns_by_filter_page2.json", "r") as f:
URNS_BY_FILTER_PAGE2_RESPONSE = f.read()
# Global state flag to track if first page has been requested
FIRST_ENTITIES_PAGE_REQUESTED = False
class MockDataHubAPIHandler(http.server.SimpleHTTPRequestHandler):
# Global state flag to track if first page has been requested accross all instances; one instance per request
first_entities_page_requested = False
def do_GET(self):
parsed_url = urlparse(self.path)
path = parsed_url.path
# Health check endpoint
if path == "/health":
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
return
# Mock DataHub API endpoints
if path.startswith("/config"):
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps(dict(noCode="true")).encode())
return
# Default 404 response
self.send_response(HTTPStatus.NOT_FOUND)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "Not found", "path": self.path}).encode())
def do_POST(self):
parsed_url = urlparse(self.path)
path = parsed_url.path
# Get request body
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
request_body = json.loads(post_data)
if path == "/openapi/v3/entity/query/batchGet":
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
# Return the appropriate page of entity data in V3 format
if not MockDataHubAPIHandler.first_entities_page_requested:
self.wfile.write(ENTITIES_V3_PAGE1_RESPONSE.encode())
MockDataHubAPIHandler.first_entities_page_requested = True
else:
self.wfile.write(ENTITIES_V3_PAGE2_RESPONSE.encode())
return
if path == "/api/graphql":
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
# Check if this is a scroll query with nextScrollId
scroll_id = None
if "variables" in request_body:
scroll_id = request_body.get("variables", {}).get("scrollId")
if scroll_id == "page_2_scroll_id":
self.wfile.write(URNS_BY_FILTER_PAGE2_RESPONSE.encode())
else:
self.wfile.write(URNS_BY_FILTER_PAGE1_RESPONSE.encode())
return
# Default 404 response
self.send_response(HTTPStatus.NOT_FOUND)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(
json.dumps(
{"error": "Not found", "path": self.path, "method": "POST"}
).encode()
)
# Set up the server
handler = MockDataHubAPIHandler
httpd = socketserver.TCPServer(("", PORT), handler)
print(f"Serving mock DataHub API at port {PORT}")
httpd.serve_forever()

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,6 @@ from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
# Test resources and constants
FROZEN_TIME = "2025-03-25 12:00:00"
pytestmark = pytest.mark.integration_batch_2
@ -17,10 +16,10 @@ def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/hex"
def is_hex_mock_api_up(container_name: str) -> bool:
def is_mock_api_up(port: int) -> bool:
"""Check if the mock API server is up and running"""
try:
response = requests.get("http://localhost:8000/health")
response = requests.get(f"http://localhost:{port}/health")
response.raise_for_status()
return True
except (requests.RequestException, ConnectionError):
@ -40,7 +39,14 @@ def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
"hex-mock-api",
8000,
timeout=30,
checker=lambda: is_hex_mock_api_up("hex-mock-api"),
checker=lambda: is_mock_api_up(8000),
)
wait_for_port(
docker_services,
"datahub-mock-api",
8010,
timeout=30,
checker=lambda: is_mock_api_up(8010),
)
yield docker_services
@ -48,7 +54,6 @@ def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_hex_ingestion(pytestconfig, hex_mock_api_runner, test_resources_dir, tmp_path):
"""Test Hex metadata ingestion using a mock API server."""
# Path for the golden file
golden_dir = test_resources_dir / "golden"
golden_path = golden_dir / "hex_mce_golden.json"
@ -56,14 +61,67 @@ def test_hex_ingestion(pytestconfig, hex_mock_api_runner, test_resources_dir, tm
# Create the pipeline
pipeline = Pipeline.create(
{
"run_id": "hex-test",
"pipeline_name": "test-hex",
"source": {
"type": "hex",
"config": {
"workspace_name": "test-workspace",
"token": "test-token",
"base_url": "http://localhost:8000/api/v1", # Mock API URL
"base_url": "http://localhost:8000/api/v1", # Mock Hex API URL
"platform_instance": "hex_test",
"include_lineage": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/hex_mces.json",
},
},
}
)
# Run the pipeline
pipeline.run()
pipeline.raise_from_status()
# Check against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/hex_mces.json",
golden_path=golden_path,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_hex_ingestion_with_lineage(
pytestconfig, hex_mock_api_runner, test_resources_dir, tmp_path
):
# Path for the golden file
golden_dir = test_resources_dir / "golden"
golden_path = golden_dir / "hex_mce_golden_with_lineage.json"
# Create the pipeline
pipeline = Pipeline.create(
{
"pipeline_name": "test-hex-with-lineage",
"datahub_api": {
"server": "http://localhost:8010", # Mock DataHub API URL
},
"source": {
"type": "hex",
"config": {
"workspace_name": "some-hex-workspace",
"token": "test-token",
"base_url": "http://localhost:8000/api/v1", # Mock Hex API URL
"platform_instance": "hex_test",
"include_lineage": True,
"datahub_page_size": 1, # Force pagination
"stateful_ingestion": {
"enabled": False,
},
},
},
"sink": {

View File

@ -0,0 +1,157 @@
import unittest
from datetime import datetime, timedelta, timezone
from datahub.ingestion.source.hex.hex import HexSourceConfig
def datetime_approx_equal(
dt1: datetime, dt2: datetime, tolerance_seconds: int = 5
) -> bool:
if dt1.tzinfo is None:
dt1 = dt1.replace(tzinfo=timezone.utc)
if dt2.tzinfo is None:
dt2 = dt2.replace(tzinfo=timezone.utc)
diff = abs((dt1 - dt2).total_seconds())
return diff <= tolerance_seconds
class TestHexSourceConfig(unittest.TestCase):
def setUp(self):
self.minimum_input_config = {
"workspace_name": "test-workspace",
"token": "test-token",
}
def test_required_fields(self):
with self.assertRaises(ValueError):
input_config = {**self.minimum_input_config}
del input_config["workspace_name"]
HexSourceConfig.parse_obj(input_config)
with self.assertRaises(ValueError):
input_config = {**self.minimum_input_config}
del input_config["token"]
HexSourceConfig.parse_obj(input_config)
def test_minimum_config(self):
config = HexSourceConfig.parse_obj(self.minimum_input_config)
assert config
assert config.workspace_name == "test-workspace"
assert config.token.get_secret_value() == "test-token"
def test_lineage_config(self):
config = HexSourceConfig.parse_obj(self.minimum_input_config)
assert config and config.include_lineage
input_config = {**self.minimum_input_config, "include_lineage": False}
config = HexSourceConfig.parse_obj(input_config)
assert config and not config.include_lineage
# default values for lineage_start_time and lineage_end_time
config = HexSourceConfig.parse_obj(self.minimum_input_config)
assert (
config.lineage_start_time
and isinstance(config.lineage_start_time, datetime)
and datetime_approx_equal(
config.lineage_start_time,
datetime.now(tz=timezone.utc) - timedelta(days=1),
)
)
assert (
config.lineage_end_time
and isinstance(config.lineage_end_time, datetime)
and datetime_approx_equal(
config.lineage_end_time, datetime.now(tz=timezone.utc)
)
)
# set values for lineage_start_time and lineage_end_time
input_config = {
**self.minimum_input_config,
"lineage_start_time": "2025-03-24 12:00:00",
"lineage_end_time": "2025-03-25 12:00:00",
}
config = HexSourceConfig.parse_obj(input_config)
assert (
config.lineage_start_time
and isinstance(config.lineage_start_time, datetime)
and datetime_approx_equal(
config.lineage_start_time,
datetime(2025, 3, 24, 12, 0, 0, tzinfo=timezone.utc),
)
)
assert (
config.lineage_end_time
and isinstance(config.lineage_end_time, datetime)
and datetime_approx_equal(
config.lineage_end_time,
datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc),
)
)
# set lineage_end_time only
input_config = {
**self.minimum_input_config,
"lineage_end_time": "2025-03-25 12:00:00",
}
config = HexSourceConfig.parse_obj(input_config)
assert (
config.lineage_start_time
and isinstance(config.lineage_start_time, datetime)
and datetime_approx_equal(
config.lineage_start_time,
datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc)
- timedelta(days=1),
)
)
assert (
config.lineage_end_time
and isinstance(config.lineage_end_time, datetime)
and datetime_approx_equal(
config.lineage_end_time,
datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc),
)
)
# set lineage_start_time only
input_config = {
**self.minimum_input_config,
"lineage_start_time": "2025-03-25 12:00:00",
}
config = HexSourceConfig.parse_obj(input_config)
assert (
config.lineage_start_time
and isinstance(config.lineage_start_time, datetime)
and datetime_approx_equal(
config.lineage_start_time,
datetime(2025, 3, 25, 12, 0, 0, tzinfo=timezone.utc),
)
)
assert (
config.lineage_end_time
and isinstance(config.lineage_end_time, datetime)
and datetime_approx_equal(
config.lineage_end_time, datetime.now(tz=timezone.utc)
)
)
# set relative times for lineage_start_time and lineage_end_time
input_config = {
**self.minimum_input_config,
"lineage_start_time": "-3day",
"lineage_end_time": "now",
}
config = HexSourceConfig.parse_obj(input_config)
assert (
config.lineage_start_time
and isinstance(config.lineage_start_time, datetime)
and datetime_approx_equal(
config.lineage_start_time,
datetime.now(tz=timezone.utc) - timedelta(days=3),
)
)
assert (
config.lineage_end_time
and isinstance(config.lineage_end_time, datetime)
and datetime_approx_equal(
config.lineage_end_time, datetime.now(tz=timezone.utc)
)
)

View File

@ -792,3 +792,95 @@ class TestMapper(unittest.TestCase):
assert (
dashboard_urn.urn() == "urn:li:dashboard:(hex,test-platform.dashboard_name)"
)
def test_dataset_edges(self):
from datahub.metadata.schema_classes import EdgeClass
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
mapper = Mapper(
workspace_name=self.workspace_name,
)
# Test with empty list
edges = mapper._dataset_edges([])
assert not edges
# Test with only DatasetUrns
dataset_urn1 = DatasetUrn(
platform="snowflake",
name="test-dataset-1",
)
dataset_urn2 = DatasetUrn(
platform="bigquery",
name="test-dataset-2",
)
edges = mapper._dataset_edges([dataset_urn1, dataset_urn2])
assert edges and len(edges) == 2
assert all(isinstance(edge, EdgeClass) for edge in edges)
assert edges[0].destinationUrn == dataset_urn1.urn()
assert edges[1].destinationUrn == dataset_urn2.urn()
# Test with mixed DatasetUrns and SchemaFieldUrns - should filter out SchemaFieldUrns
schema_field_urn = SchemaFieldUrn(
parent=dataset_urn1,
field_path="test.field.path",
)
edges = mapper._dataset_edges([dataset_urn1, schema_field_urn, dataset_urn2])
assert edges and len(edges) == 2 # SchemaFieldUrn should be filtered out
assert edges[0].destinationUrn == dataset_urn1.urn()
assert edges[1].destinationUrn == dataset_urn2.urn()
def test_map_project_with_upstream_datasets(self):
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
# Create a project with upstream datasets
dataset_urn1 = DatasetUrn(
platform="snowflake",
name="test-dataset-1",
)
dataset_urn2 = DatasetUrn(
platform="bigquery",
name="test-dataset-2",
)
schema_field_urn = SchemaFieldUrn(
parent=dataset_urn1,
field_path="test.field.path",
)
project = Project(
id="uuid1",
title="Test Project With Lineage",
description="A test project with upstream datasets",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Published"),
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
upstream_datasets=[dataset_urn1, schema_field_urn, dataset_urn2],
)
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
work_units = list(mapper.map_project(project))
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
dashboard_info = dashboard_info_wus[0].get_aspect_of_type(DashboardInfoClass)
# Verify dataset edges
assert (
dashboard_info
and dashboard_info.datasetEdges
and len(dashboard_info.datasetEdges) == 2
)
edge_urns = [edge.destinationUrn for edge in dashboard_info.datasetEdges]
assert dataset_urn1.urn() in edge_urns
assert dataset_urn2.urn() in edge_urns
assert schema_field_urn.urn() not in edge_urns # Should be filtered out

View File

@ -0,0 +1,386 @@
import unittest
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple
from unittest.mock import MagicMock, patch
from datahub.ingestion.source.hex.constants import HEX_PLATFORM_URN
from datahub.ingestion.source.hex.query_fetcher import (
HexQueryFetcher,
HexQueryFetcherReport,
QueryResponse,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
QueryPropertiesClass,
QueryStatementClass,
QuerySubjectClass,
QuerySubjectsClass,
)
from datahub.metadata.urns import DatasetUrn, QueryUrn
class TestHexQueryFetcherExtractHexMetadata(unittest.TestCase):
"""Test cases for HexQueryFetcher._extract_hex_metadata method"""
def setUp(self):
self.mock_client = MagicMock()
self.workspace_name = "some-hex-workspace"
self.start_datetime = datetime(2023, 1, 1)
self.report = HexQueryFetcherReport()
self.fetcher = HexQueryFetcher(
datahub_client=self.mock_client,
workspace_name=self.workspace_name,
start_datetime=self.start_datetime,
end_datetime=self.start_datetime - timedelta(days=1),
report=self.report,
)
def test_extract_hex_metadata_with_matching_workspace(self):
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is not None
project_id, workspace_name = result
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
assert workspace_name == "some-hex-workspace"
def test_extract_hex_metadata_with_non_matching_workspace(self):
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://app.hex.tech/different-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic?selectedCellId=67c38da0-e631-4005-9750-5bdae2a2ef3f", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is not None
project_id, workspace_name = result
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
assert workspace_name == "different-workspace"
def test_extract_hex_metadata_without_url_returns_none(self):
# missing project_url
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is None
def test_extract_hex_metadata_with_no_metadata(self):
# no Hex metadata
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- This is a regular comment
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is None
def test_extract_hex_metadata_with_invalid_json(self):
# invalid JSON in Hex metadata
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", INVALID_JSON}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is None
def test_extract_hex_metadata_with_missing_project_id(self):
# missing project_id
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "project_url": "https://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is None
def test_extract_hex_metadata_with_invalid_url_format_returns_none(self):
# invalid URL format in project_url
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_url": "https://invalid-url-format/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is None
def test_extract_hex_metadata_with_custom_domain(self):
# custom domain in project_url (single-tenant deployment)
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "https://my-hex-instance.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is not None
project_id, workspace_name = result
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
assert workspace_name == "some-hex-workspace"
def test_extract_hex_metadata_with_http_protocol(self):
# HTTP protocol (not HTTPS)
sql = """
select *
from "LONG_TAIL_COMPANIONS"."ANALYTICS"."PET_DETAILS"
limit 100
-- Hex query metadata: {"categories": ["Scratchpad"], "cell_type": "SQL", "connection": "Long Tail Companions", "context": "SCHEDULED_RUN", "project_id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "project_name": "PlayNotebook", "project_url": "http://app.hex.tech/some-hex-workspace/hex/d73da67d-c87b-4dd8-9e7f-b79cb7f822cf/draft/logic", "status": "In development", "trace_id": "f316f99947454a7e8aff2947f848f73d", "user_email": "alice@mail.com"}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is not None
project_id, workspace_name = result
assert project_id == "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf"
assert workspace_name == "some-hex-workspace"
def test_extract_hex_metadata_with_complex_urls(self):
# complex workspace names and paths
urls_to_test = [
# URL with hyphens in workspace name
"""{"project_id": "123", "project_url": "https://app.hex.tech/my-complex-workspace-name/hex/project-id"}""",
# URL with underscores
"""{"project_id": "123", "project_url": "https://app.hex.tech/workspace_with_underscores/hex/project-id"}""",
# URL with special chars in domain
"""{"project_id": "123", "project_url": "https://my-custom-subdomain.hex.tech/some-hex-workspace/hex/project-id"}""",
# URL with long path after /hex/
"""{"project_id": "123", "project_url": "https://app.hex.tech/some-hex-workspace/hex/project-id/draft/logic?selectedCellId=67c38da0-e631"}""",
]
expected_workspaces = [
"my-complex-workspace-name",
"workspace_with_underscores",
"some-hex-workspace",
"some-hex-workspace",
]
for i, url_json in enumerate(urls_to_test):
sql = f"""
select * from table
-- Hex query metadata: {url_json}
"""
result = self.fetcher._extract_hex_metadata(sql)
assert result is not None, (
f"Failed to extract metadata from URL: {url_json}"
)
project_id, workspace_name = result
assert project_id == "123"
assert workspace_name == expected_workspaces[i], (
f"Expected workspace {expected_workspaces[i]} but got {workspace_name}"
)
class TestHexQueryFetcherFetch(unittest.TestCase):
"""Test cases for the HexQueryFetcher.fetch method"""
def setUp(self):
self.mock_client = MagicMock()
self.workspace_name = "workspace1"
self.start_datetime = datetime(2023, 1, 1)
self.report = HexQueryFetcherReport()
self.fetcher = HexQueryFetcher(
datahub_client=self.mock_client,
workspace_name=self.workspace_name,
start_datetime=self.start_datetime,
end_datetime=self.start_datetime - timedelta(days=1),
report=self.report,
)
# valid test data
self.query_urn_1 = QueryUrn.from_string("urn:li:query:query1")
self.query_urn_2 = QueryUrn.from_string("urn:li:query:query2")
self.dataset_urn_1 = DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,table1,PROD)"
)
self.dataset_urn_2 = DatasetUrn.from_string(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,table1,PROD)"
)
# self.entities_data matches the return type of HexQueryFetcher._fetch_query_entities
self.entities_data: Dict[
QueryUrn,
Tuple[Optional[QueryPropertiesClass], Optional[QuerySubjectsClass]],
] = {
self.query_urn_1: (
QueryPropertiesClass(
created=AuditStampClass._construct_with_defaults(),
lastModified=AuditStampClass._construct_with_defaults(),
statement=QueryStatementClass(
value="""SELECT * FROM table -- Hex query metadata: {"project_id": "project1", "project_url": "https://app.hex.tech/workspace1/hex/project1"}"""
),
source=HEX_PLATFORM_URN.urn(),
),
QuerySubjectsClass(
subjects=[
QuerySubjectClass(entity=self.dataset_urn_1.urn()),
QuerySubjectClass(entity=self.dataset_urn_2.urn()),
]
),
),
self.query_urn_2: (
QueryPropertiesClass(
created=AuditStampClass._construct_with_defaults(),
lastModified=AuditStampClass._construct_with_defaults(),
statement=QueryStatementClass(
value="""SELECT * FROM table -- Hex query metadata: {"project_id": "project2", "project_url": "https://app.hex.tech/workspace1/hex/project2"}"""
),
source=HEX_PLATFORM_URN.urn(),
),
QuerySubjectsClass(
subjects=[QuerySubjectClass(entity=self.dataset_urn_1.urn())]
),
),
}
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
)
def test_fetch_with_valid_data(
self, mock_fetch_query_entities, mock_fetch_query_urns
):
mock_fetch_query_urns.return_value = [self.query_urn_1]
mock_fetch_query_entities.return_value = self.entities_data
results = list(self.fetcher.fetch())
assert len(results) == 2
assert all(isinstance(qr, QueryResponse) for qr in results)
assert results[0].urn == self.query_urn_1
assert results[0].hex_project_id == "project1"
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
assert results[1].urn == self.query_urn_2
assert results[1].hex_project_id == "project2"
assert results[1].dataset_subjects == [self.dataset_urn_1]
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
)
def test_fetch_with_missing_hex_query_metadata(
self, mock_fetch_query_entities, mock_fetch_query_urns
):
# force fail in query_urn_2
self.entities_data[self.query_urn_2][0].statement.value = ( # type: ignore
"SELECT * FROM table -- IT'S MISSING HERE"
)
mock_fetch_query_urns.return_value = [self.query_urn_1]
mock_fetch_query_entities.return_value = self.entities_data
results = list(self.fetcher.fetch())
assert len(results) == 1
assert all(isinstance(qr, QueryResponse) for qr in results)
assert results[0].urn == self.query_urn_1
assert results[0].hex_project_id == "project1"
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
)
def test_fetch_with_missing_not_matching_workspace(
self, mock_fetch_query_entities, mock_fetch_query_urns
):
# force not match in query_urn_2
self.entities_data[self.query_urn_2][0].statement.value = ( # type: ignore
"""SELECT * FROM table -- Hex query metadata: {"project_id": "project1", "project_url": "https://app.hex.tech/YET_ANOTHER_WORKSPACE/hex/project1"}"""
)
mock_fetch_query_urns.return_value = [self.query_urn_1]
mock_fetch_query_entities.return_value = self.entities_data
results = list(self.fetcher.fetch())
assert len(results) == 1
assert all(isinstance(qr, QueryResponse) for qr in results)
assert results[0].urn == self.query_urn_1
assert results[0].hex_project_id == "project1"
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
)
def test_fetch_with_no_subjects(
self, mock_fetch_query_entities, mock_fetch_query_urns
):
# force no subjects query_urn_2
self.entities_data[self.query_urn_2][1].subjects = [] # type: ignore
mock_fetch_query_urns.return_value = [self.query_urn_1]
mock_fetch_query_entities.return_value = self.entities_data
results = list(self.fetcher.fetch())
assert len(results) == 1
assert all(isinstance(qr, QueryResponse) for qr in results)
assert results[0].urn == self.query_urn_1
assert results[0].hex_project_id == "project1"
assert results[0].dataset_subjects == [self.dataset_urn_1, self.dataset_urn_2]
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
def test_fetch_with_no_query_urns_found(self, mock_fetch_query_urns):
mock_fetch_query_urns.return_value = []
results = list(self.fetcher.fetch())
assert len(results) == 0
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_entities"
)
def test_fetch_query_entities_fail(
self, mock_fetch_query_entities, mock_fetch_query_urns
):
mock_fetch_query_urns.return_value = [self.query_urn_1]
mock_fetch_query_entities.side_effect = Exception(
"Failed to fetch query entities"
)
results = list(self.fetcher.fetch())
assert len(results) == 0
assert self.report.errors == 1
@patch(
"datahub.ingestion.source.hex.query_fetcher.HexQueryFetcher._fetch_query_urns_filter_hex_and_last_modified"
)
def test_fetch_query_urns_fail(self, mock_fetch_query_urns):
mock_fetch_query_urns.side_effect = Exception("Failed to fetch query urns")
results = list(self.fetcher.fetch())
assert len(results) == 0
assert self.report.errors == 1