refactor(ingest): simplify adding aspects to MCEs in transformers (#9686)

This commit is contained in:
Harshal Sheth 2024-01-23 22:58:46 -08:00 committed by GitHub
parent c4dec931a3
commit 2f36817e95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 48 deletions

View File

@ -147,7 +147,7 @@ class Dataset(BaseModel):
return make_data_platform_urn(self.platform) return make_data_platform_urn(self.platform)
else: else:
assert self.urn is not None # validator should have filled this in assert self.urn is not None # validator should have filled this in
dataset_urn = DatasetUrn.create_from_string(self.urn) dataset_urn = DatasetUrn.from_string(self.urn)
return str(dataset_urn.get_data_platform_urn()) return str(dataset_urn.get_data_platform_urn())
@validator("urn", pre=True, always=True) @validator("urn", pre=True, always=True)

View File

@ -62,13 +62,6 @@ MAX_CONTENT_WIDTH = 120
default=None, default=None,
help="Enable debug logging.", help="Enable debug logging.",
) )
@click.option(
"--debug-vars/--no-debug-vars",
type=bool,
is_flag=True,
default=False,
help="Show variable values in stack traces. Implies --debug. While we try to avoid printing sensitive information like passwords, this may still happen.",
)
@click.version_option( @click.version_option(
version=datahub_package.nice_version_name(), version=datahub_package.nice_version_name(),
prog_name=datahub_package.__package_name__, prog_name=datahub_package.__package_name__,
@ -76,13 +69,7 @@ MAX_CONTENT_WIDTH = 120
def datahub( def datahub(
debug: bool, debug: bool,
log_file: Optional[str], log_file: Optional[str],
debug_vars: bool,
) -> None: ) -> None:
if debug_vars:
# debug_vars implies debug. This option isn't actually used here, but instead
# read directly from the command line arguments in the main entrypoint.
debug = True
debug = debug or get_boolean_env_variable("DATAHUB_DEBUG", False) debug = debug or get_boolean_env_variable("DATAHUB_DEBUG", False)
# Note that we're purposely leaking the context manager here. # Note that we're purposely leaking the context manager here.

View File

@ -77,7 +77,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
mixedin = mixedin or isinstance(self, mixin) mixedin = mixedin or isinstance(self, mixin)
if not mixedin: if not mixedin:
assert ( assert (
"Class does not implement one of required traits {self.allowed_mixins}" f"Class does not implement one of required traits {self.allowed_mixins}"
) )
def _should_process( def _should_process(
@ -135,38 +135,37 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
if mce.proposedSnapshot: if mce.proposedSnapshot:
self._record_mce(mce) self._record_mce(mce)
if isinstance(self, SingleAspectTransformer): if isinstance(self, SingleAspectTransformer):
aspect_type = ASPECT_MAP.get(self.aspect_name()) aspect_type = ASPECT_MAP[self.aspect_name()]
if aspect_type:
# if we find a type corresponding to the aspect name we look for it in the mce # If we find a type corresponding to the aspect name we look for it in the mce
old_aspect = ( # It's possible that the aspect is supported by the entity but not in the MCE
builder.get_aspect_if_available( # snapshot union. In those cases, we just want to record the urn as seen.
supports_aspect = builder.can_add_aspect(mce, aspect_type)
if supports_aspect:
old_aspect = builder.get_aspect_if_available(
mce, mce,
aspect_type, aspect_type,
) )
if builder.can_add_aspect(mce, aspect_type) if old_aspect is not None:
else None # TRICKY: If the aspect is not present in the MCE, it might still show up in a
) # subsequent MCP. As such, we _only_ mark the urn as processed if we actually
if old_aspect: # find the aspect already in the MCE.
if isinstance(self, LegacyMCETransformer):
# use the transform_one pathway to transform this MCE
envelope.record = self.transform_one(mce)
else:
transformed_aspect = self.transform_aspect( transformed_aspect = self.transform_aspect(
entity_urn=mce.proposedSnapshot.urn, entity_urn=mce.proposedSnapshot.urn,
aspect_name=self.aspect_name(), aspect_name=self.aspect_name(),
aspect=old_aspect, aspect=old_aspect,
) )
# If transformed_aspect is None, this will remove the aspect.
builder.set_aspect( builder.set_aspect(
mce, mce,
aspect_type=aspect_type, aspect_type=aspect_type,
aspect=transformed_aspect, aspect=transformed_aspect,
) )
envelope.record = mce envelope.record = mce
self._mark_processed(mce.proposedSnapshot.urn) self._mark_processed(mce.proposedSnapshot.urn)
else:
log.warning(
f"Could not locate a snapshot aspect type for aspect {self.aspect_name()}. This can lead to silent drops of messages in transformers."
)
elif isinstance(self, LegacyMCETransformer): elif isinstance(self, LegacyMCETransformer):
# we pass down the full MCE # we pass down the full MCE
envelope.record = self.transform_one(mce) envelope.record = self.transform_one(mce)
@ -202,7 +201,6 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
def _handle_end_of_stream( def _handle_end_of_stream(
self, envelope: RecordEnvelope self, envelope: RecordEnvelope
) -> Iterable[RecordEnvelope]: ) -> Iterable[RecordEnvelope]:
if not isinstance(self, SingleAspectTransformer) and not isinstance( if not isinstance(self, SingleAspectTransformer) and not isinstance(
self, LegacyMCETransformer self, LegacyMCETransformer
): ):
@ -265,7 +263,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta):
else None, else None,
) )
if transformed_aspect: if transformed_aspect:
structured_urn = Urn.create_from_string(urn) structured_urn = Urn.from_string(urn)
mcp: MetadataChangeProposalWrapper = ( mcp: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper( MetadataChangeProposalWrapper(

View File

@ -34,7 +34,7 @@ class ExtractDatasetTags(DatasetTagsTransformer):
def _get_tags_to_add(self, entity_urn: str) -> List[TagAssociationClass]: def _get_tags_to_add(self, entity_urn: str) -> List[TagAssociationClass]:
if self.config.extract_tags_from == ExtractTagsOption.URN: if self.config.extract_tags_from == ExtractTagsOption.URN:
urn = DatasetUrn.create_from_string(entity_urn) urn = DatasetUrn.from_string(entity_urn)
match = re.search(self.config.extract_tags_regex, urn.get_dataset_name()) match = re.search(self.config.extract_tags_regex, urn.get_dataset_name())
if match: if match:
captured_group = match.group(1) captured_group = match.group(1)