fix(ingestion): use default generate_browse_path_v2 even if no pipeline_config (#13117)

This commit is contained in:
Sergio Gómez Villamor 2025-04-23 13:25:58 +02:00 committed by GitHub
parent 64829a3279
commit 1563b0e9fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 924 additions and 59 deletions

View File

@ -420,12 +420,9 @@ class Source(Closeable, metaclass=ABCMeta):
Run in order, first in list is applied first. Be careful with order when overriding.
"""
browse_path_processor: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.generate_browse_path_v2
):
if self.ctx.flags.generate_browse_path_v2:
browse_path_processor = self._get_browse_path_processor(
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
self.ctx.flags.generate_browse_path_v2_dry_run
)
auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None

View File

@ -19,7 +19,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402207,
"lastObserved": 1744091634014,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -35,7 +35,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402207,
"lastObserved": 1744091634014,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -52,7 +52,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402207,
"lastObserved": 1744091634014,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -70,7 +70,28 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402207,
"lastObserved": 1744091634014,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1744091634014,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -1543,7 +1564,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402228,
"lastObserved": 1744091634026,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -1561,7 +1582,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402228,
"lastObserved": 1744091634026,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -1577,7 +1598,32 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402228,
"lastObserved": 1744091634026,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)"
},
{
"id": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a",
"urn": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a"
}
]
}
},
"systemMetadata": {
"lastObserved": 1744091634027,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}

View File

@ -19,7 +19,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402490,
"lastObserved": 1744091635205,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -35,7 +35,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402490,
"lastObserved": 1744091635205,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -52,7 +52,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402491,
"lastObserved": 1744091635205,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -70,7 +70,28 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402491,
"lastObserved": 1744091635205,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1744091635205,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -123,7 +144,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402491,
"lastObserved": 1744091635206,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -141,7 +162,7 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402491,
"lastObserved": 1744091635206,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
@ -157,7 +178,32 @@
}
},
"systemMetadata": {
"lastObserved": 1743454402491,
"lastObserved": 1744091635206,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,delta_platform_instance)"
},
{
"id": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a",
"urn": "urn:li:container:275c7ea5ecf956fd8d45e14228757a8a"
}
]
}
},
"systemMetadata": {
"lastObserved": 1744091635206,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}

View File

@ -76,6 +76,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
@ -150,6 +166,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7",
@ -224,6 +256,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -467,6 +515,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"urn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -667,6 +736,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"urn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -868,6 +958,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"urn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {

View File

@ -76,6 +76,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -326,5 +342,26 @@
"runId": "glue-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-profiling.avro-profiling,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"urn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -76,6 +76,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -410,5 +426,26 @@
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"urn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -76,6 +76,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
@ -150,6 +166,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7",
@ -224,6 +256,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -492,6 +540,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"urn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -717,6 +786,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"urn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -943,6 +1033,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"urn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {

View File

@ -78,6 +78,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:7d53111f2c71396ea6f6d26c84770665",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca",
@ -154,6 +175,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ac4381240e82d55400c22e4392e744a4",
@ -230,6 +272,27 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:ac4381240e82d55400c22e4392e744a4",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -474,6 +537,31 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,some_instance_name.flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
},
{
"id": "urn:li:container:7d53111f2c71396ea6f6d26c84770665",
"urn": "urn:li:container:7d53111f2c71396ea6f6d26c84770665"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -675,6 +763,31 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,some_instance_name.test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
},
{
"id": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca",
"urn": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -877,6 +990,31 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,some_instance_name.test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
},
{
"id": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca",
"urn": "urn:li:container:9fb26491b2c92dde9e80791dbecca9ca"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-source-tes",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {

View File

@ -6,6 +6,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "TEXT",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)",
@ -29,6 +30,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
"customProperties": {},
"dataType": "ORDINAL",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)",
@ -52,6 +54,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "CONTINUOUS",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)",
@ -106,6 +109,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlFeatureTable",
"entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test-2)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
@ -134,6 +153,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
"customProperties": {},
"dataType": "ORDINAL",
"sources": []
}
@ -154,6 +174,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "CONTINUOUS",
"sources": []
}
@ -174,6 +195,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "TEXT",
"sources": []
}
@ -226,6 +248,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlFeatureTable",
"entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test-1)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": {
@ -233,6 +271,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
"customProperties": {},
"dataType": "TEXT",
"sources": []
}
@ -253,6 +292,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "ORDINAL",
"sources": []
}
@ -273,6 +313,7 @@
"aspects": [
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"customProperties": {},
"dataType": "CONTINUOUS",
"sources": []
}
@ -323,6 +364,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlFeatureTable",
"entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
@ -860,6 +917,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,auto_ml:an-auto-ml-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:auto-ml-job/an-auto-ml-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "auto_ml"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -941,6 +1018,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,compilation:a-compilation-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:compilation-job/a-compilation-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "compilation"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -1017,6 +1114,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,hyper_parameter_tuning:a-hyper-parameter-tuning-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:hyper-parameter-tuning-job/a-hyper-parameter-tuning-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "hyper_parameter_tuning"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -1102,6 +1219,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,labeling:a-labeling-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:labeling-job/a-labeling-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "labeling"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -1189,6 +1326,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,processing:a-processing-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:processing-job/a-processing-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "processing"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -1300,6 +1457,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,training:a-training-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:training-job/a-training-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "training"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
@ -1388,6 +1565,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(sagemaker,transform:a-transform-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:transform-job/a-transform-job)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "transform"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLModelDeploymentSnapshot": {
@ -1498,6 +1695,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlModelGroup",
"entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,a-model-package-group,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLModelSnapshot": {
@ -1515,6 +1728,9 @@
"EnableNetworkIsolation": "True"
},
"externalUrl": "https://us-west-2.console.aws.amazon.com/sagemaker/home?region=us-west-2#/models/the-first-model",
"trainingJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,training:a-training-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:training-job/a-training-job)"
],
"downstreamJobs": [],
"date": 1420070400000,
"hyperParams": [
@ -1541,9 +1757,6 @@
"deployments": [
"urn:li:mlModelDeployment:(urn:li:dataPlatform:sagemaker,arn:aws:sagemaker:us-west-2:123412341234:endpoint/the-first-endpoint,PROD)"
],
"trainingJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,training:a-training-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:training-job/a-training-job)"
],
"groups": [
"urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,a-model-package-group,PROD)"
]
@ -1565,6 +1778,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlModel",
"entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:sagemaker,the-first-model,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "a-model-package-group"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLModelSnapshot": {
@ -1582,6 +1815,12 @@
"EnableNetworkIsolation": "False"
},
"externalUrl": "https://us-west-2.console.aws.amazon.com/sagemaker/home?region=us-west-2#/models/the-second-model",
"trainingJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,training:a-training-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:training-job/a-training-job)"
],
"downstreamJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,transform:a-transform-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:transform-job/a-transform-job)"
],
"date": 1420070400000,
"hyperParams": [
{
@ -1607,12 +1846,6 @@
"deployments": [
"urn:li:mlModelDeployment:(urn:li:dataPlatform:sagemaker,arn:aws:sagemaker:us-west-2:123412341234:endpoint/the-second-endpoint,PROD)"
],
"trainingJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,training:a-training-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:training-job/a-training-job)"
],
"downstreamJobs": [
"urn:li:dataJob:(urn:li:dataFlow:(sagemaker,transform:a-transform-job,PROD),arn:aws:sagemaker:us-west-2:123412341234:transform-job/a-transform-job)"
],
"groups": [
"urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,a-model-package-group,PROD)"
]
@ -1634,6 +1867,26 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "mlModel",
"entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:sagemaker,the-second-model,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "a-model-package-group"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "sagemaker-source-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(sagemaker,auto_ml:an-auto-ml-job,PROD)",

View File

@ -74,7 +74,7 @@ def test_kafka_source_workunits_wildcard_topic(mock_kafka, mock_admin_client):
assert isinstance(first_mce, MetadataChangeEvent)
mock_kafka.assert_called_once()
mock_kafka_instance.list_topics.assert_called_once()
assert len(workunits) == 4
assert len(workunits) == 6
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
@ -96,7 +96,7 @@ def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client):
mock_kafka.assert_called_once()
mock_kafka_instance.list_topics.assert_called_once()
assert len(workunits) == 2
assert len(workunits) == 3
mock_cluster_metadata.topics = {"test": None, "test2": None, "bazbaz": None}
ctx = PipelineContext(run_id="test2")
@ -108,7 +108,7 @@ def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client):
ctx,
)
workunits = [w for w in kafka_source.get_workunits()]
assert len(workunits) == 4
assert len(workunits) == 6
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
@ -132,8 +132,8 @@ def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_cl
)
workunits = [w for w in kafka_source.get_workunits()]
# We should only have 1 topic + sub-type wu.
assert len(workunits) == 2
# We should only have 1 topic + sub-type wu + browse paths.
assert len(workunits) == 3
assert isinstance(workunits[0], MetadataWorkUnit)
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
proposed_snap = workunits[0].metadata.proposedSnapshot
@ -180,8 +180,8 @@ def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_clie
)
workunits = [w for w in kafka_source.get_workunits()]
# We should only have 1 topic + sub-type wu.
assert len(workunits) == 2
# We should only have 1 topic + sub-type wu + browse paths.
assert len(workunits) == 3
assert isinstance(workunits[0], MetadataWorkUnit)
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
proposed_snap = workunits[0].metadata.proposedSnapshot
@ -338,9 +338,10 @@ def test_kafka_source_workunits_schema_registry_subject_name_strategies(
mock_kafka_consumer.assert_called_once()
mock_kafka_instance.list_topics.assert_called_once()
# Along with with 4 topics(3 with schema and 1 schemaless) which constitutes to 8 workunits,
# Along with with 4 topics (3 with schema and 1 schemaless) which constitutes to 8 workunits,
# there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits
assert len(workunits) == 20
# and there will be 10 browse paths workunits
assert len(workunits) == 30
i: int = -1
for wu in workunits:
assert isinstance(wu, MetadataWorkUnit)
@ -479,7 +480,7 @@ def test_kafka_ignore_warnings_on_schema_type(
kafka_source = KafkaSource.create(source_config, ctx)
workunits = list(kafka_source.get_workunits())
assert len(workunits) == 2
assert len(workunits) == 3
if ignore_warnings_on_schema_type:
assert not kafka_source.report.warnings
else:
@ -513,7 +514,7 @@ def test_kafka_source_succeeds_with_admin_client_init_error(
mock_kafka_admin_client.assert_called_once()
assert len(workunits) == 2
assert len(workunits) == 3
@patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True)
@ -545,7 +546,7 @@ def test_kafka_source_succeeds_with_describe_configs_error(
mock_kafka_admin_client.assert_called_once()
mock_admin_client_instance.describe_configs.assert_called_once()
assert len(workunits) == 2
assert len(workunits) == 3
@freeze_time("2023-09-20 10:00:00")
@ -658,10 +659,13 @@ def test_kafka_source_topic_meta_mappings(
},
ctx,
)
# Along with with 1 topics(and 5 meta mapping) it constitutes to 6 workunits,
# Along with with 1 topics (and 5 meta mapping) it constitutes to 6 workunits,
# there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each)
workunits = [w for w in kafka_source.get_workunits()]
assert len(workunits) == 10
assert len(workunits) == 13
# workunit[0] - DatasetSnapshot
mce = workunits[0].metadata
assert isinstance(mce, MetadataChangeEvent)
@ -694,12 +698,35 @@ def test_kafka_source_topic_meta_mappings(
"urn:li:glossaryTerm:double_meta_property",
]
)
assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper)
mce = workunits[2].metadata
assert isinstance(mce, MetadataChangeEvent)
assert isinstance(workunits[3].metadata, MetadataChangeProposalWrapper)
mce = workunits[4].metadata
# workunit[1] - subtypes
assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper)
assert workunits[1].metadata.aspectName == "subTypes"
# workunit[2] - browse paths
assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper)
assert workunits[2].metadata.aspectName == "browsePathsV2"
# workunit[3] - DatasetSnapshot
mce = workunits[3].metadata
assert isinstance(mce, MetadataChangeEvent)
# workunit[4] - subtypes
assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper)
assert workunits[4].metadata.aspectName == "subTypes"
# workunit[5] - browse paths
assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper)
assert workunits[5].metadata.aspectName == "browsePathsV2"
# workunit[6] - DatasetSnapshot
mce = workunits[6].metadata
assert isinstance(mce, MetadataChangeEvent)
ownership_aspect = [
asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
@ -731,15 +758,35 @@ def test_kafka_source_topic_meta_mappings(
]
)
assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper)
assert isinstance(workunits[6].metadata, MetadataChangeProposalWrapper)
# workunit[7] - subtypes
assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper)
assert workunits[7].metadata.aspectName == "subTypes"
# workunit[8] - browse paths
assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper)
assert workunits[8].metadata.aspectName == "browsePathsV2"
# workunit[9] - glossary terms
assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper)
assert workunits[6].metadata.aspectName == "glossaryTermKey"
assert workunits[7].metadata.aspectName == "glossaryTermKey"
assert workunits[8].metadata.aspectName == "tagKey"
assert workunits[9].metadata.aspectName == "tagKey"
assert workunits[9].metadata.aspectName == "glossaryTermKey"
# workunit[10] - glossary terms
assert isinstance(workunits[10].metadata, MetadataChangeProposalWrapper)
assert workunits[10].metadata.aspectName == "glossaryTermKey"
# workunit[11] - tags
assert isinstance(workunits[11].metadata, MetadataChangeProposalWrapper)
assert workunits[11].metadata.aspectName == "tagKey"
# workunit[12] - tags
assert isinstance(workunits[12].metadata, MetadataChangeProposalWrapper)
assert workunits[12].metadata.aspectName == "tagKey"
def test_kafka_source_oauth_cb_configuration():

View File

@ -149,8 +149,22 @@ class TestPulsarSource(unittest.TestCase):
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
@ -190,8 +204,22 @@ class TestPulsarSource(unittest.TestCase):
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_patterns(self, mock_session):
@ -237,5 +265,19 @@ class TestPulsarSource(unittest.TestCase):
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}