mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-27 10:04:52 +00:00
fix(ingest): tableau - fix tableau db error, add more logs (#5423)
This commit is contained in:
parent
14d764a26f
commit
fa46167dfe
@ -380,18 +380,27 @@ class TableauSource(Source):
|
|||||||
return upstream_tables
|
return upstream_tables
|
||||||
|
|
||||||
for table in datasource.get("upstreamTables", []):
|
for table in datasource.get("upstreamTables", []):
|
||||||
# skip upstream tables when there is no column info when retrieving embedded datasource
|
# skip upstream tables when there is no column info when retrieving datasource
|
||||||
# and when table name is None
|
# Lineage and Schema details for these will be taken care in self.emit_custom_sql_datasources()
|
||||||
# Schema details for these will be taken care in self.emit_custom_sql_ds()
|
|
||||||
if not is_custom_sql and not table.get("columns"):
|
if not is_custom_sql and not table.get("columns"):
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping upstream table with id {table['id']}, no columns"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
elif table["name"] is None:
|
elif table["name"] is None:
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping upstream table {table['id']} from lineage since its name is none"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
schema = table.get("schema", "")
|
schema = table.get("schema", "")
|
||||||
table_name = table.get("name", "")
|
table_name = table.get("name", "")
|
||||||
full_name = table.get("fullName", "")
|
full_name = table.get("fullName", "")
|
||||||
upstream_db = table.get("database", {}).get("name", "")
|
upstream_db = (
|
||||||
|
table.get("database", {}).get("name", "")
|
||||||
|
if table.get("database") is not None
|
||||||
|
else ""
|
||||||
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Processing Table with Connection Type: {0} and id {1}".format(
|
"Processing Table with Connection Type: {0} and id {1}".format(
|
||||||
table.get("connectionType", ""), table.get("id", "")
|
table.get("connectionType", ""), table.get("id", "")
|
||||||
@ -406,6 +415,9 @@ class TableauSource(Source):
|
|||||||
and table_name == full_name
|
and table_name == full_name
|
||||||
and schema in table_name
|
and schema in table_name
|
||||||
):
|
):
|
||||||
|
logger.debug(
|
||||||
|
f"Omitting schema for upstream table {table['id']}, schema included in table name"
|
||||||
|
)
|
||||||
schema = ""
|
schema = ""
|
||||||
table_urn = make_table_urn(
|
table_urn = make_table_urn(
|
||||||
self.config.env,
|
self.config.env,
|
||||||
@ -555,6 +567,9 @@ class TableauSource(Source):
|
|||||||
# Datasource fields
|
# Datasource fields
|
||||||
|
|
||||||
if field.get("name") is None:
|
if field.get("name") is None:
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping field {field['id']} from schema since its name is none"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
nativeDataType = field.get("remoteType", "UNKNOWN")
|
nativeDataType = field.get("remoteType", "UNKNOWN")
|
||||||
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
|
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
|
||||||
@ -635,6 +650,9 @@ class TableauSource(Source):
|
|||||||
# check datasource - custom sql relations from a field being referenced
|
# check datasource - custom sql relations from a field being referenced
|
||||||
self._track_custom_sql_ids(field)
|
self._track_custom_sql_ids(field)
|
||||||
if field.get("name") is None:
|
if field.get("name") is None:
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping field {field['id']} from schema since its name is none"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
nativeDataType = field.get("dataType", "UNKNOWN")
|
nativeDataType = field.get("dataType", "UNKNOWN")
|
||||||
@ -843,7 +861,7 @@ class TableauSource(Source):
|
|||||||
def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
|
def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
|
||||||
for (table_urn, (columns, path, is_embedded)) in self.upstream_tables.items():
|
for (table_urn, (columns, path, is_embedded)) in self.upstream_tables.items():
|
||||||
if not is_embedded and not self.config.ingest_tables_external:
|
if not is_embedded and not self.config.ingest_tables_external:
|
||||||
logger.error(
|
logger.debug(
|
||||||
f"Skipping external table {table_urn} as ingest_tables_external is set to False"
|
f"Skipping external table {table_urn} as ingest_tables_external is set to False"
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
@ -865,6 +883,9 @@ class TableauSource(Source):
|
|||||||
fields = []
|
fields = []
|
||||||
for field in columns:
|
for field in columns:
|
||||||
if field.get("name") is None:
|
if field.get("name") is None:
|
||||||
|
logger.warning(
|
||||||
|
f"Skipping field {field['id']} from schema since its name is none"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
nativeDataType = field.get("remoteType", "UNKNOWN")
|
nativeDataType = field.get("remoteType", "UNKNOWN")
|
||||||
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
|
TypeClass = FIELD_TYPE_MAPPING.get(nativeDataType, NullTypeClass)
|
||||||
@ -907,7 +928,7 @@ class TableauSource(Source):
|
|||||||
aspects=[],
|
aspects=[],
|
||||||
)
|
)
|
||||||
|
|
||||||
creator = workbook.get("owner", {}).get("username", "")
|
creator: Optional[str] = workbook["owner"].get("username")
|
||||||
created_at = sheet.get("createdAt", datetime.now())
|
created_at = sheet.get("createdAt", datetime.now())
|
||||||
updated_at = sheet.get("updatedAt", datetime.now())
|
updated_at = sheet.get("updatedAt", datetime.now())
|
||||||
last_modified = self.get_last_modified(creator, created_at, updated_at)
|
last_modified = self.get_last_modified(creator, created_at, updated_at)
|
||||||
@ -940,8 +961,6 @@ class TableauSource(Source):
|
|||||||
data_sources = self.get_sheetwise_upstream_datasources(sheet)
|
data_sources = self.get_sheetwise_upstream_datasources(sheet)
|
||||||
|
|
||||||
for ds_id in data_sources:
|
for ds_id in data_sources:
|
||||||
if ds_id is None or not ds_id:
|
|
||||||
continue
|
|
||||||
ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env)
|
ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env)
|
||||||
datasource_urn.append(ds_urn)
|
datasource_urn.append(ds_urn)
|
||||||
if ds_id not in self.datasource_ids_being_used:
|
if ds_id not in self.datasource_ids_being_used:
|
||||||
@ -1136,7 +1155,7 @@ class TableauSource(Source):
|
|||||||
|
|
||||||
@lru_cache(maxsize=None)
|
@lru_cache(maxsize=None)
|
||||||
def get_last_modified(
|
def get_last_modified(
|
||||||
self, creator: str, created_at: bytes, updated_at: bytes
|
self, creator: Optional[str], created_at: bytes, updated_at: bytes
|
||||||
) -> ChangeAuditStamps:
|
) -> ChangeAuditStamps:
|
||||||
last_modified = ChangeAuditStamps()
|
last_modified = ChangeAuditStamps()
|
||||||
if creator:
|
if creator:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user