Merge branch 'master' into fivetran-std-edition-support

This commit is contained in:
Jonny Dixon 2025-03-25 22:06:05 +00:00 committed by GitHub
commit d3cc11f447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 5343 additions and 4 deletions

View File

@ -8,7 +8,7 @@ module.exports = {
'plugin:vitest/recommended',
'prettier',
],
plugins: ['@typescript-eslint', 'react-refresh'],
plugins: ['@typescript-eslint', '@stylistic/js', 'react-refresh'],
parserOptions: {
ecmaVersion: 2020, // Allows for the parsing of modern ECMAScript features
sourceType: 'module', // Allows for the use of imports
@ -19,6 +19,7 @@ module.exports = {
},
rules: {
'@typescript-eslint/no-explicit-any': 'off',
'@stylistic/js/comma-dangle': ['error', 'always-multiline'],
'arrow-body-style': 'off',
'class-methods-use-this': 'off',
'import/no-extraneous-dependencies': 'off',

View File

@ -51,6 +51,7 @@
"antd": "4.24.7",
"color-hash": "^2.0.1",
"colorthief": "^2.4.0",
"country-data-list": "^1.3.4",
"cron-parser": "^4.8.1",
"cronstrue": "^1.122.0",
"d3-scale": "^4.0.2",
@ -97,8 +98,7 @@
"uuid": "^8.3.2",
"virtualizedtableforantd4": "^1.2.1",
"web-vitals": "^0.2.4",
"yamljs": "^0.3.0",
"country-data-list": "^1.3.4"
"yamljs": "^0.3.0"
},
"scripts": {
"analyze": "source-map-explorer 'dist/assets/*.js'",
@ -145,6 +145,7 @@
"@storybook/react-vite": "^8.1.11",
"@storybook/test": "^8.1.11",
"@storybook/theming": "^8.1.11",
"@stylistic/eslint-plugin-js": "^4.2.0",
"@types/graphql": "^14.5.0",
"@types/query-string": "^6.3.0",
"@types/styled-components": "^5.1.7",

View File

@ -341,5 +341,12 @@
"description": "Import ML Models and lineage from Google Vertex AI.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertexai/",
"recipe": "source:\n type: vertexai\n config:\n project_id: # you GCP project ID \n region: # region where your GCP project resides \n # Credentials\n # Add GCP credentials"
},
{
"urn": "urn:li:dataPlatform:hex",
"name": "hex",
"displayName": "Hex",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/hex/",
"recipe": "source:\n type: hex\n config:\n workspace_name: # Your Hex Workspace name\n token: # Your PAT or Workspace token"
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -3792,6 +3792,14 @@
resolved "https://registry.yarnpkg.com/@storybook/theming/-/theming-8.4.7.tgz#c308f6a883999bd35e87826738ab8a76515932b5"
integrity sha512-99rgLEjf7iwfSEmdqlHkSG3AyLcK0sfExcr0jnc6rLiAkBhzuIsvcHjjUwkR210SOCgXqBPW0ZA6uhnuyppHLw==
"@stylistic/eslint-plugin-js@^4.2.0":
version "4.2.0"
resolved "https://registry.yarnpkg.com/@stylistic/eslint-plugin-js/-/eslint-plugin-js-4.2.0.tgz#30536fd35dd6aba08c1e234fe37bf66831c6e989"
integrity sha512-MiJr6wvyzMYl/wElmj8Jns8zH7Q1w8XoVtm+WM6yDaTrfxryMyb8n0CMxt82fo42RoLIfxAEtM6tmQVxqhk0/A==
dependencies:
eslint-visitor-keys "^4.2.0"
espree "^10.3.0"
"@svgmoji/blob@^3.2.0":
version "3.2.0"
resolved "https://registry.yarnpkg.com/@svgmoji/blob/-/blob-3.2.0.tgz#62a0ab1ba22a0d27f23cb38aacf6d4fb13123dfb"
@ -7530,6 +7538,11 @@ eslint-visitor-keys@^3.3.0, eslint-visitor-keys@^3.4.1:
resolved "https://registry.yarnpkg.com/eslint-visitor-keys/-/eslint-visitor-keys-3.4.1.tgz#c22c48f48942d08ca824cc526211ae400478a994"
integrity sha512-pZnmmLwYzf+kWaM/Qgrvpen51upAktaaiI01nsJD/Yr3lMOdNtq0cxkrrg16w64VtisN6okbs7Q8AfGqj4c9fA==
eslint-visitor-keys@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/eslint-visitor-keys/-/eslint-visitor-keys-4.2.0.tgz#687bacb2af884fcdda8a6e7d65c606f46a14cd45"
integrity sha512-UyLnSehNt62FFhSwjZlHmeokpRK59rcz29j+F1/aDgbkbRTk7wIc9XzdoasMUbRNKDM0qQt/+BJ4BrpFeABemw==
eslint@^8.2.0:
version "8.43.0"
resolved "https://registry.yarnpkg.com/eslint/-/eslint-8.43.0.tgz#3e8c6066a57097adfd9d390b8fc93075f257a094"
@ -7575,6 +7588,15 @@ eslint@^8.2.0:
strip-json-comments "^3.1.0"
text-table "^0.2.0"
espree@^10.3.0:
version "10.3.0"
resolved "https://registry.yarnpkg.com/espree/-/espree-10.3.0.tgz#29267cf5b0cb98735b65e64ba07e0ed49d1eed8a"
integrity sha512-0QYC8b24HWY8zjRnDTL6RiHfDbAWn63qb4LMj1Z4b076A4une81+z03Kg7l7mn/48PUTqoLptSXez8oknU8Clg==
dependencies:
acorn "^8.14.0"
acorn-jsx "^5.3.2"
eslint-visitor-keys "^4.2.0"
espree@^9.5.2:
version "9.5.2"
resolved "https://registry.yarnpkg.com/espree/-/espree-9.5.2.tgz#e994e7dc33a082a7a82dceaf12883a829353215b"

View File

@ -0,0 +1,23 @@
This connector ingests [Hex](https://hex.tech/) assets into DataHub.
### Concept Mapping
| Hex Concept | DataHub Concept | Notes |
|-------------|----------------------------------------------------------------------------------------------------|---------------------|
| `"hex"` | [Data Platform](https://datahubproject.io/docs/generated/metamodel/entities/dataplatform/) | |
| Workspace | [Container](https://datahubproject.io/docs/generated/metamodel/entities/container/) | |
| Project | [Dashboard](https://datahubproject.io/docs/generated/metamodel/entities/dashboard/) | Subtype `Project` |
| Component | [Dashboard](https://datahubproject.io/docs/generated/metamodel/entities/dashboard/) | Subtype `Component` |
| Collection | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/Tag/) | |
Other Hex concepts are not mapped to DataHub entities yet.
### Limitations
Currently, the [Hex API](https://learn.hex.tech/docs/api/api-reference) has some limitations that affect the completeness of the extracted metadata:
1. **Projects and Components Relationship**: The API does not support fetching the many-to-many relationship between Projects and their Components.
2. **Metadata Access**: There is no direct method to retrieve metadata for Collections, Status, or Categories. This information is only available indirectly through references within Projects and Components.
Please keep these limitations in mind when working with the Hex connector.

View File

@ -0,0 +1,21 @@
### Prerequisites
#### Workspace name
Workspace name is required to fetch the data from Hex. You can find the workspace name in the URL of your Hex home page.
```
https://app.hex.tech/<workspace_name>"
```
_Eg_: In https://app.hex.tech/acryl-partnership, `acryl-partnership` is the workspace name.
#### Authentication
To authenticate with Hex, you will need to provide your Hex API Bearer token.
You can obtain your API key by following the instructions on the [Hex documentation](https://learn.hex.tech/docs/api/api-overview).
Either PAT (Personal Access Token) or Workspace Token can be used as API Bearer token:
- (Recommended) If Workspace Token, a read-only token would be enough for ingestion.
- If PAT, ingestion will be done with the user's permissions.

View File

@ -0,0 +1,8 @@
source:
type: hex
config:
workspace_name: # Hex workspace name. You can find this name in your Hex home page URL: https://app.hex.tech/<workspace_name>
token: # Your PAT or Workspace token
sink:
# sink configs

View File

@ -805,6 +805,7 @@ entry_points = {
"cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource",
"neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource",
"vertexai = datahub.ingestion.source.vertexai.vertexai:VertexAISource",
"hex = datahub.ingestion.source.hex.hex:HexSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",

View File

@ -94,6 +94,10 @@ class BIAssetSubTypes(StrEnum):
SAC_STORY = "Story"
SAC_APPLICATION = "Application"
# Hex
HEX_PROJECT = "Project"
HEX_COMPONENT = "Component"
class MLAssetSubTypes(StrEnum):
MLFLOW_TRAINING_RUN = "ML Training Run"

View File

@ -0,0 +1,394 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Generator, List, Optional, Union
import requests
from pydantic import BaseModel, Field, ValidationError, validator
from typing_extensions import assert_never
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.hex.constants import (
HEX_API_BASE_URL_DEFAULT,
HEX_API_PAGE_SIZE_DEFAULT,
)
from datahub.ingestion.source.hex.model import (
Analytics,
Category,
Collection,
Component,
Owner,
Project,
Status,
)
from datahub.utilities.str_enum import StrEnum
logger = logging.getLogger(__name__)
# The following models were Claude-generated from Hex API OpenAPI definition https://static.hex.site/openapi.json
# To be exclusively used internally for the deserialization of the API response
class HexApiAppViewStats(BaseModel):
"""App view analytics data model."""
all_time: Optional[int] = Field(default=None, alias="allTime")
last_seven_days: Optional[int] = Field(default=None, alias="lastSevenDays")
last_fourteen_days: Optional[int] = Field(default=None, alias="lastFourteenDays")
last_thirty_days: Optional[int] = Field(default=None, alias="lastThirtyDays")
class HexApiProjectAnalytics(BaseModel):
"""Analytics data model for projects."""
app_views: Optional[HexApiAppViewStats] = Field(default=None, alias="appViews")
last_viewed_at: Optional[datetime] = Field(default=None, alias="lastViewedAt")
published_results_updated_at: Optional[datetime] = Field(
default=None, alias="publishedResultsUpdatedAt"
)
@validator("last_viewed_at", "published_results_updated_at", pre=True)
def parse_datetime(cls, value):
if value is None:
return None
if isinstance(value, str):
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
)
return value
class HexApiProjectStatus(BaseModel):
"""Project status model."""
name: str
class HexApiCategory(BaseModel):
"""Category model."""
name: str
description: Optional[str] = None
class HexApiReviews(BaseModel):
"""Reviews configuration model."""
required: bool
class HexApiUser(BaseModel):
"""User model."""
email: str
class HexApiAccessType(StrEnum):
"""Access type enum."""
NONE = "NONE"
VIEW = "VIEW"
EDIT = "EDIT"
FULL_ACCESS = "FULL_ACCESS"
class HexApiUserAccess(BaseModel):
"""User access model."""
user: HexApiUser
access: Optional[HexApiAccessType] = None
class HexApiCollectionData(BaseModel):
"""Collection data model."""
name: str
class HexApiCollectionAccess(BaseModel):
"""Collection access model."""
collection: HexApiCollectionData
access: Optional[HexApiAccessType] = None
class HexApiAccessSettings(BaseModel):
"""Access settings model."""
access: Optional[HexApiAccessType] = None
class HexApiWeeklySchedule(BaseModel):
"""Weekly schedule model."""
day_of_week: str = Field(alias="dayOfWeek")
hour: int
minute: int
timezone: str
class HexApiSchedule(BaseModel):
"""Schedule model."""
cadence: str
enabled: bool
hourly: Optional[Any] = None
daily: Optional[Any] = None
weekly: Optional[HexApiWeeklySchedule] = None
monthly: Optional[Any] = None
custom: Optional[Any] = None
class HexApiSharing(BaseModel):
"""Sharing configuration model."""
users: Optional[List[HexApiUserAccess]] = []
collections: Optional[List[HexApiCollectionAccess]] = []
groups: Optional[List[Any]] = []
workspace: Optional[HexApiAccessSettings] = None
public_web: Optional[HexApiAccessSettings] = Field(default=None, alias="publicWeb")
support: Optional[HexApiAccessSettings] = None
class Config:
extra = "ignore" # Allow extra fields in the JSON
class HexApiItemType(StrEnum):
"""Item type enum."""
PROJECT = "PROJECT"
COMPONENT = "COMPONENT"
class HexApiProjectApiResource(BaseModel):
"""Base model for Hex items (projects and components) from the API."""
id: str
title: str
description: Optional[str] = None
type: HexApiItemType
creator: Optional[HexApiUser] = None
owner: Optional[HexApiUser] = None
status: Optional[HexApiProjectStatus] = None
categories: Optional[List[HexApiCategory]] = []
reviews: Optional[HexApiReviews] = None
analytics: Optional[HexApiProjectAnalytics] = None
last_edited_at: Optional[datetime] = Field(default=None, alias="lastEditedAt")
last_published_at: Optional[datetime] = Field(default=None, alias="lastPublishedAt")
created_at: Optional[datetime] = Field(default=None, alias="createdAt")
archived_at: Optional[datetime] = Field(default=None, alias="archivedAt")
trashed_at: Optional[datetime] = Field(default=None, alias="trashedAt")
schedules: Optional[List[HexApiSchedule]] = []
sharing: Optional[HexApiSharing] = None
class Config:
extra = "ignore" # Allow extra fields in the JSON
@validator(
"created_at",
"last_edited_at",
"last_published_at",
"archived_at",
"trashed_at",
pre=True,
)
def parse_datetime(cls, value):
if value is None:
return None
if isinstance(value, str):
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=timezone.utc
)
return value
class HexApiPageCursors(BaseModel):
"""Pagination cursor model."""
after: Optional[str] = None
before: Optional[str] = None
class HexApiProjectsListResponse(BaseModel):
"""Response model for the list projects API."""
values: List[HexApiProjectApiResource]
pagination: Optional[HexApiPageCursors] = None
class Config:
extra = "ignore" # Allow extra fields in the JSON
@dataclass
class HexApiReport(SourceReport):
fetch_projects_page_calls: int = 0
fetch_projects_page_items: int = 0
class HexApi:
"""https://learn.hex.tech/docs/api/api-reference"""
def __init__(
self,
token: str,
report: HexApiReport,
base_url: str = HEX_API_BASE_URL_DEFAULT,
page_size: int = HEX_API_PAGE_SIZE_DEFAULT,
):
self.token = token
self.base_url = base_url
self.report = report
self.page_size = page_size
def _list_projects_url(self):
return f"{self.base_url}/projects"
def _auth_header(self):
return {"Authorization": f"Bearer {self.token}"}
def fetch_projects(
self,
include_components: bool = True,
include_archived: bool = False,
include_trashed: bool = False,
) -> Generator[Union[Project, Component], None, None]:
"""Fetch all projects and components
https://learn.hex.tech/docs/api/api-reference#operation/ListProjects
"""
params = {
"includeComponents": include_components,
"includeArchived": include_archived,
"includeTrashed": include_trashed,
"includeSharing": True,
"limit": self.page_size,
"after": None,
"before": None,
"sortBy": "CREATED_AT",
"sortDirection": "ASC",
}
yield from self._fetch_projects_page(params)
while params["after"]:
yield from self._fetch_projects_page(params)
def _fetch_projects_page(
self, params: Dict[str, Any]
) -> Generator[Union[Project, Component], None, None]:
logger.debug(f"Fetching projects page with params: {params}")
self.report.fetch_projects_page_calls += 1
try:
response = requests.get(
url=self._list_projects_url(),
headers=self._auth_header(),
params=params,
timeout=30,
)
response.raise_for_status()
api_response = HexApiProjectsListResponse.parse_obj(response.json())
logger.info(f"Fetched {len(api_response.values)} items")
params["after"] = (
api_response.pagination.after if api_response.pagination else None
)
self.report.fetch_projects_page_items += len(api_response.values)
for item in api_response.values:
try:
ret = self._map_data_from_model(item)
yield ret
except Exception as e:
self.report.warning(
title="Incomplete metadata",
message="Incomplete metadata because of error mapping item",
context=str(item),
exc=e,
)
except ValidationError as e:
self.report.failure(
title="Listing Projects and Components API response parsing error",
message="Error parsing API response and halting metadata ingestion",
context=str(response.json()),
exc=e,
)
except (requests.RequestException, Exception) as e:
self.report.failure(
title="Listing Projects and Components API request error",
message="Error fetching Projects and Components and halting metadata ingestion",
context=str(params),
exc=e,
)
def _map_data_from_model(
self, hex_item: HexApiProjectApiResource
) -> Union[Project, Component]:
"""
Maps a HexApi pydantic model parsed from the API to our domain model
"""
# Map status
status = Status(name=hex_item.status.name) if hex_item.status else None
# Map categories
categories = []
if hex_item.categories:
categories = [
Category(name=cat.name, description=cat.description)
for cat in hex_item.categories
]
# Map collections
collections = []
if hex_item.sharing and hex_item.sharing.collections:
collections = [
Collection(name=col.collection.name)
for col in hex_item.sharing.collections
]
# Map creator and owner
creator = Owner(email=hex_item.creator.email) if hex_item.creator else None
owner = Owner(email=hex_item.owner.email) if hex_item.owner else None
# Map analytics
analytics = None
if hex_item.analytics and hex_item.analytics.app_views:
analytics = Analytics(
appviews_all_time=hex_item.analytics.app_views.all_time,
appviews_last_7_days=hex_item.analytics.app_views.last_seven_days,
appviews_last_14_days=hex_item.analytics.app_views.last_fourteen_days,
appviews_last_30_days=hex_item.analytics.app_views.last_thirty_days,
last_viewed_at=hex_item.analytics.last_viewed_at,
)
# Create the appropriate domain model based on type
if hex_item.type == HexApiItemType.PROJECT:
return Project(
id=hex_item.id,
title=hex_item.title,
description=hex_item.description,
created_at=hex_item.created_at,
last_edited_at=hex_item.last_edited_at,
status=status,
categories=categories,
collections=collections,
creator=creator,
owner=owner,
analytics=analytics,
)
elif hex_item.type == HexApiItemType.COMPONENT:
return Component(
id=hex_item.id,
title=hex_item.title,
description=hex_item.description,
created_at=hex_item.created_at,
last_edited_at=hex_item.last_edited_at,
status=status,
categories=categories,
collections=collections,
creator=creator,
owner=owner,
analytics=analytics,
)
else:
assert_never(hex_item.type)

View File

@ -0,0 +1,3 @@
HEX_PLATFORM_NAME = "hex"
HEX_API_BASE_URL_DEFAULT = "https://app.hex.tech/api/v1"
HEX_API_PAGE_SIZE_DEFAULT = 100

View File

@ -0,0 +1,167 @@
from typing import Any, Dict, Iterable, List, Optional
from pydantic import Field, SecretStr
from typing_extensions import assert_never
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.hex.api import HexApi, HexApiReport
from datahub.ingestion.source.hex.constants import (
HEX_API_BASE_URL_DEFAULT,
HEX_API_PAGE_SIZE_DEFAULT,
HEX_PLATFORM_NAME,
)
from datahub.ingestion.source.hex.mapper import Mapper
from datahub.ingestion.source.hex.model import Component, Project
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
class HexSourceConfig(
StatefulIngestionConfigBase, PlatformInstanceConfigMixin, EnvConfigMixin
):
workspace_name: str = Field(
description="Hex workspace name. You can find this name in your Hex home page URL: https://app.hex.tech/<workspace_name>",
)
token: SecretStr = Field(
description="Hex API token; either PAT or Workflow token - https://learn.hex.tech/docs/api/api-overview#authentication",
)
base_url: str = Field(
default=HEX_API_BASE_URL_DEFAULT,
description="Hex API base URL. For most Hex users, this will be https://app.hex.tech/api/v1. "
"Single-tenant app users should replace this with the URL they use to access Hex.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None,
description="Configuration for stateful ingestion and stale metadata removal.",
)
include_components: bool = Field(
default=True,
desciption="Include Hex Components in the ingestion",
)
page_size: int = Field(
default=HEX_API_PAGE_SIZE_DEFAULT,
description="Number of items to fetch per Hex API call.",
)
patch_metadata: bool = Field(
default=False,
description="Emit metadata as patch events",
)
collections_as_tags: bool = Field(
default=True,
description="Emit Hex Collections as tags",
)
status_as_tag: bool = Field(
default=True,
description="Emit Hex Status as tags",
)
categories_as_tags: bool = Field(
default=True,
description="Emit Hex Category as tags",
)
project_title_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex pattern for project titles to filter in ingestion.",
)
component_title_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex pattern for component titles to filter in ingestion.",
)
set_ownership_from_email: bool = Field(
default=True,
description="Set ownership identity from owner/creator email",
)
class HexReport(StaleEntityRemovalSourceReport, HexApiReport):
pass
@platform_name("Hex")
@config_class(HexSourceConfig)
@support_status(SupportStatus.TESTING)
@capability(SourceCapability.DESCRIPTIONS, "Supported by default")
@capability(SourceCapability.OWNERSHIP, "Supported by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.CONTAINERS, "Enabled by default")
class HexSource(StatefulIngestionSourceBase):
def __init__(self, config: HexSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.source_config = config
self.report = HexReport()
self.platform = HEX_PLATFORM_NAME
self.hex_api = HexApi(
report=self.report,
token=self.source_config.token.get_secret_value(),
base_url=self.source_config.base_url,
page_size=self.source_config.page_size,
)
self.mapper = Mapper(
workspace_name=self.source_config.workspace_name,
platform_instance=self.source_config.platform_instance,
env=self.source_config.env,
base_url=self.source_config.base_url,
patch_metadata=self.source_config.patch_metadata,
collections_as_tags=self.source_config.collections_as_tags,
status_as_tag=self.source_config.status_as_tag,
categories_as_tags=self.source_config.categories_as_tags,
set_ownership_from_email=self.source_config.set_ownership_from_email,
)
@classmethod
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "HexSource":
config = HexSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]
def get_report(self) -> StatefulIngestionReport:
return self.report
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.mapper.map_workspace()
for project_or_component in self.hex_api.fetch_projects():
if isinstance(project_or_component, Project):
if self.source_config.project_title_pattern.allowed(
project_or_component.title
):
yield from self.mapper.map_project(project=project_or_component)
elif isinstance(project_or_component, Component):
if (
self.source_config.include_components
and self.source_config.component_title_pattern.allowed(
project_or_component.title
)
):
yield from self.mapper.map_component(component=project_or_component)
else:
assert_never(project_or_component)

View File

@ -0,0 +1,372 @@
import logging
from datetime import datetime
from typing import Iterable, List, Optional, Tuple
from datahub._codegen.aspect import (
_Aspect, # TODO: is there a better import than this one?
)
from datahub.emitter.mce_builder import (
make_container_urn,
make_dashboard_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_tag_urn,
make_ts_millis,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey
from datahub.ingestion.api.incremental_lineage_helper import (
convert_dashboard_info_to_patch,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.hex.constants import (
HEX_API_BASE_URL_DEFAULT,
HEX_PLATFORM_NAME,
)
from datahub.ingestion.source.hex.model import (
Analytics,
Category,
Collection,
Component,
Owner,
Project,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStampClass,
ChangeAuditStampsClass,
OwnershipType,
)
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ContainerClass,
ContainerPropertiesClass,
DashboardInfoClass,
DashboardUsageStatisticsClass,
DataPlatformInstanceClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
SubTypesClass,
TagAssociationClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import ContainerUrn, CorpUserUrn, DashboardUrn, Urn
logger = logging.getLogger(__name__)
class WorkspaceKey(ContainerKey):
workspace_name: str
DEFAULT_INGESTION_USER_URN = CorpUserUrn("_ingestion")
DEFAULT_OWNERSHIP_TYPE = OwnershipType.TECHNICAL_OWNER
class Mapper:
def __init__(
self,
workspace_name: str,
platform_instance: Optional[str] = None,
env: Optional[str] = None,
base_url: str = HEX_API_BASE_URL_DEFAULT,
patch_metadata: bool = True,
collections_as_tags: bool = True,
status_as_tag: bool = True,
categories_as_tags: bool = True,
set_ownership_from_email: bool = True,
):
self._workspace_name = workspace_name
self._env = env
self._platform_instance = platform_instance
self._workspace_urn = Mapper._get_workspace_urn(
workspace_name=workspace_name,
platform=HEX_PLATFORM_NAME,
env=env,
platform_instance=platform_instance,
)
self._base_url = base_url.strip("/").replace("/api/v1", "")
self._patch_metadata = patch_metadata
self._collections_as_tags = collections_as_tags
self._status_as_tag = status_as_tag
self._categories_as_tags = categories_as_tags
self._set_ownership_from_email = set_ownership_from_email
def map_workspace(self) -> Iterable[MetadataWorkUnit]:
container_properties = ContainerPropertiesClass(
name=self._workspace_name,
env=self._env,
)
yield from self._yield_mcps(
entity_urn=self._workspace_urn,
aspects=[container_properties],
)
def map_project(self, project: Project) -> Iterable[MetadataWorkUnit]:
dashboard_urn = self._get_dashboard_urn(name=project.id)
dashboard_info = DashboardInfoClass(
title=project.title,
description=project.description or "",
lastModified=self._change_audit_stamps(
created_at=project.created_at, last_edited_at=project.last_edited_at
),
externalUrl=f"{self._base_url}/{self._workspace_name}/hex/{project.id}",
customProperties=dict(id=project.id),
)
subtypes = SubTypesClass(
typeNames=[BIAssetSubTypes.HEX_PROJECT],
)
platform_instance = self._platform_instance_aspect()
container = ContainerClass(
container=self._workspace_urn.urn(),
)
tags = self._global_tags(
status=project.status,
categories=project.categories,
collections=project.collections,
)
ownership = self._ownership(creator=project.creator, owner=project.owner)
usage_stats_all_time, usage_stats_last_7_days = (
self._dashboard_usage_statistics(analytics=project.analytics)
)
yield from self._yield_mcps(
entity_urn=dashboard_urn,
aspects=[
dashboard_info,
subtypes,
platform_instance,
container,
tags,
ownership,
usage_stats_all_time,
usage_stats_last_7_days,
],
)
def map_component(self, component: Component) -> Iterable[MetadataWorkUnit]:
dashboard_urn = self._get_dashboard_urn(name=component.id)
dashboard_info = DashboardInfoClass(
title=component.title,
description=component.description or "",
lastModified=self._change_audit_stamps(
created_at=component.created_at, last_edited_at=component.last_edited_at
),
externalUrl=f"{self._base_url}/{self._workspace_name}/hex/{component.id}",
customProperties=dict(id=component.id),
)
subtypes = SubTypesClass(
typeNames=[BIAssetSubTypes.HEX_COMPONENT],
)
platform_instance = self._platform_instance_aspect()
container = ContainerClass(
container=self._workspace_urn.urn(),
)
tags = self._global_tags(
status=component.status,
categories=component.categories,
collections=component.collections,
)
ownership = self._ownership(creator=component.creator, owner=component.owner)
usage_stats_all_time, usage_stats_last_7_days = (
self._dashboard_usage_statistics(analytics=component.analytics)
)
yield from self._yield_mcps(
entity_urn=dashboard_urn,
aspects=[
dashboard_info,
subtypes,
platform_instance,
container,
tags,
ownership,
usage_stats_all_time,
usage_stats_last_7_days,
],
)
@classmethod
def _get_workspace_urn(
cls,
workspace_name: str,
platform: str = HEX_PLATFORM_NAME,
env: Optional[str] = None,
platform_instance: Optional[str] = None,
) -> ContainerUrn:
workspace_key = WorkspaceKey(
platform=platform,
env=env,
platform_instance=platform_instance,
workspace_name=workspace_name,
)
container_urn_str = make_container_urn(guid=workspace_key)
container_urn = Urn.from_string(container_urn_str)
assert isinstance(container_urn, ContainerUrn)
return container_urn
def _get_dashboard_urn(self, name: str) -> DashboardUrn:
dashboard_urn_str = make_dashboard_urn(
platform=HEX_PLATFORM_NAME,
name=name,
platform_instance=self._platform_instance,
)
dashboard_urn = Urn.from_string(dashboard_urn_str)
assert isinstance(dashboard_urn, DashboardUrn)
return dashboard_urn
def _change_audit_stamps(
self, created_at: Optional[datetime], last_edited_at: Optional[datetime]
) -> ChangeAuditStampsClass:
return ChangeAuditStampsClass(
created=AuditStampClass(
time=make_ts_millis(created_at),
actor=DEFAULT_INGESTION_USER_URN.urn(),
)
if created_at
else None,
lastModified=AuditStampClass(
time=make_ts_millis(last_edited_at),
actor=DEFAULT_INGESTION_USER_URN.urn(),
)
if last_edited_at
else None,
)
def _global_tags(
self,
status: Optional[Status],
categories: Optional[List[Category]],
collections: Optional[List[Collection]],
) -> Optional[GlobalTagsClass]:
tag_associations: List[TagAssociationClass] = []
if status and self._status_as_tag:
tag_associations.append(
TagAssociationClass(tag=make_tag_urn(tag=f"hex:status:{status.name}"))
)
if categories and self._categories_as_tags:
tag_associations.extend(
[
TagAssociationClass(
tag=make_tag_urn(tag=f"hex:category:{cat.name}")
)
for cat in categories
]
)
if collections and self._collections_as_tags:
tag_associations.extend(
[
TagAssociationClass(
tag=make_tag_urn(tag=f"hex:collection:{col.name}")
)
for col in collections
]
)
return GlobalTagsClass(tags=tag_associations) if tag_associations else None
def _ownership(
self, creator: Optional[Owner], owner: Optional[Owner]
) -> Optional[OwnershipClass]:
if self._set_ownership_from_email:
# since we are not making any diff of creator/owner, we usually have duplicates
# TODO: set or ownership types to properly differentiate them, maybe by config?
unique_owners = set(o for o in [creator, owner] if o)
owners: List[OwnerClass] = [
OwnerClass(owner=make_user_urn(o.email), type=DEFAULT_OWNERSHIP_TYPE)
for o in unique_owners
]
return OwnershipClass(owners=owners) if owners else None
return None
def _dashboard_usage_statistics(
self, analytics: Optional[Analytics]
) -> Tuple[
Optional[DashboardUsageStatisticsClass], Optional[DashboardUsageStatisticsClass]
]:
tm_millis = make_ts_millis(datetime.now())
last_viewed_at = (
make_ts_millis(analytics.last_viewed_at)
if analytics and analytics.last_viewed_at
else None
)
usage_all_time: Optional[DashboardUsageStatisticsClass] = (
DashboardUsageStatisticsClass(
timestampMillis=tm_millis,
viewsCount=analytics.appviews_all_time,
lastViewedAt=last_viewed_at,
)
if analytics and analytics.appviews_all_time
else None
)
usage_last_7_days: Optional[DashboardUsageStatisticsClass] = (
DashboardUsageStatisticsClass(
timestampMillis=tm_millis,
viewsCount=analytics.appviews_last_7_days,
eventGranularity=TimeWindowSizeClass(
unit=CalendarIntervalClass.WEEK, multiple=1
),
lastViewedAt=last_viewed_at,
)
if analytics and analytics.appviews_last_7_days
else None
)
return (usage_all_time, usage_last_7_days)
def _platform_instance_aspect(self) -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(HEX_PLATFORM_NAME),
instance=make_dataplatform_instance_urn(
platform=HEX_PLATFORM_NAME, instance=self._platform_instance
)
if self._platform_instance
else None,
)
def _yield_mcps(
self, entity_urn: Urn, aspects: List[Optional[_Aspect]]
) -> Iterable[MetadataWorkUnit]:
for mcpw in MetadataChangeProposalWrapper.construct_many(
entityUrn=entity_urn.urn(),
aspects=aspects,
):
wu = MetadataWorkUnit.from_metadata(metadata=mcpw)
maybe_wu = self._maybe_patch_wu(wu)
if maybe_wu:
yield maybe_wu
def _maybe_patch_wu(self, wu: MetadataWorkUnit) -> Optional[MetadataWorkUnit]:
# So far we only have support for DashboardInfo aspect
dashboard_info_aspect: Optional[DashboardInfoClass] = wu.get_aspect_of_type(
DashboardInfoClass
)
if dashboard_info_aspect and self._patch_metadata:
return convert_dashboard_info_to_patch(
wu.get_urn(),
dashboard_info_aspect,
wu.metadata.systemMetadata,
)
else:
return wu

View File

@ -0,0 +1,68 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
@dataclass
class Workspace:
name: str
@dataclass
class Status:
name: str
@dataclass
class Category:
name: str
description: Optional[str] = None
@dataclass
class Collection:
name: str
@dataclass(frozen=True)
class Owner:
email: str
@dataclass
class Analytics:
appviews_all_time: Optional[int]
appviews_last_7_days: Optional[int]
appviews_last_14_days: Optional[int]
appviews_last_30_days: Optional[int]
last_viewed_at: Optional[datetime]
@dataclass
class Project:
id: str
title: str
description: Optional[str]
last_edited_at: Optional[datetime] = None
created_at: Optional[datetime] = None
status: Optional[Status] = None
categories: Optional[List[Category]] = None # TODO: emit category description!
collections: Optional[List[Collection]] = None
creator: Optional[Owner] = None
owner: Optional[Owner] = None
analytics: Optional[Analytics] = None
@dataclass
class Component:
id: str
title: str
description: Optional[str]
last_edited_at: Optional[datetime] = None
created_at: Optional[datetime] = None
status: Optional[Status] = None
categories: Optional[List[Category]] = None
collections: Optional[List[Collection]] = None
creator: Optional[Owner] = None
owner: Optional[Owner] = None
analytics: Optional[Analytics] = None

View File

@ -0,0 +1,16 @@
version: '3'
services:
hex-mock-api:
image: python:3.9-alpine
container_name: hex-mock-api
ports:
- "8000:8000"
volumes:
- ./hex_projects_response.json:/app/hex_projects_response.json
- ./mock_server.py:/app/mock_server.py
command: ["python", "/app/mock_server.py"]
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8000/health"]
interval: 5s
timeout: 5s
retries: 3

View File

@ -0,0 +1,591 @@
{
"values": [
{
"id": "c8f815c8-88c2-4dea-981f-69f544d6165d",
"title": "Welcome to Hex!-0",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user1@example.com"
},
"owner": {
"email": "user1@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T17:16:09.567Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T17:07:19.275Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user1@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "dd0f1e20-7586-4b8e-89ae-bfe3c924625b",
"title": "Welcome to Hex!-2",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user2@example.com"
},
"owner": {
"email": "user2@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T17:44:46.875Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T17:35:56.247Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user2@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "827ea1f2-ed9a-425f-8d48-0ecc491c7c7c",
"title": "Welcome to Hex!-3",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user3@example.com"
},
"owner": {
"email": "user3@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T20:04:05.879Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T20:04:03.148Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user3@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "e9d940fe-34ad-415b-ad12-cb4c201650dc",
"title": "Welcome to Hex!-4",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user4@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-22T17:59:39.864Z",
"lastPublishedAt": null,
"createdAt": "2024-08-22T17:58:40.772Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
},
{
"user": {
"email": "user4@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "89e64571-42d9-44ac-bf47-320a7440eb57",
"title": "Welcome to Hex!-5",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user4a@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-10-23T21:26:04.682Z",
"lastPublishedAt": null,
"createdAt": "2024-10-23T21:26:01.878Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "d05b0d81-6d00-4798-8967-6587b6731c0a",
"title": "Welcome to Hex!-6",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-12-03T13:31:17.016Z",
"lastPublishedAt": null,
"createdAt": "2024-12-03T06:53:25.879Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf",
"title": "PlayNotebook",
"description": "This is a play project\n\n",
"type": "PROJECT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": {
"name": "In development"
},
"categories": [
{
"name": "Scratchpad",
"description": "Personal scratchpad for a team member. Not intended for broad consumption"
}
],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 16,
"lastSevenDays": 1,
"lastFourteenDays": 1,
"lastThirtyDays": 1
},
"lastViewedAt": "2025-03-17T14:55:34.717Z",
"publishedResultsUpdatedAt": "2025-03-18T12:11:34.907Z"
},
"lastEditedAt": "2024-12-18T08:38:03.873Z",
"lastPublishedAt": "2024-12-13T23:26:27.466Z",
"createdAt": "2024-12-03T09:54:58.471Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [
{
"cadence": "WEEKLY",
"enabled": true,
"hourly": null,
"daily": null,
"weekly": {
"dayOfWeek": "TUESDAY",
"hour": 17,
"minute": 40,
"timezone": "+05:30"
},
"monthly": null,
"custom": null
}
],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
},
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "4759f33c-1ab9-403d-92e8-9bef48de00c4",
"title": "Cancelled Orders",
"description": null,
"type": "COMPONENT",
"creator": {
"email": "user4a@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": "2024-12-13T23:22:48.995Z"
},
"lastEditedAt": "2024-12-13T23:22:58.183Z",
"lastPublishedAt": "2024-12-13T23:22:58.189Z",
"createdAt": "2024-12-09T15:39:26.093Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "0496a2c2-8656-475d-9946-6402320779e2",
"title": "Pet Profiles",
"description": "this is a component to get pet profiles at glance\n\n",
"type": "COMPONENT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": "2024-12-16T10:43:52.474Z"
},
"lastEditedAt": "2025-01-23T17:00:07.922Z",
"lastPublishedAt": "2024-12-16T10:44:09.990Z",
"createdAt": "2024-12-16T10:41:55.531Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [
{
"collection": {
"name": "First Collection"
},
"access": "NONE"
}
],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "2ef730de-25ec-4131-94af-3517e743a738",
"title": "Welcome to Hex!",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user6@example.com"
},
"owner": {
"email": "user6@example.com"
},
"status": {
"name": "In development"
},
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2025-03-17T14:44:00.614Z",
"lastPublishedAt": null,
"createdAt": "2025-03-17T09:27:30.585Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user6@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
}
],
"pagination": {
"after": null,
"before": null
}
}

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""
Simple HTTP server that returns the same JSON response for any request to /api/v1/projects
"""
import http.server
import json
import socketserver
from http import HTTPStatus
from urllib.parse import urlparse
PORT = 8000
# Load the mock response data
with open("/app/hex_projects_response.json", "r") as f:
HEX_PROJECTS_RESPONSE = f.read()
class MockHexAPIHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
"""Handle GET requests"""
parsed_url = urlparse(self.path)
path = parsed_url.path
# Health check endpoint
if path == "/health":
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
return
# Mock Hex API endpoints
if path.startswith("/api/v1/projects"):
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(HEX_PROJECTS_RESPONSE.encode())
return
# Default 404 response
self.send_response(HTTPStatus.NOT_FOUND)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "Not found", "path": self.path}).encode())
# Set up the server
handler = MockHexAPIHandler
httpd = socketserver.TCPServer(("", PORT), handler)
print(f"Serving mock Hex API at port {PORT}")
httpd.serve_forever()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,88 @@
import pytest
import requests
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
# Test resources and constants
FROZEN_TIME = "2025-03-25 12:00:00"
pytestmark = pytest.mark.integration_batch_2
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/hex"
def is_hex_mock_api_up(container_name: str) -> bool:
"""Check if the mock API server is up and running"""
try:
response = requests.get("http://localhost:8000/health")
response.raise_for_status()
return True
except (requests.RequestException, ConnectionError):
return False
@pytest.fixture(scope="module")
def hex_mock_api_runner(docker_compose_runner, test_resources_dir):
docker_dir = test_resources_dir / "docker"
# Start Docker Compose
with docker_compose_runner(
docker_dir / "docker-compose.yml", "hex-mock"
) as docker_services:
wait_for_port(
docker_services,
"hex-mock-api",
8000,
timeout=30,
checker=lambda: is_hex_mock_api_up("hex-mock-api"),
)
yield docker_services
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_hex_ingestion(pytestconfig, hex_mock_api_runner, test_resources_dir, tmp_path):
"""Test Hex metadata ingestion using a mock API server."""
# Path for the golden file
golden_dir = test_resources_dir / "golden"
golden_path = golden_dir / "hex_mce_golden.json"
# Create the pipeline
pipeline = Pipeline.create(
{
"run_id": "hex-test",
"source": {
"type": "hex",
"config": {
"workspace_name": "test-workspace",
"token": "test-token",
"base_url": "http://localhost:8000/api/v1", # Mock API URL
"platform_instance": "hex_test",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/hex_mces.json",
},
},
}
)
# Run the pipeline
pipeline.run()
pipeline.raise_from_status()
# Check against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/hex_mces.json",
golden_path=golden_path,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)

View File

@ -0,0 +1,356 @@
import json
import os
import unittest
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import requests
from pydantic import ValidationError
from datahub.ingestion.source.hex.api import (
HexApi,
HexApiProjectApiResource,
HexApiProjectsListResponse,
HexApiReport,
)
from datahub.ingestion.source.hex.model import (
Component,
Project,
)
# Helper to load test data from JSON files
def load_json_data(filename):
test_dir = Path(os.path.dirname(os.path.abspath(__file__)))
file_path = test_dir / "test_data" / filename
with open(file_path, "r") as f:
return json.load(f)
class TestHexAPI(unittest.TestCase):
def setUp(self):
self.token = "test-token"
self.report = HexApiReport()
self.base_url = "https://test.hex.tech/api/v1"
self.page_size = 8 # Small page size to test pagination
@patch("datahub.ingestion.source.hex.api.requests.get")
def test_fetch_projects_pagination(self, mock_get):
page1_data = load_json_data("hex_projects_page1.json")
page2_data = load_json_data("hex_projects_page2.json")
mock_response1 = MagicMock()
mock_response1.json.return_value = page1_data
mock_response2 = MagicMock()
mock_response2.json.return_value = page2_data
mock_get.side_effect = [mock_response1, mock_response2]
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
page_size=self.page_size,
)
results = list(hex_api.fetch_projects())
# check pagination
assert mock_get.call_count == 2
assert self.report.fetch_projects_page_calls == 2
assert self.report.fetch_projects_page_items == len(
mock_response1.json()["values"]
) + len(mock_response2.json()["values"])
# some random validations on the results
assert len(results) == len(mock_response1.json()["values"]) + len(
mock_response2.json()["values"]
)
assert all(isinstance(item, (Project, Component)) for item in results)
assert {
(item.id, item.title) for item in results if isinstance(item, Project)
} == {
("827ea1f2-ed9a-425f-8d48-0ecc491c7c7c", "Welcome to Hex!-3"),
("e9d940fe-34ad-415b-ad12-cb4c201650dc", "Welcome to Hex!-4"),
("d73da67d-c87b-4dd8-9e7f-b79cb7f822cf", "PlayNotebook"),
("d05b0d81-6d00-4798-8967-6587b6731c0a", "Welcome to Hex!-6"),
("2ef730de-25ec-4131-94af-3517e743a738", "Welcome to Hex!"),
("c8f815c8-88c2-4dea-981f-69f544d6165d", "Welcome to Hex!-0"),
("89e64571-42d9-44ac-bf47-320a7440eb57", "Welcome to Hex!-5"),
("dd0f1e20-7586-4b8e-89ae-bfe3c924625b", "Welcome to Hex!-2"),
}
assert {
(item.id, item.title) for item in results if isinstance(item, Component)
} == {
("0496a2c2-8656-475d-9946-6402320779e2", "Pet Profiles"),
("4759f33c-1ab9-403d-92e8-9bef48de00c4", "Cancelled Orders"),
}
@patch("datahub.ingestion.source.hex.api.requests.get")
def test_map_data_project(self, mock_get):
# Test mapping of a project
project_data = {
"id": "project1",
"title": "Test Project",
"description": "A test project",
"type": "PROJECT",
"createdAt": "2022-01-01T12:00:00.000Z",
"lastEditedAt": "2022-01-02T12:00:00.000Z",
"status": {"name": "Published"},
"categories": [{"name": "Category1", "description": "A category"}],
"sharing": {"collections": [{"collection": {"name": "Collection1"}}]},
"creator": {"email": "creator@example.com"},
"owner": {"email": "owner@example.com"},
"analytics": {
"appViews": {
"allTime": 100,
"lastSevenDays": 10,
"lastFourteenDays": 20,
"lastThirtyDays": 30,
},
"lastViewedAt": "2022-01-03T12:00:00.000Z",
},
}
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
)
hex_api_project = HexApiProjectApiResource.parse_obj(project_data)
result = hex_api._map_data_from_model(hex_api_project)
# Verify the result
assert isinstance(result, Project)
assert result.id == "project1"
assert result.title == "Test Project"
assert result.description == "A test project"
assert result.created_at == datetime(2022, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
assert result.last_edited_at == datetime(
2022, 1, 2, 12, 0, 0, tzinfo=timezone.utc
)
assert result.status and result.status.name == "Published"
assert (
result.categories
and len(result.categories) == 1
and result.categories[0].name == "Category1"
)
assert (
result.collections
and len(result.collections) == 1
and result.collections[0].name == "Collection1"
)
assert result.creator and result.creator.email == "creator@example.com"
assert result.owner and result.owner.email == "owner@example.com"
assert (
result.analytics
and result.analytics.appviews_all_time == 100
and result.analytics.last_viewed_at
== datetime(2022, 1, 3, 12, 0, 0, tzinfo=timezone.utc)
)
@patch("datahub.ingestion.source.hex.api.requests.get")
def test_map_data_component(self, mock_get):
# Test mapping of a component
component_data = {
"id": "component1",
"title": "Test Component",
"description": "A test component",
"type": "COMPONENT",
"createdAt": "2022-02-01T12:00:00.000Z",
"lastEditedAt": "2022-02-02T12:00:00.000Z",
"status": {"name": "Draft"},
"categories": [{"name": "Category2"}],
"sharing": {"collections": [{"collection": {"name": "Collection2"}}]},
"creator": {"email": "creator@example.com"},
"owner": {"email": "owner@example.com"},
"analytics": {
"appViews": {
"allTime": 50,
"lastSevenDays": 5,
"lastFourteenDays": 10,
"lastThirtyDays": 15,
},
"lastViewedAt": "2022-02-03T12:00:00.000Z",
},
}
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
)
hex_api_component = HexApiProjectApiResource.parse_obj(component_data)
result = hex_api._map_data_from_model(hex_api_component)
# Verify the result
assert isinstance(result, Component)
assert result.id == "component1"
assert result.title == "Test Component"
assert result.description == "A test component"
assert result.created_at == datetime(2022, 2, 1, 12, 0, 0, tzinfo=timezone.utc)
assert result.last_edited_at == datetime(
2022, 2, 2, 12, 0, 0, tzinfo=timezone.utc
)
assert result.status and result.status.name == "Draft"
assert (
result.categories
and len(result.categories) == 1
and result.categories[0].name == "Category2"
)
assert (
result.collections
and len(result.collections) == 1
and result.collections[0].name == "Collection2"
)
assert result.creator and result.creator.email == "creator@example.com"
assert result.owner and result.owner.email == "owner@example.com"
assert (
result.analytics
and result.analytics.appviews_all_time == 50
and result.analytics.last_viewed_at
== datetime(2022, 2, 3, 12, 0, 0, tzinfo=timezone.utc)
)
@patch("datahub.ingestion.source.hex.api.requests.get")
def test_fetch_projects_failure_http_error(self, mock_get):
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
"500 Server Error: Internal Server Error"
)
mock_get.return_value = mock_response
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
)
# No exception should be raised; gracefully finish with no results and proper error reporting
results = list(hex_api.fetch_projects())
# Verify results are empty and error was reported
assert len(results) == 0
assert self.report.fetch_projects_page_calls == 1
failures = list(self.report.failures)
assert len(failures) == 1
assert (
failures[0].title
and failures[0].title == "Listing Projects and Components API request error"
)
assert (
failures[0].message
and failures[0].message
== "Error fetching Projects and Components and halting metadata ingestion"
)
assert failures[0].context
@patch("datahub.ingestion.source.hex.api.requests.get")
@patch("datahub.ingestion.source.hex.api.HexApiProjectsListResponse.parse_obj")
def test_fetch_projects_failure_response_validation(self, mock_parse_obj, mock_get):
# Create a dummy http response
mock_response = MagicMock()
mock_response.json.return_value = {"whatever": "json"}
mock_get.return_value = mock_response
# and simulate ValidationError when parsing the response
mock_parse_obj.side_effect = ValidationError([], model=HexApiProjectApiResource)
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
)
# No exception should be raised; gracefully finish with no results and proper error reporting
results = list(hex_api.fetch_projects())
# Verify results are empty and error was reported
assert len(results) == 0
assert self.report.fetch_projects_page_calls == 1
failures = list(self.report.failures)
assert len(failures) == 1
assert (
failures[0].title
and failures[0].title
== "Listing Projects and Components API response parsing error"
)
assert (
failures[0].message
and failures[0].message
== "Error parsing API response and halting metadata ingestion"
)
assert failures[0].context
@patch("datahub.ingestion.source.hex.api.requests.get")
@patch("datahub.ingestion.source.hex.api.HexApiProjectsListResponse.parse_obj")
@patch("datahub.ingestion.source.hex.api.HexApi._map_data_from_model")
def test_fetch_projects_warning_model_mapping(
self, mock_map_data_from_model, mock_parse_obj, mock_get
):
# Create a dummy http response
mock_get_response = MagicMock()
mock_get_response.json.return_value = {"values": [{"whatever": "json"}]}
mock_get.return_value = mock_get_response
# create a couple of dummy project items
mock_parse_obj.return_value = HexApiProjectsListResponse(
values=[
HexApiProjectApiResource(
id="problem_item", title="Problem Item", type="PROJECT"
),
HexApiProjectApiResource(
id="valid_item", title="Valid Item", type="PROJECT"
),
]
)
# and simulate an Error when mapping the response to a model
def parse_side_effect(item_data):
assert isinstance(item_data, HexApiProjectApiResource)
if item_data.id == "problem_item":
raise ValueError("Invalid data structure for problem_item")
else:
valid_item = MagicMock()
valid_item.id = "valid_item"
valid_item.title = "Valid Item"
valid_item.type = "PROJECT"
valid_item.description = "A valid project"
valid_item.created_at = None
valid_item.last_edited_at = None
valid_item.status = None
valid_item.categories = []
valid_item.sharing = MagicMock(collections=[])
valid_item.creator = None
valid_item.owner = None
valid_item.analytics = None
return valid_item
mock_map_data_from_model.side_effect = parse_side_effect
hex_api = HexApi(
token=self.token,
report=self.report,
base_url=self.base_url,
)
# Should not raise exception, but log warning
results = list(hex_api.fetch_projects())
# We should still get the valid item but skip the problematic one
assert len(results) == 1
assert results[0].id == "valid_item"
assert self.report.fetch_projects_page_calls == 1
warnings = list(self.report.warnings)
assert len(warnings) == 1
assert warnings[0].title and warnings[0].title == "Incomplete metadata"
assert (
warnings[0].message
and warnings[0].message
== "Incomplete metadata because of error mapping item"
)
assert warnings[0].context

View File

@ -0,0 +1,474 @@
{
"values": [
{
"id": "c8f815c8-88c2-4dea-981f-69f544d6165d",
"title": "Welcome to Hex!-0",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user1@example.com"
},
"owner": {
"email": "user1@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T17:16:09.567Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T17:07:19.275Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user1@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "dd0f1e20-7586-4b8e-89ae-bfe3c924625b",
"title": "Welcome to Hex!-2",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user2@example.com"
},
"owner": {
"email": "user2@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T17:44:46.875Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T17:35:56.247Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user2@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "827ea1f2-ed9a-425f-8d48-0ecc491c7c7c",
"title": "Welcome to Hex!-3",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user3@example.com"
},
"owner": {
"email": "user3@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-21T20:04:05.879Z",
"lastPublishedAt": null,
"createdAt": "2024-08-21T20:04:03.148Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user3@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "e9d940fe-34ad-415b-ad12-cb4c201650dc",
"title": "Welcome to Hex!-4",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user4@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-08-22T17:59:39.864Z",
"lastPublishedAt": null,
"createdAt": "2024-08-22T17:58:40.772Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
},
{
"user": {
"email": "user4@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "89e64571-42d9-44ac-bf47-320a7440eb57",
"title": "Welcome to Hex!-5",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user4a@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-10-23T21:26:04.682Z",
"lastPublishedAt": null,
"createdAt": "2024-10-23T21:26:01.878Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "d05b0d81-6d00-4798-8967-6587b6731c0a",
"title": "Welcome to Hex!-6",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2024-12-03T13:31:17.016Z",
"lastPublishedAt": null,
"createdAt": "2024-12-03T06:53:25.879Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "d73da67d-c87b-4dd8-9e7f-b79cb7f822cf",
"title": "PlayNotebook",
"description": "This is a play project\n\n",
"type": "PROJECT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": {
"name": "In development"
},
"categories": [
{
"name": "Scratchpad",
"description": "Personal scratchpad for a team member. Not intended for broad consumption"
}
],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 16,
"lastSevenDays": 1,
"lastFourteenDays": 1,
"lastThirtyDays": 1
},
"lastViewedAt": "2025-03-17T14:55:34.717Z",
"publishedResultsUpdatedAt": "2025-03-18T12:11:34.907Z"
},
"lastEditedAt": "2024-12-18T08:38:03.873Z",
"lastPublishedAt": "2024-12-13T23:26:27.466Z",
"createdAt": "2024-12-03T09:54:58.471Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [
{
"cadence": "WEEKLY",
"enabled": true,
"hourly": null,
"daily": null,
"weekly": {
"dayOfWeek": "TUESDAY",
"hour": 17,
"minute": 40,
"timezone": "+05:30"
},
"monthly": null,
"custom": null
}
],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
},
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "4759f33c-1ab9-403d-92e8-9bef48de00c4",
"title": "Cancelled Orders",
"description": null,
"type": "COMPONENT",
"creator": {
"email": "user4a@example.com"
},
"owner": {
"email": "user4a@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": "2024-12-13T23:22:48.995Z"
},
"lastEditedAt": "2024-12-13T23:22:58.183Z",
"lastPublishedAt": "2024-12-13T23:22:58.189Z",
"createdAt": "2024-12-09T15:39:26.093Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user4a@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
}
],
"pagination": {
"after": "djI6WyIyMDI0LTEyLTA5VDE1OjM5OjI2LjA5M1oiLCI0NzU5ZjMzYy0xYWI5LTQwM2QtOTJlOC05YmVmNDhkZTAwYzQiXQ",
"before": "djI6WyIyMDI0LTA4LTIxVDE3OjA3OjE5LjI3NVoiLCJjOGY4MTVjOC04OGMyLTRkZWEtOTgxZi02OWY1NDRkNjE2NWQiXQ"
}
}

View File

@ -0,0 +1,125 @@
{
"values": [
{
"id": "0496a2c2-8656-475d-9946-6402320779e2",
"title": "Pet Profiles",
"description": "this is a component to get pet profiles at glance\n\n",
"type": "COMPONENT",
"creator": {
"email": "user5@example.com"
},
"owner": {
"email": "user5@example.com"
},
"status": null,
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": "2024-12-16T10:43:52.474Z"
},
"lastEditedAt": "2025-01-23T17:00:07.922Z",
"lastPublishedAt": "2024-12-16T10:44:09.990Z",
"createdAt": "2024-12-16T10:41:55.531Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user5@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [
{
"collection": {
"name": "First Collection"
},
"access": "NONE"
}
],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
},
{
"id": "2ef730de-25ec-4131-94af-3517e743a738",
"title": "Welcome to Hex!",
"description": "Get started with this example project that uses SQL and Python to find the most popular dessert order for a fictional dumpling restaurant.\n\n",
"type": "PROJECT",
"creator": {
"email": "user6@example.com"
},
"owner": {
"email": "user6@example.com"
},
"status": {
"name": "In development"
},
"categories": [],
"reviews": {
"required": false
},
"analytics": {
"appViews": {
"allTime": 0,
"lastSevenDays": 0,
"lastFourteenDays": 0,
"lastThirtyDays": 0
},
"lastViewedAt": null,
"publishedResultsUpdatedAt": null
},
"lastEditedAt": "2025-03-17T14:44:00.614Z",
"lastPublishedAt": null,
"createdAt": "2025-03-17T09:27:30.585Z",
"archivedAt": null,
"trashedAt": null,
"schedules": [],
"sharing": {
"users": [
{
"user": {
"email": "user6@example.com"
},
"access": "FULL_ACCESS"
}
],
"collections": [],
"groups": [],
"workspace": {
"access": "NONE"
},
"publicWeb": {
"access": "NONE"
},
"support": {
"access": "NONE"
}
}
}
],
"pagination": {
"after": null,
"before": "djI6WyIyMDI0LTEyLTE2VDEwOjQxOjU1LjUzMVoiLCIwNDk2YTJjMi04NjU2LTQ3NWQtOTk0Ni02NDAyMzIwNzc5ZTIiXQ"
}
}

View File

@ -0,0 +1,794 @@
import unittest
from datetime import datetime
from datahub.emitter.mce_builder import make_tag_urn, make_ts_millis, make_user_urn
from datahub.emitter.mcp import (
MetadataChangeProposalWrapper,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.hex.constants import HEX_PLATFORM_NAME
from datahub.ingestion.source.hex.mapper import Mapper
from datahub.ingestion.source.hex.model import (
Analytics,
Category,
Collection,
Component,
Owner,
Project,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStampClass,
ChangeAuditStampsClass,
OwnershipType,
)
from datahub.metadata.com.linkedin.pegasus2avro.dashboard import (
DashboardUsageStatistics,
)
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ContainerClass,
ContainerPropertiesClass,
DashboardInfoClass,
DataPlatformInstanceClass,
GlobalTagsClass,
MetadataChangeProposalClass,
OwnershipClass,
SubTypesClass,
TimeWindowSizeClass,
)
from datahub.metadata.urns import DashboardUrn
class TestMapper(unittest.TestCase):
workspace_name = "test-workspace"
created_at = datetime(2022, 1, 1, 0, 0, 0)
last_edited_at = datetime(2022, 1, 2, 0, 0, 0)
last_modified = ChangeAuditStampsClass(
created=AuditStampClass(
time=make_ts_millis(datetime(2022, 1, 1)),
actor="urn:li:corpuser:_ingestion",
),
lastModified=AuditStampClass(
time=make_ts_millis(datetime(2022, 1, 2)),
actor="urn:li:corpuser:_ingestion",
),
)
def test_map_workspace(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert aspect and aspect.name == self.workspace_name and not aspect.env
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert aspect and aspect.name == self.workspace_name and not aspect.env
mapper = Mapper(
workspace_name=self.workspace_name,
env="test-env",
platform_instance="test-platform",
)
work_units = list(mapper.map_workspace())
assert len(work_units) == 1
assert isinstance(work_units[0], MetadataWorkUnit) and isinstance(
work_units[0].metadata, MetadataChangeProposalWrapper
)
# guid here is the same as before because by default env is ignored in the key
assert (
work_units[0].metadata.entityUrn
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
aspect = work_units[0].get_aspect_of_type(ContainerPropertiesClass)
assert (
aspect and aspect.name == self.workspace_name and aspect.env == "test-env"
)
def test_map_project(self):
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
project = Project(
id="uuid1",
title="Test Project",
description="A test project",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Published"),
categories=[Category(name="Category1"), Category(name="Category2")],
collections=[Collection(name="Collection1")],
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
analytics=Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
),
)
# check URNs
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
assert all(
isinstance(wu.metadata, MetadataChangeProposalWrapper)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# check DashboardInfoClass
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
assert isinstance(
dashboard_info_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(dashboard_info_wus[0].metadata.aspect, DashboardInfoClass)
assert dashboard_info_wus[0].metadata.aspect.title == "Test Project"
assert dashboard_info_wus[0].metadata.aspect.description == "A test project"
assert (
dashboard_info_wus[0].metadata.aspect.externalUrl
== "https://app.hex.tech/test-workspace/hex/uuid1"
)
assert dashboard_info_wus[0].metadata.aspect.customProperties == {
"id": "uuid1",
}
# check SubTypesClass
subtypes_wus = [wu for wu in work_units if wu.get_aspect_of_type(SubTypesClass)]
assert len(subtypes_wus) == 1
assert isinstance(
subtypes_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(subtypes_wus[0].metadata.aspect, SubTypesClass)
assert subtypes_wus[0].metadata.aspect.typeNames == [
BIAssetSubTypes.HEX_PROJECT
]
# check DataPlatformInstanceClass
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert platform_instance_wus[0].metadata.aspect.instance is None
# check ContainerClass
container_wus = [
wu for wu in work_units if wu.get_aspect_of_type(ContainerClass)
]
assert len(container_wus) == 1
assert isinstance(
container_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(container_wus[0].metadata.aspect, ContainerClass)
assert (
container_wus[0].metadata.aspect.container
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
# check GlobalTagsClass
tags_wus = [wu for wu in work_units if wu.get_aspect_of_type(GlobalTagsClass)]
assert len(tags_wus) == 1
assert isinstance(
tags_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(tags_wus[0].metadata.aspect, GlobalTagsClass)
assert len(tags_wus[0].metadata.aspect.tags) == 4
tag_urns = {tag.tag for tag in tags_wus[0].metadata.aspect.tags}
assert tag_urns == {
"urn:li:tag:hex:status:Published",
"urn:li:tag:hex:category:Category1",
"urn:li:tag:hex:category:Category2",
"urn:li:tag:hex:collection:Collection1",
}
# check OwnershipClass
ownership_wus = [
wu for wu in work_units if wu.get_aspect_of_type(OwnershipClass)
]
assert len(ownership_wus) == 1
assert isinstance(
ownership_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(ownership_wus[0].metadata.aspect, OwnershipClass)
assert len(ownership_wus[0].metadata.aspect.owners) == 2
owner_urns = {owner.owner for owner in ownership_wus[0].metadata.aspect.owners}
assert owner_urns == {
"urn:li:corpuser:creator@example.com",
"urn:li:corpuser:owner@example.com",
}
assert all(
[
owner.type == OwnershipType.TECHNICAL_OWNER
for owner in ownership_wus[0].metadata.aspect.owners
]
)
# check DashboardUsageStatistics
dashboard_usage_statistics_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardUsageStatistics)
]
assert len(dashboard_usage_statistics_wus) == 2
usage_stats_all_time_wu = dashboard_usage_statistics_wus[0]
usage_stats_last_7_days_wu = dashboard_usage_statistics_wus[1]
assert (
isinstance(usage_stats_all_time_wu.metadata, MetadataChangeProposalWrapper)
and isinstance(
usage_stats_all_time_wu.metadata.aspect, DashboardUsageStatistics
)
and isinstance(
usage_stats_last_7_days_wu.metadata, MetadataChangeProposalWrapper
)
and isinstance(
usage_stats_last_7_days_wu.metadata.aspect, DashboardUsageStatistics
)
)
assert (
usage_stats_all_time_wu.metadata.aspect.viewsCount == 100
and usage_stats_last_7_days_wu.metadata.aspect.viewsCount == 10
)
assert (
not usage_stats_all_time_wu.metadata.aspect.eventGranularity
and usage_stats_last_7_days_wu.metadata.aspect.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time_wu.metadata.aspect.lastViewedAt
== usage_stats_last_7_days_wu.metadata.aspect.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
# what if we set patch_metadata to True
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=True,
)
# mostly the same
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
assert all(
isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# but DashboardInfo patch
patche_wus = [
wu
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalClass)
and wu.metadata.changeType == "PATCH"
]
assert len(patche_wus) == 1
assert isinstance(patche_wus[0].metadata, MetadataChangeProposalClass)
assert patche_wus[0].metadata.aspectName == "dashboardInfo"
# what if we set platform_instance
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
# mostly the same but additional instance DataPlatformInstanceClass
work_units = list(mapper.map_project(project))
assert len(work_units) == 8
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert (
platform_instance_wus[0].metadata.aspect.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_map_component(self):
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=False,
)
component = Component(
id="uuid1",
title="Test Component",
description="A test component",
created_at=self.created_at,
last_edited_at=self.last_edited_at,
status=Status(name="Draft"),
categories=[Category(name="Category3")],
collections=[Collection(name="Collection2")],
creator=Owner(email="creator@example.com"),
owner=Owner(email="owner@example.com"),
analytics=Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
),
)
# check URNs
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
assert all(
isinstance(wu.metadata, MetadataChangeProposalWrapper)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# check DashboardInfoClass
dashboard_info_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardInfoClass)
]
assert len(dashboard_info_wus) == 1
assert isinstance(
dashboard_info_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(dashboard_info_wus[0].metadata.aspect, DashboardInfoClass)
assert dashboard_info_wus[0].metadata.aspect.title == "Test Component"
assert dashboard_info_wus[0].metadata.aspect.description == "A test component"
assert (
dashboard_info_wus[0].metadata.aspect.externalUrl
== "https://app.hex.tech/test-workspace/hex/uuid1"
)
assert dashboard_info_wus[0].metadata.aspect.customProperties == {"id": "uuid1"}
# check SubTypesClass
subtypes_wus = [wu for wu in work_units if wu.get_aspect_of_type(SubTypesClass)]
assert len(subtypes_wus) == 1
assert isinstance(
subtypes_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(subtypes_wus[0].metadata.aspect, SubTypesClass)
assert subtypes_wus[0].metadata.aspect.typeNames == [
BIAssetSubTypes.HEX_COMPONENT
]
# check DataPlatformInstanceClass
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert platform_instance_wus[0].metadata.aspect.instance is None
# check ContainerClass
container_wus = [
wu for wu in work_units if wu.get_aspect_of_type(ContainerClass)
]
assert len(container_wus) == 1
assert isinstance(
container_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(container_wus[0].metadata.aspect, ContainerClass)
assert (
container_wus[0].metadata.aspect.container
== "urn:li:container:635fbdd141a7b358624369c6060847c3"
)
# check GlobalTagsClass
tags_wus = [wu for wu in work_units if wu.get_aspect_of_type(GlobalTagsClass)]
assert len(tags_wus) == 1
assert isinstance(
tags_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(tags_wus[0].metadata.aspect, GlobalTagsClass)
assert len(tags_wus[0].metadata.aspect.tags) == 3
tag_urns = {tag.tag for tag in tags_wus[0].metadata.aspect.tags}
assert tag_urns == {
"urn:li:tag:hex:status:Draft",
"urn:li:tag:hex:category:Category3",
"urn:li:tag:hex:collection:Collection2",
}
# check OwnershipClass
ownership_wus = [
wu for wu in work_units if wu.get_aspect_of_type(OwnershipClass)
]
assert len(ownership_wus) == 1
assert isinstance(
ownership_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(ownership_wus[0].metadata.aspect, OwnershipClass)
assert len(ownership_wus[0].metadata.aspect.owners) == 2
owner_urns = {owner.owner for owner in ownership_wus[0].metadata.aspect.owners}
assert owner_urns == {
"urn:li:corpuser:creator@example.com",
"urn:li:corpuser:owner@example.com",
}
assert all(
[
owner.type == OwnershipType.TECHNICAL_OWNER
for owner in ownership_wus[0].metadata.aspect.owners
]
)
# check DashboardUsageStatistics
dashboard_usage_statistics_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DashboardUsageStatistics)
]
assert len(dashboard_usage_statistics_wus) == 2
usage_stats_all_time_wu = dashboard_usage_statistics_wus[0]
usage_stats_last_7_days_wu = dashboard_usage_statistics_wus[1]
assert (
isinstance(usage_stats_all_time_wu.metadata, MetadataChangeProposalWrapper)
and isinstance(
usage_stats_all_time_wu.metadata.aspect, DashboardUsageStatistics
)
and isinstance(
usage_stats_last_7_days_wu.metadata, MetadataChangeProposalWrapper
)
and isinstance(
usage_stats_last_7_days_wu.metadata.aspect, DashboardUsageStatistics
)
)
assert (
usage_stats_all_time_wu.metadata.aspect.viewsCount == 100
and usage_stats_last_7_days_wu.metadata.aspect.viewsCount == 10
)
assert (
not usage_stats_all_time_wu.metadata.aspect.eventGranularity
and usage_stats_last_7_days_wu.metadata.aspect.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time_wu.metadata.aspect.lastViewedAt
== usage_stats_last_7_days_wu.metadata.aspect.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
# what if we set patch_metadata to True
mapper = Mapper(
workspace_name=self.workspace_name,
patch_metadata=True,
)
# mostly the same
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
assert all(
isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.entityUrn == "urn:li:dashboard:(hex,uuid1)"
for wu in work_units
)
# but DashboardInfo patch
patche_wus = [
wu
for wu in work_units
if isinstance(
wu.metadata,
(MetadataChangeProposalWrapper, MetadataChangeProposalClass),
)
and wu.metadata.changeType == "PATCH"
]
assert len(patche_wus) == 1
assert isinstance(patche_wus[0].metadata, MetadataChangeProposalClass)
assert patche_wus[0].metadata.aspectName == "dashboardInfo"
# what if we set platform_instance
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
# mostly the same but additional DataPlatformInstanceClass
work_units = list(mapper.map_component(component))
assert len(work_units) == 8
platform_instance_wus = [
wu for wu in work_units if wu.get_aspect_of_type(DataPlatformInstanceClass)
]
assert len(platform_instance_wus) == 1
assert isinstance(
platform_instance_wus[0].metadata, MetadataChangeProposalWrapper
) and isinstance(
platform_instance_wus[0].metadata.aspect, DataPlatformInstanceClass
)
assert (
platform_instance_wus[0].metadata.aspect.platform
== "urn:li:dataPlatform:hex"
)
assert (
platform_instance_wus[0].metadata.aspect.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_global_tags_status(self):
status = Status(name="Published")
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(status, None, None)
assert tags is not None
assert len(tags.tags) == 1
assert tags.tags[0].tag == make_tag_urn("hex:status:Published")
mapper = Mapper(
workspace_name=self.workspace_name,
status_as_tag=False,
)
tags = mapper._global_tags(status, None, None)
assert tags is None
def test_global_tags_categories(self):
categories = [Category(name="Category1"), Category(name="Category2")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(None, categories, None)
assert tags is not None
assert len(tags.tags) == 2
assert tags.tags[0].tag == make_tag_urn("hex:category:Category1")
assert tags.tags[1].tag == make_tag_urn("hex:category:Category2")
mapper = Mapper(
workspace_name=self.workspace_name,
categories_as_tags=False,
)
tags = mapper._global_tags(None, categories, None)
assert tags is None
def test_global_tags_collections(self):
collections = [Collection(name="Collection1")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(None, None, collections)
assert tags is not None
assert len(tags.tags) == 1
assert tags.tags[0].tag == make_tag_urn("hex:collection:Collection1")
mapper = Mapper(
workspace_name=self.workspace_name,
collections_as_tags=False,
)
tags = mapper._global_tags(None, None, collections)
assert tags is None
def test_global_tags_all(self):
status = Status(name="Published")
categories = [Category(name="Category1"), Category(name="Category2")]
collections = [Collection(name="Collection1")]
mapper = Mapper(
workspace_name=self.workspace_name,
)
tags = mapper._global_tags(status, categories, collections)
assert tags is not None
assert len(tags.tags) == 4
def test_ownership(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
creator = Owner(email="creator@example.com")
owner = Owner(email="owner@example.com")
ownership = mapper._ownership(creator, owner)
assert ownership is not None
assert len(ownership.owners) == 2
creator_owner = next(
(
o
for o in ownership.owners
if o.owner == make_user_urn("creator@example.com")
),
None,
)
assert creator_owner is not None
assert creator_owner.type == OwnershipType.TECHNICAL_OWNER
primary_owner = next(
(
o
for o in ownership.owners
if o.owner == make_user_urn("owner@example.com")
),
None,
)
assert primary_owner is not None
assert primary_owner.type == OwnershipType.TECHNICAL_OWNER
ownership = mapper._ownership(creator, creator)
assert ownership is not None
assert len(ownership.owners) == 1
ownership = mapper._ownership(None, None)
assert ownership is None
mapper = Mapper(
workspace_name=self.workspace_name,
set_ownership_from_email=False,
)
ownership = mapper._ownership(creator, owner)
assert ownership is None
def test_dashboard_usage_statistics(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
analytics = Analytics(
appviews_all_time=100,
appviews_last_7_days=10,
appviews_last_14_days=20,
appviews_last_30_days=30,
last_viewed_at=datetime(2022, 1, 1, 0, 0, 0),
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert usage_stats_all_time and usage_stats_last_7_days
assert (
usage_stats_all_time.viewsCount == 100
and usage_stats_last_7_days.viewsCount == 10
)
assert (
not usage_stats_all_time.eventGranularity
and usage_stats_last_7_days.eventGranularity
== TimeWindowSizeClass(unit=CalendarIntervalClass.WEEK, multiple=1)
)
assert (
usage_stats_all_time.lastViewedAt
== usage_stats_last_7_days.lastViewedAt
== make_ts_millis(datetime(2022, 1, 1))
)
analytics = Analytics(
appviews_all_time=None,
appviews_last_7_days=None,
appviews_last_14_days=None,
appviews_last_30_days=None,
last_viewed_at=None,
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert not usage_stats_all_time and not usage_stats_last_7_days
analytics = Analytics(
appviews_all_time=None,
appviews_last_7_days=10,
appviews_last_14_days=None,
appviews_last_30_days=None,
last_viewed_at=None,
)
usage_stats_all_time, usage_stats_last_7_days = (
mapper._dashboard_usage_statistics(analytics)
)
assert not usage_stats_all_time and usage_stats_last_7_days
assert usage_stats_last_7_days.viewsCount == 10
assert usage_stats_last_7_days.eventGranularity == TimeWindowSizeClass(
unit=CalendarIntervalClass.WEEK, multiple=1
)
assert usage_stats_last_7_days.lastViewedAt is None
def test_platform_instance_aspect(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
platform_instance = mapper._platform_instance_aspect()
assert platform_instance
assert platform_instance.platform == "urn:li:dataPlatform:hex"
assert platform_instance.instance is None
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
platform_instance = mapper._platform_instance_aspect()
assert platform_instance
assert platform_instance.platform == "urn:li:dataPlatform:hex"
assert (
platform_instance.instance
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:hex,test-platform)"
)
def test_get_dashboard_urn(self):
mapper = Mapper(
workspace_name=self.workspace_name,
)
dashboard_urn = mapper._get_dashboard_urn("dashboard_name")
assert dashboard_urn == DashboardUrn(
dashboard_id="dashboard_name", dashboard_tool=HEX_PLATFORM_NAME
)
assert dashboard_urn.urn() == "urn:li:dashboard:(hex,dashboard_name)"
mapper = Mapper(
workspace_name=self.workspace_name,
platform_instance="test-platform",
)
dashboard_urn = mapper._get_dashboard_urn("dashboard_name")
assert dashboard_urn == DashboardUrn(
dashboard_id="test-platform.dashboard_name",
dashboard_tool=HEX_PLATFORM_NAME,
)
assert (
dashboard_urn.urn() == "urn:li:dashboard:(hex,test-platform.dashboard_name)"
)

View File

@ -757,4 +757,13 @@
displayName: vertexai
type: OTHERS
logoUrl: "/assets/platforms/vertexai.png"
- entityUrn: urn:li:dataPlatform:hex
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: hex
displayName: Hex
type: OTHERS
logoUrl: "/assets/platforms/hex.png"