mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-02 13:53:06 +00:00
feat(tableau): use pagination for all connection queries (#5204)
This commit is contained in:
parent
baf3f3f33c
commit
022ef2f17c
@ -44,7 +44,7 @@ Workbooks from Tableau are ingested as Container in datahub. <br/>
|
||||
- GraphQL query <br/>
|
||||
```graphql
|
||||
{
|
||||
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
|
||||
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
|
||||
nodes {
|
||||
id
|
||||
name
|
||||
@ -73,7 +73,7 @@ Dashboards from Tableau are ingested as Dashboard in datahub. <br/>
|
||||
- GraphQL query <br/>
|
||||
```graphql
|
||||
{
|
||||
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
|
||||
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default", "Project 2"]}) {
|
||||
nodes {
|
||||
.....
|
||||
dashboards {
|
||||
@ -185,7 +185,7 @@ Embedded Data source from Tableau is ingested as a Dataset in datahub.
|
||||
- GraphQL query <br/>
|
||||
```graphql
|
||||
{
|
||||
workbooksConnection(first: 15, offset: 0, filter: {projectNameWithin: ["default"]}) {
|
||||
workbooksConnection(first: 10, offset: 0, filter: {projectNameWithin: ["default"]}) {
|
||||
nodes {
|
||||
....
|
||||
embeddedDatasources {
|
||||
@ -265,7 +265,7 @@ Published Data source from Tableau is ingested as a Dataset in datahub.
|
||||
- GraphQL query <br/>
|
||||
```graphql
|
||||
{
|
||||
publishedDatasourcesConnection(filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
|
||||
publishedDatasourcesConnection(first: 10, offset: 0, filter: {idWithin: ["00cce29f-b561-bb41-3557-8e19660bb5dd", "618c87db-5959-338b-bcc7-6f5f4cc0b6c6"]}) {
|
||||
nodes {
|
||||
__typename
|
||||
id
|
||||
@ -343,7 +343,7 @@ For custom sql data sources, the query is viewable in UI under View Definition t
|
||||
- GraphQL query <br/>
|
||||
```graphql
|
||||
{
|
||||
customSQLTablesConnection(filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
|
||||
customSQLTablesConnection(first: 10, offset: 0, filter: {idWithin: ["22b0b4c3-6b85-713d-a161-5a87fdd78f40"]}) {
|
||||
nodes {
|
||||
id
|
||||
name
|
||||
@ -408,8 +408,8 @@ Lineage is emitted as received from Tableau's metadata API for
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Why are only some workbooks ingested from the specified project?
|
||||
### Why are only some workbooks/custom SQLs/published datasources ingested from the specified project?
|
||||
|
||||
This may happen when the Tableau API returns NODE_LIMIT_EXCEEDED error in response to metadata query and returns partial results with message "Showing partial results. , The request exceeded the ‘n’ node limit. Use pagination, additional filtering, or both in the query to adjust results." To resolve this, consider
|
||||
- reducing the page size using the `workbooks_page_size` config param in datahub recipe (Defaults to 10).
|
||||
- reducing the page size using the `page_size` config param in datahub recipe (Defaults to 10).
|
||||
- increasing tableau configuration [metadata query node limit](https://help.tableau.com/current/server/en-us/cli_configuration-set_tsm.htm#metadata_nodelimit) to higher value.
|
@ -5,7 +5,7 @@ from functools import lru_cache
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import dateutil.parser as dp
|
||||
from pydantic import validator
|
||||
from pydantic import root_validator, validator
|
||||
from pydantic.fields import Field
|
||||
from tableauserverclient import (
|
||||
PersonalAccessTokenAuth,
|
||||
@ -132,10 +132,16 @@ class TableauConfig(ConfigModel):
|
||||
description="Ingest details for tables external to (not embedded in) tableau as entities.",
|
||||
)
|
||||
|
||||
workbooks_page_size: int = Field(
|
||||
default=10,
|
||||
description="Number of workbooks to query at a time using Tableau api.",
|
||||
workbooks_page_size: Optional[int] = Field(
|
||||
default=None,
|
||||
description="@deprecated(use page_size instead) Number of workbooks to query at a time using Tableau api.",
|
||||
)
|
||||
|
||||
page_size: int = Field(
|
||||
default=10,
|
||||
description="Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using Tableau api.",
|
||||
)
|
||||
|
||||
env: str = Field(
|
||||
default=builder.DEFAULT_ENV,
|
||||
description="Environment to use in namespace when constructing URNs.",
|
||||
@ -145,6 +151,17 @@ class TableauConfig(ConfigModel):
|
||||
def remove_trailing_slash(cls, v):
|
||||
return config_clean.remove_trailing_slashes(v)
|
||||
|
||||
@root_validator()
|
||||
def show_warning_for_deprecated_config_field(
|
||||
cls, values: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
if values.get("workbooks_page_size") is not None:
|
||||
logger.warn(
|
||||
"Config workbooks_page_size is deprecated. Please use config page_size instead."
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
|
||||
class WorkbookKey(PlatformKey):
|
||||
workbook_id: str
|
||||
@ -247,6 +264,9 @@ class TableauSource(Source):
|
||||
count: int = 0,
|
||||
current_count: int = 0,
|
||||
) -> Tuple[dict, int, int]:
|
||||
logger.debug(
|
||||
f"Query {connection_type} to get {count} objects with offset {current_count}"
|
||||
)
|
||||
query_data = query_metadata(
|
||||
self.server, query, connection_type, count, current_count, query_filter
|
||||
)
|
||||
@ -267,7 +287,12 @@ class TableauSource(Source):
|
||||
has_next_page = connection_object.get("pageInfo", {}).get("hasNextPage", False)
|
||||
return connection_object, total_count, has_next_page
|
||||
|
||||
def emit_workbooks(self, workbooks_page_size: int) -> Iterable[MetadataWorkUnit]:
|
||||
def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
|
||||
count_on_query = (
|
||||
self.config.page_size
|
||||
if self.config.workbooks_page_size is None
|
||||
else self.config.workbooks_page_size
|
||||
)
|
||||
|
||||
projects = (
|
||||
f"projectNameWithin: {json.dumps(self.config.projects)}"
|
||||
@ -282,8 +307,8 @@ class TableauSource(Source):
|
||||
current_count = 0
|
||||
while has_next_page:
|
||||
count = (
|
||||
workbooks_page_size
|
||||
if current_count + workbooks_page_size < total_count
|
||||
count_on_query
|
||||
if current_count + count_on_query < total_count
|
||||
else total_count - current_count
|
||||
)
|
||||
(
|
||||
@ -410,7 +435,7 @@ class TableauSource(Source):
|
||||
return upstream_tables
|
||||
|
||||
def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
|
||||
count_on_query = len(self.custom_sql_ids_being_used)
|
||||
count_on_query = self.config.page_size
|
||||
custom_sql_filter = "idWithin: {}".format(
|
||||
json.dumps(self.custom_sql_ids_being_used)
|
||||
)
|
||||
@ -779,7 +804,7 @@ class TableauSource(Source):
|
||||
)
|
||||
|
||||
def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
|
||||
count_on_query = len(self.datasource_ids_being_used)
|
||||
count_on_query = self.config.page_size
|
||||
datasource_filter = "idWithin: {}".format(
|
||||
json.dumps(self.datasource_ids_being_used)
|
||||
)
|
||||
@ -1148,7 +1173,7 @@ class TableauSource(Source):
|
||||
if self.server is None or not self.server.is_signed_in():
|
||||
return
|
||||
try:
|
||||
yield from self.emit_workbooks(self.config.workbooks_page_size)
|
||||
yield from self.emit_workbooks()
|
||||
if self.datasource_ids_being_used:
|
||||
yield from self.emit_published_datasources()
|
||||
if self.custom_sql_ids_being_used:
|
||||
|
Loading…
x
Reference in New Issue
Block a user