2021-03-05 16:39:34 -08:00
|
|
|
"""
|
|
|
|
After running the recipe in this directory, we get a large JSON file called all_covid19_datasets.json.
|
|
|
|
This script reads that JSON file, adds to it using the directives pull from a Google sheet, and
|
|
|
|
produces a new JSON file called demo_data.json.
|
|
|
|
"""
|
|
|
|
|
|
|
|
from typing import List
|
|
|
|
|
|
|
|
import pathlib
|
|
|
|
import json
|
|
|
|
import csv
|
|
|
|
import dataclasses
|
|
|
|
import time
|
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
MetadataChangeEventClass,
|
|
|
|
DatasetSnapshotClass,
|
|
|
|
OwnershipClass,
|
|
|
|
OwnerClass,
|
|
|
|
OwnershipTypeClass,
|
|
|
|
CorpUserSnapshotClass,
|
|
|
|
CorpUserInfoClass,
|
|
|
|
AuditStampClass,
|
|
|
|
UpstreamLineageClass,
|
|
|
|
UpstreamClass,
|
|
|
|
DatasetLineageTypeClass,
|
|
|
|
)
|
|
|
|
|
|
|
|
DEMO_DATA_DIR = pathlib.Path("./examples/demo_data")
|
|
|
|
INPUT_ALL_DATASETS = DEMO_DATA_DIR / "all_covid19_datasets.json"
|
|
|
|
OUTPUT_ENRICHED = DEMO_DATA_DIR / "demo_data.json"
|
|
|
|
DIRECTIVES_CSV = DEMO_DATA_DIR / "directives.csv"
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
class Directive:
|
|
|
|
table: str
|
|
|
|
drop: bool
|
|
|
|
owners: List[str]
|
|
|
|
depends_on: List[str]
|
|
|
|
|
|
|
|
|
|
|
|
def read_mces(path) -> List[MetadataChangeEventClass]:
|
|
|
|
with open(path) as f:
|
|
|
|
objs = json.load(f)
|
|
|
|
mces = [MetadataChangeEventClass.from_obj(obj) for obj in objs]
|
|
|
|
return mces
|
|
|
|
|
|
|
|
|
|
|
|
def write_mces(path, mces: List[MetadataChangeEventClass]) -> None:
|
|
|
|
objs = [mce.to_obj() for mce in mces]
|
|
|
|
with open(path, "w") as f:
|
|
|
|
json.dump(objs, f, indent=4)
|
|
|
|
|
|
|
|
|
|
|
|
def parse_directive(row) -> Directive:
|
|
|
|
return Directive(
|
|
|
|
table=row["table"],
|
|
|
|
drop=bool(row["drop"]),
|
|
|
|
owners=[x.strip() for x in row["owners"].split(",") if x],
|
|
|
|
depends_on=[x.strip() for x in row["depends_on"].split(",") if x],
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_directives() -> List[Directive]:
|
|
|
|
with open(DIRECTIVES_CSV, "r") as f:
|
|
|
|
reader = csv.DictReader(f)
|
|
|
|
rows = [parse_directive(row) for row in reader]
|
|
|
|
return rows
|
|
|
|
|
|
|
|
|
|
|
|
def dataset_name_to_urn(name: str) -> str:
|
|
|
|
return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,{name},PROD)"
|
|
|
|
|
|
|
|
|
|
|
|
def clean_owner_name(name: str) -> str:
|
|
|
|
clean = "".join(c for c in name if c.isalpha())
|
|
|
|
return clean
|
|
|
|
|
|
|
|
|
|
|
|
def owner_name_to_urn(name: str) -> str:
|
|
|
|
return f"urn:li:corpuser:{name}"
|
|
|
|
|
|
|
|
|
|
|
|
def create_owner_entity_mce(owner: str) -> MetadataChangeEventClass:
|
|
|
|
clean_name = clean_owner_name(owner)
|
|
|
|
return MetadataChangeEventClass(
|
|
|
|
proposedSnapshot=CorpUserSnapshotClass(
|
|
|
|
urn=owner_name_to_urn(clean_name),
|
|
|
|
aspects=[
|
|
|
|
CorpUserInfoClass(
|
|
|
|
active=True,
|
|
|
|
displayName=owner,
|
|
|
|
fullName=owner,
|
|
|
|
email=f"{clean_name}-demo@example.com",
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def create_ownership_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
|
|
|
|
return MetadataChangeEventClass(
|
|
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
|
|
urn=dataset_name_to_urn(directive.table),
|
|
|
|
aspects=[
|
|
|
|
OwnershipClass(
|
|
|
|
owners=[
|
|
|
|
OwnerClass(
|
|
|
|
owner=owner_name_to_urn(clean_owner_name(owner)),
|
|
|
|
type=OwnershipTypeClass.DATAOWNER, # type: ignore
|
|
|
|
)
|
|
|
|
for owner in directive.owners
|
|
|
|
],
|
|
|
|
lastModified=AuditStampClass(
|
|
|
|
time=int(time.time() * 1000),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def create_lineage_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
|
|
|
|
return MetadataChangeEventClass(
|
|
|
|
proposedSnapshot=DatasetSnapshotClass(
|
|
|
|
urn=dataset_name_to_urn(directive.table),
|
|
|
|
aspects=[
|
|
|
|
UpstreamLineageClass(
|
|
|
|
upstreams=[
|
|
|
|
UpstreamClass(
|
|
|
|
dataset=dataset_name_to_urn(upstream),
|
|
|
|
type=DatasetLineageTypeClass.TRANSFORMED, # type: ignore
|
|
|
|
auditStamp=AuditStampClass(
|
|
|
|
time=int(time.time() * 1000),
|
|
|
|
actor="urn:li:corpuser:datahub",
|
|
|
|
),
|
|
|
|
)
|
|
|
|
for upstream in directive.depends_on
|
|
|
|
]
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
datasets = read_mces(INPUT_ALL_DATASETS)
|
|
|
|
all_directives = fetch_directives()
|
|
|
|
directives = [directive for directive in all_directives if not directive.drop]
|
|
|
|
|
|
|
|
all_dataset_urns = {
|
|
|
|
dataset_name_to_urn(directive.table) for directive in all_directives
|
|
|
|
}
|
|
|
|
allowed_urns = {
|
|
|
|
dataset_name_to_urn(directive.table)
|
|
|
|
for directive in all_directives
|
|
|
|
if not directive.drop
|
|
|
|
}
|
|
|
|
|
|
|
|
assert all(dataset.proposedSnapshot.urn in all_dataset_urns for dataset in datasets)
|
|
|
|
filtered_dataset_mces = [
|
|
|
|
dataset for dataset in datasets if dataset.proposedSnapshot.urn in allowed_urns
|
|
|
|
]
|
|
|
|
|
|
|
|
owner_names = {owner for directive in directives for owner in directive.owners}
|
2021-03-10 02:07:20 -05:00
|
|
|
owner_entity_mces = [
|
|
|
|
create_owner_entity_mce(owner) for owner in sorted(owner_names)
|
|
|
|
]
|
2021-03-05 16:39:34 -08:00
|
|
|
|
|
|
|
ownership_aspect_mces = [
|
|
|
|
create_ownership_aspect_mce(directive)
|
|
|
|
for directive in directives
|
|
|
|
if directive.owners
|
|
|
|
]
|
|
|
|
|
|
|
|
lineage_aspect_mces = [
|
|
|
|
create_lineage_aspect_mce(directive)
|
|
|
|
for directive in directives
|
|
|
|
if directive.depends_on
|
|
|
|
]
|
|
|
|
|
|
|
|
enriched_mces = (
|
|
|
|
filtered_dataset_mces
|
|
|
|
+ owner_entity_mces
|
|
|
|
+ ownership_aspect_mces
|
|
|
|
+ lineage_aspect_mces
|
|
|
|
)
|
|
|
|
write_mces(OUTPUT_ENRICHED, enriched_mces)
|