fix(ingest): adding platform instance urn to data platform instance aspects (#4015)

This commit is contained in:
Swaroop Jagadish 2022-01-31 08:49:03 -08:00 committed by GitHub
parent c27f1f9487
commit e29d2cbe7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 32 deletions

View File

@ -13,6 +13,7 @@ from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -339,7 +340,9 @@ class ElasticsearchSource(Source):
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.source_config.platform_instance,
instance=make_dataplatform_instance_urn(
self.platform, self.source_config.platform_instance
),
),
changeType=ChangeTypeClass.UPSERT,
)

View File

@ -976,7 +976,12 @@ class KafkaConnectSource(Source):
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataPlatformInstance",
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(target_platform)
platform=builder.make_data_platform_urn(target_platform),
instance=builder.make_dataplatform_instance_urn(
target_platform, target_platform_instance
)
if target_platform_instance
else None,
),
)
@ -995,7 +1000,12 @@ class KafkaConnectSource(Source):
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataPlatformInstance",
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(source_platform)
platform=builder.make_data_platform_urn(source_platform),
instance=builder.make_dataplatform_instance_urn(
source_platform, source_platform_instance
)
if source_platform_instance
else None,
),
)

View File

@ -25,6 +25,7 @@ from sqlalchemy.sql import sqltypes as types
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -722,21 +723,32 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.report.report_workunit(wu)
yield wu
# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.config.platform_instance,
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
def get_dataplatform_instance_aspect(
self, dataset_urn: str
) -> Optional[SqlWorkUnit]:
# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
)
wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
yield wu
),
)
wu = SqlWorkUnit(id=f"{dataset_urn}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
return wu
else:
return None
def get_schema_fields(
self, dataset_name: str, columns: List[dict], pk_constraints: dict = None
@ -868,21 +880,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.report.report_workunit(wu)
yield wu
# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.config.platform_instance,
),
)
wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
yield wu
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler