diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json index 6f85b2b447a..8bfe90c1a6f 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/lookerConnection.json @@ -28,20 +28,11 @@ "type": "string", "format": "password" }, - "url": { + "hostPort": { "description": "URL to Looker instance.", "type": "string", "format": "uri" }, - "platformName": { - "description": "Looker Platform Name", - "type": "string", - "default": "looker" - }, - "actor": { - "description": "Looker actor.", - "type": "string" - }, "env": { "description": "Looker Environment", "type": "string" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/metabaseConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/metabaseConnection.json index 03c5c101dc6..c05aa86c2ce 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/metabaseConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/metabaseConnection.json @@ -32,6 +32,10 @@ "description": "Host and Port of Metabase instance.", "type": "string" }, + "dbServiceName": { + "description": "Database Service Name for creation of lineage", + "type": "string" + }, "supportedPipelineTypes": { "description": "Supported Metadata Extraction Pipelines.", "type": "string", diff --git a/ingestion/examples/workflows/looker.json b/ingestion/examples/workflows/looker.json index f49ac4dec00..b84d5a66a06 100644 --- a/ingestion/examples/workflows/looker.json +++ b/ingestion/examples/workflows/looker.json @@ -1,23 +1,31 @@ { "source": { "type": "looker", - "config": { - "username": "username", - "password": "password", - "url": "http://localhost", - "service_name": "looker", - "service_type": "Looker", + "serviceName": "local_looker", + "serviceConnection": { + "config": { + "type": "Looker", + "username": "username", + "password": "password", + "hostPort": "hostPort", + "env": "env" + } + }, + "sourceConfig": { + "config": { + "dashboardFilterPattern": {}, + "chartFilterPattern": {} + } } }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } -} +} \ No newline at end of file diff --git a/ingestion/examples/workflows/mariadb.json b/ingestion/examples/workflows/mariadb.json index eed941f5760..ce5b77aee48 100644 --- a/ingestion/examples/workflows/mariadb.json +++ b/ingestion/examples/workflows/mariadb.json @@ -1,25 +1,25 @@ { "source": { "type": "mariadb", - "config": { - "username": "openmetadata_user", - "password": "openmetadata_password", - "database": "openmetadata_db", - "service_name": "local_mariadb", - "schema_filter_pattern": { - "excludes": ["information_schema.*", "performance_schema.*", "sys.*"] + "serviceName": "local_mariadb", + "serviceConnection": { + "config": { + "type": "MariaDB", + "username": "openmetadata_user", + "password": "openmetadata_password", + "hostPort": "localhost:3306" } - } + }, + "sourceConfig": {"config": {"enableDataProfiler": false}} }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/examples/workflows/metabase.json b/ingestion/examples/workflows/metabase.json index a4138ad015a..06e3932c43f 100644 --- a/ingestion/examples/workflows/metabase.json +++ b/ingestion/examples/workflows/metabase.json @@ -1,23 +1,31 @@ { "source": { "type": "metabase", - "config": { - "username": "username", - "password": "password", - "host_port": "host:port", - "service_name": "local_metabase", - "db_service_name": "Optional - Create Lineage by adding relevant Database Service Name" + "serviceName": "test", + "serviceConnection": { + "config": { + "type": "Metabase", + "username": "username", + "password": "password", + "hostPort": "hostPort", + "dbServiceName": "Database Service Name to create Lineage" + } + }, + "sourceConfig": { + "config": { + "dashboardFilterPattern": {} +, "chartFilterPattern": {} + } } }, "sink": { "type": "metadata-rest", "config": {} }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } -} +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index facb7c317bf..0eb3c981786 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.dashboard import ( from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) @@ -75,7 +78,7 @@ class LookerSource(Source[Entity]): "LOOKERSDK_CLIENT_SECRET" ] = self.service_connection.password.get_secret_value() if not self.check_env("LOOKERSDK_BASE_URL"): - os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.url + os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.hostPort client = looker_sdk.init31() client.me() return client @@ -110,7 +113,7 @@ class LookerSource(Source[Entity]): displayName=dashboard_elements.title or "", description="", chart_type=dashboard_elements.type, - url=f"{self.service_connection.url}/dashboard_elements/{dashboard_elements.id}", + url=f"{self.service_connection.hostPort}/dashboard_elements/{dashboard_elements.id}", service=EntityReference(id=self.service.id, type="dashboardService"), ) if not dashboard_elements.id: @@ -153,7 +156,7 @@ class LookerSource(Source[Entity]): displayName=dashboard.title, description=dashboard.description or "", charts=self.chart_names, - url=f"{self.service_connection.url}/dashboards/{dashboard.id}", + url=f"{self.service_connection.hostPort}/dashboards/{dashboard.id}", service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/mariadb.py b/ingestion/src/metadata/ingestion/source/mariadb.py index bf6f3506ac5..a2c8e64f233 100644 --- a/ingestion/src/metadata/ingestion/source/mariadb.py +++ b/ingestion/src/metadata/ingestion/source/mariadb.py @@ -14,20 +14,25 @@ from metadata.generated.schema.entity.services.connections.database.mariaDBConne from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig -class MariadbConfig(MariaDBConnection, SQLConnectionConfig): - def get_connection_url(self): - return super().get_connection_url() - - class MariadbSource(SQLSource): def __init__(self, config, metadata_config): super().__init__(config, metadata_config) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = MariadbConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: MariaDBConnection = config.serviceConnection.__root__.config + if not isinstance(connection, MariaDBConnection): + raise InvalidSourceException( + f"Expected MariaDBConnection, but got {connection}" + ) + return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/metabase.py b/ingestion/src/metadata/ingestion/source/metabase.py index 070ff62607d..68a98e57c85 100644 --- a/ingestion/src/metadata/ingestion/source/metabase.py +++ b/ingestion/src/metadata/ingestion/source/metabase.py @@ -14,29 +14,37 @@ import json import logging import traceback import uuid -from typing import Iterable, Optional +from typing import Iterable from urllib.parse import quote import requests -from pydantic import SecretStr -from sqllineage.runner import LineageRunner from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.dashboard import Dashboard as Model_Dashboard from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( + MetabaseConnection, +) from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) +from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( + DashboardServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.sql_source import SQLSourceStatus +from metadata.utils.filters import filter_by_chart, filter_by_dashboard from metadata.utils.helpers import get_dashboard_service_or_create, ingest_lineage HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} @@ -44,22 +52,6 @@ HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} logger: logging.Logger = logging.getLogger(__name__) -class MetabaseSourceConfig(ConfigModel): - """Metabase pydantic config model""" - - username: str - password: SecretStr - host_port: str - dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - service_name: str - service_type: str = DashboardServiceType.Metabase.value - db_service_name: Optional[str] = None - - def get_connection_url(self): - """get connection url (not implemented)""" - - class MetabaseSource(Source[Entity]): """Metabase entity class @@ -76,25 +68,27 @@ class MetabaseSource(Source[Entity]): metric_charts: """ - config: MetabaseSourceConfig + config: WorkflowSource metadata_config: OpenMetadataServerConfig status: SQLSourceStatus def __init__( self, - config: MetabaseSourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataServerConfig, ): super().__init__() self.config = config + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config = self.config.sourceConfig.config self.metadata_config = metadata_config self.status = SQLSourceStatus() - params = {} - params["username"] = self.config.username - params["password"] = self.config.password.get_secret_value() + params = dict() + params["username"] = self.service_connection.username + params["password"] = self.service_connection.password.get_secret_value() try: resp = requests.post( - self.config.host_port + "/api/session/", + self.service_connection.hostPort + "/api/session/", data=json.dumps(params), headers=HEADERS, ) @@ -103,11 +97,9 @@ class MetabaseSource(Source[Entity]): session_id = resp.json()["id"] self.metabase_session = {"X-Metabase-Session": session_id} self.dashboard_service = get_dashboard_service_or_create( - config.service_name, + config.serviceName, DashboardServiceType.Metabase.name, - config.username, - config.password.get_secret_value(), - config.host_port, + self.service_connection.dict(), metadata_config, ) self.charts = [] @@ -123,7 +115,12 @@ class MetabaseSource(Source[Entity]): Returns: MetabaseSource """ - config = MetabaseSourceConfig.parse_obj(config_dict) + config = WorkflowSource.parse_obj(config_dict) + connection: MetabaseConnection = config.serviceConnection.__root__.config + if not isinstance(connection, MetabaseConnection): + raise InvalidSourceException( + f"Expected MetabaseConnection, but got {connection}" + ) return cls(config, metadata_config) def next_record(self) -> Iterable[Entity]: @@ -141,10 +138,14 @@ class MetabaseSource(Source[Entity]): for chart in charts: try: chart_details = chart["card"] - if not "name" in chart_details: + if not ("name" in chart_details): continue - if not self.config.chart_pattern.included(chart_details["name"]): - self.status.filter(chart_details["name"], None) + if filter_by_chart( + self.source_config.chartFilterPattern, chart_details["name"] + ): + self.status.filter( + chart_details["name"], "Chart Pattern not allowed" + ) continue yield Chart( id=uuid.uuid4(), @@ -154,7 +155,7 @@ class MetabaseSource(Source[Entity]): if chart_details["description"] is not None else "", chart_type=str(chart_details["display"]), - url=self.config.host_port, + url=self.service_connection.hostPort, service=EntityReference( id=self.dashboard_service.id, type="dashboardService" ), @@ -174,16 +175,18 @@ class MetabaseSource(Source[Entity]): resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") dashboard_details = resp_dashboard.json() self.charts = [] - if not self.config.dashboard_pattern.included( - dashboard_details["name"] + if filter_by_dashboard( + self.source_config.dashboardFilterPattern, dashboard_details["name"] ): - self.status.filter(dashboard_details["name"], None) + self.status.filter( + dashboard_details["name"], "Dashboard Pattern not Allowed" + ) continue yield from self.get_charts(dashboard_details["ordered_cards"]) yield Dashboard( id=uuid.uuid4(), name=dashboard_details["name"], - url=self.config.host_port, + url=self.service_connection.hostPort, displayName=dashboard_details["name"], description=dashboard_details["description"] if dashboard_details["description"] is not None @@ -193,7 +196,7 @@ class MetabaseSource(Source[Entity]): id=self.dashboard_service.id, type="dashboardService" ), ) - if self.config.db_service_name: + if self.service_connection.dbServiceName: yield from self.get_lineage( dashboard_details["ordered_cards"], dashboard_details["name"] ) @@ -214,7 +217,7 @@ class MetabaseSource(Source[Entity]): resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}") if resp_tables.status_code == 200: table = resp_tables.json() - table_fqdn = f"{self.config.db_service_name}.\ + table_fqdn = f"{self.service_connection.dbServiceName}.\ {table['schema']}.{table['name']}" dashboard_fqdn = ( f"{self.dashboard_service.name}.{quote(dashboard_name)}" @@ -245,7 +248,9 @@ class MetabaseSource(Source[Entity]): Args: path: """ - return requests.get(self.config.host_port + path, headers=self.metabase_session) + return requests.get( + self.service_connection.hostPort + path, headers=self.metabase_session + ) def get_status(self) -> SourceStatus: return self.status @@ -264,20 +269,18 @@ class MetabaseSource(Source[Entity]): continue card_detail_resp = self.req_get(f"/api/card/{card_details['id']}") if card_detail_resp.status_code == 200: - card = card_detail_resp.json() raw_query = ( card_details.get("dataset_query", {}) .get("native", {}) .get("query", "") ) - - query_info = { - "sql": raw_query, - "from_type": "table", - "to_type": "table", - "service_name": self.config.service_name, - } - ingest_lineage(query_info, self.metadata_config) + query_info = { + "sql": raw_query, + "from_type": "table", + "to_type": "table", + "service_name": self.config.serviceName, + } + ingest_lineage(query_info, self.metadata_config) except Exception as e: logger.error(repr(e)) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 56652523854..620bbc3004d 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -158,9 +158,12 @@ def get_dashboard_service_or_create( if service is not None: return service else: + dashboard_config = {"config": config} created_service = metadata.create_or_update( CreateDashboardServiceRequest( - name=service_name, serviceType=dashboard_service_type, connection=config + name=service_name, + serviceType=dashboard_service_type, + connection=dashboard_config, ) ) return created_service