mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-29 10:57:52 +00:00
fix(ingest/lookml): missing lineage for looker template -- if prod (#11426)
This commit is contained in:
parent
a0787684de
commit
67d7116055
@ -4,11 +4,14 @@ import pathlib
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Set
|
||||
|
||||
from datahub.ingestion.source.looker.lkml_patched import load_lkml
|
||||
from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition
|
||||
from datahub.ingestion.source.looker.looker_template_language import (
|
||||
load_and_preprocess_file,
|
||||
)
|
||||
from datahub.ingestion.source.looker.lookml_config import (
|
||||
_BASE_PROJECT_NAME,
|
||||
_EXPLORE_FILE_EXTENSION,
|
||||
LookMLSourceConfig,
|
||||
LookMLSourceReport,
|
||||
)
|
||||
|
||||
@ -43,6 +46,7 @@ class LookerModel:
|
||||
root_project_name: Optional[str],
|
||||
base_projects_folders: Dict[str, pathlib.Path],
|
||||
path: str,
|
||||
source_config: LookMLSourceConfig,
|
||||
reporter: LookMLSourceReport,
|
||||
) -> "LookerModel":
|
||||
logger.debug(f"Loading model from {path}")
|
||||
@ -54,6 +58,7 @@ class LookerModel:
|
||||
root_project_name,
|
||||
base_projects_folders,
|
||||
path,
|
||||
source_config,
|
||||
reporter,
|
||||
seen_so_far=set(),
|
||||
traversal_path=pathlib.Path(path).stem,
|
||||
@ -68,7 +73,10 @@ class LookerModel:
|
||||
]
|
||||
for included_file in explore_files:
|
||||
try:
|
||||
parsed = load_lkml(included_file)
|
||||
parsed = load_and_preprocess_file(
|
||||
path=included_file,
|
||||
source_config=source_config,
|
||||
)
|
||||
included_explores = parsed.get("explores", [])
|
||||
explores.extend(included_explores)
|
||||
except Exception as e:
|
||||
@ -94,6 +102,7 @@ class LookerModel:
|
||||
root_project_name: Optional[str],
|
||||
base_projects_folder: Dict[str, pathlib.Path],
|
||||
path: str,
|
||||
source_config: LookMLSourceConfig,
|
||||
reporter: LookMLSourceReport,
|
||||
seen_so_far: Set[str],
|
||||
traversal_path: str = "", # a cosmetic parameter to aid debugging
|
||||
@ -206,7 +215,10 @@ class LookerModel:
|
||||
f"Will be loading {included_file}, traversed here via {traversal_path}"
|
||||
)
|
||||
try:
|
||||
parsed = load_lkml(included_file)
|
||||
parsed = load_and_preprocess_file(
|
||||
path=included_file,
|
||||
source_config=source_config,
|
||||
)
|
||||
seen_so_far.add(included_file)
|
||||
if "includes" in parsed: # we have more includes to resolve!
|
||||
resolved.extend(
|
||||
@ -216,6 +228,7 @@ class LookerModel:
|
||||
root_project_name,
|
||||
base_projects_folder,
|
||||
included_file,
|
||||
source_config,
|
||||
reporter,
|
||||
seen_so_far,
|
||||
traversal_path=traversal_path
|
||||
@ -259,6 +272,7 @@ class LookerViewFile:
|
||||
root_project_name: Optional[str],
|
||||
base_projects_folder: Dict[str, pathlib.Path],
|
||||
raw_file_content: str,
|
||||
source_config: LookMLSourceConfig,
|
||||
reporter: LookMLSourceReport,
|
||||
) -> "LookerViewFile":
|
||||
logger.debug(f"Loading view file at {absolute_file_path}")
|
||||
@ -272,6 +286,7 @@ class LookerViewFile:
|
||||
root_project_name,
|
||||
base_projects_folder,
|
||||
absolute_file_path,
|
||||
source_config,
|
||||
reporter,
|
||||
seen_so_far=seen_so_far,
|
||||
)
|
||||
|
||||
@ -3,11 +3,10 @@ import pathlib
|
||||
from dataclasses import replace
|
||||
from typing import Dict, Optional
|
||||
|
||||
from datahub.ingestion.source.looker.lkml_patched import load_lkml
|
||||
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
|
||||
from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile
|
||||
from datahub.ingestion.source.looker.looker_template_language import (
|
||||
process_lookml_template_language,
|
||||
load_and_preprocess_file,
|
||||
)
|
||||
from datahub.ingestion.source.looker.lookml_config import (
|
||||
_EXPLORE_FILE_EXTENSION,
|
||||
@ -72,10 +71,8 @@ class LookerViewFileLoader:
|
||||
try:
|
||||
logger.debug(f"Loading viewfile {path}")
|
||||
|
||||
parsed = load_lkml(path)
|
||||
|
||||
process_lookml_template_language(
|
||||
view_lkml_file_dict=parsed,
|
||||
parsed = load_and_preprocess_file(
|
||||
path=path,
|
||||
source_config=self.source_config,
|
||||
)
|
||||
|
||||
@ -86,6 +83,7 @@ class LookerViewFileLoader:
|
||||
root_project_name=self._root_project_name,
|
||||
base_projects_folder=self._base_projects_folder,
|
||||
raw_file_content=raw_file_content,
|
||||
source_config=self.source_config,
|
||||
reporter=reporter,
|
||||
)
|
||||
logger.debug(f"adding viewfile for path {path} to the cache")
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, ClassVar, Dict, List, Optional, Set
|
||||
from typing import Any, ClassVar, Dict, List, Optional, Set, Union
|
||||
|
||||
from deepmerge import always_merger
|
||||
from liquid import Undefined
|
||||
from liquid.exceptions import LiquidSyntaxError
|
||||
|
||||
from datahub.ingestion.source.looker.lkml_patched import load_lkml
|
||||
from datahub.ingestion.source.looker.looker_constant import (
|
||||
DATAHUB_TRANSFORMED_SQL,
|
||||
DATAHUB_TRANSFORMED_SQL_TABLE_NAME,
|
||||
@ -390,6 +392,7 @@ def process_lookml_template_language(
|
||||
source_config: LookMLSourceConfig,
|
||||
view_lkml_file_dict: dict,
|
||||
) -> None:
|
||||
|
||||
if "views" not in view_lkml_file_dict:
|
||||
return
|
||||
|
||||
@ -416,3 +419,18 @@ def process_lookml_template_language(
|
||||
)
|
||||
|
||||
view_lkml_file_dict["views"] = transformed_views
|
||||
|
||||
|
||||
def load_and_preprocess_file(
|
||||
path: Union[str, pathlib.Path],
|
||||
source_config: LookMLSourceConfig,
|
||||
) -> dict:
|
||||
|
||||
parsed = load_lkml(path)
|
||||
|
||||
process_lookml_template_language(
|
||||
view_lkml_file_dict=parsed,
|
||||
source_config=source_config,
|
||||
)
|
||||
|
||||
return parsed
|
||||
|
||||
@ -365,8 +365,9 @@ class LookerViewContext:
|
||||
return sql_table_name.lower()
|
||||
|
||||
def datahub_transformed_sql_table_name(self) -> str:
|
||||
table_name: Optional[str] = self.raw_view.get(
|
||||
"datahub_transformed_sql_table_name"
|
||||
# This field might be present in parent view of current view
|
||||
table_name: Optional[str] = self.get_including_extends(
|
||||
field="datahub_transformed_sql_table_name"
|
||||
)
|
||||
|
||||
if not table_name:
|
||||
|
||||
@ -29,7 +29,6 @@ from datahub.ingestion.source.common.subtypes import (
|
||||
DatasetSubTypes,
|
||||
)
|
||||
from datahub.ingestion.source.git.git_import import GitClone
|
||||
from datahub.ingestion.source.looker.lkml_patched import load_lkml
|
||||
from datahub.ingestion.source.looker.looker_common import (
|
||||
CORPUSER_DATAHUB,
|
||||
LookerExplore,
|
||||
@ -45,6 +44,9 @@ from datahub.ingestion.source.looker.looker_connection import (
|
||||
get_connection_def_based_on_connection_string,
|
||||
)
|
||||
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
|
||||
from datahub.ingestion.source.looker.looker_template_language import (
|
||||
load_and_preprocess_file,
|
||||
)
|
||||
from datahub.ingestion.source.looker.looker_view_id_cache import (
|
||||
LookerModel,
|
||||
LookerViewFileLoader,
|
||||
@ -311,13 +313,19 @@ class LookMLSource(StatefulIngestionSourceBase):
|
||||
|
||||
def _load_model(self, path: str) -> LookerModel:
|
||||
logger.debug(f"Loading model from file {path}")
|
||||
parsed = load_lkml(path)
|
||||
|
||||
parsed = load_and_preprocess_file(
|
||||
path=path,
|
||||
source_config=self.source_config,
|
||||
)
|
||||
|
||||
looker_model = LookerModel.from_looker_dict(
|
||||
parsed,
|
||||
_BASE_PROJECT_NAME,
|
||||
self.source_config.project_name,
|
||||
self.base_projects_folder,
|
||||
path,
|
||||
self.source_config,
|
||||
self.reporter,
|
||||
)
|
||||
return looker_model
|
||||
@ -495,7 +503,10 @@ class LookMLSource(StatefulIngestionSourceBase):
|
||||
def get_manifest_if_present(self, folder: pathlib.Path) -> Optional[LookerManifest]:
|
||||
manifest_file = folder / "manifest.lkml"
|
||||
if manifest_file.exists():
|
||||
manifest_dict = load_lkml(manifest_file)
|
||||
|
||||
manifest_dict = load_and_preprocess_file(
|
||||
path=manifest_file, source_config=self.source_config
|
||||
)
|
||||
|
||||
manifest = LookerManifest(
|
||||
project_name=manifest_dict.get("project_name"),
|
||||
|
||||
@ -154,6 +154,7 @@ def _generate_fully_qualified_name(
|
||||
sql_table_name: str,
|
||||
connection_def: LookerConnectionDefinition,
|
||||
reporter: LookMLSourceReport,
|
||||
view_name: str,
|
||||
) -> str:
|
||||
"""Returns a fully qualified dataset name, resolved through a connection definition.
|
||||
Input sql_table_name can be in three forms: table, db.table, db.schema.table"""
|
||||
@ -192,7 +193,7 @@ def _generate_fully_qualified_name(
|
||||
reporter.report_warning(
|
||||
title="Malformed Table Name",
|
||||
message="Table name has more than 3 parts.",
|
||||
context=f"Table Name: {sql_table_name}",
|
||||
context=f"view-name: {view_name}, table-name: {sql_table_name}",
|
||||
)
|
||||
return sql_table_name.lower()
|
||||
|
||||
@ -280,10 +281,13 @@ class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC):
|
||||
return []
|
||||
|
||||
if sql_parsing_result.debug_info.table_error is not None:
|
||||
logger.debug(
|
||||
f"view-name={self.view_context.name()}, sql_query={self.get_sql_query()}"
|
||||
)
|
||||
self.reporter.report_warning(
|
||||
title="Table Level Lineage Missing",
|
||||
message="Error in parsing derived sql",
|
||||
context=f"View-name: {self.view_context.name()}",
|
||||
context=f"view-name: {self.view_context.name()}, platform: {self.view_context.view_connection.platform}",
|
||||
exc=sql_parsing_result.debug_info.table_error,
|
||||
)
|
||||
return []
|
||||
@ -530,6 +534,7 @@ class RegularViewUpstream(AbstractViewUpstream):
|
||||
sql_table_name=self.view_context.datahub_transformed_sql_table_name(),
|
||||
connection_def=self.view_context.view_connection,
|
||||
reporter=self.view_context.reporter,
|
||||
view_name=self.view_context.name(),
|
||||
)
|
||||
|
||||
self.upstream_dataset_urn = make_dataset_urn_with_platform_instance(
|
||||
@ -586,6 +591,7 @@ class DotSqlTableNameViewUpstream(AbstractViewUpstream):
|
||||
self.view_context.datahub_transformed_sql_table_name(),
|
||||
self.view_context.view_connection,
|
||||
self.view_context.reporter,
|
||||
self.view_context.name(),
|
||||
),
|
||||
base_folder_path=self.view_context.base_folder_path,
|
||||
looker_view_id_cache=self.looker_view_id_cache,
|
||||
|
||||
@ -2,6 +2,7 @@ import logging
|
||||
import pathlib
|
||||
from typing import Any, List
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pydantic
|
||||
import pytest
|
||||
@ -14,13 +15,13 @@ from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.ingestion.source.file import read_metadata_file
|
||||
from datahub.ingestion.source.looker.looker_template_language import (
|
||||
SpecialVariable,
|
||||
load_and_preprocess_file,
|
||||
resolve_liquid_variable,
|
||||
)
|
||||
from datahub.ingestion.source.looker.lookml_source import (
|
||||
LookerModel,
|
||||
LookerRefinementResolver,
|
||||
LookMLSourceConfig,
|
||||
load_lkml,
|
||||
)
|
||||
from datahub.metadata.schema_classes import (
|
||||
DatasetSnapshotClass,
|
||||
@ -870,7 +871,11 @@ def test_manifest_parser(pytestconfig: pytest.Config) -> None:
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
|
||||
manifest_file = test_resources_dir / "lkml_manifest_samples/complex-manifest.lkml"
|
||||
|
||||
manifest = load_lkml(manifest_file)
|
||||
manifest = load_and_preprocess_file(
|
||||
path=manifest_file,
|
||||
source_config=MagicMock(),
|
||||
)
|
||||
|
||||
assert manifest
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user