fix(redash): improve logging for debugging, add validation for dataset urn, some refactoring (#4997)

This commit is contained in:
Aseem Bansal 2022-05-26 16:24:54 +05:30 committed by GitHub
parent 92338c7912
commit 21f8d4be03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 31 deletions

View File

@ -34,7 +34,6 @@ from datahub.metadata.schema_classes import (
SchemaMetadataClass,
StringTypeClass,
SubTypesClass,
SystemMetadataClass,
TagAssociationClass,
)
@ -43,6 +42,7 @@ logger = logging.getLogger(__name__)
# TODO: Support generating docs for each event type in entity registry.
def capitalize_first(something: str) -> str:
return something[0:1].upper() + something[1:]
@ -89,10 +89,12 @@ class AspectDefinition:
schema: Optional[avro.schema.Schema] = None
type: Optional[str] = None
@dataclass
class EventDefinition:
name: str
entity_registry: Dict[str, EntityDefinition] = {}
@ -386,7 +388,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]:
)
foreign_keys: List[ForeignKeyConstraintClass] = []
source_dataset_urn = make_dataset_urn(
platform=make_data_platform_urn("datahub"),
platform="datahub",
name=f"{entity_display_name}",
)
for f_field in schema_fields:
@ -444,7 +446,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]:
destination_entity_name = capitalize_first(entity_type)
foreign_dataset_urn = make_dataset_urn(
platform=make_data_platform_urn("datahub"),
platform="datahub",
name=destination_entity_name,
)
fkey = ForeignKeyConstraintClass(
@ -478,7 +480,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]:
dataset = DatasetSnapshotClass(
urn=make_dataset_urn(
platform=make_data_platform_urn("datahub"),
platform="datahub",
name=f"{entity_display_name}",
),
aspects=[

View File

@ -33,6 +33,7 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
@ -67,9 +68,9 @@ def make_data_platform_urn(platform: str) -> str:
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
if DATASET_URN_TO_LOWER:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})"
return make_dataset_urn_with_platform_instance(
platform=platform, name=name, platform_instance=None, env=env
)
def make_dataplatform_instance_urn(platform: str, instance: str) -> str:
@ -82,12 +83,16 @@ def make_dataplatform_instance_urn(platform: str, instance: str) -> str:
def make_dataset_urn_with_platform_instance(
platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV
) -> str:
if platform_instance:
if DATASET_URN_TO_LOWER:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})"
else:
return make_dataset_urn(platform=platform, name=name, env=env)
if DATASET_URN_TO_LOWER:
name = name.lower()
return str(
DatasetUrn.create_from_ids(
platform_id=platform,
table_name=name,
env=env,
platform_instance=platform_instance,
)
)
def make_schema_field_urn(parent_urn: str, field_path: str) -> str:

View File

@ -193,7 +193,7 @@ class NifiProcessorProvenanceEventAnalyzer:
s3_url = f"s3://{s3_bucket}/{s3_key}"
s3_url = s3_url[: s3_url.rindex("/")]
dataset_name = s3_url.replace("s3://", "").replace("/", ".")
platform = "urn:li:dataPlatform:s3"
platform = "s3"
dataset_urn = builder.make_dataset_urn(platform, dataset_name, self.env)
return ExternalDataset(
platform,

View File

@ -332,11 +332,14 @@ class RedashSource(Source):
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
)
def error(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_failure(key, reason)
log.error(f"{key} => {reason}")
def test_connection(self) -> None:
test_response = self.client._get(f"{self.config.connect_uri}/api")
if test_response.status_code == 200:
logger.info("Redash API connected succesfully")
pass
else:
raise ValueError(f"Failed to connect to {self.config.connect_uri}/api")
@ -349,9 +352,6 @@ class RedashSource(Source):
def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
assert "." in sql_parser_path, "sql_parser-path must contain a ."
module_name, cls_name = sql_parser_path.rsplit(".", 1)
import sys
logger.debug(sys.path)
parser_cls = getattr(importlib.import_module(module_name), cls_name)
if not issubclass(parser_cls, SQLParser):
raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
@ -421,6 +421,8 @@ class RedashSource(Source):
platform = self._get_platform_based_on_datasource(data_source)
database_name = self._get_database_name_based_on_datasource(data_source)
data_source_syntax = data_source.get("syntax")
data_source_id = data_source.get("id")
query_id = sql_query_data.get("id")
if database_name:
query = sql_query_data.get("query", "")
@ -439,8 +441,11 @@ class RedashSource(Source):
)
)
except Exception as e:
logger.error(e)
logger.error(query)
self.error(
logger,
f"sql-parsing-query-{query_id}-datasource-{data_source_id}",
f"exception {e} in parsing {query}",
)
# make sure dataset_urns is not empty list
return dataset_urns if len(dataset_urns) > 0 else None
@ -540,6 +545,9 @@ class RedashSource(Source):
dashboards_response = self.client.dashboards(1, PAGE_SIZE)
total_dashboards = dashboards_response["count"]
max_page = total_dashboards // PAGE_SIZE
logger.info(
f"/api/dashboards total count {total_dashboards} and max page {max_page}"
)
while (
current_dashboards_page <= max_page
@ -549,7 +557,9 @@ class RedashSource(Source):
page=current_dashboards_page, page_size=PAGE_SIZE
)
logger.info(f"/api/dashboards on page {current_dashboards_page}")
logger.info(
f"/api/dashboards on page {current_dashboards_page} / {max_page}"
)
current_dashboards_page += 1
@ -649,7 +659,7 @@ class RedashSource(Source):
if datasource_urns is None:
self.report.report_warning(
key=f"redash-chart-{viz_id}",
key=f"redash-chart-{viz_id}-datasource-{data_source_id}",
reason=f"data_source_type={data_source_type} not yet implemented. Setting inputs to None",
)
@ -673,6 +683,7 @@ class RedashSource(Source):
_queries_response = self.client.queries(1, PAGE_SIZE)
total_queries = _queries_response["count"]
max_page = total_queries // PAGE_SIZE
logger.info(f"/api/queries total count {total_queries} and max page {max_page}")
while (
current_queries_page <= max_page
@ -682,7 +693,7 @@ class RedashSource(Source):
page=current_queries_page, page_size=PAGE_SIZE
)
logger.info(f"/api/queries on page {current_queries_page}")
logger.info(f"/api/queries on page {current_queries_page} / {max_page}")
current_queries_page += 1

View File

@ -1,4 +1,4 @@
from typing import List, Set
from typing import List, Optional, Set
from datahub.metadata.schema_classes import FabricTypeClass
from datahub.utilities.urns.data_platform_urn import DataPlatformUrn
@ -55,13 +55,25 @@ class DatasetUrn(Urn):
@classmethod
def create_from_ids(
cls, platform_id: str, table_name: str, env: str
cls,
platform_id: str,
table_name: str,
env: str,
platform_instance: Optional[str] = None,
) -> "DatasetUrn":
entity_id: List[str] = [
str(DataPlatformUrn.create_from_id(platform_id)),
table_name,
env,
]
entity_id: List[str]
if platform_instance:
entity_id = [
str(DataPlatformUrn.create_from_id(platform_id)),
f"{platform_instance}.{table_name}",
env,
]
else:
entity_id = [
str(DataPlatformUrn.create_from_id(platform_id)),
table_name,
env,
]
return cls(DatasetUrn.ENTITY_TYPE, entity_id)
@staticmethod

View File

@ -144,7 +144,7 @@ class Urn:
return self._entity_id[0]
result = ""
for part in self._entity_id:
result = result + part + ","
result = result + str(part) + ","
return f"({result[:-1]})"
def __hash__(self) -> int: