mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-02 04:13:17 +00:00
parent
23468fb868
commit
1b3ff505c2
@ -119,7 +119,7 @@ plugins: Dict[str, Set[str]] = {
|
||||
"protobuf",
|
||||
},
|
||||
"ldap-users": {"ldap3==2.9.1"},
|
||||
"looker": {"looker-sdk>=22.4.0"},
|
||||
"looker": {"looker-sdk>=22.20.0"},
|
||||
"mssql": {"sqlalchemy-pytds>=0.3"},
|
||||
"pymssql": {"pymssql==2.2.5"},
|
||||
"mssql-odbc": {"pyodbc"},
|
||||
|
@ -13,6 +13,7 @@ Mixin class containing User specific methods
|
||||
|
||||
To be used by OpenMetadata class
|
||||
"""
|
||||
from typing import Optional
|
||||
|
||||
from metadata.generated.schema.entity.teams.user import User
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
@ -30,7 +31,7 @@ class OMetaUserMixin:
|
||||
|
||||
client: REST
|
||||
|
||||
def get_user_by_email(self, email: str) -> None:
|
||||
def get_user_by_email(self, email: str) -> Optional[User]:
|
||||
"""
|
||||
GET user entity by name
|
||||
|
||||
@ -40,8 +41,7 @@ class OMetaUserMixin:
|
||||
|
||||
name = email.split("@")[0]
|
||||
users = self.es_search_from_fqn(entity_type=User, fqn_search_string=name)
|
||||
if users:
|
||||
for user in users:
|
||||
for user in users or []:
|
||||
if user.email.__root__ == email:
|
||||
return user
|
||||
return None
|
||||
|
@ -183,9 +183,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_dashboard_name(self, dashboard_details: Any) -> str:
|
||||
def get_dashboard_name(self, dashboard: Any) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
Get Dashboard Name from each element coming from `get_dashboards_list`
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
@ -198,7 +198,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
self, dashboard_details: Any
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Yields lineage if config is enabled
|
||||
Yields lineage if config is enabled.
|
||||
|
||||
We will look for the data in all the services
|
||||
we have informed.
|
||||
"""
|
||||
for db_service_name in self.source_config.dbServiceNames or []:
|
||||
yield from self.yield_dashboard_lineage_details(
|
||||
@ -265,7 +268,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
entity=DashboardService, config=config
|
||||
)
|
||||
|
||||
def _get_add_lineage_request(self, to_entity: Dashboard, from_entity: Table):
|
||||
@staticmethod
|
||||
def _get_add_lineage_request(
|
||||
to_entity: Dashboard, from_entity: Table
|
||||
) -> Optional[AddLineageRequest]:
|
||||
if from_entity and to_entity:
|
||||
return AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
@ -281,10 +287,21 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
|
||||
def get_dashboard(self) -> Any:
|
||||
"""
|
||||
Method to iterate through dashboard lists filter dashbaords & yield dashboard details
|
||||
Method to iterate through dashboard lists filter dashboards & yield dashboard details
|
||||
"""
|
||||
for dashboard in self.get_dashboards_list():
|
||||
|
||||
dashboard_name = self.get_dashboard_name(dashboard)
|
||||
if filter_by_dashboard(
|
||||
self.source_config.dashboardFilterPattern,
|
||||
dashboard_name,
|
||||
):
|
||||
self.status.filter(
|
||||
dashboard_name,
|
||||
"Dashboard Filtered Out",
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
dashboard_details = self.get_dashboard_details(dashboard)
|
||||
except Exception as exc:
|
||||
@ -293,17 +310,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
f"Cannot extract dashboard details from {dashboard}: {exc}"
|
||||
)
|
||||
continue
|
||||
dashboard_name = self.get_dashboard_name(dashboard_details)
|
||||
|
||||
if filter_by_dashboard(
|
||||
self.source_config.dashboardFilterPattern,
|
||||
dashboard_name,
|
||||
):
|
||||
self.status.filter(
|
||||
dashboard_name,
|
||||
"Dashboard Fltered Out",
|
||||
)
|
||||
continue
|
||||
yield dashboard_details
|
||||
|
||||
def test_connection(self) -> None:
|
||||
|
@ -69,8 +69,8 @@ class DomodashboardSource(DashboardServiceSource):
|
||||
dashboards = self.domo_client.page_list()
|
||||
return dashboards
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
return dashboard_details["name"]
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
return dashboard["name"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
return dashboard
|
||||
|
@ -8,7 +8,16 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Looker source module"""
|
||||
"""
|
||||
Looker source module.
|
||||
Supports:
|
||||
- owner
|
||||
- lineage
|
||||
- usage
|
||||
|
||||
Notes:
|
||||
- Filtering is applied on the Dashboard title or ID, if the title is missing
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
@ -16,6 +25,7 @@ from typing import Iterable, List, Optional, Set, cast
|
||||
|
||||
from looker_sdk.error import SDKError
|
||||
from looker_sdk.sdk.api31.models import Query
|
||||
from looker_sdk.sdk.api40.methods import Looker40SDK
|
||||
from looker_sdk.sdk.api40.models import Dashboard as LookerDashboard
|
||||
from looker_sdk.sdk.api40.models import (
|
||||
DashboardBase,
|
||||
@ -54,13 +64,32 @@ from metadata.utils.logger import ingestion_logger
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
LIST_DASHBOARD_FIELDS = ["id", "title"]
|
||||
|
||||
# Here we can update the fields to get further information, such as:
|
||||
# created_at, updated_at, last_updater_id, deleted_at, deleter_id, favorite_count, last_viewed_at
|
||||
GET_DASHBOARD_FIELDS = [
|
||||
"id",
|
||||
"title",
|
||||
"dashboard_elements",
|
||||
"dashboard_filters",
|
||||
"view_count",
|
||||
"description",
|
||||
"folder",
|
||||
"user_id", # Use as owner
|
||||
]
|
||||
|
||||
|
||||
class LookerSource(DashboardServiceSource):
|
||||
"""
|
||||
Looker Source Class
|
||||
Looker Source Class.
|
||||
|
||||
Its client uses Looker 40 from the SDK: client = looker_sdk.init40()
|
||||
"""
|
||||
|
||||
config: WorkflowSource
|
||||
metadata_config: OpenMetadataConnection
|
||||
client: Looker40SDK
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -70,6 +99,9 @@ class LookerSource(DashboardServiceSource):
|
||||
super().__init__(config, metadata_config)
|
||||
self.today = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
# Owners cache. The key will be the user_id and the value its OM user EntityRef
|
||||
self._owners_ref = {}
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
|
||||
config = WorkflowSource.parse_obj(config_dict)
|
||||
@ -80,32 +112,73 @@ class LookerSource(DashboardServiceSource):
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def get_dashboards_list(self) -> Optional[List[DashboardBase]]:
|
||||
def get_dashboards_list(self) -> List[DashboardBase]:
|
||||
"""
|
||||
Get List of all dashboards
|
||||
"""
|
||||
return self.client.all_dashboards(fields="id")
|
||||
try:
|
||||
return list(
|
||||
self.client.all_dashboards(fields=",".join(LIST_DASHBOARD_FIELDS))
|
||||
)
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Wild error trying to obtain dashboard list {err}")
|
||||
# If we cannot list the dashboards, let's blow up
|
||||
raise err
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: DashboardBase) -> str:
|
||||
def get_dashboard_name(self, dashboard: DashboardBase) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
Get Dashboard Title. This will be used for filtering.
|
||||
If the title is not present, we'll send the ID
|
||||
"""
|
||||
return dashboard_details.id
|
||||
return dashboard.title or dashboard.id
|
||||
|
||||
def get_dashboard_details(self, dashboard: DashboardBase) -> LookerDashboard:
|
||||
"""
|
||||
Get Dashboard Details
|
||||
"""
|
||||
fields = [
|
||||
"id",
|
||||
"title",
|
||||
"dashboard_elements",
|
||||
"dashboard_filters",
|
||||
"view_count",
|
||||
"description",
|
||||
"folder",
|
||||
]
|
||||
return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields))
|
||||
return self.client.dashboard(
|
||||
dashboard_id=dashboard.id, fields=",".join(GET_DASHBOARD_FIELDS)
|
||||
)
|
||||
|
||||
def get_owner_details(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[EntityReference]:
|
||||
"""Get dashboard owner
|
||||
|
||||
Store the visited users in the _owners_ref cache, even if we found them
|
||||
in OM or not.
|
||||
|
||||
If the user has not yet been visited, store it and return from cache.
|
||||
|
||||
Args:
|
||||
dashboard_details: LookerDashboard
|
||||
Returns:
|
||||
Optional[EntityReference]
|
||||
"""
|
||||
|
||||
try:
|
||||
if (
|
||||
dashboard_details.user_id is not None
|
||||
and dashboard_details.user_id not in self._owners_ref
|
||||
):
|
||||
dashboard_owner = self.client.user(dashboard_details.user_id)
|
||||
user = self.metadata.get_user_by_email(dashboard_owner.email)
|
||||
if user: # Save the EntityRef
|
||||
self._owners_ref[dashboard_details.user_id] = EntityReference(
|
||||
id=user.id, type="user"
|
||||
)
|
||||
else: # Otherwise, flag the user as missing in OM
|
||||
self._owners_ref[dashboard_details.user_id] = None
|
||||
logger.debug(
|
||||
f"User {dashboard_owner.email} not found in OpenMetadata."
|
||||
)
|
||||
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Could not fetch owner data due to {err}")
|
||||
|
||||
return self._owners_ref.get(dashboard_details.user_id)
|
||||
|
||||
def yield_dashboard(
|
||||
self, dashboard_details: LookerDashboard
|
||||
@ -117,7 +190,7 @@ class LookerSource(DashboardServiceSource):
|
||||
yield CreateDashboardRequest(
|
||||
name=dashboard_details.id.replace("::", "_"),
|
||||
displayName=dashboard_details.title,
|
||||
description=dashboard_details.description or "",
|
||||
description=dashboard_details.description or None,
|
||||
charts=[
|
||||
EntityReference(id=chart.id.__root__, type="chart")
|
||||
for chart in self.context.charts
|
||||
@ -126,6 +199,7 @@ class LookerSource(DashboardServiceSource):
|
||||
service=EntityReference(
|
||||
id=self.context.dashboard_service.id.__root__, type="dashboardService"
|
||||
),
|
||||
owner=self.get_owner_details(dashboard_details),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@ -186,7 +260,7 @@ class LookerSource(DashboardServiceSource):
|
||||
return dashboard_sources
|
||||
|
||||
def yield_dashboard_lineage_details(
|
||||
self, dashboard_details: LookerDashboard, db_service_name
|
||||
self, dashboard_details: LookerDashboard, db_service_name: str
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
"""
|
||||
Get lineage between charts and data sources.
|
||||
@ -211,22 +285,10 @@ class LookerSource(DashboardServiceSource):
|
||||
|
||||
for source in datasource_list:
|
||||
try:
|
||||
source_elements = fqn.split_table_name(table_name=source)
|
||||
|
||||
from_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=db_service_name,
|
||||
database_name=source_elements["database"],
|
||||
schema_name=source_elements["database_schema"],
|
||||
table_name=source_elements["table"],
|
||||
)
|
||||
from_entity = self.metadata.get_by_name(
|
||||
entity=Table,
|
||||
fqn=from_fqn,
|
||||
)
|
||||
yield self._get_add_lineage_request(
|
||||
to_entity=to_entity, from_entity=from_entity
|
||||
yield self.build_lineage_request(
|
||||
source=source,
|
||||
db_service_name=db_service_name,
|
||||
to_entity=to_entity,
|
||||
)
|
||||
|
||||
except (Exception, IndexError) as err:
|
||||
@ -235,15 +297,53 @@ class LookerSource(DashboardServiceSource):
|
||||
f"Error building lineage for database service [{db_service_name}]: {err}"
|
||||
)
|
||||
|
||||
def build_lineage_request(
|
||||
self, source: str, db_service_name: str, to_entity: MetadataDashboard
|
||||
) -> Optional[AddLineageRequest]:
|
||||
"""
|
||||
Once we have a list of origin data sources, check their components
|
||||
and build the lineage request.
|
||||
|
||||
We will try searching in ES with and without the `database`
|
||||
|
||||
Args:
|
||||
source: table name from the source list
|
||||
db_service_name: name of the service from the config
|
||||
to_entity: Dashboard Entity being used
|
||||
"""
|
||||
|
||||
source_elements = fqn.split_table_name(table_name=source)
|
||||
|
||||
for database_name in [source_elements["database"], None]:
|
||||
|
||||
from_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=db_service_name,
|
||||
database_name=database_name,
|
||||
schema_name=source_elements["database_schema"],
|
||||
table_name=source_elements["table"],
|
||||
)
|
||||
|
||||
from_entity: Table = self.metadata.get_by_name(
|
||||
entity=Table,
|
||||
fqn=from_fqn,
|
||||
)
|
||||
|
||||
if from_entity:
|
||||
return self._get_add_lineage_request(
|
||||
to_entity=to_entity, from_entity=from_entity
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def yield_dashboard_chart(
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[Iterable[CreateChartRequest]]:
|
||||
"""
|
||||
Method to fetch charts linked to dashboard
|
||||
"""
|
||||
for chart in cast(
|
||||
Iterable[DashboardElement], dashboard_details.dashboard_elements
|
||||
):
|
||||
for chart in dashboard_details.dashboard_elements:
|
||||
try:
|
||||
if filter_by_chart(
|
||||
chart_filter_pattern=self.source_config.chartFilterPattern,
|
||||
@ -259,7 +359,7 @@ class LookerSource(DashboardServiceSource):
|
||||
yield CreateChartRequest(
|
||||
name=chart.id,
|
||||
displayName=chart.title or chart.id,
|
||||
description="",
|
||||
description=self.build_chart_description(chart) or None,
|
||||
chartType=get_standard_chart_type(chart.type).value,
|
||||
chartUrl=f"/dashboard_elements/{chart.id}",
|
||||
service=EntityReference(
|
||||
@ -273,6 +373,28 @@ class LookerSource(DashboardServiceSource):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error creating chart [{chart}]: {exc}")
|
||||
|
||||
@staticmethod
|
||||
def build_chart_description(chart: DashboardElement) -> Optional[str]:
|
||||
"""
|
||||
Chart descriptions will be based on the subtitle + note_text, if exists.
|
||||
If the chart is a text tile, we will add the text as the chart description as well.
|
||||
This should keep the dashboard searchable without breaking the original metadata structure.
|
||||
"""
|
||||
|
||||
# If the string is None or empty, filter it out.
|
||||
try:
|
||||
return "; ".join(
|
||||
filter(
|
||||
lambda string: string,
|
||||
[chart.subtitle_text, chart.body_text, chart.note_text],
|
||||
)
|
||||
or []
|
||||
)
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Error getting chart description: {err}")
|
||||
return None
|
||||
|
||||
def yield_dashboard_usage( # pylint: disable=W0221
|
||||
self, dashboard_details: LookerDashboard
|
||||
) -> Optional[DashboardUsage]:
|
||||
|
@ -83,11 +83,11 @@ class MetabaseSource(DashboardServiceSource):
|
||||
return resp_dashboards.json()
|
||||
return []
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details["name"]
|
||||
return dashboard["name"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -72,11 +72,11 @@ class ModeSource(DashboardServiceSource):
|
||||
"""
|
||||
return self.client.fetch_all_reports(self.workspace_name)
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details.get(mode_client.NAME)
|
||||
return dashboard.get(mode_client.NAME)
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -128,11 +128,11 @@ class PowerbiSource(DashboardServiceSource):
|
||||
"""
|
||||
return self.context.workspace.get("dashboards")
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details["displayName"]
|
||||
return dashboard["displayName"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -80,11 +80,11 @@ class QuickSightSource(DashboardServiceSource):
|
||||
]
|
||||
return dashboards
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details["Name"]
|
||||
return dashboard["Name"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -64,11 +64,11 @@ class RedashSource(DashboardServiceSource):
|
||||
dashboard_info = self.client.dashboards()
|
||||
return dashboard_info["results"]
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details["name"]
|
||||
return dashboard["name"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -137,11 +137,11 @@ class SupersetSource(DashboardServiceSource):
|
||||
for dashboard in dashboards["result"]:
|
||||
yield dashboard
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details["dashboard_title"]
|
||||
return dashboard["dashboard_title"]
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -146,11 +146,11 @@ class TableauSource(DashboardServiceSource):
|
||||
"""
|
||||
return self.workbooks.values()
|
||||
|
||||
def get_dashboard_name(self, dashboard_details: dict) -> str:
|
||||
def get_dashboard_name(self, dashboard: dict) -> str:
|
||||
"""
|
||||
Get Dashboard Name
|
||||
"""
|
||||
return dashboard_details.get("name")
|
||||
return dashboard.get("name")
|
||||
|
||||
def get_dashboard_details(self, dashboard: dict) -> dict:
|
||||
"""
|
||||
|
@ -182,3 +182,27 @@ class FQNBuildTest(TestCase):
|
||||
str(context.exception),
|
||||
"Service Name and Table Name should be informed, but got service=`None`, table=`None`",
|
||||
)
|
||||
|
||||
def test_split_table_name(self):
|
||||
"""Different tables are properly partitioned"""
|
||||
|
||||
self.assertEqual(
|
||||
{"database": "database", "database_schema": "schema", "table": "table"},
|
||||
fqn.split_table_name(table_name="database.schema.table"),
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
{"database": None, "database_schema": "schema", "table": "table"},
|
||||
fqn.split_table_name(table_name="schema.table"),
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
{"database": None, "database_schema": None, "table": "table"},
|
||||
fqn.split_table_name(table_name="table"),
|
||||
)
|
||||
|
||||
# We also clean quotes
|
||||
self.assertEqual(
|
||||
{"database": "database", "database_schema": "schema", "table": "table"},
|
||||
fqn.split_table_name(table_name='database."schema".table'),
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user