fix(ingest/dremio): Fix platform_instance URN generation (#15076)

This commit is contained in:
Tamas Nemeth 2025-10-24 14:32:21 +02:00 committed by GitHub
parent 5a1569743a
commit a865b6ba63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 4562 additions and 3322 deletions

View File

@ -338,10 +338,10 @@ class DremioSource(StatefulIngestionSourceBase):
return return
dataset_urn = make_dataset_urn_with_platform_instance( dataset_urn = make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()), platform=self.get_platform(),
name=f"dremio.{dataset_name}", name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env,
) )
for dremio_mcp in self.dremio_aspects.populate_dataset_mcp( for dremio_mcp in self.dremio_aspects.populate_dataset_mcp(
@ -421,10 +421,10 @@ class DremioSource(StatefulIngestionSourceBase):
schema_str = ".".join(dataset_info.path) schema_str = ".".join(dataset_info.path)
dataset_name = f"{schema_str}.{dataset_info.resource_name}".lower() dataset_name = f"{schema_str}.{dataset_info.resource_name}".lower()
dataset_urn = make_dataset_urn_with_platform_instance( dataset_urn = make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()), platform=self.get_platform(),
name=f"dremio.{dataset_name}", name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env,
) )
yield from self.profiler.get_workunits(dataset_info, dataset_urn) yield from self.profiler.get_workunits(dataset_info, dataset_urn)
@ -436,10 +436,10 @@ class DremioSource(StatefulIngestionSourceBase):
""" """
upstream_urns = [ upstream_urns = [
make_dataset_urn_with_platform_instance( make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()), platform=self.get_platform(),
name=f"dremio.{upstream_table.lower()}", name=upstream_table.lower(),
env=self.config.env,
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env,
) )
for upstream_table in parents for upstream_table in parents
] ]
@ -498,19 +498,19 @@ class DremioSource(StatefulIngestionSourceBase):
if query.query and query.affected_dataset: if query.query and query.affected_dataset:
upstream_urns = [ upstream_urns = [
make_dataset_urn_with_platform_instance( make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()), platform=self.get_platform(),
name=f"dremio.{ds.lower()}", name=ds.lower(),
env=self.config.env,
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env,
) )
for ds in query.queried_datasets for ds in query.queried_datasets
] ]
downstream_urn = make_dataset_urn_with_platform_instance( downstream_urn = make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()), platform=self.get_platform(),
name=f"dremio.{query.affected_dataset.lower()}", name=query.affected_dataset.lower(),
env=self.config.env,
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env,
) )
# Add query to SqlParsingAggregator # Add query to SqlParsingAggregator

View File

@ -1,3 +1,4 @@
import datetime
import json import json
import os import os
import subprocess import subprocess
@ -6,7 +7,7 @@ from typing import Dict
import boto3 import boto3
import pytest import pytest
import requests import requests
from freezegun import freeze_time from time_machine import travel
from datahub.testing import mce_helpers from datahub.testing import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.click_helpers import run_datahub_cmd
@ -14,7 +15,7 @@ from tests.test_helpers.docker_helpers import wait_for_port
pytestmark = pytest.mark.integration_batch_4 pytestmark = pytest.mark.integration_batch_4
FROZEN_TIME = "2023-10-15 07:00:00" FROZEN_TIME = datetime.datetime(2023, 10, 15, 7, 0, tzinfo=datetime.timezone.utc)
MINIO_PORT = 9000 MINIO_PORT = 9000
MYSQL_PORT = 3306 MYSQL_PORT = 3306
@ -469,7 +470,7 @@ def populate_minio(pytestconfig, s3_bkt):
yield yield
@freeze_time(FROZEN_TIME) @travel(FROZEN_TIME, tick=False)
@pytest.mark.integration @pytest.mark.integration
def test_dremio_ingest( def test_dremio_ingest(
test_resources_dir, test_resources_dir,
@ -492,7 +493,7 @@ def test_dremio_ingest(
) )
@freeze_time(FROZEN_TIME) @travel(FROZEN_TIME, tick=False)
@pytest.mark.integration @pytest.mark.integration
def test_dremio_platform_instance_urns( def test_dremio_platform_instance_urns(
test_resources_dir, test_resources_dir,
@ -539,9 +540,12 @@ def test_dremio_platform_instance_urns(
# Check dataset URN structure # Check dataset URN structure
if mce["entityType"] == "dataset" and "entityUrn" in mce: if mce["entityType"] == "dataset" and "entityUrn" in mce:
assert "test-platform.dremio" in mce["entityUrn"], ( assert "test-platform." in mce["entityUrn"], (
f"Platform instance missing in dataset URN: {mce['entityUrn']}" f"Platform instance missing in dataset URN: {mce['entityUrn']}"
) )
assert "test-platform.dremio." not in mce["entityUrn"], (
f"URN has incorrect double dremio prefix: {mce['entityUrn']}"
)
# Check aspects for both datasets and containers # Check aspects for both datasets and containers
if "aspectName" in mce: if "aspectName" in mce:
@ -573,7 +577,7 @@ def test_dremio_platform_instance_urns(
) )
@freeze_time(FROZEN_TIME) @travel(FROZEN_TIME, tick=False)
@pytest.mark.integration @pytest.mark.integration
def test_dremio_schema_filter( def test_dremio_schema_filter(
test_resources_dir, test_resources_dir,