feat(ingest): adding superset ingestion source (#2425)

This commit is contained in:
Gabe Lyons 2021-04-22 00:11:54 -07:00 committed by GitHub
parent 3d6489fe97
commit c7b49de67b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 302 additions and 2 deletions

View File

@ -1,4 +1,5 @@
import lookerLogo from '../../../images/lookerlogo.png';
import supersetLogo from '../../../images/supersetlogo.png';
/**
* TODO: This is a temporary solution, until the backend can push logos for all data platform types.
@ -7,5 +8,8 @@ export function getLogoFromPlatform(platform: string) {
if (platform.toLowerCase() === 'looker') {
return lookerLogo;
}
if (platform.toLowerCase() === 'superset') {
return supersetLogo;
}
return undefined;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.1 KiB

View File

@ -135,7 +135,7 @@
"type" : "enum",
"name" : "ChartType",
"doc" : "The various types of charts",
"symbols" : [ "BAR", "PIE", "SCATTER", "TABLE", "TEXT" ],
"symbols" : [ "BAR", "PIE", "SCATTER", "TABLE", "TEXT", "LINE", "AREA", "HISTOGRAM", "BOX_PLOT" ],
"symbolDocs" : {
"BAR" : "Chart showing a Bar chart",
"PIE" : "Chart showing a Pie chart",

View File

@ -0,0 +1,12 @@
source:
type: "superset"
config:
username: admin
password: admin
provider: db
connect_uri: http://localhost:8088/
sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'

View File

@ -73,6 +73,7 @@ plugins: Dict[str, Set[str]] = {
"ldap": {"python-ldap>=2.4"},
"druid": sql_common | {"pydruid>=0.6.2"},
"mongodb": {"pymongo>=3.11"},
"superset": {"requests"},
"glue": {"boto3"},
}
@ -180,6 +181,7 @@ setuptools.setup(
"oracle = datahub.ingestion.source.oracle:OracleSource",
"postgres = datahub.ingestion.source.postgres:PostgresSource",
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",

View File

@ -0,0 +1,274 @@
import json
from functools import lru_cache
from typing import Iterable, Optional
import dateutil.parser as dp
import requests
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
ChartSnapshot,
DashboardSnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import ChartInfoClass, DashboardInfoClass
PAGE_SIZE = 25
class SupersetConfig(ConfigModel):
# See the Superset /security/login endpoint for details
# https://superset.apache.org/docs/rest-api
connect_uri: str = "localhost:8088"
username: Optional[str] = None
password: Optional[str] = None
provider: str = "db"
options: dict = {}
def get_platform_from_sqlalchemy_uri(sqlalchemy_uri: str) -> str:
if sqlalchemy_uri.startswith("bigquery"):
return "bigquery"
if sqlalchemy_uri.startswith("druid"):
return "druid"
if sqlalchemy_uri.startswith("mssql"):
return "mssql"
if (
sqlalchemy_uri.startswith("jdbc:postgres:")
and sqlalchemy_uri.index("redshift.amazonaws") > 0
):
return "redshift"
if sqlalchemy_uri.startswith("snowflake"):
return "snowflake"
if sqlalchemy_uri.startswith("presto"):
return "presto"
if sqlalchemy_uri.startswith("postgresql"):
return "postgres"
if sqlalchemy_uri.startswith("pinot"):
return "pinot"
if sqlalchemy_uri.startswith("oracle"):
return "oracle"
if sqlalchemy_uri.startswith("mysql"):
return "mysql"
if sqlalchemy_uri.startswith("mongodb"):
return "mongo"
if sqlalchemy_uri.startswith("hive"):
return "hive"
return "external"
class SupersetSource(Source):
config: SupersetConfig
report: SourceReport
env = "PROD"
platform = "superset"
def __hash__(self):
return id(self)
def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(ctx)
self.config = config
self.report = SourceReport()
login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
None,
{
"username": self.config.username,
"password": self.config.password,
"refresh": True,
"provider": self.config.provider,
},
)
self.access_token = login_response.json()["access_token"]
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/api/v1/database")
if test_response.status_code == 200:
pass
# TODO(Gabe): how should we message about this error?
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = SupersetConfig.parse_obj(config_dict)
return cls(ctx, config)
@lru_cache(maxsize=None)
def get_platform_from_database_id(self, database_id):
database_response = self.session.get(
f"{self.config.connect_uri}/api/v1/database/{database_id}"
).json()
sqlalchemy_uri = database_response.get("result", {}).get("sqlalchemy_uri")
return get_platform_from_sqlalchemy_uri(sqlalchemy_uri)
@lru_cache(maxsize=None)
def get_datasource_urn_from_id(self, datasource_id):
dataset_response = self.session.get(
f"{self.config.connect_uri}/api/v1/dataset/{datasource_id}"
).json()
schema_name = dataset_response.get("result", {}).get("schema")
table_name = dataset_response.get("result", {}).get("table_name")
database_id = dataset_response.get("result", {}).get("database", {}).get("id")
database_name = (
dataset_response.get("result", {}).get("database", {}).get("database_name")
)
if database_id and table_name:
platform = self.get_platform_from_database_id(database_id)
platform_urn = f"urn:li:dataPlatform:{platform}"
dataset_urn = (
f"urn:li:dataset:("
f"{platform_urn},{database_name + '.' if database_name else ''}"
f"{schema_name + '.' if schema_name else ''}"
f"{table_name},{self.env})"
)
return dataset_urn
return None
def construct_dashboard_from_api_data(self, dashboard_data):
dashboard_urn = f"urn:li:dashboard:({self.platform},{dashboard_data['id']})"
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
)
modified_actor = f"urn:li:corpuser:{(dashboard_data.get('changed_by') or {}).get('username', 'unknown')}"
modified_ts = int(
dp.parse(dashboard_data.get("changed_on_utc", "now")).timestamp()
)
title = dashboard_data.get("dashboard_title", "")
# note: the API does not currently supply created_by usernames due to a bug, but we are required to
# provide a created AuditStamp to comply with ChangeAuditStamp model. For now, I sub in the last
# modified actor urn
last_modified = ChangeAuditStamps(
created=AuditStamp(time=modified_ts, actor=modified_actor),
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
)
dashboard_url = f"{self.config.connect_uri[:-1]}{dashboard_data.get('url', '')}"
chart_urns = []
raw_position_data = dashboard_data.get("position_json", "{}")
position_data = json.loads(raw_position_data)
for key, value in position_data.items():
if not key.startswith("CHART-"):
continue
chart_urns.append(
f"urn:li:chart:({self.platform},{value.get('meta', {}).get('chartId', 'unknown')})"
)
dashboard_info = DashboardInfoClass(
description="",
title=title,
charts=chart_urns,
lastModified=last_modified,
dashboardUrl=dashboard_url,
)
dashboard_snapshot.aspects.append(dashboard_info)
return dashboard_snapshot
def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
current_dashboard_page = 0
# we will set total dashboards to the actual number after we get the response
total_dashboards = PAGE_SIZE
while current_dashboard_page * PAGE_SIZE <= total_dashboards:
dashboard_response = self.session.get(
f"{self.config.connect_uri}/api/v1/dashboard",
params=f"q=(page:{current_dashboard_page},page_size:{PAGE_SIZE})",
)
payload = dashboard_response.json()
total_dashboards = payload.get("count") or 0
current_dashboard_page += 1
payload = dashboard_response.json()
for dashboard_data in payload["result"]:
dashboard_snapshot = self.construct_dashboard_from_api_data(
dashboard_data
)
mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
wu = MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce)
self.report.report_workunit(wu)
yield wu
def construct_chart_from_chart_data(self, chart_data):
chart_urn = f"urn:li:chart:({self.platform},{chart_data['id']})"
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[],
)
modified_actor = f"urn:li:corpuser:{(chart_data.get('changed_by') or {}).get('username', 'unknown')}"
modified_ts = int(dp.parse(chart_data.get("changed_on_utc", "now")).timestamp())
title = chart_data.get("slice_name", "")
# note: the API does not currently supply created_by usernames due to a bug, but we are required to
# provide a created AuditStamp to comply with ChangeAuditStamp model. For now, I sub in the last
# modified actor urn
last_modified = ChangeAuditStamps(
created=AuditStamp(time=modified_ts, actor=modified_actor),
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
)
chart_url = f"{self.config.connect_uri[:-1]}{chart_data.get('url', '')}"
datasource_id = chart_data.get("datasource_id")
datasource_urn = self.get_datasource_urn_from_id(datasource_id)
chart_info = ChartInfoClass(
description="",
title=title,
lastModified=last_modified,
chartUrl=chart_url,
inputs=[datasource_urn] if datasource_urn else None,
)
chart_snapshot.aspects.append(chart_info)
return chart_snapshot
def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
current_chart_page = 0
# we will set total charts to the actual number after we get the response
total_charts = PAGE_SIZE
while current_chart_page * PAGE_SIZE <= total_charts:
chart_response = self.session.get(
f"{self.config.connect_uri}/api/v1/chart",
params=f"q=(page:{current_chart_page},page_size:{PAGE_SIZE})",
)
current_chart_page += 1
payload = chart_response.json()
total_charts = payload["count"]
for chart_data in payload["result"]:
chart_snapshot = self.construct_chart_from_chart_data(chart_data)
mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot)
wu = MetadataWorkUnit(id=chart_snapshot.urn, mce=mce)
self.report.report_workunit(wu)
yield wu
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_dashboard_mces()
yield from self.emit_chart_mces()
def get_report(self) -> SourceReport:
return self.report

View File

@ -29,4 +29,12 @@ enum ChartType {
* Chart showing Markdown formatted text
*/
TEXT
}
LINE
AREA
HISTOGRAM
BOX_PLOT
}