diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 5bed849a33..cd036b53cf 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -1,8 +1,10 @@ """Convenience functions for creating MCEs""" import json import logging +import os import re import time +from distutils.util import strtobool from enum import Enum from hashlib import md5 from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints @@ -33,12 +35,20 @@ from datahub.metadata.schema_classes import ( UpstreamLineageClass, ) +logger = logging.getLogger(__name__) + DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION DEFAULT_FLOW_CLUSTER = "prod" UNKNOWN_USER = "urn:li:corpuser:unknown" +DATASET_URN_TO_LOWER: bool = strtobool( + os.getenv("DATAHUB_DATASET_URN_TO_LOWER", "false") +) -logger = logging.getLogger(__name__) +# TODO: Delete this once lower-casing is the standard. +def set_dataset_urn_to_lower(value: bool) -> None: + global DATASET_URN_TO_LOWER + DATASET_URN_TO_LOWER = value class OwnerType(Enum): @@ -58,6 +68,8 @@ def make_data_platform_urn(platform: str) -> str: def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: + if DATASET_URN_TO_LOWER: + name = name.lower() return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})" @@ -72,6 +84,8 @@ def make_dataset_urn_with_platform_instance( platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV ) -> str: if platform_instance: + if DATASET_URN_TO_LOWER: + name = name.lower() return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})" else: return make_dataset_urn(platform=platform, name=name, env=env) diff --git a/metadata-ingestion/src/datahub/ingestion/api/common.py b/metadata-ingestion/src/datahub/ingestion/api/common.py index 7e97dfc3ea..56c21a7f39 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/common.py +++ b/metadata-ingestion/src/datahub/ingestion/api/common.py @@ -2,6 +2,7 @@ from abc import ABCMeta, abstractmethod from dataclasses import dataclass from typing import Dict, Generic, Iterable, Optional, Tuple, TypeVar +from datahub.emitter.mce_builder import set_dataset_urn_to_lower from datahub.ingestion.api.committable import Committable from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph @@ -56,6 +57,14 @@ class PipelineContext: self.preview_mode = preview_mode self.reporters: Dict[str, Committable] = dict() self.checkpointers: Dict[str, Committable] = dict() + self._set_dataset_urn_to_lower_if_needed() + + def _set_dataset_urn_to_lower_if_needed(self) -> None: + # TODO: Get rid of this function once lower-casing is the standard. + if self.graph: + server_config = self.graph.get_config() + if server_config and server_config.get("datasetUrnNameCasing"): + set_dataset_urn_to_lower(True) def register_checkpointer(self, committable: Committable) -> None: if committable.name in self.checkpointers: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/DatasetUrnNameCasingFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/DatasetUrnNameCasingFactory.java new file mode 100644 index 0000000000..d80d57799e --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/DatasetUrnNameCasingFactory.java @@ -0,0 +1,16 @@ +package com.linkedin.gms.factory.common; + +import javax.annotation.Nonnull; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class DatasetUrnNameCasingFactory { + @Nonnull + @Bean(name = "datasetUrnNameCasing") + protected Boolean getInstance() { + String datasetUrnNameCasingEnv = System.getenv("DATAHUB_DATASET_URN_TO_LOWER"); + return Boolean.parseBoolean(datasetUrnNameCasingEnv); + } +} \ No newline at end of file diff --git a/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java b/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java index c857b2b051..12346267cd 100644 --- a/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java +++ b/metadata-service/servlet/src/main/java/com/datahub/gms/servlet/Config.java @@ -3,6 +3,7 @@ package com.datahub.gms.servlet; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader; @@ -20,7 +21,6 @@ import javax.servlet.http.HttpServletResponse; import org.apache.maven.artifact.versioning.ComparableVersion; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; -import com.linkedin.gms.factory.config.ConfigurationProvider; // Return a 200 for health checks @@ -59,6 +59,10 @@ public class Config extends HttpServlet { return (GitVersion) ctx.getBean("gitVersion"); } + private Boolean getDatasetUrnNameCasing(WebApplicationContext ctx) { + return (Boolean) ctx.getBean("datasetUrnNameCasing"); + } + private boolean checkImpactAnalysisSupport(WebApplicationContext ctx) { return ((GraphService) ctx.getBean("graphService")).supportsMultiHop(); } @@ -93,6 +97,9 @@ public class Config extends HttpServlet { resp.setContentType("application/json"); PrintWriter out = resp.getWriter(); + Boolean datasetUrnNameCasing = getDatasetUrnNameCasing(ctx); + config.put("datasetUrnNameCasing", datasetUrnNameCasing); + try { Map config = new HashMap<>(this.config); Map> pluginTree =