From 2f36817e95f9853e0a0302888136a150fe8a8889 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 23 Jan 2024 22:58:46 -0800 Subject: [PATCH] refactor(ingest): simplify adding aspects to MCEs in transformers (#9686) --- .../datahub/api/entities/dataset/dataset.py | 2 +- metadata-ingestion/src/datahub/entrypoints.py | 13 ---- .../ingestion/transformer/base_transformer.py | 64 +++++++++---------- .../transformer/extract_dataset_tags.py | 2 +- 4 files changed, 33 insertions(+), 48 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 3b4a5fbfbb..a1498a6ca9 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -147,7 +147,7 @@ class Dataset(BaseModel): return make_data_platform_urn(self.platform) else: assert self.urn is not None # validator should have filled this in - dataset_urn = DatasetUrn.create_from_string(self.urn) + dataset_urn = DatasetUrn.from_string(self.urn) return str(dataset_urn.get_data_platform_urn()) @validator("urn", pre=True, always=True) diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 4989f984ba..1bf090a2e5 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -62,13 +62,6 @@ MAX_CONTENT_WIDTH = 120 default=None, help="Enable debug logging.", ) -@click.option( - "--debug-vars/--no-debug-vars", - type=bool, - is_flag=True, - default=False, - help="Show variable values in stack traces. Implies --debug. While we try to avoid printing sensitive information like passwords, this may still happen.", -) @click.version_option( version=datahub_package.nice_version_name(), prog_name=datahub_package.__package_name__, @@ -76,13 +69,7 @@ MAX_CONTENT_WIDTH = 120 def datahub( debug: bool, log_file: Optional[str], - debug_vars: bool, ) -> None: - if debug_vars: - # debug_vars implies debug. This option isn't actually used here, but instead - # read directly from the command line arguments in the main entrypoint. - debug = True - debug = debug or get_boolean_env_variable("DATAHUB_DEBUG", False) # Note that we're purposely leaking the context manager here. diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 254b3d084f..e8e25a061a 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -77,7 +77,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): mixedin = mixedin or isinstance(self, mixin) if not mixedin: assert ( - "Class does not implement one of required traits {self.allowed_mixins}" + f"Class does not implement one of required traits {self.allowed_mixins}" ) def _should_process( @@ -135,38 +135,37 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): if mce.proposedSnapshot: self._record_mce(mce) if isinstance(self, SingleAspectTransformer): - aspect_type = ASPECT_MAP.get(self.aspect_name()) - if aspect_type: - # if we find a type corresponding to the aspect name we look for it in the mce - old_aspect = ( - builder.get_aspect_if_available( - mce, - aspect_type, + aspect_type = ASPECT_MAP[self.aspect_name()] + + # If we find a type corresponding to the aspect name we look for it in the mce + # It's possible that the aspect is supported by the entity but not in the MCE + # snapshot union. In those cases, we just want to record the urn as seen. + supports_aspect = builder.can_add_aspect(mce, aspect_type) + if supports_aspect: + old_aspect = builder.get_aspect_if_available( + mce, + aspect_type, + ) + if old_aspect is not None: + # TRICKY: If the aspect is not present in the MCE, it might still show up in a + # subsequent MCP. As such, we _only_ mark the urn as processed if we actually + # find the aspect already in the MCE. + + transformed_aspect = self.transform_aspect( + entity_urn=mce.proposedSnapshot.urn, + aspect_name=self.aspect_name(), + aspect=old_aspect, ) - if builder.can_add_aspect(mce, aspect_type) - else None - ) - if old_aspect: - if isinstance(self, LegacyMCETransformer): - # use the transform_one pathway to transform this MCE - envelope.record = self.transform_one(mce) - else: - transformed_aspect = self.transform_aspect( - entity_urn=mce.proposedSnapshot.urn, - aspect_name=self.aspect_name(), - aspect=old_aspect, - ) - builder.set_aspect( - mce, - aspect_type=aspect_type, - aspect=transformed_aspect, - ) - envelope.record = mce + + # If transformed_aspect is None, this will remove the aspect. + builder.set_aspect( + mce, + aspect_type=aspect_type, + aspect=transformed_aspect, + ) + + envelope.record = mce self._mark_processed(mce.proposedSnapshot.urn) - else: - log.warning( - f"Could not locate a snapshot aspect type for aspect {self.aspect_name()}. This can lead to silent drops of messages in transformers." - ) elif isinstance(self, LegacyMCETransformer): # we pass down the full MCE envelope.record = self.transform_one(mce) @@ -202,7 +201,6 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): def _handle_end_of_stream( self, envelope: RecordEnvelope ) -> Iterable[RecordEnvelope]: - if not isinstance(self, SingleAspectTransformer) and not isinstance( self, LegacyMCETransformer ): @@ -265,7 +263,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): else None, ) if transformed_aspect: - structured_urn = Urn.create_from_string(urn) + structured_urn = Urn.from_string(urn) mcp: MetadataChangeProposalWrapper = ( MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/extract_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/extract_dataset_tags.py index 25b18f0806..4b64d38a9b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/extract_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/extract_dataset_tags.py @@ -34,7 +34,7 @@ class ExtractDatasetTags(DatasetTagsTransformer): def _get_tags_to_add(self, entity_urn: str) -> List[TagAssociationClass]: if self.config.extract_tags_from == ExtractTagsOption.URN: - urn = DatasetUrn.create_from_string(entity_urn) + urn = DatasetUrn.from_string(entity_urn) match = re.search(self.config.extract_tags_regex, urn.get_dataset_name()) if match: captured_group = match.group(1)