feat(ingest/teradata): Add option to not use file backed dict for view definitions (#9024)

This commit is contained in:
Andrew Sikowitz 2023-10-16 19:13:23 -04:00 committed by GitHub
parent 9ccd1d4f5d
commit 6366b63e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,7 +1,7 @@
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Iterable, Optional, Set, Union from typing import Iterable, MutableMapping, Optional, Union
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401 import teradatasqlalchemy # noqa: F401
@ -12,7 +12,6 @@ from sqlalchemy.engine import Engine
from datahub.configuration.common import AllowDenyPattern from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( from datahub.ingestion.api.decorators import (
@ -34,11 +33,7 @@ from datahub.ingestion.source.sql.two_tier_sql_source import (
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import ( from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import ( from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass, BytesTypeClass,
TimeTypeClass, TimeTypeClass,
@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
description="Generate usage statistic.", description="Generate usage statistic.",
) )
use_file_backed_cache: bool = Field(
default=True,
description="Whether to use a file backed cache for the view definitions.",
)
@platform_name("Teradata") @platform_name("Teradata")
@config_class(TeradataConfig) @config_class(TeradataConfig)
@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource):
and "timestamp" >= TIMESTAMP '{start_time}' and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}' and "timestamp" < TIMESTAMP '{end_time}'
""" """
urns: Optional[Set[str]]
_view_definition_cache: MutableMapping[str, str]
def __init__(self, config: TeradataConfig, ctx: PipelineContext): def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata") super().__init__(config, ctx, "teradata")
@ -166,7 +167,10 @@ class TeradataSource(TwoTierSQLAlchemySource):
env=self.config.env, env=self.config.env,
) )
self._view_definition_cache: FileBackedDict[str] = FileBackedDict() if self.config.use_file_backed_cache:
self._view_definition_cache = FileBackedDict[str]()
else:
self._view_definition_cache = {}
@classmethod @classmethod
def create(cls, config_dict, ctx): def create(cls, config_dict, ctx):
@ -249,24 +253,13 @@ class TeradataSource(TwoTierSQLAlchemySource):
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# Add all schemas to the schema resolver # Add all schemas to the schema resolver
for wu in super().get_workunits_internal(): for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass): urn = wu.get_urn()
if wu.metadata.proposedSnapshot: schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
for aspect in wu.metadata.proposedSnapshot.aspects: if schema_metadata:
if isinstance(aspect, SchemaMetadataClass): self.schema_resolver.add_schema_metadata(urn, schema_metadata)
self.schema_resolver.add_schema_metadata( view_properties = wu.get_aspect_of_type(ViewPropertiesClass)
wu.metadata.proposedSnapshot.urn, if view_properties and self.config.include_view_lineage:
aspect, self._view_definition_cache[urn] = view_properties.viewLogic
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
yield wu yield wu
if self.config.include_view_lineage: if self.config.include_view_lineage: