mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
fix(ingest/mode): Improve query lineage (#10284)
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
parent
2dbb968e07
commit
897e648eae
@ -374,7 +374,9 @@ plugins: Dict[str, Set[str]] = {
|
||||
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
|
||||
"setuptools",
|
||||
},
|
||||
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib | sqlglot_lib,
|
||||
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
|
||||
| sqllineage_lib
|
||||
| sqlglot_lib,
|
||||
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
|
||||
"mssql": sql_common | mssql_common,
|
||||
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
|
||||
|
||||
@ -48,6 +48,7 @@ class BIContainerSubTypes(str, Enum):
|
||||
QLIK_APP = "Qlik App"
|
||||
SIGMA_WORKSPACE = "Sigma Workspace"
|
||||
SIGMA_WORKBOOK = "Sigma Workbook"
|
||||
MODE_COLLECTION = "Collection"
|
||||
|
||||
|
||||
class JobContainerSubTypes(str, Enum):
|
||||
@ -64,3 +65,8 @@ class BIAssetSubTypes(str, Enum):
|
||||
# PowerBI
|
||||
POWERBI_TILE = "PowerBI Tile"
|
||||
POWERBI_PAGE = "PowerBI Page"
|
||||
|
||||
# Mode
|
||||
MODE_REPORT = "Report"
|
||||
MODE_QUERY = "Query"
|
||||
MODE_CHART = "Chart"
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import dataclasses
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
@ -9,16 +10,20 @@ from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
|
||||
import dateutil.parser as dp
|
||||
import pydantic
|
||||
import requests
|
||||
import sqlglot
|
||||
import tenacity
|
||||
import yaml
|
||||
from liquid import Template, Undefined
|
||||
from pydantic import Field, validator
|
||||
from requests.models import HTTPBasicAuth, HTTPError
|
||||
from sqllineage.runner import LineageRunner
|
||||
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
|
||||
|
||||
import datahub.emitter.mce_builder as builder
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.configuration.common import AllowDenyPattern, ConfigModel
|
||||
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
from datahub.emitter.mcp_builder import ContainerKey, gen_containers
|
||||
from datahub.ingestion.api.common import PipelineContext
|
||||
from datahub.ingestion.api.decorators import (
|
||||
SourceCapability,
|
||||
@ -30,6 +35,10 @@ from datahub.ingestion.api.decorators import (
|
||||
)
|
||||
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.common.subtypes import (
|
||||
BIAssetSubTypes,
|
||||
BIContainerSubTypes,
|
||||
)
|
||||
from datahub.ingestion.source.state.stale_entity_removal_handler import (
|
||||
StaleEntityRemovalHandler,
|
||||
StaleEntityRemovalSourceReport,
|
||||
@ -49,8 +58,9 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
|
||||
)
|
||||
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||
from datahub.metadata.schema_classes import (
|
||||
BrowsePathEntryClass,
|
||||
BrowsePathsClass,
|
||||
ChangeTypeClass,
|
||||
BrowsePathsV2Class,
|
||||
ChartInfoClass,
|
||||
ChartQueryClass,
|
||||
ChartQueryTypeClass,
|
||||
@ -77,9 +87,7 @@ from datahub.metadata.schema_classes import (
|
||||
QuerySourceClass,
|
||||
QueryStatementClass,
|
||||
SchemaFieldClass,
|
||||
SchemaFieldDataTypeClass,
|
||||
SchemaMetadataClass,
|
||||
StringTypeClass,
|
||||
SubTypesClass,
|
||||
TagAssociationClass,
|
||||
TagPropertiesClass,
|
||||
@ -94,10 +102,16 @@ from datahub.sql_parsing.sqlglot_lineage import (
|
||||
infer_output_schema,
|
||||
)
|
||||
from datahub.utilities import config_clean
|
||||
from datahub.utilities.lossy_collections import LossyDict, LossyList
|
||||
|
||||
logger: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SpaceKey(ContainerKey):
|
||||
# Note that Mode has renamed Spaces to Collections.
|
||||
space_token: str
|
||||
|
||||
|
||||
class ModeAPIConfig(ConfigModel):
|
||||
retry_backoff_multiplier: Union[int, float] = Field(
|
||||
default=2,
|
||||
@ -121,11 +135,22 @@ class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase):
|
||||
password: pydantic.SecretStr = Field(
|
||||
description="Mode password for authentication."
|
||||
)
|
||||
workspace: Optional[str] = Field(default=None, description="")
|
||||
|
||||
workspace: str = Field(
|
||||
description="The Mode workspace name. Find it in Settings > Workspace > Details."
|
||||
)
|
||||
default_schema: str = Field(
|
||||
default="public",
|
||||
description="Default schema to use when schema is not provided in an SQL query",
|
||||
)
|
||||
|
||||
space_pattern: AllowDenyPattern = Field(
|
||||
default=AllowDenyPattern(
|
||||
deny=["^Personal$"],
|
||||
),
|
||||
description="Regex patterns for mode spaces to filter in ingestion (Spaces named as 'Personal' are filtered by default.) Specify regex to only match the space name. e.g. to only ingest space named analytics, use the regex 'analytics'",
|
||||
)
|
||||
|
||||
owner_username_instead_of_email: Optional[bool] = Field(
|
||||
default=True, description="Use username for owner URN instead of Email"
|
||||
)
|
||||
@ -155,7 +180,22 @@ class HTTPError429(HTTPError):
|
||||
|
||||
@dataclass
|
||||
class ModeSourceReport(StaleEntityRemovalSourceReport):
|
||||
pass
|
||||
filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList)
|
||||
num_sql_parsed: int = 0
|
||||
num_sql_parser_failures: int = 0
|
||||
num_sql_parser_success: int = 0
|
||||
num_sql_parser_table_error: int = 0
|
||||
num_sql_parser_column_error: int = 0
|
||||
num_query_template_render: int = 0
|
||||
num_query_template_render_failures: int = 0
|
||||
num_query_template_render_success: int = 0
|
||||
|
||||
dropped_imported_datasets: LossyDict[str, LossyList[str]] = dataclasses.field(
|
||||
default_factory=LossyDict
|
||||
)
|
||||
|
||||
def report_dropped_space(self, ent_name: str) -> None:
|
||||
self.filtered_spaces.append(ent_name)
|
||||
|
||||
|
||||
@platform_name("Mode")
|
||||
@ -268,21 +308,78 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
except HTTPError as http_error:
|
||||
self.report.report_failure(
|
||||
key="mode-session",
|
||||
reason=f"Unable to retrieve user "
|
||||
f"{self.config.token} information, "
|
||||
f"{str(http_error)}",
|
||||
reason=f"Unable to verify connection. Error was: {str(http_error)}",
|
||||
)
|
||||
|
||||
self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}"
|
||||
self.space_tokens = self._get_space_name_and_tokens()
|
||||
|
||||
def _browse_path_space(self) -> List[BrowsePathEntryClass]:
|
||||
# TODO: Use containers for the workspace?
|
||||
return [
|
||||
BrowsePathEntryClass(id=self.config.workspace),
|
||||
]
|
||||
|
||||
def _browse_path_dashboard(self, space_token: str) -> List[BrowsePathEntryClass]:
|
||||
space_container_urn = self.gen_space_key(space_token).as_urn()
|
||||
return [
|
||||
*self._browse_path_space(),
|
||||
BrowsePathEntryClass(id=space_container_urn, urn=space_container_urn),
|
||||
]
|
||||
|
||||
def _browse_path_query(
|
||||
self, space_token: str, report_info: dict
|
||||
) -> List[BrowsePathEntryClass]:
|
||||
dashboard_urn = self._dashboard_urn(report_info)
|
||||
return [
|
||||
*self._browse_path_dashboard(space_token),
|
||||
BrowsePathEntryClass(id=dashboard_urn, urn=dashboard_urn),
|
||||
]
|
||||
|
||||
def _browse_path_chart(
|
||||
self, space_token: str, report_info: dict, query_info: dict
|
||||
) -> List[BrowsePathEntryClass]:
|
||||
query_urn = self.get_dataset_urn_from_query(query_info)
|
||||
return [
|
||||
*self._browse_path_query(space_token, report_info),
|
||||
BrowsePathEntryClass(id=query_urn, urn=query_urn),
|
||||
]
|
||||
|
||||
def _dashboard_urn(self, report_info: dict) -> str:
|
||||
return builder.make_dashboard_urn(self.platform, report_info.get("id", ""))
|
||||
|
||||
def _parse_last_run_at(self, report_info: dict) -> Optional[int]:
|
||||
# Mode queries are refreshed, and that timestamp is reflected correctly here.
|
||||
# However, datasets are synced, and that's captured by the sync timestamps.
|
||||
# However, this is probably accurate enough for now.
|
||||
last_refreshed_ts = None
|
||||
last_refreshed_ts_str = report_info.get("last_run_at")
|
||||
if last_refreshed_ts_str:
|
||||
last_refreshed_ts = int(dp.parse(last_refreshed_ts_str).timestamp() * 1000)
|
||||
|
||||
return last_refreshed_ts
|
||||
|
||||
def construct_dashboard(
|
||||
self, space_name: str, report_info: dict
|
||||
) -> DashboardSnapshot:
|
||||
self, space_token: str, report_info: dict
|
||||
) -> Optional[Tuple[DashboardSnapshot, MetadataChangeProposalWrapper]]:
|
||||
report_token = report_info.get("token", "")
|
||||
dashboard_urn = builder.make_dashboard_urn(
|
||||
self.platform, report_info.get("id", "")
|
||||
)
|
||||
# logger.debug(f"Processing report {report_info.get('name', '')}: {report_info}")
|
||||
|
||||
if not report_token:
|
||||
self.report.report_warning(
|
||||
key="mode-report",
|
||||
reason=f"Report token is missing for {report_info.get('id', '')}",
|
||||
)
|
||||
return None
|
||||
|
||||
if not report_info.get("id"):
|
||||
self.report.report_warning(
|
||||
key="mode-report",
|
||||
reason=f"Report id is missing for {report_info.get('token', '')}",
|
||||
)
|
||||
return None
|
||||
|
||||
dashboard_urn = self._dashboard_urn(report_info)
|
||||
dashboard_snapshot = DashboardSnapshot(
|
||||
urn=dashboard_urn,
|
||||
aspects=[],
|
||||
@ -290,50 +387,64 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
title = report_info.get("name", "")
|
||||
description = report_info.get("description", "")
|
||||
|
||||
last_modified = ChangeAuditStamps()
|
||||
|
||||
# Creator + created ts.
|
||||
creator = self._get_creator(
|
||||
report_info.get("_links", {}).get("creator", {}).get("href", "")
|
||||
)
|
||||
if creator:
|
||||
modified_actor = builder.make_user_urn(creator)
|
||||
if not report_info.get("last_saved_at"):
|
||||
# Sometimes mode returns null for last_saved_at.
|
||||
# In that case, we use the created_at timestamp instead.
|
||||
report_info["last_saved_at"] = report_info.get("created_at")
|
||||
|
||||
modified_ts = int(
|
||||
dp.parse(f"{report_info.get('last_saved_at', 'now')}").timestamp()
|
||||
* 1000
|
||||
)
|
||||
creator_actor = builder.make_user_urn(creator)
|
||||
created_ts = int(
|
||||
dp.parse(f"{report_info.get('created_at', 'now')}").timestamp() * 1000
|
||||
)
|
||||
last_modified = ChangeAuditStamps(
|
||||
created=AuditStamp(time=created_ts, actor=modified_actor),
|
||||
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
|
||||
last_modified.created = AuditStamp(time=created_ts, actor=creator_actor)
|
||||
|
||||
# Last modified ts.
|
||||
last_modified_ts_str = report_info.get("last_saved_at")
|
||||
if not last_modified_ts_str:
|
||||
# Sometimes mode returns null for last_saved_at.
|
||||
# In that case, we use the edited_at timestamp instead.
|
||||
last_modified_ts_str = report_info.get("edited_at")
|
||||
if last_modified_ts_str:
|
||||
modified_ts = int(dp.parse(last_modified_ts_str).timestamp() * 1000)
|
||||
last_modified.lastModified = AuditStamp(
|
||||
time=modified_ts, actor="urn:li:corpuser:unknown"
|
||||
)
|
||||
|
||||
# Last refreshed ts.
|
||||
last_refreshed_ts = self._parse_last_run_at(report_info)
|
||||
|
||||
dashboard_info_class = DashboardInfoClass(
|
||||
description=description,
|
||||
title=title,
|
||||
description=description if description else "",
|
||||
title=title if title else "",
|
||||
charts=self._get_chart_urns(report_token),
|
||||
lastModified=last_modified,
|
||||
lastRefreshed=last_refreshed_ts,
|
||||
dashboardUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}",
|
||||
customProperties={},
|
||||
)
|
||||
dashboard_snapshot.aspects.append(dashboard_info_class)
|
||||
|
||||
# browse path
|
||||
space_name = self.space_tokens[space_token]
|
||||
browse_path = BrowsePathsClass(
|
||||
paths=[
|
||||
f"/mode/{self.config.workspace}/"
|
||||
f"{space_name}/"
|
||||
f"{report_info.get('name')}"
|
||||
f"{title if title else report_info.get('id', '')}"
|
||||
]
|
||||
)
|
||||
dashboard_snapshot.aspects.append(browse_path)
|
||||
|
||||
browse_path_v2 = BrowsePathsV2Class(
|
||||
path=self._browse_path_dashboard(space_token)
|
||||
)
|
||||
browse_mcp = MetadataChangeProposalWrapper(
|
||||
entityUrn=dashboard_urn,
|
||||
aspect=browse_path_v2,
|
||||
)
|
||||
|
||||
# Ownership
|
||||
ownership = self._get_ownership(
|
||||
self._get_creator(
|
||||
@ -343,7 +454,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
if ownership is not None:
|
||||
dashboard_snapshot.aspects.append(ownership)
|
||||
|
||||
return dashboard_snapshot
|
||||
return dashboard_snapshot, browse_mcp
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _get_ownership(self, user: str) -> Optional[OwnershipClass]:
|
||||
@ -372,7 +483,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
else user_json.get("email")
|
||||
)
|
||||
except HTTPError as http_error:
|
||||
self.report.report_failure(
|
||||
self.report.report_warning(
|
||||
key="mode-user",
|
||||
reason=f"Unable to retrieve user for {href}, "
|
||||
f"Reason: {str(http_error)}",
|
||||
@ -386,6 +497,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
charts = self._get_charts(report_token, query.get("token", ""))
|
||||
# build chart urns
|
||||
for chart in charts:
|
||||
logger.debug(f"Chart: {chart.get('token')}")
|
||||
chart_urn = builder.make_chart_urn(
|
||||
self.platform, chart.get("token", "")
|
||||
)
|
||||
@ -396,10 +508,19 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
def _get_space_name_and_tokens(self) -> dict:
|
||||
space_info = {}
|
||||
try:
|
||||
payload = self._get_request_json(f"{self.workspace_uri}/spaces")
|
||||
logger.debug(f"Retrieving spaces for {self.workspace_uri}")
|
||||
payload = self._get_request_json(f"{self.workspace_uri}/spaces?filter=all")
|
||||
spaces = payload.get("_embedded", {}).get("spaces", {})
|
||||
|
||||
logger.debug(
|
||||
f"Got {len(spaces)} spaces from workspace {self.workspace_uri}"
|
||||
)
|
||||
for s in spaces:
|
||||
logger.debug(f"Space: {s.get('name')}")
|
||||
space_name = s.get("name", "")
|
||||
if not self.config.space_pattern.allowed(space_name):
|
||||
self.report.report_dropped_space(space_name)
|
||||
logging.debug(f"Skipping space {space_name} due to space pattern")
|
||||
continue
|
||||
space_info[s.get("token", "")] = s.get("name", "")
|
||||
except HTTPError as http_error:
|
||||
self.report.report_failure(
|
||||
@ -414,6 +535,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
type_mapping = {
|
||||
"table": ChartTypeClass.TABLE,
|
||||
"bar": ChartTypeClass.BAR,
|
||||
"bigNumber": ChartTypeClass.TEXT,
|
||||
"line": ChartTypeClass.LINE,
|
||||
"stackedBar100": ChartTypeClass.BAR,
|
||||
"stackedBar": ChartTypeClass.BAR,
|
||||
@ -428,19 +550,22 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
"bigValue": ChartTypeClass.TEXT,
|
||||
"pivotTable": ChartTypeClass.TABLE,
|
||||
"linePlusBar": None,
|
||||
"vegas": None,
|
||||
"vegasPivotTable": ChartTypeClass.TABLE,
|
||||
"histogram": ChartTypeClass.HISTOGRAM,
|
||||
}
|
||||
if not display_type:
|
||||
self.report.report_warning(
|
||||
key=f"mode-chart-{token}",
|
||||
reason=f"Chart type {display_type} is missing. " f"Setting to None",
|
||||
key="mode-chart-type-mapper",
|
||||
reason=f"{token}: Chart type is missing. Setting to None",
|
||||
)
|
||||
return None
|
||||
try:
|
||||
chart_type = type_mapping[display_type]
|
||||
except KeyError:
|
||||
self.report.report_warning(
|
||||
key=f"mode-chart-{token}",
|
||||
reason=f"Chart type {display_type} not supported. " f"Setting to None",
|
||||
key="mode-chart-type-mapper",
|
||||
reason=f"{token}: Chart type {display_type} not supported. Setting to None",
|
||||
)
|
||||
chart_type = None
|
||||
|
||||
@ -449,7 +574,9 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
def construct_chart_custom_properties(
|
||||
self, chart_detail: dict, chart_type: str
|
||||
) -> Dict:
|
||||
custom_properties = {}
|
||||
custom_properties = {
|
||||
"ChartType": chart_type,
|
||||
}
|
||||
metadata = chart_detail.get("encoding", {})
|
||||
if chart_type == "table":
|
||||
columns = list(chart_detail.get("fieldFormats", {}).keys())
|
||||
@ -457,10 +584,12 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
filters = metadata.get("filter", [])
|
||||
filters = filters[0].get("formula", "") if len(filters) else ""
|
||||
|
||||
custom_properties = {
|
||||
"Columns": str_columns,
|
||||
"Filters": filters[1:-1] if len(filters) else "",
|
||||
}
|
||||
custom_properties.update(
|
||||
{
|
||||
"Columns": str_columns,
|
||||
"Filters": filters[1:-1] if len(filters) else "",
|
||||
}
|
||||
)
|
||||
|
||||
elif chart_type == "pivotTable":
|
||||
pivot_table = chart_detail.get("pivotTable", {})
|
||||
@ -469,12 +598,14 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
values = pivot_table.get("values", [])
|
||||
filters = pivot_table.get("filters", [])
|
||||
|
||||
custom_properties = {
|
||||
"Columns": ", ".join(columns) if len(columns) else "",
|
||||
"Rows": ", ".join(rows) if len(rows) else "",
|
||||
"Metrics": ", ".join(values) if len(values) else "",
|
||||
"Filters": ", ".join(filters) if len(filters) else "",
|
||||
}
|
||||
custom_properties.update(
|
||||
{
|
||||
"Columns": ", ".join(columns) if len(columns) else "",
|
||||
"Rows": ", ".join(rows) if len(rows) else "",
|
||||
"Metrics": ", ".join(values) if len(values) else "",
|
||||
"Filters": ", ".join(filters) if len(filters) else "",
|
||||
}
|
||||
)
|
||||
# list filters in their own row
|
||||
for filter in filters:
|
||||
custom_properties[f"Filter: {filter}"] = ", ".join(
|
||||
@ -489,14 +620,16 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
value = metadata.get("value", [])
|
||||
filters = metadata.get("filter", [])
|
||||
|
||||
custom_properties = {
|
||||
"X": x[0].get("formula", "") if len(x) else "",
|
||||
"Y": y[0].get("formula", "") if len(y) else "",
|
||||
"X2": x2[0].get("formula", "") if len(x2) else "",
|
||||
"Y2": y2[0].get("formula", "") if len(y2) else "",
|
||||
"Metrics": value[0].get("formula", "") if len(value) else "",
|
||||
"Filters": filters[0].get("formula", "") if len(filters) else "",
|
||||
}
|
||||
custom_properties.update(
|
||||
{
|
||||
"X": x[0].get("formula", "") if len(x) else "",
|
||||
"Y": y[0].get("formula", "") if len(y) else "",
|
||||
"X2": x2[0].get("formula", "") if len(x2) else "",
|
||||
"Y2": y2[0].get("formula", "") if len(y2) else "",
|
||||
"Metrics": value[0].get("formula", "") if len(value) else "",
|
||||
"Filters": filters[0].get("formula", "") if len(filters) else "",
|
||||
}
|
||||
)
|
||||
|
||||
return custom_properties
|
||||
|
||||
@ -532,21 +665,25 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
return platform
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _get_platform_and_dbname(
|
||||
self, data_source_id: int
|
||||
) -> Union[Tuple[str, str], Tuple[None, None]]:
|
||||
def _get_data_sources(self) -> List[dict]:
|
||||
data_sources = []
|
||||
try:
|
||||
ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources")
|
||||
data_sources = ds_json.get("_embedded", {}).get("data_sources", [])
|
||||
except HTTPError as http_error:
|
||||
self.report.report_failure(
|
||||
key=f"mode-datasource-{data_source_id}",
|
||||
reason=f"No data sources found for datasource id: "
|
||||
f"{data_source_id}, "
|
||||
f"Reason: {str(http_error)}",
|
||||
key="mode-data-sources",
|
||||
reason=f"Unable to retrieve data sources. Reason: {str(http_error)}",
|
||||
)
|
||||
|
||||
return data_sources
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _get_platform_and_dbname(
|
||||
self, data_source_id: int
|
||||
) -> Union[Tuple[str, str], Tuple[None, None]]:
|
||||
data_sources = self._get_data_sources()
|
||||
|
||||
if not data_sources:
|
||||
self.report.report_failure(
|
||||
key=f"mode-datasource-{data_source_id}",
|
||||
@ -571,13 +708,13 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
def _replace_definitions(self, raw_query: str) -> str:
|
||||
query = raw_query
|
||||
definitions = re.findall("({{[^}{]+}})", raw_query)
|
||||
definitions = re.findall(r"({{(?:\s+)?@[^}{]+}})", raw_query)
|
||||
for definition_variable in definitions:
|
||||
definition_name, definition_alias = self._parse_definition_name(
|
||||
definition_variable
|
||||
)
|
||||
definition_query = self._get_definition(definition_name)
|
||||
# if unable to retrieve definition, then replace the {{}} so that it doesn't get picked up again in recurive call
|
||||
# if unable to retrieve definition, then replace the {{}} so that it doesn't get picked up again in recursive call
|
||||
if definition_query is not None:
|
||||
query = query.replace(
|
||||
definition_variable, f"({definition_query}) as {definition_alias}"
|
||||
@ -587,6 +724,8 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
definition_variable, f"{definition_name} as {definition_alias}"
|
||||
)
|
||||
query = self._replace_definitions(query)
|
||||
query = query.replace("\\n", "\n")
|
||||
query = query.replace("\\t", "\t")
|
||||
|
||||
return query
|
||||
|
||||
@ -597,7 +736,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
if len(name_match):
|
||||
name = name_match[0][1:]
|
||||
alias_match = re.findall(
|
||||
r"as\s+\S+", definition_variable
|
||||
r"as\s+\S+\w+", definition_variable
|
||||
) # i.e ['as alias_name']
|
||||
if len(alias_match):
|
||||
alias_match = alias_match[0].split(" ")
|
||||
@ -685,24 +824,6 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
data_source_id = query_data.get("data_source_id")
|
||||
return QueryUrn(f"{id}.{data_source_id}.{last_run_id}").urn()
|
||||
|
||||
def _get_upstream_warehouse_urn_for_query(self, query: dict) -> List[str]:
|
||||
# create datasource urn
|
||||
platform, db_name = self._get_platform_and_dbname(query.get("data_source_id"))
|
||||
source_tables = self._get_source_from_query(query.get("raw_query"))
|
||||
if not platform or not db_name or not source_tables:
|
||||
return []
|
||||
datasource_urn = self._get_datasource_urn(
|
||||
platform=platform,
|
||||
platform_instance=(
|
||||
self.config.platform_instance_map.get(platform)
|
||||
if platform and self.config.platform_instance_map
|
||||
else None
|
||||
),
|
||||
database=db_name,
|
||||
source_tables=list(source_tables),
|
||||
)
|
||||
return datasource_urn
|
||||
|
||||
def set_field_tags(self, fields: List[SchemaFieldClass]) -> None:
|
||||
for field in fields:
|
||||
# It is not clear how to distinguish between measures and dimensions in Mode.
|
||||
@ -717,17 +838,59 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
tag = TagAssociationClass(tag=self.DIMENSION_TAG_URN)
|
||||
field.globalTags = GlobalTagsClass(tags=[tag])
|
||||
|
||||
def normalize_mode_query(self, query: str) -> str:
|
||||
regex = r"{% form %}(.*?){% endform %}"
|
||||
rendered_query: str = query
|
||||
normalized_query: str = query
|
||||
|
||||
self.report.num_query_template_render += 1
|
||||
matches = re.findall(regex, query, re.MULTILINE | re.DOTALL | re.IGNORECASE)
|
||||
try:
|
||||
jinja_params: Dict = {}
|
||||
if matches:
|
||||
for match in matches:
|
||||
definition = Template(source=match).render()
|
||||
parameters = yaml.safe_load(definition)
|
||||
for key in parameters.keys():
|
||||
jinja_params[key] = parameters[key].get("default", "")
|
||||
|
||||
normalized_query = re.sub(
|
||||
r"{% form %}(.*){% endform %}",
|
||||
"",
|
||||
query,
|
||||
0,
|
||||
re.MULTILINE | re.DOTALL,
|
||||
)
|
||||
|
||||
# Wherever we don't resolve the jinja params, we replace it with NULL
|
||||
Undefined.__str__ = lambda self: "NULL" # type: ignore
|
||||
rendered_query = Template(normalized_query).render(jinja_params)
|
||||
self.report.num_query_template_render_success += 1
|
||||
except Exception as e:
|
||||
logger.debug(f"Rendering query {query} failed with {e}")
|
||||
self.report.num_query_template_render_failures += 1
|
||||
return rendered_query
|
||||
|
||||
return rendered_query
|
||||
|
||||
def construct_query_from_api_data(
|
||||
self,
|
||||
report_token: str,
|
||||
query_data: dict,
|
||||
space_token: str,
|
||||
report_info: dict,
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
query_urn = self.get_dataset_urn_from_query(query_data)
|
||||
query_token = query_data.get("token")
|
||||
|
||||
dataset_props = DatasetPropertiesClass(
|
||||
name=query_data.get("name"),
|
||||
description="",
|
||||
externalUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}/details/queries/{query_data.get('token')}",
|
||||
description=f"""### Source Code
|
||||
``` sql
|
||||
{query_data.get("raw_query")}
|
||||
```
|
||||
""",
|
||||
externalUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}/details/queries/{query_token}",
|
||||
customProperties=self.get_custom_props_from_dict(
|
||||
query_data,
|
||||
[
|
||||
@ -744,25 +907,26 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
yield (
|
||||
MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=query_urn,
|
||||
aspectName="datasetProperties",
|
||||
aspect=dataset_props,
|
||||
).as_workunit()
|
||||
)
|
||||
|
||||
subtypes = SubTypesClass(typeNames=(["Query"]))
|
||||
subtypes = SubTypesClass(typeNames=([BIAssetSubTypes.MODE_QUERY]))
|
||||
yield (
|
||||
MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=query_urn,
|
||||
aspectName="subTypes",
|
||||
aspect=subtypes,
|
||||
).as_workunit()
|
||||
)
|
||||
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=query_urn,
|
||||
aspect=BrowsePathsV2Class(
|
||||
path=self._browse_path_query(space_token, report_info)
|
||||
),
|
||||
).as_workunit()
|
||||
|
||||
(
|
||||
upstream_warehouse_platform,
|
||||
upstream_warehouse_db_name,
|
||||
@ -772,8 +936,35 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
# this means we can't infer the platform
|
||||
return
|
||||
|
||||
query = query_data["raw_query"]
|
||||
query = self._replace_definitions(query)
|
||||
normalized_query = self.normalize_mode_query(query)
|
||||
query_to_parse = normalized_query
|
||||
# If multiple query is present in the query, we get the last one.
|
||||
# This won't work for complex cases where temp table is created and used in the same query.
|
||||
# But it should be good enough for simple use-cases.
|
||||
try:
|
||||
for partial_query in sqlglot.parse(normalized_query):
|
||||
if not partial_query:
|
||||
continue
|
||||
# This is hacky but on snowlake we want to change the default warehouse if use warehouse is present
|
||||
if upstream_warehouse_platform == "snowflake":
|
||||
regexp = r"use\s+warehouse\s+(.*)(\s+)?;"
|
||||
matches = re.search(
|
||||
regexp,
|
||||
partial_query.sql(dialect=upstream_warehouse_platform),
|
||||
re.MULTILINE | re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
if matches and matches.group(1):
|
||||
upstream_warehouse_db_name = matches.group(1)
|
||||
|
||||
query_to_parse = partial_query.sql(dialect=upstream_warehouse_platform)
|
||||
except Exception as e:
|
||||
logger.debug(f"sqlglot.parse failed on: {normalized_query}, error: {e}")
|
||||
query_to_parse = normalized_query
|
||||
|
||||
parsed_query_object = create_lineage_sql_parsed_result(
|
||||
query=query_data["raw_query"],
|
||||
query=query_to_parse,
|
||||
default_db=upstream_warehouse_db_name,
|
||||
platform=upstream_warehouse_platform,
|
||||
platform_instance=(
|
||||
@ -785,9 +976,24 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
graph=self.ctx.graph,
|
||||
)
|
||||
|
||||
self.report.num_sql_parsed += 1
|
||||
if parsed_query_object.debug_info.table_error:
|
||||
self.report.num_sql_parser_table_error += 1
|
||||
self.report.num_sql_parser_failures += 1
|
||||
logger.info(
|
||||
f"Failed to parse compiled code for report: {report_token} query: {query_token} {parsed_query_object.debug_info.error} the query was [{query_to_parse}]"
|
||||
)
|
||||
elif parsed_query_object.debug_info.column_error:
|
||||
self.report.num_sql_parser_column_error += 1
|
||||
self.report.num_sql_parser_failures += 1
|
||||
logger.info(
|
||||
f"Failed to generate CLL for report: {report_token} query: {query_token}: {parsed_query_object.debug_info.column_error} the query was [{query_to_parse}]"
|
||||
)
|
||||
else:
|
||||
self.report.num_sql_parser_success += 1
|
||||
|
||||
schema_fields = infer_output_schema(parsed_query_object)
|
||||
if schema_fields:
|
||||
|
||||
schema_metadata = SchemaMetadataClass(
|
||||
schemaName="mode_query",
|
||||
platform=f"urn:li:dataPlatform:{self.platform}",
|
||||
@ -801,10 +1007,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
yield (
|
||||
MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=query_urn,
|
||||
aspectName="schemaMetadata",
|
||||
aspect=schema_metadata,
|
||||
).as_workunit()
|
||||
)
|
||||
@ -854,10 +1057,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
)
|
||||
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityType="query",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=query_instance_urn,
|
||||
aspectName="queryProperties",
|
||||
aspect=query_properties,
|
||||
).as_workunit()
|
||||
|
||||
@ -927,49 +1127,49 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
wu.append(
|
||||
MetadataChangeProposalWrapper(
|
||||
entityType="dataset",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=query_urn,
|
||||
aspectName="upstreamLineage",
|
||||
aspect=upstream_lineage,
|
||||
).as_workunit()
|
||||
)
|
||||
|
||||
return wu
|
||||
|
||||
def get_formula_columns(self, node: Dict, columns: Set[str] = set()) -> Set[str]:
|
||||
def get_formula_columns(
|
||||
self, node: Dict, columns: Optional[Set[str]] = None
|
||||
) -> Set[str]:
|
||||
columns = columns if columns is not None else set()
|
||||
if isinstance(node, dict):
|
||||
for key, item in node.items():
|
||||
node = item
|
||||
if isinstance(item, dict):
|
||||
self.get_formula_columns(node, columns)
|
||||
elif isinstance(node, list):
|
||||
for i in node:
|
||||
self.get_formula_columns(item, columns)
|
||||
elif isinstance(item, list):
|
||||
for i in item:
|
||||
if isinstance(i, dict):
|
||||
self.get_formula_columns(i, columns)
|
||||
elif isinstance(node, str):
|
||||
elif isinstance(item, str):
|
||||
if key == "formula":
|
||||
column_names = re.findall(r"\[(.+?)\]", node)
|
||||
column_names = re.findall(r"\[(.+?)\]", item)
|
||||
columns.update(column_names)
|
||||
return columns
|
||||
|
||||
def get_input_fields(
|
||||
self, chart_urn: str, chart_data: Dict, chart_fields: List[str], query_urn: str
|
||||
self,
|
||||
chart_urn: str,
|
||||
chart_data: Dict,
|
||||
chart_fields: Dict[str, SchemaFieldClass],
|
||||
query_urn: str,
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
# TODO: Identify which fields are used as X, Y, filters, etc and tag them accordingly.
|
||||
fields = self.get_formula_columns(chart_data)
|
||||
|
||||
input_fields = []
|
||||
|
||||
for field in sorted(fields):
|
||||
for field in fields:
|
||||
if field.lower() not in chart_fields:
|
||||
continue
|
||||
input_field = InputFieldClass(
|
||||
schemaFieldUrn=builder.make_schema_field_urn(query_urn, field.lower()),
|
||||
schemaField=SchemaFieldClass(
|
||||
fieldPath=field.lower(),
|
||||
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
|
||||
nativeDataType="string",
|
||||
),
|
||||
schemaField=chart_fields[field.lower()],
|
||||
)
|
||||
input_fields.append(input_field)
|
||||
|
||||
@ -984,8 +1184,16 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
).as_workunit()
|
||||
|
||||
def construct_chart_from_api_data(
|
||||
self, chart_data: dict, chart_fields: List[str], query: dict, path: str
|
||||
self,
|
||||
index: int,
|
||||
chart_data: dict,
|
||||
chart_fields: Dict[str, SchemaFieldClass],
|
||||
query: dict,
|
||||
space_token: str,
|
||||
report_info: dict,
|
||||
query_name: str,
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
# logger.debug(f"Processing chart {chart_data.get('token', '')}: {chart_data}")
|
||||
chart_urn = builder.make_chart_urn(self.platform, chart_data.get("token", ""))
|
||||
chart_snapshot = ChartSnapshot(
|
||||
urn=chart_urn,
|
||||
@ -1009,6 +1217,9 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
|
||||
)
|
||||
|
||||
# Last refreshed ts.
|
||||
last_refreshed_ts = self._parse_last_run_at(report_info)
|
||||
|
||||
chart_detail = (
|
||||
chart_data.get("view", {})
|
||||
if len(chart_data.get("view", {})) != 0
|
||||
@ -1024,7 +1235,12 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
or chart_detail.get("chartDescription")
|
||||
or ""
|
||||
)
|
||||
title = chart_detail.get("title") or chart_detail.get("chartTitle") or ""
|
||||
|
||||
title = (
|
||||
chart_detail.get("title")
|
||||
or chart_detail.get("chartTitle")
|
||||
or f"Chart {index}"
|
||||
)
|
||||
|
||||
# create datasource urn
|
||||
custom_properties = self.construct_chart_custom_properties(
|
||||
@ -1032,15 +1248,16 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
)
|
||||
|
||||
query_urn = self.get_dataset_urn_from_query(query)
|
||||
custom_properties["upstream_fields"] = "profile_id"
|
||||
|
||||
# Chart Info
|
||||
chart_info = ChartInfoClass(
|
||||
type=chart_type,
|
||||
description=description,
|
||||
title=title,
|
||||
lastModified=last_modified,
|
||||
chartUrl=f"{self.config.connect_uri}"
|
||||
f"{chart_data.get('_links', {}).get('report_viz_web', {}).get('href', '')}",
|
||||
lastRefreshed=last_refreshed_ts,
|
||||
# The links href starts with a slash already.
|
||||
chartUrl=f"{self.config.connect_uri}{chart_data.get('_links', {}).get('report_viz_web', {}).get('href', '')}",
|
||||
inputs=[query_urn],
|
||||
customProperties=custom_properties,
|
||||
inputEdges=[],
|
||||
@ -1049,10 +1266,28 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
query_urn = self.get_dataset_urn_from_query(query)
|
||||
yield from self.get_input_fields(chart_urn, chart_data, chart_fields, query_urn)
|
||||
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=chart_urn,
|
||||
aspect=SubTypesClass(typeNames=[BIAssetSubTypes.MODE_CHART]),
|
||||
).as_workunit()
|
||||
|
||||
# Browse Path
|
||||
space_name = self.space_tokens[space_token]
|
||||
report_name = report_info["name"]
|
||||
path = f"/mode/{self.config.workspace}/{space_name}/{report_name}/{query_name}/{title}"
|
||||
browse_path = BrowsePathsClass(paths=[path])
|
||||
chart_snapshot.aspects.append(browse_path)
|
||||
|
||||
# Browse path v2
|
||||
browse_path_v2 = BrowsePathsV2Class(
|
||||
path=self._browse_path_chart(space_token, report_info, query),
|
||||
)
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=chart_urn,
|
||||
aspect=browse_path_v2,
|
||||
).as_workunit()
|
||||
|
||||
# Query
|
||||
chart_query = ChartQueryClass(
|
||||
rawQuery=query.get("raw_query", ""),
|
||||
@ -1073,7 +1308,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
yield MetadataWorkUnit(id=chart_snapshot.urn, mce=mce)
|
||||
|
||||
@lru_cache(maxsize=None)
|
||||
def _get_reports(self, space_token: str) -> list:
|
||||
def _get_reports(self, space_token: str) -> List[dict]:
|
||||
reports = []
|
||||
try:
|
||||
reports_json = self._get_request_json(
|
||||
@ -1179,26 +1414,57 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
aspect=EmbedClass(renderUrl=embed_url),
|
||||
)
|
||||
|
||||
def gen_space_key(self, space_token: str) -> SpaceKey:
|
||||
return SpaceKey(platform=self.platform, space_token=space_token)
|
||||
|
||||
def construct_space_container(
|
||||
self, space_token: str, space_name: str
|
||||
) -> Iterable[MetadataWorkUnit]:
|
||||
key = self.gen_space_key(space_token)
|
||||
yield from gen_containers(
|
||||
container_key=key,
|
||||
name=space_name,
|
||||
sub_types=[BIContainerSubTypes.MODE_COLLECTION],
|
||||
# TODO: Support extracting the documentation for a space.
|
||||
)
|
||||
|
||||
# We have a somewhat atypical browse path here, since we include the workspace name
|
||||
# as what's effectively but not officially a platform instance.
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=key.as_urn(),
|
||||
aspect=BrowsePathsV2Class(path=self._browse_path_space()),
|
||||
).as_workunit()
|
||||
|
||||
def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
|
||||
for space_token, space_name in self.space_tokens.items():
|
||||
yield from self.construct_space_container(space_token, space_name)
|
||||
|
||||
reports = self._get_reports(space_token)
|
||||
for report in reports:
|
||||
dashboard_snapshot_from_report = self.construct_dashboard(
|
||||
space_name, report
|
||||
logger.debug(
|
||||
f"Report: name: {report.get('name')} token: {report.get('token')}"
|
||||
)
|
||||
dashboard_tuple_from_report = self.construct_dashboard(
|
||||
space_token=space_token, report_info=report
|
||||
)
|
||||
|
||||
if dashboard_tuple_from_report is None:
|
||||
continue
|
||||
(
|
||||
dashboard_snapshot_from_report,
|
||||
browse_mcpw,
|
||||
) = dashboard_tuple_from_report
|
||||
|
||||
mce = MetadataChangeEvent(
|
||||
proposedSnapshot=dashboard_snapshot_from_report
|
||||
)
|
||||
|
||||
mcpw = MetadataChangeProposalWrapper(
|
||||
entityType="dashboard",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=dashboard_snapshot_from_report.urn,
|
||||
aspectName="subTypes",
|
||||
aspect=SubTypesClass(typeNames=["Report"]),
|
||||
aspect=SubTypesClass(typeNames=[BIAssetSubTypes.MODE_REPORT]),
|
||||
)
|
||||
yield mcpw.as_workunit()
|
||||
yield browse_mcpw.as_workunit()
|
||||
|
||||
usage_statistics = DashboardUsageStatisticsClass(
|
||||
timestampMillis=round(datetime.now().timestamp() * 1000),
|
||||
@ -1206,10 +1472,7 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
)
|
||||
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityType="dashboard",
|
||||
changeType=ChangeTypeClass.UPSERT,
|
||||
entityUrn=dashboard_snapshot_from_report.urn,
|
||||
aspectName="dashboardUsageStatistics",
|
||||
aspect=usage_statistics,
|
||||
).as_workunit()
|
||||
|
||||
@ -1223,34 +1486,52 @@ class ModeSource(StatefulIngestionSourceBase):
|
||||
|
||||
def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
|
||||
# Space/collection -> report -> query -> Chart
|
||||
for space_token, space_name in self.space_tokens.items():
|
||||
for space_token in self.space_tokens.keys():
|
||||
reports = self._get_reports(space_token)
|
||||
for report in reports:
|
||||
report_token = report.get("token", "")
|
||||
|
||||
if report.get("imported_datasets"):
|
||||
# The connector doesn't support imported datasets yet.
|
||||
# For now, we just keep this in the report to track what we're missing.
|
||||
imported_datasets = [
|
||||
imported_dataset.get("name") or str(imported_dataset)
|
||||
for imported_dataset in report["imported_datasets"]
|
||||
]
|
||||
self.report.dropped_imported_datasets.setdefault(
|
||||
report_token, LossyList()
|
||||
).extend(imported_datasets)
|
||||
|
||||
queries = self._get_queries(report_token)
|
||||
for query in queries:
|
||||
query_mcps = self.construct_query_from_api_data(report_token, query)
|
||||
chart_fields: List[str] = []
|
||||
query_mcps = self.construct_query_from_api_data(
|
||||
report_token,
|
||||
query,
|
||||
space_token=space_token,
|
||||
report_info=report,
|
||||
)
|
||||
chart_fields: Dict[str, SchemaFieldClass] = {}
|
||||
for wu in query_mcps:
|
||||
if (
|
||||
isinstance(wu.metadata, MetadataChangeProposalWrapper)
|
||||
and wu.metadata.aspectName == "schemaMetadata"
|
||||
):
|
||||
if isinstance(wu.metadata.aspect, SchemaMetadataClass):
|
||||
schema_metadata = wu.metadata.aspect
|
||||
for field in schema_metadata.fields:
|
||||
chart_fields.append(field.fieldPath)
|
||||
if isinstance(
|
||||
wu.metadata, MetadataChangeProposalWrapper
|
||||
) and isinstance(wu.metadata.aspect, SchemaMetadataClass):
|
||||
schema_metadata = wu.metadata.aspect
|
||||
for field in schema_metadata.fields:
|
||||
chart_fields.setdefault(field.fieldPath, field)
|
||||
|
||||
yield wu
|
||||
|
||||
charts = self._get_charts(report_token, query.get("token", ""))
|
||||
# build charts
|
||||
for chart in charts:
|
||||
view = chart.get("view") or chart.get("view_vegas")
|
||||
chart_name = view.get("title") or view.get("chartTitle") or ""
|
||||
path = f"/mode/{self.config.workspace}/{space_name}/{report.get('name')}/{query.get('name')}/{chart_name}"
|
||||
for i, chart in enumerate(charts):
|
||||
yield from self.construct_chart_from_api_data(
|
||||
chart, chart_fields, query, path
|
||||
i,
|
||||
chart,
|
||||
chart_fields,
|
||||
query,
|
||||
space_token=space_token,
|
||||
report_info=report,
|
||||
query_name=query["name"],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
||||
@ -495,6 +495,9 @@ def _column_level_lineage( # noqa: C901
|
||||
# Otherwise, we can't process it.
|
||||
continue
|
||||
|
||||
if output_col == "":
|
||||
continue
|
||||
|
||||
if is_dialect_instance(dialect, "bigquery") and output_col.lower() in {
|
||||
"_partitiontime",
|
||||
"_partitiondate",
|
||||
@ -888,9 +891,9 @@ def _sqlglot_lineage_inner(
|
||||
try:
|
||||
if select_statement is not None:
|
||||
with cooperative_timeout(
|
||||
timeout=SQL_LINEAGE_TIMEOUT_SECONDS
|
||||
if SQL_LINEAGE_TIMEOUT_ENABLED
|
||||
else None
|
||||
timeout=(
|
||||
SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None
|
||||
)
|
||||
):
|
||||
column_lineage = _column_level_lineage(
|
||||
select_statement,
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import random
|
||||
from typing import Dict, Generic, Iterator, List, Set, TypeVar, Union
|
||||
from typing import Dict, Generic, Iterable, Iterator, List, Set, TypeVar, Union
|
||||
|
||||
from datahub.configuration.pydantic_migration_helpers import PYDANTIC_VERSION_2
|
||||
|
||||
@ -31,6 +31,10 @@ class LossyList(List[T], Generic[T]):
|
||||
finally:
|
||||
self.total_elements += 1
|
||||
|
||||
def extend(self, __iterable: Iterable[T]) -> None:
|
||||
for item in __iterable:
|
||||
self.append(item)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return self.total_elements
|
||||
|
||||
|
||||
@ -1,4 +1,94 @@
|
||||
[
|
||||
{
|
||||
"entityType": "container",
|
||||
"entityUrn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "containerProperties",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"customProperties": {
|
||||
"platform": "mode",
|
||||
"space_token": "75737b70402e"
|
||||
},
|
||||
"name": "AcrylTest"
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "container",
|
||||
"entityUrn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "container",
|
||||
"entityUrn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataPlatformInstance",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"platform": "urn:li:dataPlatform:mode"
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "container",
|
||||
"entityUrn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "subTypes",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"typeNames": [
|
||||
"Collection"
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "container",
|
||||
"entityUrn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "browsePathsV2",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"path": [
|
||||
{
|
||||
"id": "acryl"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dashboard",
|
||||
"entityUrn": "urn:li:dashboard:(mode,2934237)",
|
||||
@ -75,10 +165,11 @@
|
||||
},
|
||||
"lastModified": {
|
||||
"time": 1639182684451,
|
||||
"actor": "urn:li:corpuser:modeuser"
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
}
|
||||
},
|
||||
"dashboardUrl": "https://app.mode.com/acryl/reports/9d2da37fa91e"
|
||||
"dashboardUrl": "https://app.mode.com/acryl/reports/9d2da37fa91e",
|
||||
"lastRefreshed": 1639177971490
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -124,10 +215,8 @@
|
||||
"id": "acryl"
|
||||
},
|
||||
{
|
||||
"id": "AcrylTest"
|
||||
},
|
||||
{
|
||||
"id": "Report 1"
|
||||
"id": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"urn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -152,7 +241,7 @@
|
||||
},
|
||||
"externalUrl": "https://app.mode.com/acryl/reports/9d2da37fa91e/details/queries/6e26a9f3d4e2",
|
||||
"name": "Customer and staff",
|
||||
"description": "",
|
||||
"description": "### Source Code\n``` sql\nSELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id where selected_id = {{ selected_id }} \n{% form %}\nselected_id:\n type: text\n default: my_id\n{% endform %}\n```\n ",
|
||||
"tags": []
|
||||
}
|
||||
},
|
||||
@ -180,6 +269,355 @@
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "schemaMetadata",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"schemaName": "mode_query",
|
||||
"platform": "urn:li:dataPlatform:mode",
|
||||
"version": 0,
|
||||
"created": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"lastModified": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"hash": "",
|
||||
"platformSchema": {
|
||||
"com.linkedin.schema.OtherSchema": {
|
||||
"rawSchema": ""
|
||||
}
|
||||
},
|
||||
"fields": [
|
||||
{
|
||||
"fieldPath": "customer_id",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "first_name",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "last_name",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "amount",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "payment_date",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "rental_id",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "staff_id",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "Staff First Name",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
},
|
||||
{
|
||||
"fieldPath": "Staff Last Name",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "upstreamLineage",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"upstreams": [
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.customer,PROD)",
|
||||
"type": "TRANSFORMED",
|
||||
"query": "urn:li:query:10149707.34499.1897576958"
|
||||
},
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.payment,PROD)",
|
||||
"type": "TRANSFORMED",
|
||||
"query": "urn:li:query:10149707.34499.1897576958"
|
||||
},
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.rental,PROD)",
|
||||
"type": "TRANSFORMED",
|
||||
"query": "urn:li:query:10149707.34499.1897576958"
|
||||
},
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.staff,PROD)",
|
||||
"type": "TRANSFORMED",
|
||||
"query": "urn:li:query:10149707.34499.1897576958"
|
||||
}
|
||||
],
|
||||
"fineGrainedLineages": [
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.customer,PROD),customer_id)"
|
||||
],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),customer_id)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),first_name)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),last_name)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),rental_id)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.rental,PROD),staff_id)"
|
||||
],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),staff_id)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.staff,PROD),first_name)"
|
||||
],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),Staff First Name)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,dvdrental.staff,PROD),last_name)"
|
||||
],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),Staff Last Name)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
|
||||
@ -202,6 +640,34 @@
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "browsePathsV2",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"path": [
|
||||
{
|
||||
"id": "acryl"
|
||||
},
|
||||
{
|
||||
"id": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"urn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a"
|
||||
},
|
||||
{
|
||||
"id": "urn:li:dashboard:(mode,2934237)",
|
||||
"urn": "urn:li:dashboard:(mode,2934237)"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "query",
|
||||
"entityUrn": "urn:li:query:10149707.34499.1897576958",
|
||||
@ -210,7 +676,7 @@
|
||||
"aspect": {
|
||||
"json": {
|
||||
"statement": {
|
||||
"value": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id",
|
||||
"value": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id where selected_id = {{ selected_id }} \n{% form %}\nselected_id:\n type: text\n default: my_id\n{% endform %}",
|
||||
"language": "SQL"
|
||||
},
|
||||
"source": "SYSTEM",
|
||||
@ -230,6 +696,85 @@
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "chart",
|
||||
"entityUrn": "urn:li:chart:(mode,f622b9ee725b)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "inputFields",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"fields": [
|
||||
{
|
||||
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)",
|
||||
"schemaField": {
|
||||
"fieldPath": "payment_date",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)",
|
||||
"schemaField": {
|
||||
"fieldPath": "amount",
|
||||
"nullable": false,
|
||||
"type": {
|
||||
"type": {
|
||||
"com.linkedin.schema.NullType": {}
|
||||
}
|
||||
},
|
||||
"nativeDataType": "",
|
||||
"recursive": false,
|
||||
"globalTags": {
|
||||
"tags": [
|
||||
{
|
||||
"tag": "urn:li:tag:Dimension"
|
||||
}
|
||||
]
|
||||
},
|
||||
"isPartOfKey": false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "chart",
|
||||
"entityUrn": "urn:li:chart:(mode,f622b9ee725b)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "subTypes",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"typeNames": [
|
||||
"Chart"
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"proposedSnapshot": {
|
||||
"com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": {
|
||||
@ -238,9 +783,9 @@
|
||||
{
|
||||
"com.linkedin.pegasus2avro.chart.ChartInfo": {
|
||||
"customProperties": {
|
||||
"ChartType": "table",
|
||||
"Columns": "payment_date,Staff First Name,amount,Staff Last Name",
|
||||
"Filters": "amount",
|
||||
"upstream_fields": "profile_id"
|
||||
"Filters": "amount"
|
||||
},
|
||||
"title": "Customer Staff Table",
|
||||
"description": "",
|
||||
@ -261,7 +806,8 @@
|
||||
}
|
||||
],
|
||||
"inputEdges": [],
|
||||
"type": "TABLE"
|
||||
"type": "TABLE",
|
||||
"lastRefreshed": 1639177971490
|
||||
}
|
||||
},
|
||||
{
|
||||
@ -273,7 +819,7 @@
|
||||
},
|
||||
{
|
||||
"com.linkedin.pegasus2avro.chart.ChartQuery": {
|
||||
"rawQuery": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id",
|
||||
"rawQuery": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id where selected_id = {{ selected_id }} \n{% form %}\nselected_id:\n type: text\n default: my_id\n{% endform %}",
|
||||
"type": "SQL"
|
||||
}
|
||||
},
|
||||
@ -329,16 +875,16 @@
|
||||
"id": "acryl"
|
||||
},
|
||||
{
|
||||
"id": "AcrylTest"
|
||||
"id": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a",
|
||||
"urn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a"
|
||||
},
|
||||
{
|
||||
"id": "Report 1"
|
||||
"id": "urn:li:dashboard:(mode,2934237)",
|
||||
"urn": "urn:li:dashboard:(mode,2934237)"
|
||||
},
|
||||
{
|
||||
"id": "Customer and staff"
|
||||
},
|
||||
{
|
||||
"id": "Customer Staff Table"
|
||||
"id": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)",
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -398,55 +944,13 @@
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dashboard",
|
||||
"entityUrn": "urn:li:dashboard:(mode,2934237)",
|
||||
"entityType": "tag",
|
||||
"entityUrn": "urn:li:tag:Dimension",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "browsePathsV2",
|
||||
"aspectName": "tagKey",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"path": [
|
||||
{
|
||||
"id": "acryl"
|
||||
},
|
||||
{
|
||||
"id": "AcrylTest"
|
||||
},
|
||||
{
|
||||
"id": "Report 1"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1638860400000,
|
||||
"runId": "mode-test",
|
||||
"lastRunId": "no-run-id-provided"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "chart",
|
||||
"entityUrn": "urn:li:chart:(mode,f622b9ee725b)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "browsePathsV2",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"path": [
|
||||
{
|
||||
"id": "acryl"
|
||||
},
|
||||
{
|
||||
"id": "AcrylTest"
|
||||
},
|
||||
{
|
||||
"id": "Report 1"
|
||||
},
|
||||
{
|
||||
"id": "Customer and staff"
|
||||
},
|
||||
{
|
||||
"id": "Customer Staff Table"
|
||||
}
|
||||
]
|
||||
"name": "Dimension"
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
|
||||
@ -8,7 +8,7 @@
|
||||
"queries": [{
|
||||
"id": 10149707,
|
||||
"token": "6e26a9f3d4e2",
|
||||
"raw_query": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id",
|
||||
"raw_query": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id where selected_id = {{ selected_id }} \n{% form %}\nselected_id:\n type: text\n default: my_id\n{% endform %}",
|
||||
"created_at": "2021-12-10T20:55:24.361Z",
|
||||
"updated_at": "2021-12-10T23:12:53.273Z",
|
||||
"name": "Customer and staff",
|
||||
@ -53,7 +53,7 @@
|
||||
"query": {
|
||||
"raw_query": {
|
||||
"type": "text",
|
||||
"value": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id"
|
||||
"value": "SELECT rental.*, staff.first_name \"Staff First Name\", staff.last_name \"Staff Last Name\" FROM {{ @join_on_definition as rental }} join staff on staff.staff_id = rental.staff_id where selected_id = {{ selected_id }} \n{% form %}\nselected_id:\n type: text\n default: my_id\n{% endform %}"
|
||||
},
|
||||
"name": {
|
||||
"type": "text",
|
||||
|
||||
@ -14,7 +14,7 @@ FROZEN_TIME = "2021-12-07 07:00:00"
|
||||
JSON_RESPONSE_MAP = {
|
||||
"https://app.mode.com/api/verify": "verify.json",
|
||||
"https://app.mode.com/api/account": "user.json",
|
||||
"https://app.mode.com/api/acryl/spaces": "spaces.json",
|
||||
"https://app.mode.com/api/acryl/spaces?filter=all": "spaces.json",
|
||||
"https://app.mode.com/api/acryl/spaces/157933cc1168/reports": "reports_157933cc1168.json",
|
||||
"https://app.mode.com/api/acryl/spaces/75737b70402e/reports": "reports_75737b70402e.json",
|
||||
"https://app.mode.com/api/modeuser": "user.json",
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
{
|
||||
"query_type": "SELECT",
|
||||
"query_type_props": {},
|
||||
"query_fingerprint": "f2ac7c7d2236fce51b0d7c2b3f0e1a3c26cf6f26566b77f1d9084b7ab9c1d021",
|
||||
"in_tables": [
|
||||
"urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)",
|
||||
"urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table_b,PROD)"
|
||||
],
|
||||
"out_tables": [],
|
||||
"column_lineage": [
|
||||
{
|
||||
"downstream": {
|
||||
"table": null,
|
||||
"column": "id",
|
||||
"column_type": null,
|
||||
"native_column_type": null
|
||||
},
|
||||
"upstreams": [
|
||||
{
|
||||
"table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table,PROD)",
|
||||
"column": "id"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"downstream": {
|
||||
"table": null,
|
||||
"column": "name",
|
||||
"column_type": null,
|
||||
"native_column_type": null
|
||||
},
|
||||
"upstreams": [
|
||||
{
|
||||
"table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table_b,PROD)",
|
||||
"column": "name"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"downstream": {
|
||||
"table": null,
|
||||
"column": "address",
|
||||
"column_type": null,
|
||||
"native_column_type": null
|
||||
},
|
||||
"upstreams": [
|
||||
{
|
||||
"table": "urn:li:dataset:(urn:li:dataPlatform:snowflake,my_db.my_schema.my_table_b,PROD)",
|
||||
"column": "address"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"debug_info": {
|
||||
"confidence": 0.2,
|
||||
"generalized_statement": "SELECT A.ID, B.NAME, PARSE_JSON(B.MY_JSON) AS :userInfo, B.ADDRESS FROM my_db.my_schema.my_table AS A LEFT JOIN my_db.my_schema.my_table_B AS B ON A.ID = B.ID"
|
||||
}
|
||||
}
|
||||
@ -1108,3 +1108,20 @@ AS (
|
||||
default_db="my_db",
|
||||
expected_file=RESOURCE_DIR / "test_redshift_system_automove.json",
|
||||
)
|
||||
|
||||
|
||||
def test_snowflake_with_unnamed_column_from_udf_call() -> None:
|
||||
assert_sql_result(
|
||||
"""SELECT
|
||||
A.ID,
|
||||
B.NAME,
|
||||
PARSE_JSON(B.MY_JSON) AS :userInfo,
|
||||
B.ADDRESS
|
||||
FROM my_db.my_schema.my_table AS A
|
||||
LEFT JOIN my_db.my_schema.my_table_B AS B
|
||||
ON A.ID = B.ID
|
||||
""",
|
||||
dialect="snowflake",
|
||||
default_db="my_db",
|
||||
expected_file=RESOURCE_DIR / "test_snowflake_unnamed_column_udf.json",
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user