Refactoring Connectors - Metabase, MariaDB, Looker (#3977)

* Metabase Fix

* Looker, Metabase and Mariadb

* modified metabase
This commit is contained in:
Ayush Shah 2022-04-09 04:26:12 -07:00 committed by GitHub
parent d5758b2fb2
commit 3840eb77c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 109 deletions

View File

@ -28,20 +28,11 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"url": { "hostPort": {
"description": "URL to Looker instance.", "description": "URL to Looker instance.",
"type": "string", "type": "string",
"format": "uri" "format": "uri"
}, },
"platformName": {
"description": "Looker Platform Name",
"type": "string",
"default": "looker"
},
"actor": {
"description": "Looker actor.",
"type": "string"
},
"env": { "env": {
"description": "Looker Environment", "description": "Looker Environment",
"type": "string" "type": "string"

View File

@ -32,6 +32,10 @@
"description": "Host and Port of Metabase instance.", "description": "Host and Port of Metabase instance.",
"type": "string" "type": "string"
}, },
"dbServiceName": {
"description": "Database Service Name for creation of lineage",
"type": "string"
},
"supportedPipelineTypes": { "supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.", "description": "Supported Metadata Extraction Pipelines.",
"type": "string", "type": "string",

View File

@ -1,23 +1,31 @@
{ {
"source": { "source": {
"type": "looker", "type": "looker",
"config": { "serviceName": "local_looker",
"username": "username", "serviceConnection": {
"password": "password", "config": {
"url": "http://localhost", "type": "Looker",
"service_name": "looker", "username": "username",
"service_type": "Looker", "password": "password",
"hostPort": "hostPort",
"env": "env"
}
},
"sourceConfig": {
"config": {
"dashboardFilterPattern": {},
"chartFilterPattern": {}
}
} }
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -1,25 +1,25 @@
{ {
"source": { "source": {
"type": "mariadb", "type": "mariadb",
"config": { "serviceName": "local_mariadb",
"username": "openmetadata_user", "serviceConnection": {
"password": "openmetadata_password", "config": {
"database": "openmetadata_db", "type": "MariaDB",
"service_name": "local_mariadb", "username": "openmetadata_user",
"schema_filter_pattern": { "password": "openmetadata_password",
"excludes": ["information_schema.*", "performance_schema.*", "sys.*"] "hostPort": "localhost:3306"
} }
} },
"sourceConfig": {"config": {"enableDataProfiler": false}}
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -1,23 +1,31 @@
{ {
"source": { "source": {
"type": "metabase", "type": "metabase",
"config": { "serviceName": "test",
"username": "username", "serviceConnection": {
"password": "password", "config": {
"host_port": "host:port", "type": "Metabase",
"service_name": "local_metabase", "username": "username",
"db_service_name": "Optional - Create Lineage by adding relevant Database Service Name" "password": "password",
"hostPort": "hostPort",
"dbServiceName": "Database Service Name to create Lineage"
}
},
"sourceConfig": {
"config": {
"dashboardFilterPattern": {}
, "chartFilterPattern": {}
}
} }
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.connections.dashboard import (
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
@ -75,7 +78,7 @@ class LookerSource(Source[Entity]):
"LOOKERSDK_CLIENT_SECRET" "LOOKERSDK_CLIENT_SECRET"
] = self.service_connection.password.get_secret_value() ] = self.service_connection.password.get_secret_value()
if not self.check_env("LOOKERSDK_BASE_URL"): if not self.check_env("LOOKERSDK_BASE_URL"):
os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.url os.environ["LOOKERSDK_BASE_URL"] = self.service_connection.hostPort
client = looker_sdk.init31() client = looker_sdk.init31()
client.me() client.me()
return client return client
@ -110,7 +113,7 @@ class LookerSource(Source[Entity]):
displayName=dashboard_elements.title or "", displayName=dashboard_elements.title or "",
description="", description="",
chart_type=dashboard_elements.type, chart_type=dashboard_elements.type,
url=f"{self.service_connection.url}/dashboard_elements/{dashboard_elements.id}", url=f"{self.service_connection.hostPort}/dashboard_elements/{dashboard_elements.id}",
service=EntityReference(id=self.service.id, type="dashboardService"), service=EntityReference(id=self.service.id, type="dashboardService"),
) )
if not dashboard_elements.id: if not dashboard_elements.id:
@ -153,7 +156,7 @@ class LookerSource(Source[Entity]):
displayName=dashboard.title, displayName=dashboard.title,
description=dashboard.description or "", description=dashboard.description or "",
charts=self.chart_names, charts=self.chart_names,
url=f"{self.service_connection.url}/dashboards/{dashboard.id}", url=f"{self.service_connection.hostPort}/dashboards/{dashboard.id}",
service=EntityReference( service=EntityReference(
id=self.service.id, type="dashboardService" id=self.service.id, type="dashboardService"
), ),

View File

@ -14,20 +14,25 @@ from metadata.generated.schema.entity.services.connections.database.mariaDBConne
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class MariadbConfig(MariaDBConnection, SQLConnectionConfig):
def get_connection_url(self):
return super().get_connection_url()
class MariadbSource(SQLSource): class MariadbSource(SQLSource):
def __init__(self, config, metadata_config): def __init__(self, config, metadata_config):
super().__init__(config, metadata_config) super().__init__(config, metadata_config)
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = MariadbConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: MariaDBConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MariaDBConnection):
raise InvalidSourceException(
f"Expected MariaDBConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)

View File

@ -14,29 +14,37 @@ import json
import logging import logging
import traceback import traceback
import uuid import uuid
from typing import Iterable, Optional from typing import Iterable
from urllib.parse import quote from urllib.parse import quote
import requests import requests
from pydantic import SecretStr
from sqllineage.runner import LineageRunner
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard as Model_Dashboard from metadata.generated.schema.entity.data.dashboard import Dashboard as Model_Dashboard
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import (
MetabaseConnection,
)
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source import SQLSourceStatus from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
from metadata.utils.helpers import get_dashboard_service_or_create, ingest_lineage from metadata.utils.helpers import get_dashboard_service_or_create, ingest_lineage
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
@ -44,22 +52,6 @@ HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
class MetabaseSourceConfig(ConfigModel):
"""Metabase pydantic config model"""
username: str
password: SecretStr
host_port: str
dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
service_name: str
service_type: str = DashboardServiceType.Metabase.value
db_service_name: Optional[str] = None
def get_connection_url(self):
"""get connection url (not implemented)"""
class MetabaseSource(Source[Entity]): class MetabaseSource(Source[Entity]):
"""Metabase entity class """Metabase entity class
@ -76,25 +68,27 @@ class MetabaseSource(Source[Entity]):
metric_charts: metric_charts:
""" """
config: MetabaseSourceConfig config: WorkflowSource
metadata_config: OpenMetadataServerConfig metadata_config: OpenMetadataServerConfig
status: SQLSourceStatus status: SQLSourceStatus
def __init__( def __init__(
self, self,
config: MetabaseSourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config = self.config.sourceConfig.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
params = {} params = dict()
params["username"] = self.config.username params["username"] = self.service_connection.username
params["password"] = self.config.password.get_secret_value() params["password"] = self.service_connection.password.get_secret_value()
try: try:
resp = requests.post( resp = requests.post(
self.config.host_port + "/api/session/", self.service_connection.hostPort + "/api/session/",
data=json.dumps(params), data=json.dumps(params),
headers=HEADERS, headers=HEADERS,
) )
@ -103,11 +97,9 @@ class MetabaseSource(Source[Entity]):
session_id = resp.json()["id"] session_id = resp.json()["id"]
self.metabase_session = {"X-Metabase-Session": session_id} self.metabase_session = {"X-Metabase-Session": session_id}
self.dashboard_service = get_dashboard_service_or_create( self.dashboard_service = get_dashboard_service_or_create(
config.service_name, config.serviceName,
DashboardServiceType.Metabase.name, DashboardServiceType.Metabase.name,
config.username, self.service_connection.dict(),
config.password.get_secret_value(),
config.host_port,
metadata_config, metadata_config,
) )
self.charts = [] self.charts = []
@ -123,7 +115,12 @@ class MetabaseSource(Source[Entity]):
Returns: Returns:
MetabaseSource MetabaseSource
""" """
config = MetabaseSourceConfig.parse_obj(config_dict) config = WorkflowSource.parse_obj(config_dict)
connection: MetabaseConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MetabaseConnection):
raise InvalidSourceException(
f"Expected MetabaseConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
@ -141,10 +138,14 @@ class MetabaseSource(Source[Entity]):
for chart in charts: for chart in charts:
try: try:
chart_details = chart["card"] chart_details = chart["card"]
if not "name" in chart_details: if not ("name" in chart_details):
continue continue
if not self.config.chart_pattern.included(chart_details["name"]): if filter_by_chart(
self.status.filter(chart_details["name"], None) self.source_config.chartFilterPattern, chart_details["name"]
):
self.status.filter(
chart_details["name"], "Chart Pattern not allowed"
)
continue continue
yield Chart( yield Chart(
id=uuid.uuid4(), id=uuid.uuid4(),
@ -154,7 +155,7 @@ class MetabaseSource(Source[Entity]):
if chart_details["description"] is not None if chart_details["description"] is not None
else "", else "",
chart_type=str(chart_details["display"]), chart_type=str(chart_details["display"]),
url=self.config.host_port, url=self.service_connection.hostPort,
service=EntityReference( service=EntityReference(
id=self.dashboard_service.id, type="dashboardService" id=self.dashboard_service.id, type="dashboardService"
), ),
@ -174,16 +175,18 @@ class MetabaseSource(Source[Entity]):
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
dashboard_details = resp_dashboard.json() dashboard_details = resp_dashboard.json()
self.charts = [] self.charts = []
if not self.config.dashboard_pattern.included( if filter_by_dashboard(
dashboard_details["name"] self.source_config.dashboardFilterPattern, dashboard_details["name"]
): ):
self.status.filter(dashboard_details["name"], None) self.status.filter(
dashboard_details["name"], "Dashboard Pattern not Allowed"
)
continue continue
yield from self.get_charts(dashboard_details["ordered_cards"]) yield from self.get_charts(dashboard_details["ordered_cards"])
yield Dashboard( yield Dashboard(
id=uuid.uuid4(), id=uuid.uuid4(),
name=dashboard_details["name"], name=dashboard_details["name"],
url=self.config.host_port, url=self.service_connection.hostPort,
displayName=dashboard_details["name"], displayName=dashboard_details["name"],
description=dashboard_details["description"] description=dashboard_details["description"]
if dashboard_details["description"] is not None if dashboard_details["description"] is not None
@ -193,7 +196,7 @@ class MetabaseSource(Source[Entity]):
id=self.dashboard_service.id, type="dashboardService" id=self.dashboard_service.id, type="dashboardService"
), ),
) )
if self.config.db_service_name: if self.service_connection.dbServiceName:
yield from self.get_lineage( yield from self.get_lineage(
dashboard_details["ordered_cards"], dashboard_details["name"] dashboard_details["ordered_cards"], dashboard_details["name"]
) )
@ -214,7 +217,7 @@ class MetabaseSource(Source[Entity]):
resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}") resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}")
if resp_tables.status_code == 200: if resp_tables.status_code == 200:
table = resp_tables.json() table = resp_tables.json()
table_fqdn = f"{self.config.db_service_name}.\ table_fqdn = f"{self.service_connection.dbServiceName}.\
{table['schema']}.{table['name']}" {table['schema']}.{table['name']}"
dashboard_fqdn = ( dashboard_fqdn = (
f"{self.dashboard_service.name}.{quote(dashboard_name)}" f"{self.dashboard_service.name}.{quote(dashboard_name)}"
@ -245,7 +248,9 @@ class MetabaseSource(Source[Entity]):
Args: Args:
path: path:
""" """
return requests.get(self.config.host_port + path, headers=self.metabase_session) return requests.get(
self.service_connection.hostPort + path, headers=self.metabase_session
)
def get_status(self) -> SourceStatus: def get_status(self) -> SourceStatus:
return self.status return self.status
@ -264,20 +269,18 @@ class MetabaseSource(Source[Entity]):
continue continue
card_detail_resp = self.req_get(f"/api/card/{card_details['id']}") card_detail_resp = self.req_get(f"/api/card/{card_details['id']}")
if card_detail_resp.status_code == 200: if card_detail_resp.status_code == 200:
card = card_detail_resp.json()
raw_query = ( raw_query = (
card_details.get("dataset_query", {}) card_details.get("dataset_query", {})
.get("native", {}) .get("native", {})
.get("query", "") .get("query", "")
) )
query_info = {
query_info = { "sql": raw_query,
"sql": raw_query, "from_type": "table",
"from_type": "table", "to_type": "table",
"to_type": "table", "service_name": self.config.serviceName,
"service_name": self.config.service_name, }
} ingest_lineage(query_info, self.metadata_config)
ingest_lineage(query_info, self.metadata_config)
except Exception as e: except Exception as e:
logger.error(repr(e)) logger.error(repr(e))

View File

@ -158,9 +158,12 @@ def get_dashboard_service_or_create(
if service is not None: if service is not None:
return service return service
else: else:
dashboard_config = {"config": config}
created_service = metadata.create_or_update( created_service = metadata.create_or_update(
CreateDashboardServiceRequest( CreateDashboardServiceRequest(
name=service_name, serviceType=dashboard_service_type, connection=config name=service_name,
serviceType=dashboard_service_type,
connection=dashboard_config,
) )
) )
return created_service return created_service