feat(mssql): platform instance aspect for dataflow and datajob entities (#12180)

This commit is contained in:
Sergio Gómez Villamor 2024-12-19 20:20:42 +01:00 committed by GitHub
parent 9762c46702
commit 45ace13fe2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 576 additions and 230 deletions

View File

@ -1,11 +1,17 @@
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union
from datahub.emitter.mce_builder import make_data_flow_urn, make_data_job_urn
from datahub.emitter.mce_builder import (
make_data_flow_urn,
make_data_job_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.metadata.schema_classes import (
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
)
@ -204,6 +210,18 @@ class MSSQLDataJob:
status=self.status,
)
@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.flow.platform_instance:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.entity.flow.orchestrator),
instance=make_dataplatform_instance_urn(
platform=self.entity.flow.orchestrator,
instance=self.entity.flow.platform_instance,
),
)
return None
@dataclass
class MSSQLDataFlow:
@ -238,3 +256,14 @@ class MSSQLDataFlow:
customProperties=self.flow_properties,
externalUrl=self.external_url,
)
@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.platform_instance:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.entity.orchestrator),
instance=make_dataplatform_instance_urn(
self.entity.orchestrator, self.entity.platform_instance
),
)
return None

View File

@ -639,6 +639,13 @@ class SQLServerSource(SQLAlchemySource):
aspect=data_job.as_datajob_info_aspect,
).as_workunit()
data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_platform_instance_aspect,
).as_workunit()
if include_lineage:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
@ -654,6 +661,13 @@ class SQLServerSource(SQLAlchemySource):
entityUrn=data_flow.urn,
aspect=data_flow.as_dataflow_info_aspect,
).as_workunit()
data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_platform_instance_aspect,
).as_workunit()
# TODO: Add SubType when it appear
def get_inspectors(self) -> Iterable[Inspector]:

View File

@ -7,6 +7,7 @@ source:
password: test!Password
database: DemoData
host_port: localhost:21433
platform_instance: my-instance
# use_odbc: True
# uri_args:
# driver: "ODBC Driver 17 for SQL Server"