mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
238 lines
7.2 KiB
Python
238 lines
7.2 KiB
Python
"""
|
|
After running the recipe in this directory, we get a large JSON file called all_covid19_datasets.json.
|
|
This script reads that JSON file, adds to it using the directives pull from a Google sheet, and
|
|
produces a new JSON file called demo_data.json.
|
|
"""
|
|
|
|
import csv
|
|
import dataclasses
|
|
import pathlib
|
|
import time
|
|
from typing import Dict, List, cast
|
|
|
|
from datahub.ingestion.sink.file import write_metadata_file as write_mces
|
|
from datahub.ingestion.source.file import read_metadata_file
|
|
from datahub.metadata.schema_classes import (
|
|
AuditStampClass,
|
|
CorpUserInfoClass,
|
|
CorpUserSnapshotClass,
|
|
DatasetLineageTypeClass,
|
|
DatasetSnapshotClass,
|
|
EditableSchemaMetadataClass,
|
|
GlobalTagsClass,
|
|
MetadataChangeEventClass,
|
|
OwnerClass,
|
|
OwnershipClass,
|
|
OwnershipTypeClass,
|
|
UpstreamClass,
|
|
UpstreamLineageClass,
|
|
)
|
|
|
|
DEMO_DATA_DIR = pathlib.Path("./examples/demo_data")
|
|
INPUT_ALL_DATASETS = DEMO_DATA_DIR / "all_covid19_datasets.json"
|
|
OUTPUT_ENRICHED = DEMO_DATA_DIR / "demo_data.json"
|
|
DIRECTIVES_CSV = DEMO_DATA_DIR / "directives.csv"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Directive:
|
|
table: str
|
|
drop: bool
|
|
owners: List[str]
|
|
depends_on: List[str]
|
|
|
|
|
|
def read_mces(path: pathlib.Path) -> List[MetadataChangeEventClass]:
|
|
objs = read_metadata_file(path)
|
|
assert all(isinstance(obj, MetadataChangeEventClass) for obj in objs)
|
|
return cast(List[MetadataChangeEventClass], objs)
|
|
|
|
|
|
def parse_directive(row: Dict) -> Directive:
|
|
return Directive(
|
|
table=row["table"],
|
|
drop=bool(row["drop"]),
|
|
owners=[x.strip() for x in row["owners"].split(",") if x],
|
|
depends_on=[x.strip() for x in row["depends_on"].split(",") if x],
|
|
)
|
|
|
|
|
|
def fetch_directives() -> List[Directive]:
|
|
with open(DIRECTIVES_CSV, "r") as f:
|
|
reader = csv.DictReader(f)
|
|
rows = [parse_directive(row) for row in reader]
|
|
return rows
|
|
|
|
|
|
def dataset_name_to_urn(name: str) -> str:
|
|
return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,{name},PROD)"
|
|
|
|
|
|
def clean_owner_name(name: str) -> str:
|
|
clean = "".join(c for c in name if c.isalpha())
|
|
return clean
|
|
|
|
|
|
def owner_name_to_urn(name: str) -> str:
|
|
return f"urn:li:corpuser:{name}"
|
|
|
|
|
|
def create_owner_entity_mce(owner: str) -> MetadataChangeEventClass:
|
|
clean_name = clean_owner_name(owner)
|
|
return MetadataChangeEventClass(
|
|
proposedSnapshot=CorpUserSnapshotClass(
|
|
urn=owner_name_to_urn(clean_name),
|
|
aspects=[
|
|
CorpUserInfoClass(
|
|
active=True,
|
|
displayName=owner,
|
|
fullName=owner,
|
|
email=f"{clean_name}-demo@example.com",
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
|
|
def create_ownership_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
|
|
return MetadataChangeEventClass(
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
urn=dataset_name_to_urn(directive.table),
|
|
aspects=[
|
|
OwnershipClass(
|
|
owners=[
|
|
OwnerClass(
|
|
owner=owner_name_to_urn(clean_owner_name(owner)),
|
|
type=OwnershipTypeClass.DATAOWNER,
|
|
)
|
|
for owner in directive.owners
|
|
],
|
|
lastModified=AuditStampClass(
|
|
time=int(time.time() * 1000),
|
|
actor="urn:li:corpuser:datahub",
|
|
),
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
|
|
def create_lineage_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
|
|
return MetadataChangeEventClass(
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
urn=dataset_name_to_urn(directive.table),
|
|
aspects=[
|
|
UpstreamLineageClass(
|
|
upstreams=[
|
|
UpstreamClass(
|
|
dataset=dataset_name_to_urn(upstream),
|
|
type=DatasetLineageTypeClass.TRANSFORMED,
|
|
auditStamp=AuditStampClass(
|
|
time=int(time.time() * 1000),
|
|
actor="urn:li:corpuser:datahub",
|
|
),
|
|
)
|
|
for upstream in directive.depends_on
|
|
]
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
|
|
def create_global_tags_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
|
|
return MetadataChangeEventClass(
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
urn=dataset_name_to_urn(directive.table),
|
|
aspects=[GlobalTagsClass(tags=[])],
|
|
)
|
|
)
|
|
|
|
|
|
def create_editable_schema_info_aspect_mce(
|
|
directive: Directive,
|
|
) -> MetadataChangeEventClass:
|
|
return MetadataChangeEventClass(
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
urn=dataset_name_to_urn(directive.table),
|
|
aspects=[
|
|
EditableSchemaMetadataClass(
|
|
created=AuditStampClass(
|
|
time=int(time.time() * 1000),
|
|
actor="urn:li:corpuser:datahub",
|
|
),
|
|
lastModified=AuditStampClass(
|
|
time=int(time.time() * 1000),
|
|
actor="urn:li:corpuser:datahub",
|
|
),
|
|
editableSchemaFieldInfo=[],
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
datasets = read_mces(INPUT_ALL_DATASETS)
|
|
all_directives = fetch_directives()
|
|
directives = [directive for directive in all_directives if not directive.drop]
|
|
|
|
all_dataset_urns = {
|
|
dataset_name_to_urn(directive.table) for directive in all_directives
|
|
}
|
|
allowed_urns = {
|
|
dataset_name_to_urn(directive.table)
|
|
for directive in all_directives
|
|
if not directive.drop
|
|
}
|
|
|
|
missing_dataset_directives = [
|
|
dataset.proposedSnapshot.urn
|
|
for dataset in datasets
|
|
if dataset.proposedSnapshot.urn not in all_dataset_urns
|
|
]
|
|
assert not missing_dataset_directives
|
|
|
|
filtered_dataset_mces = [
|
|
dataset for dataset in datasets if dataset.proposedSnapshot.urn in allowed_urns
|
|
]
|
|
|
|
owner_names = {owner for directive in directives for owner in directive.owners}
|
|
owner_entity_mces = [
|
|
create_owner_entity_mce(owner) for owner in sorted(owner_names)
|
|
]
|
|
|
|
ownership_aspect_mces = [
|
|
create_ownership_aspect_mce(directive)
|
|
for directive in directives
|
|
if directive.owners
|
|
]
|
|
|
|
lineage_aspect_mces = [
|
|
create_lineage_aspect_mce(directive)
|
|
for directive in directives
|
|
if directive.depends_on
|
|
]
|
|
|
|
global_tags_aspect_mces = [
|
|
create_global_tags_aspect_mce(directive)
|
|
for directive in directives
|
|
if not directive.drop
|
|
]
|
|
|
|
editable_schema_info_aspect_mces = [
|
|
create_editable_schema_info_aspect_mce(directive)
|
|
for directive in directives
|
|
if not directive.drop
|
|
]
|
|
|
|
enriched_mces = (
|
|
filtered_dataset_mces
|
|
+ owner_entity_mces
|
|
+ ownership_aspect_mces
|
|
+ lineage_aspect_mces
|
|
+ global_tags_aspect_mces
|
|
+ editable_schema_info_aspect_mces
|
|
)
|
|
write_mces(OUTPUT_ENRICHED, enriched_mces)
|