mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 08:58:26 +00:00
feat(dbt-ingestion): add documentation link from dbt source to institutionalMemory (#8686)
Co-authored-by: Ethan Cartwright <ethan.cartwright@acryl.io> Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
parent
3a9452c207
commit
e2afd44bfe
@ -6,14 +6,14 @@ source:
|
||||
# In the URL https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094,
|
||||
# 107298 is the account_id, 175705 is the project_id, and 148094 is the job_id
|
||||
|
||||
account_id: # set to your dbt cloud account id
|
||||
project_id: # set to your dbt cloud project id
|
||||
job_id: # set to your dbt cloud job id
|
||||
account_id: "${DBT_ACCOUNT_ID}" # set to your dbt cloud account id
|
||||
project_id: "${DBT_PROJECT_ID}" # set to your dbt cloud project id
|
||||
job_id: "${DBT_JOB_ID}" # set to your dbt cloud job id
|
||||
run_id: # set to your dbt cloud run id. This is optional, and defaults to the latest run
|
||||
|
||||
target_platform: postgres
|
||||
|
||||
# Options
|
||||
target_platform: "my_target_platform_id" # e.g. bigquery/postgres/etc.
|
||||
target_platform: "${TARGET_PLATFORM_ID}" # e.g. bigquery/postgres/etc.
|
||||
|
||||
# sink configs
|
||||
|
||||
@ -38,6 +38,12 @@ meta_mapping:
|
||||
operation: "add_terms"
|
||||
config:
|
||||
separator: ","
|
||||
documentation_link:
|
||||
match: "(?:https?)?\:\/\/\w*[^#]*"
|
||||
operation: "add_doc_link"
|
||||
config:
|
||||
link: {{ $match }}
|
||||
description: "Documentation Link"
|
||||
column_meta_mapping:
|
||||
terms_list:
|
||||
match: ".*"
|
||||
@ -57,6 +63,7 @@ We support the following operations:
|
||||
2. add_term - Requires `term` property in config.
|
||||
3. add_terms - Accepts an optional `separator` property in config.
|
||||
4. add_owner - Requires `owner_type` property in config which can be either user or group. Optionally accepts the `owner_category` config property which you can set to one of `['TECHNICAL_OWNER', 'BUSINESS_OWNER', 'DATA_STEWARD', 'DATAOWNER'` (defaults to `DATAOWNER`).
|
||||
5. add_doc_link - Requires `link` and `description` properties in config. Upon ingestion run, this will overwrite current links in the institutional knowledge section with this new link. The anchor text is defined here in the meta_mappings as `description`.
|
||||
|
||||
Note:
|
||||
|
||||
|
||||
@ -1188,9 +1188,15 @@ class DBTSourceBase(StatefulIngestionSourceBase):
|
||||
):
|
||||
aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION))
|
||||
|
||||
# add meta links aspect
|
||||
meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION)
|
||||
if meta_links_aspect and self.config.enable_meta_mapping:
|
||||
aspects.append(meta_links_aspect)
|
||||
|
||||
# add schema metadata aspect
|
||||
schema_metadata = self.get_schema_metadata(self.report, node, mce_platform)
|
||||
aspects.append(schema_metadata)
|
||||
|
||||
return aspects
|
||||
|
||||
def get_schema_metadata(
|
||||
|
||||
@ -2,12 +2,16 @@ import contextlib
|
||||
import logging
|
||||
import operator
|
||||
import re
|
||||
import time
|
||||
from functools import reduce
|
||||
from typing import Any, Dict, List, Match, Optional, Union
|
||||
from typing import Any, Dict, List, Match, Optional, Union, cast
|
||||
|
||||
from datahub.emitter import mce_builder
|
||||
from datahub.emitter.mce_builder import OwnerType
|
||||
from datahub.metadata.schema_classes import (
|
||||
AuditStampClass,
|
||||
InstitutionalMemoryClass,
|
||||
InstitutionalMemoryMetadataClass,
|
||||
OwnerClass,
|
||||
OwnershipClass,
|
||||
OwnershipSourceClass,
|
||||
@ -39,6 +43,7 @@ def _insert_match_value(original_value: str, match_value: str) -> str:
|
||||
|
||||
|
||||
class Constants:
|
||||
ADD_DOC_LINK_OPERATION = "add_doc_link"
|
||||
ADD_TAG_OPERATION = "add_tag"
|
||||
ADD_TERM_OPERATION = "add_term"
|
||||
ADD_TERMS_OPERATION = "add_terms"
|
||||
@ -47,6 +52,8 @@ class Constants:
|
||||
OPERATION_CONFIG = "config"
|
||||
TAG = "tag"
|
||||
TERM = "term"
|
||||
DOC_LINK = "link"
|
||||
DOC_DESCRIPTION = "description"
|
||||
OWNER_TYPE = "owner_type"
|
||||
OWNER_CATEGORY = "owner_category"
|
||||
MATCH = "match"
|
||||
@ -163,7 +170,6 @@ class OperationProcessor:
|
||||
)
|
||||
operations_value_list.append(operation) # type: ignore
|
||||
operations_map[operation_type] = operations_value_list
|
||||
|
||||
aspect_map = self.convert_to_aspects(operations_map)
|
||||
except Exception as e:
|
||||
logger.error(f"Error while processing operation defs over raw_props: {e}")
|
||||
@ -173,6 +179,7 @@ class OperationProcessor:
|
||||
self, operation_map: Dict[str, Union[set, list]]
|
||||
) -> Dict[str, Any]:
|
||||
aspect_map: Dict[str, Any] = {}
|
||||
|
||||
if Constants.ADD_TAG_OPERATION in operation_map:
|
||||
tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list(
|
||||
sorted(operation_map[Constants.ADD_TAG_OPERATION])
|
||||
@ -195,11 +202,57 @@ class OperationProcessor:
|
||||
]
|
||||
)
|
||||
aspect_map[Constants.ADD_OWNER_OPERATION] = owner_aspect
|
||||
|
||||
if Constants.ADD_TERM_OPERATION in operation_map:
|
||||
term_aspect = mce_builder.make_glossary_terms_aspect_from_urn_list(
|
||||
sorted(operation_map[Constants.ADD_TERM_OPERATION])
|
||||
)
|
||||
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect
|
||||
|
||||
if Constants.ADD_DOC_LINK_OPERATION in operation_map:
|
||||
try:
|
||||
if len(
|
||||
operation_map[Constants.ADD_DOC_LINK_OPERATION]
|
||||
) == 1 and isinstance(
|
||||
operation_map[Constants.ADD_DOC_LINK_OPERATION], list
|
||||
):
|
||||
docs_dict = cast(
|
||||
List[Dict], operation_map[Constants.ADD_DOC_LINK_OPERATION]
|
||||
)[0]
|
||||
if "description" not in docs_dict or "link" not in docs_dict:
|
||||
raise Exception(
|
||||
"Documentation_link meta_mapping config needs a description key and a link key"
|
||||
)
|
||||
|
||||
now = int(time.time() * 1000) # milliseconds since epoch
|
||||
institutional_memory_element = InstitutionalMemoryMetadataClass(
|
||||
url=docs_dict["link"],
|
||||
description=docs_dict["description"],
|
||||
createStamp=AuditStampClass(
|
||||
time=now, actor="urn:li:corpuser:ingestion"
|
||||
),
|
||||
)
|
||||
|
||||
# create a new institutional memory aspect
|
||||
institutional_memory_aspect = InstitutionalMemoryClass(
|
||||
elements=[institutional_memory_element]
|
||||
)
|
||||
|
||||
aspect_map[
|
||||
Constants.ADD_DOC_LINK_OPERATION
|
||||
] = institutional_memory_aspect
|
||||
else:
|
||||
raise Exception(
|
||||
f"Expected 1 item of type list for the documentation_link meta_mapping config,"
|
||||
f" received type of {type(operation_map[Constants.ADD_DOC_LINK_OPERATION])}"
|
||||
f", and size of {len(operation_map[Constants.ADD_DOC_LINK_OPERATION])}."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error while constructing aspect for documentation link and description : {e}"
|
||||
)
|
||||
|
||||
return aspect_map
|
||||
|
||||
def get_operation_value(
|
||||
@ -248,6 +301,16 @@ class OperationProcessor:
|
||||
term = operation_config[Constants.TERM]
|
||||
term = _insert_match_value(term, _get_best_match(match, "term"))
|
||||
return mce_builder.make_term_urn(term)
|
||||
elif (
|
||||
operation_type == Constants.ADD_DOC_LINK_OPERATION
|
||||
and operation_config[Constants.DOC_LINK]
|
||||
and operation_config[Constants.DOC_DESCRIPTION]
|
||||
):
|
||||
link = operation_config[Constants.DOC_LINK]
|
||||
link = _insert_match_value(link, _get_best_match(match, "link"))
|
||||
description = operation_config[Constants.DOC_DESCRIPTION]
|
||||
return {"link": link, "description": description}
|
||||
|
||||
elif operation_type == Constants.ADD_TERMS_OPERATION:
|
||||
separator = operation_config.get(Constants.SEPARATOR, ",")
|
||||
captured_terms = match.group(0)
|
||||
|
||||
@ -4,6 +4,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
|
||||
from datahub.metadata.schema_classes import (
|
||||
GlobalTagsClass,
|
||||
GlossaryTermsClass,
|
||||
InstitutionalMemoryClass,
|
||||
OwnerClass,
|
||||
OwnershipClass,
|
||||
OwnershipSourceTypeClass,
|
||||
@ -233,6 +234,46 @@ def test_operation_processor_advanced_matching_tags():
|
||||
assert tag_aspect.tags[0].tag == "urn:li:tag:case_4567"
|
||||
|
||||
|
||||
def test_operation_processor_institutional_memory():
|
||||
raw_props = {
|
||||
"documentation_link": "https://test.com/documentation#ignore-this",
|
||||
}
|
||||
processor = OperationProcessor(
|
||||
operation_defs={
|
||||
"documentation_link": {
|
||||
"match": r"(?:https?)?\:\/\/\w*[^#]*",
|
||||
"operation": "add_doc_link",
|
||||
"config": {"link": "{{ $match }}", "description": "test"},
|
||||
},
|
||||
},
|
||||
)
|
||||
aspect_map = processor.process(raw_props)
|
||||
assert "add_doc_link" in aspect_map
|
||||
|
||||
doc_link_aspect: InstitutionalMemoryClass = aspect_map["add_doc_link"]
|
||||
|
||||
assert doc_link_aspect.elements[0].url == "https://test.com/documentation"
|
||||
assert doc_link_aspect.elements[0].description == "test"
|
||||
|
||||
|
||||
def test_operation_processor_institutional_memory_no_description():
|
||||
raw_props = {
|
||||
"documentation_link": "test.com/documentation#ignore-this",
|
||||
}
|
||||
processor = OperationProcessor(
|
||||
operation_defs={
|
||||
"documentation_link": {
|
||||
"match": r"(?:https?)?\:\/\/\w*[^#]*",
|
||||
"operation": "add_doc_link",
|
||||
"config": {"link": "{{ $match }}"},
|
||||
},
|
||||
},
|
||||
)
|
||||
# we require a description, so this should stay empty
|
||||
aspect_map = processor.process(raw_props)
|
||||
assert aspect_map == {}
|
||||
|
||||
|
||||
def test_operation_processor_matching_nested_props():
|
||||
raw_props = {
|
||||
"gdpr": {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user