diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index 05dc234f64..eb2479c2ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -13,6 +13,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import DatasetSourceConfigBase from datahub.emitter.mce_builder import ( make_data_platform_urn, + make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -339,7 +340,9 @@ class ElasticsearchSource(Source): aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), - instance=self.source_config.platform_instance, + instance=make_dataplatform_instance_urn( + self.platform, self.source_config.platform_instance + ), ), changeType=ChangeTypeClass.UPSERT, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index a5ee8bbebb..ad42159c8a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -976,7 +976,12 @@ class KafkaConnectSource(Source): changeType=models.ChangeTypeClass.UPSERT, aspectName="dataPlatformInstance", aspect=models.DataPlatformInstanceClass( - platform=builder.make_data_platform_urn(target_platform) + platform=builder.make_data_platform_urn(target_platform), + instance=builder.make_dataplatform_instance_urn( + target_platform, target_platform_instance + ) + if target_platform_instance + else None, ), ) @@ -995,7 +1000,12 @@ class KafkaConnectSource(Source): changeType=models.ChangeTypeClass.UPSERT, aspectName="dataPlatformInstance", aspect=models.DataPlatformInstanceClass( - platform=builder.make_data_platform_urn(source_platform) + platform=builder.make_data_platform_urn(source_platform), + instance=builder.make_dataplatform_instance_urn( + source_platform, source_platform_instance + ) + if source_platform_instance + else None, ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index de5a094d20..0fbd87f004 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -25,6 +25,7 @@ from sqlalchemy.sql import sqltypes as types from datahub.configuration.common import AllowDenyPattern from datahub.emitter.mce_builder import ( make_data_platform_urn, + make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -722,21 +723,32 @@ class SQLAlchemySource(StatefulIngestionSourceBase): self.report.report_workunit(wu) yield wu - # If we are a platform instance based source, emit the instance aspect - if self.config.platform_instance: - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="dataPlatformInstance", - aspect=DataPlatformInstanceClass( - platform=make_data_platform_urn(self.platform), - instance=self.config.platform_instance, + dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn) + if dpi_aspect: + yield dpi_aspect + + def get_dataplatform_instance_aspect( + self, dataset_urn: str + ) -> Optional[SqlWorkUnit]: + # If we are a platform instance based source, emit the instance aspect + if self.config.platform_instance: + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance ), - ) - wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp) - self.report.report_workunit(wu) - yield wu + ), + ) + wu = SqlWorkUnit(id=f"{dataset_urn}-dataPlatformInstance", mcp=mcp) + self.report.report_workunit(wu) + return wu + else: + return None def get_schema_fields( self, dataset_name: str, columns: List[dict], pk_constraints: dict = None @@ -868,21 +880,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase): self.report.report_workunit(wu) yield wu - # If we are a platform instance based source, emit the instance aspect - if self.config.platform_instance: - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="dataPlatformInstance", - aspect=DataPlatformInstanceClass( - platform=make_data_platform_urn(self.platform), - instance=self.config.platform_instance, - ), - ) - wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp) - self.report.report_workunit(wu) - yield wu + dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn) + if dpi_aspect: + yield dpi_aspect def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler