mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-16 05:02:59 +00:00
feat(ingest): Add a business glossary source (#3164)
This commit is contained in:
parent
9ba378f3a0
commit
88a3523fbb
@ -1,9 +1,7 @@
|
||||
import { Avatar, Divider, Space, Tooltip, Typography } from 'antd';
|
||||
import { Divider, Space, Typography } from 'antd';
|
||||
import React from 'react';
|
||||
import { Link } from 'react-router-dom';
|
||||
import { EntityType } from '../../../../types.generated';
|
||||
import { useEntityRegistry } from '../../../useEntityRegistry';
|
||||
import defaultAvatar from '../../../../images/default_avatar.png';
|
||||
import { AvatarsGroup } from '../../../shared/avatar';
|
||||
|
||||
type Props = {
|
||||
definition: string;
|
||||
@ -26,26 +24,7 @@ export default function GlossaryTermHeader({ definition, sourceRef, sourceUrl, o
|
||||
</a>
|
||||
)}
|
||||
</Space>
|
||||
{ownership && (
|
||||
<Avatar.Group maxCount={6} size="large">
|
||||
{ownership?.owners?.map((owner) => (
|
||||
<Tooltip title={owner.owner.info?.fullName} key={owner.owner.urn}>
|
||||
<Link to={`/${entityRegistry.getPathName(EntityType.CorpUser)}/${owner.owner.urn}`}>
|
||||
<Avatar
|
||||
style={{
|
||||
color: '#f56a00',
|
||||
backgroundColor: '#fde3cf',
|
||||
}}
|
||||
src={
|
||||
(owner.owner.editableInfo && owner.owner.editableInfo.pictureLink) ||
|
||||
defaultAvatar
|
||||
}
|
||||
/>
|
||||
</Link>
|
||||
</Tooltip>
|
||||
))}
|
||||
</Avatar.Group>
|
||||
)}
|
||||
<AvatarsGroup owners={ownership?.owners} entityRegistry={entityRegistry} size="large" />
|
||||
</Space>
|
||||
</>
|
||||
);
|
||||
|
@ -38,6 +38,7 @@ Sources:
|
||||
| [athena](./source_docs/athena.md) | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
|
||||
| [bigquery](./source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
|
||||
| [bigquery-usage](./source_docs/bigquery.md) | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source |
|
||||
| [datahub-business-glossary](./source_docs/business_glossary.md) | _no additional dependencies_ | Business Glossary File source |
|
||||
| [dbt](./source_docs/dbt.md) | _no additional dependencies_ | dbt source |
|
||||
| [druid](./source_docs/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source |
|
||||
| [feast](./source_docs/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source |
|
||||
|
@ -0,0 +1,55 @@
|
||||
version: 1
|
||||
source: DataHub
|
||||
owners:
|
||||
users:
|
||||
- mjames
|
||||
url: "https://github.com/linkedin/datahub/"
|
||||
nodes:
|
||||
- name: Classification
|
||||
description: A set of terms related to Data Classification
|
||||
terms:
|
||||
- name: Sensitive
|
||||
description: Sensitive Data
|
||||
- name: Confidential
|
||||
description: Confidential Data
|
||||
- name: HighlyConfidential
|
||||
description: Highly Confidential Data
|
||||
- name: PersonalInformation
|
||||
description: All terms related to personal information
|
||||
owners:
|
||||
users:
|
||||
- mjames
|
||||
terms:
|
||||
- name: Email
|
||||
description: An individual's email address
|
||||
inherits:
|
||||
- Classification.Confidential
|
||||
owners:
|
||||
groups:
|
||||
- Trust and Safety
|
||||
- name: Address
|
||||
description: A physical address
|
||||
- name: Gender
|
||||
description: The gender identity of the individual
|
||||
inherits:
|
||||
- Classification.Sensitive
|
||||
- name: ClientsAndAccounts
|
||||
description: Provides basic concepts such as account, account holder, account provider, relationship manager that are commonly used by financial services providers to describe customers and to determine counterparty identities
|
||||
owners:
|
||||
groups:
|
||||
- finance
|
||||
terms:
|
||||
- name: Account
|
||||
description: Container for records associated with a business arrangement for regular transactions and services
|
||||
term_source: "EXTERNAL"
|
||||
source_ref: FIBO
|
||||
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Account"
|
||||
inherits:
|
||||
- Classification.HighlyConfidential
|
||||
contains:
|
||||
- ClientsAndAccounts.Balance
|
||||
- name: Balance
|
||||
description: Amount of money available or owed
|
||||
term_source: "EXTERNAL"
|
||||
source_ref: FIBO
|
||||
source_url: "https://spec.edmcouncil.org/fibo/ontology/FBC/ProductsAndServices/ClientsAndAccounts/Balance"
|
@ -0,0 +1,11 @@
|
||||
source:
|
||||
type: datahub-business-glossary
|
||||
config:
|
||||
file: ./examples/bootstrap_data/business_glossary.yml
|
||||
|
||||
|
||||
sink:
|
||||
type: datahub-rest
|
||||
config:
|
||||
server: http://localhost:8080
|
||||
|
@ -79,6 +79,7 @@ plugins: Dict[str, Set[str]] = {
|
||||
"azure": set(),
|
||||
"bigquery": sql_common | {"pybigquery >= 0.6.0"},
|
||||
"bigquery-usage": {"google-cloud-logging", "cachetools"},
|
||||
"datahub-business-glossary": set(),
|
||||
"dbt": set(),
|
||||
"druid": sql_common | {"pydruid>=0.6.2"},
|
||||
"feast": {"docker"},
|
||||
@ -232,6 +233,7 @@ entry_points = {
|
||||
"ldap = datahub.ingestion.source.ldap:LDAPSource",
|
||||
"looker = datahub.ingestion.source.looker:LookerDashboardSource",
|
||||
"lookml = datahub.ingestion.source.lookml:LookMLSource",
|
||||
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
|
||||
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
|
||||
"mssql = datahub.ingestion.source.sql.mssql:SQLServerSource",
|
||||
"mysql = datahub.ingestion.source.sql.mysql:MySQLSource",
|
||||
|
49
metadata-ingestion/source_docs/business_glossary.md
Normal file
49
metadata-ingestion/source_docs/business_glossary.md
Normal file
@ -0,0 +1,49 @@
|
||||
# Business Glossary
|
||||
|
||||
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
|
||||
|
||||
## Setup
|
||||
|
||||
Works with `acryl-datahub` out of the box.
|
||||
|
||||
## Capabilities
|
||||
|
||||
This plugin pulls business glossary metadata from a yaml-formatted file. An example of one such file is located in the examples directory [here](../examples/bootstrap_data/business_glossary.yml).
|
||||
|
||||
## Quickstart recipe
|
||||
|
||||
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
|
||||
|
||||
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
|
||||
|
||||
```yml
|
||||
source:
|
||||
type: datahub-business-glossary
|
||||
config:
|
||||
# Coordinates
|
||||
file: /path/to/business_glossary_yaml
|
||||
|
||||
sink:
|
||||
# sink configs
|
||||
```
|
||||
|
||||
## Config details
|
||||
|
||||
Note that a `.` is used to denote nested fields in the YAML recipe.
|
||||
|
||||
| Field | Required | Default | Description |
|
||||
| ---------- | -------- | ------- | ----------------------- |
|
||||
| `file` | ✅ | | Path to business glossary file to ingest. |
|
||||
|
||||
### Business Glossary File Format
|
||||
|
||||
The business glossary file format should be pretty easy to understand using the sample business glossary checked in [here](../examples/bootstrap_data/business_glossary.yml)
|
||||
|
||||
## Compatibility
|
||||
|
||||
Compatible with version 1 of business glossary format.
|
||||
The source will be evolved as we publish newer versions of this format.
|
||||
|
||||
## Questions
|
||||
|
||||
If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!
|
@ -47,6 +47,10 @@ def make_user_urn(username: str) -> str:
|
||||
return f"urn:li:corpuser:{username}"
|
||||
|
||||
|
||||
def make_group_urn(groupname: str) -> str:
|
||||
return f"urn:li:corpGroup:{groupname}"
|
||||
|
||||
|
||||
def make_tag_urn(tag: str) -> str:
|
||||
return f"urn:li:tag:{tag}"
|
||||
|
||||
|
@ -0,0 +1,261 @@
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Iterable, List, Optional, Union
|
||||
|
||||
from pydantic import validator
|
||||
|
||||
import datahub.metadata.schema_classes as models
|
||||
from datahub.configuration.common import ConfigModel
|
||||
from datahub.configuration.config_loader import load_config_file
|
||||
from datahub.emitter.mce_builder import get_sys_time, make_group_urn, make_user_urn
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
valid_status: models.StatusClass = models.StatusClass(removed=False)
|
||||
auditStamp = models.AuditStampClass(
|
||||
time=get_sys_time(), actor="urn:li:corpUser:restEmitter"
|
||||
)
|
||||
|
||||
|
||||
class Owners(ConfigModel):
|
||||
users: Optional[List[str]]
|
||||
groups: Optional[List[str]]
|
||||
|
||||
|
||||
class GlossaryTermConfig(ConfigModel):
|
||||
name: str
|
||||
description: str
|
||||
term_source: Optional[str]
|
||||
source_ref: Optional[str]
|
||||
source_url: Optional[str]
|
||||
owners: Optional[Owners]
|
||||
inherits: Optional[List[str]]
|
||||
contains: Optional[List[str]]
|
||||
|
||||
|
||||
class GlossaryNodeConfig(ConfigModel):
|
||||
name: str
|
||||
description: str
|
||||
owners: Optional[Owners]
|
||||
terms: Optional[List[GlossaryTermConfig]]
|
||||
nodes: Optional[List["GlossaryNodeConfig"]]
|
||||
|
||||
|
||||
GlossaryNodeConfig.update_forward_refs()
|
||||
|
||||
|
||||
class DefaultConfig(ConfigModel):
|
||||
"""Holds defaults for populating fields in glossary terms"""
|
||||
|
||||
source: str
|
||||
url: str
|
||||
owners: Owners
|
||||
source_type: Optional[str] = "INTERNAL"
|
||||
|
||||
|
||||
class BusinessGlossarySourceConfig(ConfigModel):
|
||||
file: str
|
||||
|
||||
|
||||
class BusinessGlossaryConfig(DefaultConfig):
|
||||
version: str
|
||||
nodes: List[GlossaryNodeConfig]
|
||||
|
||||
@validator("version")
|
||||
def version_must_be_1(cls, v):
|
||||
if v != "1":
|
||||
raise ValueError("Only version 1 is supported")
|
||||
|
||||
|
||||
def make_glossary_node_urn(path: List[str]) -> str:
|
||||
return "urn:li:glossaryNode:" + ".".join(path)
|
||||
|
||||
|
||||
def make_glossary_term_urn(path: List[str]) -> str:
|
||||
return "urn:li:glossaryTerm:" + ".".join(path)
|
||||
|
||||
|
||||
def get_owners(owners: Owners) -> models.OwnershipClass:
|
||||
owners_meta: List[models.OwnerClass] = []
|
||||
if owners.users is not None:
|
||||
owners_meta = owners_meta + [
|
||||
models.OwnerClass(
|
||||
owner=make_user_urn(o),
|
||||
type=models.OwnershipTypeClass.DEVELOPER,
|
||||
)
|
||||
for o in owners.users
|
||||
]
|
||||
if owners.groups is not None:
|
||||
owners_meta = owners_meta + [
|
||||
models.OwnerClass(
|
||||
owner=make_group_urn(o),
|
||||
type=models.OwnershipTypeClass.DEVELOPER,
|
||||
)
|
||||
for o in owners.groups
|
||||
]
|
||||
return models.OwnershipClass(owners=owners_meta)
|
||||
|
||||
|
||||
def get_mces(
|
||||
glossary: BusinessGlossaryConfig,
|
||||
) -> List[models.MetadataChangeEventClass]:
|
||||
events: List[models.MetadataChangeEventClass] = []
|
||||
path: List[str] = []
|
||||
root_owners = get_owners(glossary.owners)
|
||||
|
||||
for node in glossary.nodes:
|
||||
events += get_mces_from_node(
|
||||
node,
|
||||
path + [node.name],
|
||||
parentNode=None,
|
||||
parentOwners=root_owners,
|
||||
defaults=glossary,
|
||||
)
|
||||
return events
|
||||
|
||||
|
||||
def get_mce_from_snapshot(snapshot: Any) -> models.MetadataChangeEventClass:
|
||||
return models.MetadataChangeEventClass(
|
||||
proposedSnapshot=snapshot,
|
||||
systemMetadata=models.SystemMetadataClass(runId="test-glossary"),
|
||||
)
|
||||
|
||||
|
||||
def get_mces_from_node(
|
||||
glossaryNode: GlossaryNodeConfig,
|
||||
path: List[str],
|
||||
parentNode: Optional[str],
|
||||
parentOwners: models.OwnershipClass,
|
||||
defaults: DefaultConfig,
|
||||
) -> List[models.MetadataChangeEventClass]:
|
||||
node_urn = make_glossary_node_urn(path)
|
||||
node_info = models.GlossaryNodeInfoClass(
|
||||
definition=glossaryNode.description,
|
||||
parentNode=parentNode,
|
||||
)
|
||||
node_owners = parentOwners
|
||||
if glossaryNode.owners is not None:
|
||||
assert glossaryNode.owners is not None
|
||||
node_owners = get_owners(glossaryNode.owners)
|
||||
|
||||
node_snapshot = models.GlossaryNodeSnapshotClass(
|
||||
urn=node_urn,
|
||||
aspects=[node_info, node_owners, valid_status],
|
||||
)
|
||||
mces = [get_mce_from_snapshot(node_snapshot)]
|
||||
if glossaryNode.nodes:
|
||||
for node in glossaryNode.nodes:
|
||||
mces += get_mces_from_node(
|
||||
node,
|
||||
path + [node.name],
|
||||
parentNode=node_urn,
|
||||
parentOwners=node_owners,
|
||||
defaults=defaults,
|
||||
)
|
||||
|
||||
if glossaryNode.terms:
|
||||
for term in glossaryNode.terms:
|
||||
mces += get_mces_from_term(
|
||||
term,
|
||||
path + [term.name],
|
||||
parentNode=node_urn,
|
||||
parentOwnership=node_owners,
|
||||
defaults=defaults,
|
||||
)
|
||||
return mces
|
||||
|
||||
|
||||
def get_mces_from_term(
|
||||
glossaryTerm: GlossaryTermConfig,
|
||||
path: List[str],
|
||||
parentNode: str,
|
||||
parentOwnership: models.OwnershipClass,
|
||||
defaults: DefaultConfig,
|
||||
) -> List[models.MetadataChangeEventClass]:
|
||||
term_urn = make_glossary_term_urn(path)
|
||||
aspects: List[
|
||||
Union[
|
||||
models.GlossaryTermInfoClass,
|
||||
models.GlossaryRelatedTermsClass,
|
||||
models.OwnershipClass,
|
||||
models.StatusClass,
|
||||
models.GlossaryTermKeyClass,
|
||||
models.BrowsePathsClass,
|
||||
]
|
||||
] = []
|
||||
term_info = models.GlossaryTermInfoClass(
|
||||
definition=glossaryTerm.description,
|
||||
termSource=glossaryTerm.term_source # type: ignore
|
||||
if glossaryTerm.term_source is not None
|
||||
else defaults.source_type,
|
||||
sourceRef=glossaryTerm.source_ref
|
||||
if glossaryTerm.source_ref
|
||||
else defaults.source,
|
||||
sourceUrl=glossaryTerm.source_url if glossaryTerm.source_url else defaults.url,
|
||||
parentNode=parentNode,
|
||||
)
|
||||
aspects.append(term_info)
|
||||
|
||||
isA_related = None
|
||||
hasA_related = None
|
||||
if glossaryTerm.inherits is not None:
|
||||
assert glossaryTerm.inherits is not None
|
||||
isA_related = [make_glossary_term_urn([term]) for term in glossaryTerm.inherits]
|
||||
if glossaryTerm.contains is not None:
|
||||
assert glossaryTerm.contains is not None
|
||||
hasA_related = [
|
||||
make_glossary_term_urn([term]) for term in glossaryTerm.contains
|
||||
]
|
||||
|
||||
if isA_related is not None or hasA_related is not None:
|
||||
relatedTerms = models.GlossaryRelatedTermsClass(
|
||||
isRelatedTerms=isA_related, hasRelatedTerms=hasA_related
|
||||
)
|
||||
aspects.append(relatedTerms)
|
||||
|
||||
ownership: models.OwnershipClass = parentOwnership
|
||||
if glossaryTerm.owners is not None:
|
||||
assert glossaryTerm.owners is not None
|
||||
ownership = get_owners(glossaryTerm.owners)
|
||||
aspects.append(ownership)
|
||||
|
||||
term_browse = models.BrowsePathsClass(paths=["/" + "/".join(path)])
|
||||
aspects.append(term_browse)
|
||||
|
||||
term_snapshot: models.GlossaryTermSnapshotClass = models.GlossaryTermSnapshotClass(
|
||||
urn=term_urn,
|
||||
aspects=aspects,
|
||||
)
|
||||
return [get_mce_from_snapshot(term_snapshot)]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BusinessGlossaryFileSource(Source):
|
||||
config: BusinessGlossarySourceConfig
|
||||
report: SourceReport = field(default_factory=SourceReport)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, ctx):
|
||||
config = BusinessGlossarySourceConfig.parse_obj(config_dict)
|
||||
return cls(ctx, config)
|
||||
|
||||
def load_glossary_config(self, file_name: str) -> BusinessGlossaryConfig:
|
||||
config = load_config_file(file_name)
|
||||
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
|
||||
return glossary_cfg
|
||||
|
||||
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]:
|
||||
glossary_config = self.load_glossary_config(self.config.file)
|
||||
for mce in get_mces(glossary_config):
|
||||
wu = MetadataWorkUnit(f"{mce.proposedSnapshot.urn}", mce=mce)
|
||||
self.report.report_workunit(wu)
|
||||
yield wu
|
||||
|
||||
def get_report(self):
|
||||
return self.report
|
||||
|
||||
def close(self):
|
||||
pass
|
@ -1075,7 +1075,7 @@
|
||||
},
|
||||
"type": "string",
|
||||
"name": "pictureLink",
|
||||
"default": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web/packages/data-portal/public/assets/images/default_avatar.png",
|
||||
"default": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/default_avatar.png",
|
||||
"doc": "A URL which points to a picture which user wants to set as a profile photo"
|
||||
}
|
||||
],
|
||||
|
@ -3718,7 +3718,7 @@ class CorpUserEditableInfoClass(DictWrapper):
|
||||
else:
|
||||
self.skills = skills
|
||||
if pictureLink is None:
|
||||
# default: 'https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web/packages/data-portal/public/assets/images/default_avatar.png'
|
||||
# default: 'https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/default_avatar.png'
|
||||
self.pictureLink = self.RECORD_SCHEMA.field_map["pictureLink"].default
|
||||
else:
|
||||
self.pictureLink = pictureLink
|
||||
|
@ -1056,7 +1056,7 @@
|
||||
"name": "pictureLink",
|
||||
"type": "string",
|
||||
"doc": "A URL which points to a picture which user wants to set as a profile photo",
|
||||
"default": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web/packages/data-portal/public/assets/images/default_avatar.png",
|
||||
"default": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/default_avatar.png",
|
||||
"java": {
|
||||
"class": "com.linkedin.pegasus2avro.common.url.Url",
|
||||
"coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer"
|
||||
|
Loading…
x
Reference in New Issue
Block a user