chore(ingest): remove inferred args to MCPW, part 2 (#6905)

This commit is contained in:
Harshal Sheth 2023-01-04 23:29:56 -05:00 committed by GitHub
parent 8b1dc4bbdf
commit f651646d3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 30 additions and 137 deletions

View File

@ -2,6 +2,7 @@ import hashlib
import json
from typing import Any, Dict, Iterable, List, Optional, TypeVar
from deprecated import deprecated
from pydantic.fields import Field
from pydantic.main import BaseModel
@ -18,7 +19,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.common import (
)
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.schema_classes import (
ChangeTypeClass,
ContainerClass,
DomainsClass,
GlobalTagsClass,
@ -31,7 +31,6 @@ from datahub.metadata.schema_classes import (
TagAssociationClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type
def _stable_guid_from_dict(d: dict) -> str:
@ -129,24 +128,18 @@ KeyType = TypeVar("KeyType", bound=PlatformKey)
def add_domain_to_entity_wu(
entity_type: str, entity_urn: str, domain_urn: str
entity_urn: str, domain_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=DomainsClass(domains=[domain_urn]),
)
wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()
def add_owner_to_entity_wu(
entity_type: str, entity_urn: str, owner_urn: str
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{entity_urn}",
aspect=OwnershipClass(
owners=[
@ -156,26 +149,22 @@ def add_owner_to_entity_wu(
)
]
),
)
wu = MetadataWorkUnit(id=f"{owner_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()
def add_tags_to_entity_wu(
entity_type: str, entity_urn: str, tags: List[str]
) -> Iterable[MetadataWorkUnit]:
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{entity_urn}",
aspect=GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
),
)
wu = MetadataWorkUnit(id=f"tags-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()
@deprecated("use MetadataChangeProposalWrapper(...).as_workunit() instead")
def wrap_aspect_as_workunit(
entityName: str,
entityUrn: str,
@ -210,9 +199,7 @@ def gen_containers(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerProperties(
@ -229,51 +216,32 @@ def gen_containers(
if last_modified is not None
else None,
),
)
wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp)
yield wu
).as_workunit()
# add status
yield wrap_aspect_as_workunit(
entityName="container",
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
aspect=StatusClass(removed=False),
aspectName=StatusClass.get_aspect_name(),
)
).as_workunit()
mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}",
instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}"
if container_key.instance
else None,
),
)
wu = MetadataWorkUnit(
id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp
)
yield wu
).as_workunit()
# Set subtype
subtype_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=SubTypesClass(typeNames=sub_types),
)
wu = MetadataWorkUnit(
id=f"container-subtypes-{name}-{container_urn}", mcp=subtype_mcp
)
yield wu
).as_workunit()
if domain_urn:
yield from add_domain_to_entity_wu(
entity_type="container",
entity_urn=container_urn,
domain_urn=domain_urn,
)
@ -299,39 +267,23 @@ def gen_containers(
# Set database container
parent_container_mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspect=ContainerClass(container=parent_container_urn),
# aspect=ContainerKeyClass(guid=database_container_key.guid())
)
wu = MetadataWorkUnit(
id=f"container-parent-container-{name}-{container_urn}-{parent_container_urn}",
mcp=parent_container_mcp,
)
yield wu
yield parent_container_mcp.as_workunit()
def add_dataset_to_container(
# FIXME: Union requires two or more type arguments
container_key: KeyType,
dataset_urn: str,
container_key: KeyType, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
yield MetadataChangeProposalWrapper(
entityUrn=f"{dataset_urn}",
aspect=ContainerClass(container=f"{container_urn}"),
# aspect=ContainerKeyClass(guid=schema_container_key.guid())
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{dataset_urn}", mcp=mcp)
yield wu
).as_workunit()
def add_entity_to_container(
@ -340,14 +292,11 @@ def add_entity_to_container(
container_urn = make_container_urn(
guid=container_key.guid(),
)
mcp = MetadataChangeProposalWrapper(
yield MetadataChangeProposalWrapper(
entityType=entity_type,
changeType=ChangeTypeClass.UPSERT,
entityUrn=entity_urn,
aspect=ContainerClass(container=f"{container_urn}"),
)
wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp)
yield wu
).as_workunit()
def mcps_from_mce(
@ -355,8 +304,6 @@ def mcps_from_mce(
) -> Iterable[MetadataChangeProposalWrapper]:
for aspect in mce.proposedSnapshot.aspects:
yield MetadataChangeProposalWrapper(
entityType=guess_entity_type(mce.proposedSnapshot.urn),
changeType=ChangeTypeClass.UPSERT,
entityUrn=mce.proposedSnapshot.urn,
auditHeader=mce.auditHeader,
aspect=aspect,

View File

@ -930,12 +930,11 @@ class GlueSource(StatefulIngestionSourceBase):
return None
def _get_domain_wu(
self, dataset_name: str, entity_urn: str, entity_type: str
self, dataset_name: str, entity_urn: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
@ -985,7 +984,6 @@ class GlueSource(StatefulIngestionSourceBase):
yield from self._get_domain_wu(
dataset_name=full_table_name,
entity_urn=dataset_urn,
entity_type="dataset",
)
yield from self.add_table_to_database_container(
dataset_urn=dataset_urn, db_name=database_name

View File

@ -800,12 +800,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
@ -963,7 +961,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
entity_urn=dataset_urn,
entity_type="dataset",
)
def gen_lineage(

View File

@ -29,7 +29,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupInfoClass,
CorpUserInfoClass,
GroupMembershipClass,
@ -296,10 +295,7 @@ class AzureADSource(Source):
yield wu
group_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
@ -310,10 +306,7 @@ class AzureADSource(Source):
yield group_origin_wu
group_status_mcp = MetadataChangeProposalWrapper(
entityType="corpGroup",
entityUrn=datahub_corp_group_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}"
@ -445,10 +438,7 @@ class AzureADSource(Source):
yield wu
user_origin_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="origin",
aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"),
)
user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"
@ -457,10 +447,7 @@ class AzureADSource(Source):
yield user_origin_wu
user_status_mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=datahub_corp_user_snapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=False),
)
user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}"

View File

@ -312,7 +312,6 @@ class KafkaSource(StatefulIngestionSourceBase):
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)

View File

@ -500,7 +500,6 @@ class PulsarSource(StatefulIngestionSourceBase):
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)

View File

@ -350,7 +350,7 @@ class SalesforceSource(Source):
if domain_urn:
yield from add_domain_to_entity_wu(
domain_urn=domain_urn, entity_type="dataset", entity_urn=datasetUrn
domain_urn=domain_urn, entity_urn=datasetUrn
)
def get_platform_instance_workunit(self, datasetUrn: str) -> WorkUnit:

View File

@ -212,16 +212,10 @@ class SnowflakeCommonMixin:
aspectName: str,
aspect: _Aspect,
) -> MetadataWorkUnit:
id = f"{aspectName}-for-{entityUrn}"
if "timestampMillis" in aspect._inner_dict:
id = f"{aspectName}-{aspect.timestampMillis}-for-{entityUrn}" # type: ignore
wu = MetadataWorkUnit(
id=id,
mcp=MetadataChangeProposalWrapper(
entityUrn=entityUrn,
aspect=aspect,
),
)
wu = MetadataChangeProposalWrapper(
entityUrn=entityUrn,
aspect=aspect,
).as_workunit()
self.report.report_workunit(wu)
return wu

View File

@ -998,7 +998,6 @@ class SnowflakeV2Source(
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
)
if table.tags:
@ -1211,12 +1210,10 @@ class SnowflakeV2Source(
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)

View File

@ -527,7 +527,6 @@ class PrestoOnHiveSource(SQLAlchemySource):
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)
@ -744,7 +743,6 @@ class PrestoOnHiveSource(SQLAlchemySource):
yield from self._get_domain_wu(
dataset_name=dataset.dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)

View File

@ -739,13 +739,11 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
@ -906,7 +904,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)
@ -1190,7 +1187,6 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
entity_type="dataset",
sql_config=sql_config,
)

View File

@ -246,7 +246,6 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
),
entity_urn=dataset_urn,
entity_type="dataset",
)
if self.config.include_column_lineage:
@ -340,12 +339,10 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self,
dataset_name: str,
entity_urn: str,
entity_type: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)

View File

@ -39,10 +39,7 @@ class KafkaSinkTest(unittest.TestCase):
from datahub.emitter.mcp import MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
changeType=models.ChangeTypeClass.UPSERT,
aspectName="datasetProfile",
aspect=models.DatasetProfileClass(
rowCount=2000,
columnCount=15,

View File

@ -249,10 +249,7 @@ basicAuditStamp = models.AuditStampClass(
),
(
MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn="urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)",
changeType=models.ChangeTypeClass.UPSERT,
aspectName="ownership",
aspect=models.OwnershipClass(
owners=[
models.OwnerClass(

View File

@ -4,7 +4,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api import workunit
from datahub.ingestion.api.common import PipelineContext, WorkUnit
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass
from datahub.metadata.schema_classes import StatusClass
from datahub.utilities.urns.dataset_urn import DatasetUrn
@ -14,8 +14,6 @@ class FakeSource(Source):
workunit.MetadataWorkUnit(
id="test-workunit",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
@ -23,7 +21,6 @@ class FakeSource(Source):
env="PROD",
)
),
aspectName="status",
aspect=StatusClass(removed=False),
),
)

View File

@ -64,7 +64,6 @@ from datahub.ingestion.transformer.remove_dataset_ownership import (
)
from datahub.metadata.schema_classes import (
BrowsePathsClass,
ChangeTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
MetadataChangeEventClass,
@ -1301,8 +1300,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
] = [
MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
@ -1310,7 +1307,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
env="PROD",
)
),
aspectName="globalTags",
aspect=GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:Test")]),
)
for i in range(0, 10)
@ -1318,8 +1314,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
mcps.extend(
[
MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
@ -1327,7 +1321,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path):
env="PROD",
)
),
aspectName="datasetProperties",
aspect=DatasetPropertiesClass(description="test dataset"),
)
for i in range(0, 10)