mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-17 03:38:18 +00:00
ISSUE-19454: Fixes broken looker lineage (#19456)
* ISSUE-19454: Fixes the broken lineage in looker when backticks enclosed table refs * refactor * use isort * Update ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com> --------- Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
This commit is contained in:
parent
250b406a11
commit
fd2575d244
@ -24,7 +24,18 @@ import os
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional, Sequence, Set, Type, Union, cast
|
||||
from typing import (
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Type,
|
||||
Union,
|
||||
cast,
|
||||
get_args,
|
||||
)
|
||||
|
||||
import giturlparse
|
||||
import lkml
|
||||
@ -91,7 +102,7 @@ from metadata.generated.schema.type.entityReferenceList import EntityReferenceLi
|
||||
from metadata.generated.schema.type.usageRequest import UsageRequest
|
||||
from metadata.ingestion.api.models import Either
|
||||
from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
|
||||
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
|
||||
from metadata.ingestion.lineage.parser import LineageParser
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.dashboard.dashboard_service import (
|
||||
@ -108,7 +119,6 @@ from metadata.ingestion.source.dashboard.looker.models import (
|
||||
LookMlView,
|
||||
ViewName,
|
||||
)
|
||||
from metadata.ingestion.source.dashboard.looker.parser import LkmlParser
|
||||
from metadata.ingestion.source.dashboard.looker.utils import _clone_repo
|
||||
from metadata.readers.file.api_reader import ReadersCredentials
|
||||
from metadata.readers.file.base import Reader
|
||||
@ -121,7 +131,6 @@ from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
LIST_DASHBOARD_FIELDS = ["id", "title"]
|
||||
IMPORTED_PROJECTS_DIR = "imported_projects"
|
||||
|
||||
@ -178,7 +187,7 @@ class LookerSource(DashboardServiceSource):
|
||||
self._explores_cache = {}
|
||||
self._repo_credentials: Optional[ReadersCredentials] = None
|
||||
self._reader_class: Optional[Type[Reader]] = None
|
||||
self._project_parsers: Optional[Dict[str, LkmlParser]] = None
|
||||
self._project_parsers: Optional[Dict[str, BulkLkmlParser]] = None
|
||||
self._main_lookml_repo: Optional[LookMLRepo] = None
|
||||
self._main__lookml_manifest: Optional[LookMLManifest] = None
|
||||
self._view_data_model: Optional[DashboardDataModel] = None
|
||||
@ -260,7 +269,7 @@ class LookerSource(DashboardServiceSource):
|
||||
self._main__lookml_manifest = self.__read_manifest(credentials)
|
||||
|
||||
@property
|
||||
def parser(self) -> Optional[Dict[str, LkmlParser]]:
|
||||
def parser(self) -> Optional[Dict[str, BulkLkmlParser]]:
|
||||
if self.repository_credentials:
|
||||
return self._project_parsers
|
||||
return None
|
||||
@ -282,7 +291,7 @@ class LookerSource(DashboardServiceSource):
|
||||
"""
|
||||
if self.repository_credentials:
|
||||
all_projects: Set[str] = {model.project_name for model in all_lookml_models}
|
||||
self._project_parsers: Dict[str, LkmlParser] = {
|
||||
self._project_parsers: Dict[str, BulkLkmlParser] = {
|
||||
project_name: BulkLkmlParser(
|
||||
reader=self.reader(Path(self._main_lookml_repo.path))
|
||||
)
|
||||
@ -325,7 +334,7 @@ class LookerSource(DashboardServiceSource):
|
||||
"""
|
||||
if not self._repo_credentials:
|
||||
if self.service_connection.gitCredentials and isinstance(
|
||||
self.service_connection.gitCredentials, ReadersCredentials
|
||||
self.service_connection.gitCredentials, get_args(ReadersCredentials)
|
||||
):
|
||||
self._repo_credentials = self.service_connection.gitCredentials
|
||||
|
||||
@ -572,10 +581,12 @@ class LookerSource(DashboardServiceSource):
|
||||
|
||||
if view.sql_table_name:
|
||||
sql_table_name = self._render_table_name(view.sql_table_name)
|
||||
source_table_name = self._clean_table_name(sql_table_name)
|
||||
|
||||
# View to the source is only there if we are informing the dbServiceNames
|
||||
for db_service_name in db_service_names or []:
|
||||
dialect = self._get_db_dialect(db_service_name)
|
||||
source_table_name = self._clean_table_name(sql_table_name, dialect)
|
||||
|
||||
# View to the source is only there if we are informing the dbServiceNames
|
||||
yield self.build_lineage_request(
|
||||
source=source_table_name,
|
||||
db_service_name=db_service_name,
|
||||
@ -587,15 +598,9 @@ class LookerSource(DashboardServiceSource):
|
||||
if not sql_query:
|
||||
return
|
||||
for db_service_name in db_service_names or []:
|
||||
db_service = self.metadata.get_by_name(
|
||||
DatabaseService, db_service_name
|
||||
)
|
||||
|
||||
lineage_parser = LineageParser(
|
||||
sql_query,
|
||||
ConnectionTypeDialectMapper.dialect_of(
|
||||
db_service.connection.config.type.value
|
||||
),
|
||||
self._get_db_dialect(db_service_name),
|
||||
timeout_seconds=30,
|
||||
)
|
||||
if lineage_parser.source_tables:
|
||||
@ -615,6 +620,12 @@ class LookerSource(DashboardServiceSource):
|
||||
)
|
||||
)
|
||||
|
||||
def _get_db_dialect(self, db_service_name) -> Dialect:
|
||||
db_service = self.metadata.get_by_name(DatabaseService, db_service_name)
|
||||
return ConnectionTypeDialectMapper.dialect_of(
|
||||
db_service.connection.config.type.value
|
||||
)
|
||||
|
||||
def get_dashboards_list(self) -> List[DashboardBase]:
|
||||
"""
|
||||
Get List of all dashboards
|
||||
@ -718,7 +729,7 @@ class LookerSource(DashboardServiceSource):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _clean_table_name(table_name: str) -> str:
|
||||
def _clean_table_name(table_name: str, dialect: Dialect = Dialect.ANSI) -> str:
|
||||
"""
|
||||
sql_table_names might be renamed when defining
|
||||
an explore. E.g., customers as cust
|
||||
@ -726,7 +737,10 @@ class LookerSource(DashboardServiceSource):
|
||||
:return: clean table name
|
||||
"""
|
||||
|
||||
return table_name.lower().split(" as ")[0].strip()
|
||||
clean_table_name = table_name.lower().split(" as ")[0].strip()
|
||||
if dialect == Dialect.BIGQUERY:
|
||||
clean_table_name = clean_table_name.strip("`")
|
||||
return clean_table_name
|
||||
|
||||
@staticmethod
|
||||
def _render_table_name(table_name: str) -> str:
|
||||
|
@ -47,6 +47,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats
|
||||
from metadata.generated.schema.type.usageRequest import UsageRequest
|
||||
from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.lineage.models import Dialect
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
|
||||
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource
|
||||
@ -292,13 +293,33 @@ class LookerUnitTest(TestCase):
|
||||
"""
|
||||
Check table cleaning
|
||||
"""
|
||||
self.assertEqual(self.looker._clean_table_name("MY_TABLE"), "my_table")
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name("MY_TABLE", Dialect.MYSQL), "my_table"
|
||||
)
|
||||
|
||||
self.assertEqual(self.looker._clean_table_name(" MY_TABLE "), "my_table")
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name(" MY_TABLE ", Dialect.REDSHIFT), "my_table"
|
||||
)
|
||||
|
||||
self.assertEqual(self.looker._clean_table_name(" my_table"), "my_table")
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name(" my_table", Dialect.SNOWFLAKE), "my_table"
|
||||
)
|
||||
|
||||
self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table")
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name("TABLE AS ALIAS", Dialect.BIGQUERY), "table"
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name(
|
||||
"`project_id.dataset_id.table_id` AS ALIAS", Dialect.BIGQUERY
|
||||
),
|
||||
"project_id.dataset_id.table_id",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.looker._clean_table_name("`db.schema.table`", Dialect.POSTGRES),
|
||||
"`db.schema.table`",
|
||||
)
|
||||
|
||||
def test_render_table_name(self):
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user