mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-18 12:23:34 +00:00
fix(redash): fix bug with names, add option for page size, debugging info (#5045)
This commit is contained in:
parent
1a31f7888a
commit
08a5fcfd01
@ -42,8 +42,6 @@ from datahub.utilities.sql_parser import SQLParser
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
PAGE_SIZE = 25
|
|
||||||
|
|
||||||
DEFAULT_DATA_SOURCE_PLATFORM = "external"
|
DEFAULT_DATA_SOURCE_PLATFORM = "external"
|
||||||
DEFAULT_DATA_BASE_NAME = "default"
|
DEFAULT_DATA_BASE_NAME = "default"
|
||||||
|
|
||||||
@ -195,8 +193,9 @@ class BigqueryQualifiedNameParser(QualifiedNameParser):
|
|||||||
def get_full_qualified_name(self, database_name: str, table_name: str) -> str:
|
def get_full_qualified_name(self, database_name: str, table_name: str) -> str:
|
||||||
self.get_segments(table_name)
|
self.get_segments(table_name)
|
||||||
_database_name = self.segments_dict.get("database_name") or database_name
|
_database_name = self.segments_dict.get("database_name") or database_name
|
||||||
_schema_name = self.segments_dict.get("schema_name")
|
# We want to throw error if either of these is None
|
||||||
_table_name = self.segments_dict.get("table_name")
|
_schema_name = self.segments_dict["schema_name"]
|
||||||
|
_table_name = self.segments_dict["table_name"]
|
||||||
|
|
||||||
return f"{_database_name}{self.split_char}{_schema_name}{self.split_char}{_table_name}"
|
return f"{_database_name}{self.split_char}{_schema_name}{self.split_char}{_table_name}"
|
||||||
|
|
||||||
@ -247,9 +246,12 @@ class RedashConfig(ConfigModel):
|
|||||||
skip_draft: bool = Field(
|
skip_draft: bool = Field(
|
||||||
default=True, description="Only ingest published dashboards and charts."
|
default=True, description="Only ingest published dashboards and charts."
|
||||||
)
|
)
|
||||||
|
page_size: int = Field(
|
||||||
|
default=25, description="Limit on number of items to be queried at once."
|
||||||
|
)
|
||||||
api_page_limit: int = Field(
|
api_page_limit: int = Field(
|
||||||
default=sys.maxsize,
|
default=sys.maxsize,
|
||||||
description="Limit on ingested dashboards and charts API pagination.",
|
description="Limit on number of pages queried for ingesting dashboards and charts API during pagination.",
|
||||||
)
|
)
|
||||||
parse_table_names_from_sql: bool = Field(
|
parse_table_names_from_sql: bool = Field(
|
||||||
default=False, description="See note below."
|
default=False, description="See note below."
|
||||||
@ -270,6 +272,17 @@ class RedashSourceReport(SourceReport):
|
|||||||
items_scanned: int = 0
|
items_scanned: int = 0
|
||||||
filtered: List[str] = field(default_factory=list)
|
filtered: List[str] = field(default_factory=list)
|
||||||
queries_no_dataset: Set[str] = field(default_factory=set)
|
queries_no_dataset: Set[str] = field(default_factory=set)
|
||||||
|
total_queries: Optional[int] = field(
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
page_reached_queries: Optional[int] = field(default=None)
|
||||||
|
max_page_queries: Optional[int] = field(default=None)
|
||||||
|
total_dashboards: Optional[int] = field(
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
page_reached_dashboards: Optional[int] = field(default=None)
|
||||||
|
max_page_dashboards: Optional[int] = field(default=None)
|
||||||
|
api_page_limit: Optional[float] = field(default=None)
|
||||||
|
|
||||||
def report_item_scanned(self) -> None:
|
def report_item_scanned(self) -> None:
|
||||||
self.items_scanned += 1
|
self.items_scanned += 1
|
||||||
@ -289,14 +302,12 @@ class RedashSource(Source):
|
|||||||
- Redash chart table lineages (disabled by default)
|
- Redash chart table lineages (disabled by default)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
config: RedashConfig
|
|
||||||
report: RedashSourceReport
|
|
||||||
platform = "redash"
|
platform = "redash"
|
||||||
|
|
||||||
def __init__(self, ctx: PipelineContext, config: RedashConfig):
|
def __init__(self, ctx: PipelineContext, config: RedashConfig):
|
||||||
super().__init__(ctx)
|
super().__init__(ctx)
|
||||||
self.config = config
|
self.config: RedashConfig = config
|
||||||
self.report = RedashSourceReport()
|
self.report: RedashSourceReport = RedashSourceReport()
|
||||||
|
|
||||||
# Handle trailing slash removal
|
# Handle trailing slash removal
|
||||||
self.config.connect_uri = self.config.connect_uri.strip("/")
|
self.config.connect_uri = self.config.connect_uri.strip("/")
|
||||||
@ -543,19 +554,22 @@ class RedashSource(Source):
|
|||||||
skip_draft = self.config.skip_draft
|
skip_draft = self.config.skip_draft
|
||||||
|
|
||||||
# Get total number of dashboards to calculate maximum page number
|
# Get total number of dashboards to calculate maximum page number
|
||||||
dashboards_response = self.client.dashboards(1, PAGE_SIZE)
|
dashboards_response = self.client.dashboards(1, self.config.page_size)
|
||||||
total_dashboards = dashboards_response["count"]
|
total_dashboards = dashboards_response["count"]
|
||||||
max_page = total_dashboards // PAGE_SIZE
|
max_page = math.ceil(total_dashboards / self.config.page_size)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"/api/dashboards total count {total_dashboards} and max page {max_page}"
|
f"/api/dashboards total count {total_dashboards} and max page {max_page}"
|
||||||
)
|
)
|
||||||
|
self.report.total_dashboards = total_dashboards
|
||||||
|
self.report.max_page_dashboards = max_page
|
||||||
|
|
||||||
while (
|
while (
|
||||||
current_dashboards_page <= max_page
|
current_dashboards_page <= max_page
|
||||||
and current_dashboards_page <= self.api_page_limit
|
and current_dashboards_page <= self.api_page_limit
|
||||||
):
|
):
|
||||||
|
self.report.page_reached_dashboards = current_dashboards_page
|
||||||
dashboards_response = self.client.dashboards(
|
dashboards_response = self.client.dashboards(
|
||||||
page=current_dashboards_page, page_size=PAGE_SIZE
|
page=current_dashboards_page, page_size=self.config.page_size
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@ -679,21 +693,24 @@ class RedashSource(Source):
|
|||||||
return chart_snapshot
|
return chart_snapshot
|
||||||
|
|
||||||
def _emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
|
def _emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
|
||||||
current_queries_page = 1
|
current_queries_page: int = 1
|
||||||
skip_draft = self.config.skip_draft
|
skip_draft = self.config.skip_draft
|
||||||
|
|
||||||
# Get total number of queries to calculate maximum page number
|
# Get total number of queries to calculate maximum page number
|
||||||
_queries_response = self.client.queries(1, PAGE_SIZE)
|
_queries_response = self.client.queries(1, self.config.page_size)
|
||||||
total_queries = _queries_response["count"]
|
total_queries = _queries_response["count"]
|
||||||
max_page = total_queries // PAGE_SIZE
|
max_page = math.ceil(total_queries / self.config.page_size)
|
||||||
logger.info(f"/api/queries total count {total_queries} and max page {max_page}")
|
logger.info(f"/api/queries total count {total_queries} and max page {max_page}")
|
||||||
|
self.report.total_queries = total_queries
|
||||||
|
self.report.max_page_queries = max_page
|
||||||
|
|
||||||
while (
|
while (
|
||||||
current_queries_page <= max_page
|
current_queries_page <= max_page
|
||||||
and current_queries_page <= self.api_page_limit
|
and current_queries_page <= self.api_page_limit
|
||||||
):
|
):
|
||||||
|
self.report.page_reached_queries = current_queries_page
|
||||||
queries_response = self.client.queries(
|
queries_response = self.client.queries(
|
||||||
page=current_queries_page, page_size=PAGE_SIZE
|
page=current_queries_page, page_size=self.config.page_size
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"/api/queries on page {current_queries_page} / {max_page}")
|
logger.info(f"/api/queries on page {current_queries_page} / {max_page}")
|
||||||
@ -725,8 +742,12 @@ class RedashSource(Source):
|
|||||||
|
|
||||||
yield wu
|
yield wu
|
||||||
|
|
||||||
|
def add_config_to_report(self) -> None:
|
||||||
|
self.report.api_page_limit = self.config.api_page_limit
|
||||||
|
|
||||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||||
self.test_connection()
|
self.test_connection()
|
||||||
|
self.add_config_to_report()
|
||||||
yield from self._emit_chart_mces()
|
yield from self._emit_chart_mces()
|
||||||
yield from self._emit_dashboard_mces()
|
yield from self._emit_dashboard_mces()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user