Enable lower-casing of the name part of dataset urn via an environment vairable. (#4649)

This commit is contained in:
Ravindra Lanka 2022-04-12 12:54:22 -07:00 committed by GitHub
parent 5b22d96e04
commit 9226e3e27f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 2 deletions

View File

@ -1,8 +1,10 @@
"""Convenience functions for creating MCEs"""
import json
import logging
import os
import re
import time
from distutils.util import strtobool
from enum import Enum
from hashlib import md5
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints
@ -33,12 +35,20 @@ from datahub.metadata.schema_classes import (
UpstreamLineageClass,
)
logger = logging.getLogger(__name__)
DEFAULT_ENV = DEFAULT_ENV_CONFIGURATION
DEFAULT_FLOW_CLUSTER = "prod"
UNKNOWN_USER = "urn:li:corpuser:unknown"
DATASET_URN_TO_LOWER: bool = strtobool(
os.getenv("DATAHUB_DATASET_URN_TO_LOWER", "false")
)
logger = logging.getLogger(__name__)
# TODO: Delete this once lower-casing is the standard.
def set_dataset_urn_to_lower(value: bool) -> None:
global DATASET_URN_TO_LOWER
DATASET_URN_TO_LOWER = value
class OwnerType(Enum):
@ -58,6 +68,8 @@ def make_data_platform_urn(platform: str) -> str:
def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
if DATASET_URN_TO_LOWER:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})"
@ -72,6 +84,8 @@ def make_dataset_urn_with_platform_instance(
platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV
) -> str:
if platform_instance:
if DATASET_URN_TO_LOWER:
name = name.lower()
return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})"
else:
return make_dataset_urn(platform=platform, name=name, env=env)

View File

@ -2,6 +2,7 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Dict, Generic, Iterable, Optional, Tuple, TypeVar
from datahub.emitter.mce_builder import set_dataset_urn_to_lower
from datahub.ingestion.api.committable import Committable
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
@ -56,6 +57,14 @@ class PipelineContext:
self.preview_mode = preview_mode
self.reporters: Dict[str, Committable] = dict()
self.checkpointers: Dict[str, Committable] = dict()
self._set_dataset_urn_to_lower_if_needed()
def _set_dataset_urn_to_lower_if_needed(self) -> None:
# TODO: Get rid of this function once lower-casing is the standard.
if self.graph:
server_config = self.graph.get_config()
if server_config and server_config.get("datasetUrnNameCasing"):
set_dataset_urn_to_lower(True)
def register_checkpointer(self, committable: Committable) -> None:
if committable.name in self.checkpointers:

View File

@ -0,0 +1,16 @@
package com.linkedin.gms.factory.common;
import javax.annotation.Nonnull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DatasetUrnNameCasingFactory {
@Nonnull
@Bean(name = "datasetUrnNameCasing")
protected Boolean getInstance() {
String datasetUrnNameCasingEnv = System.getenv("DATAHUB_DATASET_URN_TO_LOWER");
return Boolean.parseBoolean(datasetUrnNameCasingEnv);
}
}

View File

@ -3,6 +3,7 @@ package com.datahub.gms.servlet;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader;
@ -20,7 +21,6 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.maven.artifact.versioning.ComparableVersion;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.linkedin.gms.factory.config.ConfigurationProvider;
// Return a 200 for health checks
@ -59,6 +59,10 @@ public class Config extends HttpServlet {
return (GitVersion) ctx.getBean("gitVersion");
}
private Boolean getDatasetUrnNameCasing(WebApplicationContext ctx) {
return (Boolean) ctx.getBean("datasetUrnNameCasing");
}
private boolean checkImpactAnalysisSupport(WebApplicationContext ctx) {
return ((GraphService) ctx.getBean("graphService")).supportsMultiHop();
}
@ -93,6 +97,9 @@ public class Config extends HttpServlet {
resp.setContentType("application/json");
PrintWriter out = resp.getWriter();
Boolean datasetUrnNameCasing = getDatasetUrnNameCasing(ctx);
config.put("datasetUrnNameCasing", datasetUrnNameCasing);
try {
Map<String, Object> config = new HashMap<>(this.config);
Map<String, Map<ComparableVersion, EntityRegistryLoadResult>> pluginTree =