feat(ingest): SageMaker jobs and models (#2830)

This commit is contained in:
Kevin Hu 2021-07-08 16:16:16 -07:00 committed by GitHub
parent a117b63b55
commit a2106ca9e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 6321 additions and 2165 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

View File

@ -460,8 +460,8 @@
"PIG" : "Pig type is for running Pig jobs.",
"SQL" : "SQL is for running Presto, mysql queries etc"
}
} ],
"doc" : "Datajob type"
}, "string" ],
"doc" : "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
}, {
"name" : "flowUrn",
"type" : "com.linkedin.common.DataFlowUrn",
@ -471,6 +471,25 @@
"entityTypes" : [ "dataFlow" ],
"name" : "IsPartOf"
}
}, {
"name" : "status",
"type" : {
"type" : "enum",
"name" : "JobStatus",
"doc" : "Job statuses",
"symbols" : [ "STARTING", "IN_PROGRESS", "STOPPING", "STOPPED", "COMPLETED", "FAILED", "UNKNOWN" ],
"symbolDocs" : {
"COMPLETED" : "Jobs with successful completion.",
"FAILED" : "Jobs that have failed.",
"IN_PROGRESS" : "Jobs currently running.",
"STARTING" : "Jobs being initialized.",
"STOPPED" : "Jobs that have stopped.",
"STOPPING" : "Jobs being stopped.",
"UNKNOWN" : "Jobs with unknown status (either unmappable or unavailable)"
}
},
"doc" : "Status of the job",
"optional" : true
} ],
"Aspect" : {
"name" : "dataJobInfo"
@ -586,7 +605,7 @@
"doc" : "Editable properties",
"optional" : true
} ]
}, "com.linkedin.datajob.DataJobInfo", "com.linkedin.datajob.DataJobInputOutput", "com.linkedin.datajob.DataJobKey", "com.linkedin.datajob.EditableDataJobProperties", "com.linkedin.datajob.azkaban.AzkabanJobType", {
}, "com.linkedin.datajob.DataJobInfo", "com.linkedin.datajob.DataJobInputOutput", "com.linkedin.datajob.DataJobKey", "com.linkedin.datajob.EditableDataJobProperties", "com.linkedin.datajob.JobStatus", "com.linkedin.datajob.azkaban.AzkabanJobType", {
"type" : "typeref",
"name" : "DataJobAspect",
"namespace" : "com.linkedin.metadata.aspect",

View File

@ -978,8 +978,8 @@
"PIG" : "Pig type is for running Pig jobs.",
"SQL" : "SQL is for running Presto, mysql queries etc"
}
} ],
"doc" : "Datajob type"
}, "string" ],
"doc" : "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
}, {
"name" : "flowUrn",
"type" : "com.linkedin.common.DataFlowUrn",
@ -989,6 +989,25 @@
"entityTypes" : [ "dataFlow" ],
"name" : "IsPartOf"
}
}, {
"name" : "status",
"type" : {
"type" : "enum",
"name" : "JobStatus",
"doc" : "Job statuses",
"symbols" : [ "STARTING", "IN_PROGRESS", "STOPPING", "STOPPED", "COMPLETED", "FAILED", "UNKNOWN" ],
"symbolDocs" : {
"COMPLETED" : "Jobs with successful completion.",
"FAILED" : "Jobs that have failed.",
"IN_PROGRESS" : "Jobs currently running.",
"STARTING" : "Jobs being initialized.",
"STOPPED" : "Jobs that have stopped.",
"STOPPING" : "Jobs being stopped.",
"UNKNOWN" : "Jobs with unknown status (either unmappable or unavailable)"
}
},
"doc" : "Status of the job",
"optional" : true
} ],
"Aspect" : {
"name" : "dataJobInfo"
@ -1058,7 +1077,7 @@
"Aspect" : {
"name" : "dataJobInputOutput"
}
}, "com.linkedin.datajob.azkaban.AzkabanJobType", {
}, "com.linkedin.datajob.JobStatus", "com.linkedin.datajob.azkaban.AzkabanJobType", {
"type" : "record",
"name" : "DatasetDeprecation",
"namespace" : "com.linkedin.dataset",
@ -2392,6 +2411,7 @@
"name" : "MLModelProperties",
"namespace" : "com.linkedin.ml.metadata",
"doc" : "Properties associated with a ML Model",
"include" : [ "com.linkedin.common.CustomProperties" ],
"fields" : [ {
"name" : "description",
"type" : "string",

View File

@ -1253,8 +1253,8 @@
"PIG" : "Pig type is for running Pig jobs.",
"SQL" : "SQL is for running Presto, mysql queries etc"
}
} ],
"doc" : "Datajob type"
}, "string" ],
"doc" : "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
}, {
"name" : "flowUrn",
"type" : "com.linkedin.common.DataFlowUrn",
@ -1264,6 +1264,25 @@
"entityTypes" : [ "dataFlow" ],
"name" : "IsPartOf"
}
}, {
"name" : "status",
"type" : {
"type" : "enum",
"name" : "JobStatus",
"doc" : "Job statuses",
"symbols" : [ "STARTING", "IN_PROGRESS", "STOPPING", "STOPPED", "COMPLETED", "FAILED", "UNKNOWN" ],
"symbolDocs" : {
"COMPLETED" : "Jobs with successful completion.",
"FAILED" : "Jobs that have failed.",
"IN_PROGRESS" : "Jobs currently running.",
"STARTING" : "Jobs being initialized.",
"STOPPED" : "Jobs that have stopped.",
"STOPPING" : "Jobs being stopped.",
"UNKNOWN" : "Jobs with unknown status (either unmappable or unavailable)"
}
},
"doc" : "Status of the job",
"optional" : true
} ],
"Aspect" : {
"name" : "dataJobInfo"
@ -1371,7 +1390,7 @@
"Aspect" : {
"name" : "editableDataJobProperties"
}
}, "com.linkedin.datajob.azkaban.AzkabanJobType", {
}, "com.linkedin.datajob.JobStatus", "com.linkedin.datajob.azkaban.AzkabanJobType", {
"type" : "record",
"name" : "DataPlatformInfo",
"namespace" : "com.linkedin.dataplatform",
@ -2792,6 +2811,7 @@
"name" : "MLModelProperties",
"namespace" : "com.linkedin.ml.metadata",
"doc" : "Properties associated with a ML Model",
"include" : [ "com.linkedin.common.CustomProperties" ],
"fields" : [ {
"name" : "description",
"type" : "string",
@ -3452,7 +3472,7 @@
"Aspect" : {
"name" : "mlFeatureTableProperties"
}
}, "com.linkedin.common.Ownership", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Status", "com.linkedin.common.Deprecation" ]
}, "com.linkedin.common.Ownership", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Status", "com.linkedin.common.Deprecation", "com.linkedin.common.BrowsePaths" ]
}
},
"doc" : "The list of metadata aspects associated with the MLFeatureTable. Depending on the use case, this can either be all, or a selection, of supported aspects."

View File

@ -141,6 +141,20 @@
"name" : "cost"
}
}, "com.linkedin.common.CostType", "com.linkedin.common.CostValue", {
"type" : "record",
"name" : "CustomProperties",
"namespace" : "com.linkedin.common",
"doc" : "Misc. properties about an entity.",
"fields" : [ {
"name" : "customProperties",
"type" : {
"type" : "map",
"values" : "string"
},
"doc" : "Custom property bag.",
"default" : { }
} ]
}, {
"type" : "typeref",
"name" : "DataPlatformUrn",
"namespace" : "com.linkedin.common",
@ -522,6 +536,7 @@
"name" : "MLModelProperties",
"namespace" : "com.linkedin.ml.metadata",
"doc" : "Properties associated with a ML Model",
"include" : [ "com.linkedin.common.CustomProperties" ],
"fields" : [ {
"name" : "description",
"type" : "string",

View File

@ -387,6 +387,18 @@ source:
aws_secret_access_key: # Optional.
aws_session_token: # Optional.
aws_role: # Optional (Role chaining supported by using a sorted list).
extract_feature_groups: True # if feature groups should be ingested, default True
extract_models: True # if models should be ingested, default True
extract_jobs: # if jobs should be ingested, default True for all
auto_ml: True
compilation: True
edge_packaging: True
hyper_parameter_tuning: True
labeling: True
processing: True
training: True
transform: True
```
### Snowflake `snowflake`

File diff suppressed because it is too large Load Diff

View File

@ -475,5 +475,45 @@
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": {
"urn": "urn:li:dataPlatform:feast",
"aspects": [
{
"com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": {
"datasetNameDelimiter": ".",
"name": "feast",
"displayName": "Feast",
"type": "OTHERS",
"logoUrl": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/feastlogo.png"
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": {
"urn": "urn:li:dataPlatform:sagemaker",
"aspects": [
{
"com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": {
"datasetNameDelimiter": ".",
"name": "sagemaker",
"displayName": "SageMaker",
"type": "OTHERS",
"logoUrl": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/sagemakerlogo.png"
}
}
]
}
},
"proposedDelta": null
}
]

View File

@ -7,4 +7,4 @@ source:
sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
server: "http://localhost:8080"

View File

@ -73,6 +73,11 @@ def make_ml_feature_table_urn(platform: str, feature_table_name: str) -> str:
)
def make_ml_model_urn(platform: str, model_name: str, env: str) -> str:
return f"urn:li:mlModel:(urn:li:dataPlatform:{platform},{model_name},{env})"
def make_lineage_mce(
upstream_urns: List[str],
downstream_urn: str,

View File

@ -86,3 +86,19 @@ class AwsSourceConfig(ConfigModel):
)
else:
return boto3.client(service, region_name=self.aws_region)
def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:
if not s3_uri.startswith("s3://"):
raise ValueError("S3 URIs should begin with 's3://'")
# remove S3 prefix (s3://)
s3_name = s3_uri[5:]
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
if suffix is not None:
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{suffix},{env})"
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"

View File

@ -10,7 +10,7 @@ from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws_common import AwsSourceConfig, make_s3_urn
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -142,15 +142,11 @@ class GlueSource(Source):
# if data object is S3 bucket
if node_args.get("connection_type") == "s3":
# remove S3 prefix (s3://)
s3_name = node_args["connection_options"]["path"][5:]
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
s3_uri = node_args["connection_options"]["path"]
extension = node_args.get("format")
yield s3_name, extension
yield s3_uri, extension
def process_dataflow_node(
self,
@ -179,20 +175,18 @@ class GlueSource(Source):
# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
# remove S3 prefix (s3://)
s3_name = node_args["connection_options"]["path"][5:]
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
s3_uri = node_args["connection_options"]["path"]
# append S3 format if different ones exist
if len(s3_formats[s3_name]) > 1:
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name}_{node_args.get('format')},{self.env})"
if len(s3_formats[s3_uri]) > 1:
node_urn = make_s3_urn(
s3_uri,
self.env,
suffix=node_args.get("format"),
)
else:
node_urn = (
f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{self.env})"
)
node_urn = make_s3_urn(s3_uri, self.env)
dataset_snapshot = DatasetSnapshot(
urn=node_urn,
@ -235,7 +229,7 @@ class GlueSource(Source):
self,
dataflow_graph: Dict[str, Any],
flow_urn: str,
s3_names: typing.DefaultDict[str, Set[Union[str, None]]],
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]],
) -> Tuple[Dict[str, Dict[str, Any]], List[str], List[MetadataChangeEvent]]:
"""
Prepare a job's DAG for ingestion.
@ -245,6 +239,8 @@ class GlueSource(Source):
Job DAG returned from get_dataflow_graph()
flow_urn:
URN of the flow (i.e. the AWS Glue job itself).
s3_formats:
Map from s3 URIs to formats used (for deduplication purposes)
"""
new_dataset_ids: List[str] = []
@ -256,7 +252,7 @@ class GlueSource(Source):
for node in dataflow_graph["DagNodes"]:
nodes[node["Id"]] = self.process_dataflow_node(
node, flow_urn, new_dataset_ids, new_dataset_mces, s3_names
node, flow_urn, new_dataset_ids, new_dataset_mces, s3_formats
)
# traverse edges to fill in node properties

View File

@ -1,42 +1,17 @@
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any, Dict, Iterable, List
from typing import Iterable
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
MLFeatureTableSnapshot,
MLPrimaryKeySnapshot,
from datahub.ingestion.source.sagemaker_processors.common import (
SagemakerSourceConfig,
SagemakerSourceReport,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
from datahub.ingestion.source.sagemaker_processors.feature_groups import (
FeatureGroupProcessor,
)
class SagemakerSourceConfig(AwsSourceConfig):
@property
def sagemaker_client(self):
return self.get_client("sagemaker")
@dataclass
class SagemakerSourceReport(SourceReport):
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
def report_table_scanned(self) -> None:
self.tables_scanned += 1
def report_table_dropped(self, table: str) -> None:
self.filtered.append(table)
from datahub.ingestion.source.sagemaker_processors.jobs import JobProcessor
from datahub.ingestion.source.sagemaker_processors.models import ModelProcessor
class SagemakerSource(Source):
@ -55,244 +30,34 @@ class SagemakerSource(Source):
config = SagemakerSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_all_feature_groups(self) -> List[Dict[str, Any]]:
"""
List all feature groups in SageMaker.
"""
feature_groups = []
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_feature_groups
paginator = self.sagemaker_client.get_paginator("list_feature_groups")
for page in paginator.paginate():
feature_groups += page["FeatureGroupSummaries"]
return feature_groups
def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
"""
Get details of a feature group (including list of component features).
"""
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_feature_group
feature_group = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name
)
# use falsy fallback since AWS stubs require this to be a string in tests
next_token = feature_group.get("NextToken", "")
# paginate over feature group features
while next_token:
next_features = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name, NextToken=next_token
)
feature_group["FeatureDefinitions"].append(
next_features["FeatureDefinitions"]
)
next_token = feature_group.get("NextToken", "")
return feature_group
def get_feature_group_wu(
self, feature_group_details: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a SageMaker feature group.
Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
"""
feature_group_name = feature_group_details["FeatureGroupName"]
feature_group_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("sagemaker", feature_group_name),
aspects=[],
)
feature_group_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
description=feature_group_details.get("Description"),
# non-primary key features
mlFeatures=[
builder.make_ml_feature_urn(
feature_group_name,
feature["FeatureName"],
)
for feature in feature_group_details["FeatureDefinitions"]
if feature["FeatureName"]
!= feature_group_details["RecordIdentifierFeatureName"]
],
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
feature_group_name,
feature_group_details["RecordIdentifierFeatureName"],
)
],
# additional metadata
customProperties={
"arn": feature_group_details["FeatureGroupArn"],
"creation_time": str(feature_group_details["CreationTime"]),
"status": feature_group_details["FeatureGroupStatus"],
},
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot)
return MetadataWorkUnit(id=feature_group_name, mce=mce)
field_type_mappings = {
"String": MLFeatureDataType.TEXT,
"Integral": MLFeatureDataType.ORDINAL,
"Fractional": MLFeatureDataType.CONTINUOUS,
}
def get_feature_type(self, aws_type: str, feature_name: str) -> str:
mapped_type = self.field_type_mappings.get(aws_type)
if mapped_type is None:
self.report.report_warning(
feature_name, f"unable to map type {aws_type} to metadata schema"
)
mapped_type = MLFeatureDataType.UNKNOWN
return mapped_type
def get_feature_wu(
self, feature_group_details: Dict[str, Any], feature: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a SageMaker feature.
Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
feature:
ingested SageMaker feature
"""
# if the feature acts as the record identifier, then we ingest it as an MLPrimaryKey
# the RecordIdentifierFeatureName is guaranteed to exist as it's required on creation
is_record_identifier = (
feature_group_details["RecordIdentifierFeatureName"]
== feature["FeatureName"]
)
feature_sources = []
if "OfflineStoreConfig" in feature_group_details:
# remove S3 prefix (s3://)
s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"][
"S3Uri"
][5:]
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
feature_sources.append(
builder.make_dataset_urn(
"s3",
s3_name,
self.source_config.env,
)
)
if "DataCatalogConfig" in feature_group_details["OfflineStoreConfig"]:
# if Glue catalog associated with offline store
glue_database = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["Database"]
glue_table = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["TableName"]
full_table_name = f"{glue_database}.{glue_table}"
self.report.report_warning(
full_table_name,
f"""Note: table {full_table_name} is an AWS Glue object.
To view full table metadata, run Glue ingestion
(see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""",
)
feature_sources.append(
f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.source_config.env})"
)
# note that there's also an OnlineStoreConfig field, but this
# lacks enough metadata to create a dataset
# (only specifies the security config and whether it's enabled at all)
# append feature name and type
if is_record_identifier:
primary_key_snapshot: MLPrimaryKeySnapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLPrimaryKeyPropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
),
],
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=primary_key_snapshot)
else:
# create snapshot instance for the feature
feature_snapshot: MLFeatureSnapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLFeaturePropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
)
],
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
return MetadataWorkUnit(
id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}',
mce=mce,
)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
feature_groups = self.get_all_feature_groups()
# extract feature groups if specified
if self.source_config.extract_feature_groups:
for feature_group in feature_groups:
feature_group_details = self.get_feature_group_details(
feature_group["FeatureGroupName"]
feature_group_processor = FeatureGroupProcessor(
sagemaker_client=self.sagemaker_client, env=self.env, report=self.report
)
yield from feature_group_processor.get_workunits()
for feature in feature_group_details["FeatureDefinitions"]:
wu = self.get_feature_wu(feature_group_details, feature)
self.report.report_workunit(wu)
yield wu
# extract models if specified
if self.source_config.extract_models:
wu = self.get_feature_group_wu(feature_group_details)
self.report.report_workunit(wu)
yield wu
model_processor = ModelProcessor(
sagemaker_client=self.sagemaker_client, env=self.env, report=self.report
)
yield from model_processor.get_workunits()
# extract jobs if specified
if self.source_config.extract_jobs is not False:
job_processor = JobProcessor(
sagemaker_client=self.sagemaker_client,
env=self.env,
report=self.report,
job_type_filter=self.source_config.extract_jobs,
)
yield from job_processor.get_workunits()
def get_report(self):
return self.report

View File

@ -0,0 +1,40 @@
from dataclasses import dataclass
from typing import Dict, Optional, Union
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.aws_common import AwsSourceConfig
class SagemakerSourceConfig(AwsSourceConfig):
extract_feature_groups: Optional[bool] = True
extract_models: Optional[bool] = True
extract_jobs: Optional[Union[Dict[str, str], bool]] = True
@property
def sagemaker_client(self):
return self.get_client("sagemaker")
@dataclass
class SagemakerSourceReport(SourceReport):
feature_groups_scanned = 0
features_scanned = 0
models_scanned = 0
jobs_scanned = 0
datasets_scanned = 0
def report_feature_group_scanned(self) -> None:
self.feature_groups_scanned += 1
def report_feature_scanned(self) -> None:
self.features_scanned += 1
def report_model_scanned(self) -> None:
self.models_scanned += 1
def report_job_scanned(self) -> None:
self.jobs_scanned += 1
def report_dataset_scanned(self) -> None:
self.datasets_scanned += 1

View File

@ -0,0 +1,268 @@
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
MLFeatureTableSnapshot,
MLPrimaryKeySnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
)
@dataclass
class FeatureGroupProcessor:
sagemaker_client: Any
env: str
report: SagemakerSourceReport
def get_all_feature_groups(self) -> List[Dict[str, Any]]:
"""
List all feature groups in SageMaker.
"""
feature_groups = []
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_feature_groups
paginator = self.sagemaker_client.get_paginator("list_feature_groups")
for page in paginator.paginate():
feature_groups += page["FeatureGroupSummaries"]
return feature_groups
def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
"""
Get details of a feature group (including list of component features).
"""
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_feature_group
feature_group = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name
)
# use falsy fallback since AWS stubs require this to be a string in tests
next_token = feature_group.get("NextToken", "")
# paginate over feature group features
while next_token:
next_features = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name, NextToken=next_token
)
feature_group["FeatureDefinitions"].append(
next_features["FeatureDefinitions"]
)
next_token = feature_group.get("NextToken", "")
return feature_group
def get_feature_group_wu(
self, feature_group_details: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a SageMaker feature group.
Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
"""
feature_group_name = feature_group_details["FeatureGroupName"]
feature_group_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("sagemaker", feature_group_name),
aspects=[
BrowsePathsClass(paths=[f"sagemaker/{feature_group_name}"]),
],
)
feature_group_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
description=feature_group_details.get("Description"),
# non-primary key features
mlFeatures=[
builder.make_ml_feature_urn(
feature_group_name,
feature["FeatureName"],
)
for feature in feature_group_details["FeatureDefinitions"]
if feature["FeatureName"]
!= feature_group_details["RecordIdentifierFeatureName"]
],
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
feature_group_name,
feature_group_details["RecordIdentifierFeatureName"],
)
],
# additional metadata
customProperties={
"arn": feature_group_details["FeatureGroupArn"],
"creation_time": str(feature_group_details["CreationTime"]),
"status": feature_group_details["FeatureGroupStatus"],
},
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot)
return MetadataWorkUnit(id=feature_group_name, mce=mce)
field_type_mappings = {
"String": MLFeatureDataType.TEXT,
"Integral": MLFeatureDataType.ORDINAL,
"Fractional": MLFeatureDataType.CONTINUOUS,
}
def get_feature_type(self, aws_type: str, feature_name: str) -> str:
mapped_type = self.field_type_mappings.get(aws_type)
if mapped_type is None:
self.report.report_warning(
feature_name, f"unable to map type {aws_type} to metadata schema"
)
mapped_type = MLFeatureDataType.UNKNOWN
return mapped_type
def get_feature_wu(
self, feature_group_details: Dict[str, Any], feature: Dict[str, Any]
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a SageMaker feature.
Parameters
----------
feature_group_details:
ingested SageMaker feature group from get_feature_group_details()
feature:
ingested SageMaker feature
"""
# if the feature acts as the record identifier, then we ingest it as an MLPrimaryKey
# the RecordIdentifierFeatureName is guaranteed to exist as it's required on creation
is_record_identifier = (
feature_group_details["RecordIdentifierFeatureName"]
== feature["FeatureName"]
)
feature_sources = []
if "OfflineStoreConfig" in feature_group_details:
# remove S3 prefix (s3://)
s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"][
"S3Uri"
][5:]
if s3_name.endswith("/"):
s3_name = s3_name[:-1]
feature_sources.append(
builder.make_dataset_urn(
"s3",
s3_name,
self.env,
)
)
if "DataCatalogConfig" in feature_group_details["OfflineStoreConfig"]:
# if Glue catalog associated with offline store
glue_database = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["Database"]
glue_table = feature_group_details["OfflineStoreConfig"][
"DataCatalogConfig"
]["TableName"]
full_table_name = f"{glue_database}.{glue_table}"
self.report.report_warning(
full_table_name,
f"""Note: table {full_table_name} is an AWS Glue object.
To view full table metadata, run Glue ingestion
(see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""",
)
feature_sources.append(
f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})"
)
# note that there's also an OnlineStoreConfig field, but this
# lacks enough metadata to create a dataset
# (only specifies the security config and whether it's enabled at all)
# append feature name and type
if is_record_identifier:
primary_key_snapshot: MLPrimaryKeySnapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLPrimaryKeyPropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
),
],
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=primary_key_snapshot)
else:
# create snapshot instance for the feature
feature_snapshot: MLFeatureSnapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
feature_group_details["FeatureGroupName"],
feature["FeatureName"],
),
aspects=[
MLFeaturePropertiesClass(
dataType=self.get_feature_type(
feature["FeatureType"], feature["FeatureName"]
),
sources=feature_sources,
)
],
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
return MetadataWorkUnit(
id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}',
mce=mce,
)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
feature_groups = self.get_all_feature_groups()
for feature_group in feature_groups:
feature_group_details = self.get_feature_group_details(
feature_group["FeatureGroupName"]
)
for feature in feature_group_details["FeatureDefinitions"]:
self.report.report_feature_scanned()
wu = self.get_feature_wu(feature_group_details, feature)
self.report.report_workunit(wu)
yield wu
self.report.report_feature_group_scanned()
wu = self.get_feature_group_wu(feature_group_details)
self.report.report_workunit(wu)
yield wu

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,86 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sagemaker_processors.common import SagemakerSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import MLModelSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import MLModelPropertiesClass
@dataclass
class ModelProcessor:
sagemaker_client: Any
env: str
report: SagemakerSourceReport
def get_all_models(self) -> List[Dict[str, Any]]:
"""
List all models in SageMaker.
"""
models = []
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_models
paginator = self.sagemaker_client.get_paginator("list_models")
for page in paginator.paginate():
models += page["Models"]
return models
def get_model_details(self, model_name: str) -> Dict[str, Any]:
"""
Get details of a model.
"""
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_model
return self.sagemaker_client.describe_model(ModelName=model_name)
def get_model_wu(self, model_details: Dict[str, Any]) -> MetadataWorkUnit:
# params to remove since we extract them
redundant_fields = {"ModelName", "CreationTime"}
model_snapshot = MLModelSnapshot(
urn=builder.make_ml_model_urn(
"sagemaker", model_details["ModelName"], self.env
),
aspects=[
MLModelPropertiesClass(
date=int(
model_details.get("CreationTime", datetime.now()).timestamp()
* 1000
),
customProperties={
key: str(value)
for key, value in model_details.items()
if key not in redundant_fields
},
)
],
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=model_snapshot)
return MetadataWorkUnit(
id=f'{model_details["ModelName"]}',
mce=mce,
)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
models = self.get_all_models()
# sort models for consistency
models = sorted(models, key=lambda x: x["ModelArn"])
for model in models:
model_details = self.get_model_details(model["ModelName"])
self.report.report_model_scanned()
wu = self.get_model_wu(model_details)
self.report.report_workunit(wu)
yield wu

View File

@ -9,6 +9,7 @@ from .....schema_classes import DataJobInfoClass
from .....schema_classes import DataJobInputOutputClass
from .....schema_classes import EditableDataFlowPropertiesClass
from .....schema_classes import EditableDataJobPropertiesClass
from .....schema_classes import JobStatusClass
DataFlowInfo = DataFlowInfoClass
@ -16,4 +17,5 @@ DataJobInfo = DataJobInfoClass
DataJobInputOutput = DataJobInputOutputClass
EditableDataFlowProperties = EditableDataFlowPropertiesClass
EditableDataJobProperties = EditableDataJobPropertiesClass
JobStatus = JobStatusClass
# fmt: on

View File

@ -1607,10 +1607,11 @@
"GLUE"
],
"doc": "The various types of support azkaban jobs"
}
},
"string"
],
"name": "type",
"doc": "Datajob type"
"doc": "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
},
{
"Relationship": {
@ -1629,6 +1630,38 @@
"name": "flowUrn",
"default": null,
"doc": "DataFlow urn that this job is part of"
},
{
"type": [
"null",
{
"type": "enum",
"symbolDocs": {
"COMPLETED": "Jobs with successful completion.",
"FAILED": "Jobs that have failed.",
"IN_PROGRESS": "Jobs currently running.",
"STARTING": "Jobs being initialized.",
"STOPPED": "Jobs that have stopped.",
"STOPPING": "Jobs being stopped.",
"UNKNOWN": "Jobs with unknown status (either unmappable or unavailable)"
},
"name": "JobStatus",
"namespace": "com.linkedin.pegasus2avro.datajob",
"symbols": [
"STARTING",
"IN_PROGRESS",
"STOPPING",
"STOPPED",
"COMPLETED",
"FAILED",
"UNKNOWN"
],
"doc": "Job statuses"
}
],
"name": "status",
"default": null,
"doc": "Status of the job"
}
],
"doc": "Information about a Data processing job"
@ -3255,6 +3288,15 @@
"name": "MLModelProperties",
"namespace": "com.linkedin.pegasus2avro.ml.metadata",
"fields": [
{
"type": {
"type": "map",
"values": "string"
},
"name": "customProperties",
"default": {},
"doc": "Custom property bag."
},
{
"Searchable": {
"fieldType": "TEXT",
@ -4355,7 +4397,8 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.InstitutionalMemory",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.Deprecation"
"com.linkedin.pegasus2avro.common.Deprecation",
"com.linkedin.pegasus2avro.common.BrowsePaths"
]
},
"name": "aspects",

View File

@ -1873,11 +1873,12 @@ class DataJobInfoClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.datajob.DataJobInfo")
def __init__(self,
name: str,
type: Union[str, "AzkabanJobTypeClass"],
type: Union[Union[str, "AzkabanJobTypeClass"], str],
customProperties: Optional[Dict[str, str]]=None,
externalUrl: Union[None, str]=None,
description: Union[None, str]=None,
flowUrn: Union[None, str]=None,
status: Union[None, Union[str, "JobStatusClass"]]=None,
):
super().__init__()
@ -1891,6 +1892,7 @@ class DataJobInfoClass(DictWrapper):
self.description = description
self.type = type
self.flowUrn = flowUrn
self.status = status
@classmethod
def construct_with_defaults(cls) -> "DataJobInfoClass":
@ -1906,6 +1908,7 @@ class DataJobInfoClass(DictWrapper):
self.description = self.RECORD_SCHEMA.field_map["description"].default
self.type = AzkabanJobTypeClass.COMMAND
self.flowUrn = self.RECORD_SCHEMA.field_map["flowUrn"].default
self.status = self.RECORD_SCHEMA.field_map["status"].default
@property
@ -1953,13 +1956,15 @@ class DataJobInfoClass(DictWrapper):
@property
def type(self) -> Union[str, "AzkabanJobTypeClass"]:
"""Getter: Datajob type"""
def type(self) -> Union[Union[str, "AzkabanJobTypeClass"], str]:
"""Getter: Datajob type
**NOTE**: AzkabanJobType is deprecated. Please use strings instead."""
return self._inner_dict.get('type') # type: ignore
@type.setter
def type(self, value: Union[str, "AzkabanJobTypeClass"]) -> None:
"""Setter: Datajob type"""
def type(self, value: Union[Union[str, "AzkabanJobTypeClass"], str]) -> None:
"""Setter: Datajob type
**NOTE**: AzkabanJobType is deprecated. Please use strings instead."""
self._inner_dict['type'] = value
@ -1974,6 +1979,17 @@ class DataJobInfoClass(DictWrapper):
self._inner_dict['flowUrn'] = value
@property
def status(self) -> Union[None, Union[str, "JobStatusClass"]]:
"""Getter: Status of the job"""
return self._inner_dict.get('status') # type: ignore
@status.setter
def status(self, value: Union[None, Union[str, "JobStatusClass"]]) -> None:
"""Setter: Status of the job"""
self._inner_dict['status'] = value
class DataJobInputOutputClass(DictWrapper):
"""Information about the inputs and outputs of a Data processing job"""
@ -2203,6 +2219,32 @@ class EditableDataJobPropertiesClass(DictWrapper):
self._inner_dict['description'] = value
class JobStatusClass(object):
"""Job statuses"""
"""Jobs being initialized."""
STARTING = "STARTING"
"""Jobs currently running."""
IN_PROGRESS = "IN_PROGRESS"
"""Jobs being stopped."""
STOPPING = "STOPPING"
"""Jobs that have stopped."""
STOPPED = "STOPPED"
"""Jobs with successful completion."""
COMPLETED = "COMPLETED"
"""Jobs that have failed."""
FAILED = "FAILED"
"""Jobs with unknown status (either unmappable or unavailable)"""
UNKNOWN = "UNKNOWN"
class AzkabanJobTypeClass(object):
"""The various types of support azkaban jobs"""
@ -4663,7 +4705,7 @@ class MLFeatureTableSnapshotClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot")
def __init__(self,
urn: str,
aspects: List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass"]],
aspects: List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass", "BrowsePathsClass"]],
):
super().__init__()
@ -4694,12 +4736,12 @@ class MLFeatureTableSnapshotClass(DictWrapper):
@property
def aspects(self) -> List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass"]]:
def aspects(self) -> List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass", "BrowsePathsClass"]]:
"""Getter: The list of metadata aspects associated with the MLFeatureTable. Depending on the use case, this can either be all, or a selection, of supported aspects."""
return self._inner_dict.get('aspects') # type: ignore
@aspects.setter
def aspects(self, value: List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass"]]) -> None:
def aspects(self, value: List[Union["MLFeatureTableKeyClass", "MLFeatureTablePropertiesClass", "OwnershipClass", "InstitutionalMemoryClass", "StatusClass", "DeprecationClass", "BrowsePathsClass"]]) -> None:
"""Setter: The list of metadata aspects associated with the MLFeatureTable. Depending on the use case, this can either be all, or a selection, of supported aspects."""
self._inner_dict['aspects'] = value
@ -5494,6 +5536,7 @@ class MLModelPropertiesClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.ml.metadata.MLModelProperties")
def __init__(self,
customProperties: Optional[Dict[str, str]]=None,
description: Union[None, str]=None,
date: Union[None, int]=None,
version: Union[None, "VersionTagClass"]=None,
@ -5504,6 +5547,11 @@ class MLModelPropertiesClass(DictWrapper):
):
super().__init__()
if customProperties is None:
# default: {}
self.customProperties = dict()
else:
self.customProperties = customProperties
self.description = description
self.date = date
self.version = version
@ -5524,6 +5572,7 @@ class MLModelPropertiesClass(DictWrapper):
return self
def _restore_defaults(self) -> None:
self.customProperties = dict()
self.description = self.RECORD_SCHEMA.field_map["description"].default
self.date = self.RECORD_SCHEMA.field_map["date"].default
self.version = self.RECORD_SCHEMA.field_map["version"].default
@ -5533,6 +5582,17 @@ class MLModelPropertiesClass(DictWrapper):
self.tags = list()
@property
def customProperties(self) -> Dict[str, str]:
"""Getter: Custom property bag."""
return self._inner_dict.get('customProperties') # type: ignore
@customProperties.setter
def customProperties(self, value: Dict[str, str]) -> None:
"""Setter: Custom property bag."""
self._inner_dict['customProperties'] = value
@property
def description(self) -> Union[None, str]:
"""Getter: Documentation of the MLModel"""
@ -7708,6 +7768,7 @@ __SCHEMA_TYPES = {
'com.linkedin.pegasus2avro.datajob.DataJobInputOutput': DataJobInputOutputClass,
'com.linkedin.pegasus2avro.datajob.EditableDataFlowProperties': EditableDataFlowPropertiesClass,
'com.linkedin.pegasus2avro.datajob.EditableDataJobProperties': EditableDataJobPropertiesClass,
'com.linkedin.pegasus2avro.datajob.JobStatus': JobStatusClass,
'com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType': AzkabanJobTypeClass,
'com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo': DataPlatformInfoClass,
'com.linkedin.pegasus2avro.dataplatform.PlatformType': PlatformTypeClass,
@ -7854,6 +7915,7 @@ __SCHEMA_TYPES = {
'DataJobInputOutput': DataJobInputOutputClass,
'EditableDataFlowProperties': EditableDataFlowPropertiesClass,
'EditableDataJobProperties': EditableDataJobPropertiesClass,
'JobStatus': JobStatusClass,
'AzkabanJobType': AzkabanJobTypeClass,
'DataPlatformInfo': DataPlatformInfoClass,
'PlatformType': PlatformTypeClass,

View File

@ -1596,9 +1596,10 @@
"PIG": "Pig type is for running Pig jobs.",
"SQL": "SQL is for running Presto, mysql queries etc"
}
}
},
"string"
],
"doc": "Datajob type"
"doc": "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
},
{
"name": "flowUrn",
@ -1617,6 +1618,37 @@
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.DataFlowUrn"
}
},
{
"name": "status",
"type": [
"null",
{
"type": "enum",
"name": "JobStatus",
"doc": "Job statuses",
"symbols": [
"STARTING",
"IN_PROGRESS",
"STOPPING",
"STOPPED",
"COMPLETED",
"FAILED",
"UNKNOWN"
],
"symbolDocs": {
"COMPLETED": "Jobs with successful completion.",
"FAILED": "Jobs that have failed.",
"IN_PROGRESS": "Jobs currently running.",
"STARTING": "Jobs being initialized.",
"STOPPED": "Jobs that have stopped.",
"STOPPING": "Jobs being stopped.",
"UNKNOWN": "Jobs with unknown status (either unmappable or unavailable)"
}
}
],
"doc": "Status of the job",
"default": null
}
],
"Aspect": {
@ -3208,6 +3240,15 @@
"namespace": "com.linkedin.pegasus2avro.ml.metadata",
"doc": "Properties associated with a ML Model",
"fields": [
{
"name": "customProperties",
"type": {
"type": "map",
"values": "string"
},
"doc": "Custom property bag.",
"default": {}
},
{
"name": "description",
"type": [
@ -4300,7 +4341,8 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.InstitutionalMemory",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.Deprecation"
"com.linkedin.pegasus2avro.common.Deprecation",
"com.linkedin.pegasus2avro.common.BrowsePaths"
]
},
"doc": "The list of metadata aspects associated with the MLFeatureTable. Depending on the use case, this can either be all, or a selection, of supported aspects."

View File

@ -1595,9 +1595,10 @@
"PIG": "Pig type is for running Pig jobs.",
"SQL": "SQL is for running Presto, mysql queries etc"
}
}
},
"string"
],
"doc": "Datajob type"
"doc": "Datajob type\n**NOTE**: AzkabanJobType is deprecated. Please use strings instead."
},
{
"name": "flowUrn",
@ -1616,6 +1617,37 @@
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.DataFlowUrn"
}
},
{
"name": "status",
"type": [
"null",
{
"type": "enum",
"name": "JobStatus",
"doc": "Job statuses",
"symbols": [
"STARTING",
"IN_PROGRESS",
"STOPPING",
"STOPPED",
"COMPLETED",
"FAILED",
"UNKNOWN"
],
"symbolDocs": {
"COMPLETED": "Jobs with successful completion.",
"FAILED": "Jobs that have failed.",
"IN_PROGRESS": "Jobs currently running.",
"STARTING": "Jobs being initialized.",
"STOPPED": "Jobs that have stopped.",
"STOPPING": "Jobs being stopped.",
"UNKNOWN": "Jobs with unknown status (either unmappable or unavailable)"
}
}
],
"doc": "Status of the job",
"default": null
}
],
"Aspect": {
@ -3207,6 +3239,15 @@
"namespace": "com.linkedin.pegasus2avro.ml.metadata",
"doc": "Properties associated with a ML Model",
"fields": [
{
"name": "customProperties",
"type": {
"type": "map",
"values": "string"
},
"doc": "Custom property bag.",
"default": {}
},
{
"name": "description",
"type": [
@ -4299,7 +4340,8 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.InstitutionalMemory",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.Deprecation"
"com.linkedin.pegasus2avro.common.Deprecation",
"com.linkedin.pegasus2avro.common.BrowsePaths"
]
},
"doc": "The list of metadata aspects associated with the MLFeatureTable. Depending on the use case, this can either be all, or a selection, of supported aspects."

View File

@ -523,9 +523,10 @@
"name": "test-job-2:Filter-Transform0",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -560,9 +561,10 @@
"name": "test-job-2:ApplyMapping-Transform1",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -597,9 +599,10 @@
"name": "test-job-2:ApplyMapping-Transform2",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -635,9 +638,10 @@
"name": "test-job-2:Join-Transform3",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -672,9 +676,10 @@
"name": "test-job-2:ApplyMapping-Transform4",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -707,9 +712,10 @@
"name": "test-job-2:ApplyMapping-Transform5",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -774,9 +780,10 @@
"name": "test-job-2:SplitFields-Transform0",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -811,9 +818,10 @@
"name": "test-job-2:ApplyMapping-Transform1",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -848,9 +856,10 @@
"name": "test-job-2:FillMissingValues-Transform2",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{
@ -885,9 +894,10 @@
"name": "test-job-2:SelectFields-Transform3",
"description": null,
"type": {
"com.linkedin.pegasus2avro.datajob.azkaban.AzkabanJobType": "GLUE"
"string": "GLUE"
},
"flowUrn": null
"flowUrn": null,
"status": null
}
},
{

View File

@ -5,12 +5,17 @@ from freezegun import freeze_time
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sagemaker import SagemakerSource, SagemakerSourceConfig
from datahub.ingestion.source.sagemaker_processors.jobs import SAGEMAKER_JOB_TYPES
from tests.test_helpers import mce_helpers
from tests.unit.test_sagemaker_source_stubs import (
describe_feature_group_response_1,
describe_feature_group_response_2,
describe_feature_group_response_3,
describe_model_response_1,
describe_model_response_2,
job_stubs,
list_feature_groups_response,
list_models_response,
)
FROZEN_TIME = "2020-04-14 07:00:00"
@ -57,6 +62,44 @@ def test_sagemaker_ingest(tmp_path, pytestconfig):
},
)
sagemaker_stubber.add_response(
"list_models",
list_models_response,
{},
)
sagemaker_stubber.add_response(
"describe_model",
describe_model_response_1,
{"ModelName": "the-first-model"},
)
sagemaker_stubber.add_response(
"describe_model",
describe_model_response_2,
{"ModelName": "the-second-model"},
)
for job_type, job in job_stubs.items():
job_info = SAGEMAKER_JOB_TYPES[job_type]
sagemaker_stubber.add_response(
job_info.list_command,
job["list"],
{},
)
for job_type, job in job_stubs.items():
job_info = SAGEMAKER_JOB_TYPES[job_type]
sagemaker_stubber.add_response(
job_info.describe_command,
job["describe"],
{job_info.describe_name_key: job["describe_name"]},
)
mce_objects = [
wu.mce.to_obj() for wu in sagemaker_source_instance.get_workunits()
]

View File

@ -186,9 +186,7 @@ basicAuditStamp = models.AuditStampClass(
"customProperties": {},
"name": "User Deletions",
"description": "Constructs the fct_users_deleted from logging_events",
"type": {
"com.linkedin.datajob.azkaban.AzkabanJobType": "SQL"
},
"type": {"string": "SQL"},
}
}
],

File diff suppressed because it is too large Load Diff

View File

@ -34,8 +34,9 @@ record DataJobInfo includes CustomProperties, ExternalReference {
/**
* Datajob type
* **NOTE**: AzkabanJobType is deprecated. Please use strings instead.
*/
type: union[AzkabanJobType]
type: union[AzkabanJobType, string]
/**
* DataFlow urn that this job is part of
@ -45,4 +46,9 @@ record DataJobInfo includes CustomProperties, ExternalReference {
"entityTypes": [ "dataFlow" ]
}
flowUrn: optional DataFlowUrn
/**
* Status of the job
*/
status: optional JobStatus
}

View File

@ -0,0 +1,42 @@
namespace com.linkedin.datajob
/**
* Job statuses
*/
enum JobStatus {
/**
* Jobs being initialized.
*/
STARTING
/**
* Jobs currently running.
*/
IN_PROGRESS
/**
* Jobs being stopped.
*/
STOPPING
/**
* Jobs that have stopped.
*/
STOPPED
/**
* Jobs with successful completion.
*/
COMPLETED
/**
* Jobs that have failed.
*/
FAILED
/**
* Jobs with unknown status (either unmappable or unavailable)
*/
UNKNOWN
}

View File

@ -6,6 +6,7 @@ import com.linkedin.common.Ownership
import com.linkedin.common.Status
import com.linkedin.ml.metadata.MLFeatureTableProperties
import com.linkedin.common.Deprecation
import com.linkedin.common.BrowsePaths
/**
* A union of all supported metadata aspects for a MLFeatureTable
@ -16,5 +17,6 @@ typeref MLFeatureTableAspect = union[
Ownership,
InstitutionalMemory,
Status,
Deprecation
Deprecation,
BrowsePaths
]

View File

@ -3,6 +3,7 @@ namespace com.linkedin.ml.metadata
import com.linkedin.common.MLFeatureUrn
import com.linkedin.common.Time
import com.linkedin.common.VersionTag
import com.linkedin.common.CustomProperties
/**
* Properties associated with a ML Model
@ -10,7 +11,7 @@ import com.linkedin.common.VersionTag
@Aspect = {
"name": "mlModelProperties"
}
record MLModelProperties {
record MLModelProperties includes CustomProperties {
/**
* Documentation of the MLModel