238 lines
7.2 KiB
Python

"""
After running the recipe in this directory, we get a large JSON file called all_covid19_datasets.json.
This script reads that JSON file, adds to it using the directives pull from a Google sheet, and
produces a new JSON file called demo_data.json.
"""
import csv
import dataclasses
import pathlib
import time
from typing import Dict, List, cast
from datahub.ingestion.sink.file import write_metadata_file as write_mces
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import (
AuditStampClass,
CorpUserInfoClass,
CorpUserSnapshotClass,
DatasetLineageTypeClass,
DatasetSnapshotClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
DEMO_DATA_DIR = pathlib.Path("./examples/demo_data")
INPUT_ALL_DATASETS = DEMO_DATA_DIR / "all_covid19_datasets.json"
OUTPUT_ENRICHED = DEMO_DATA_DIR / "demo_data.json"
DIRECTIVES_CSV = DEMO_DATA_DIR / "directives.csv"
@dataclasses.dataclass
class Directive:
table: str
drop: bool
owners: List[str]
depends_on: List[str]
def read_mces(path: pathlib.Path) -> List[MetadataChangeEventClass]:
objs = read_metadata_file(path)
assert all(isinstance(obj, MetadataChangeEventClass) for obj in objs)
return cast(List[MetadataChangeEventClass], objs)
def parse_directive(row: Dict) -> Directive:
return Directive(
table=row["table"],
drop=bool(row["drop"]),
owners=[x.strip() for x in row["owners"].split(",") if x],
depends_on=[x.strip() for x in row["depends_on"].split(",") if x],
)
def fetch_directives() -> List[Directive]:
with open(DIRECTIVES_CSV, "r") as f:
reader = csv.DictReader(f)
rows = [parse_directive(row) for row in reader]
return rows
def dataset_name_to_urn(name: str) -> str:
return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,{name},PROD)"
def clean_owner_name(name: str) -> str:
clean = "".join(c for c in name if c.isalpha())
return clean
def owner_name_to_urn(name: str) -> str:
return f"urn:li:corpuser:{name}"
def create_owner_entity_mce(owner: str) -> MetadataChangeEventClass:
clean_name = clean_owner_name(owner)
return MetadataChangeEventClass(
proposedSnapshot=CorpUserSnapshotClass(
urn=owner_name_to_urn(clean_name),
aspects=[
CorpUserInfoClass(
active=True,
displayName=owner,
fullName=owner,
email=f"{clean_name}-demo@example.com",
)
],
)
)
def create_ownership_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[
OwnershipClass(
owners=[
OwnerClass(
owner=owner_name_to_urn(clean_owner_name(owner)),
type=OwnershipTypeClass.DATAOWNER,
)
for owner in directive.owners
],
lastModified=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
)
],
)
)
def create_lineage_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[
UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=dataset_name_to_urn(upstream),
type=DatasetLineageTypeClass.TRANSFORMED,
auditStamp=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
)
for upstream in directive.depends_on
]
)
],
)
)
def create_global_tags_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[GlobalTagsClass(tags=[])],
)
)
def create_editable_schema_info_aspect_mce(
directive: Directive,
) -> MetadataChangeEventClass:
return MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_name_to_urn(directive.table),
aspects=[
EditableSchemaMetadataClass(
created=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
lastModified=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
),
editableSchemaFieldInfo=[],
)
],
)
)
if __name__ == "__main__":
datasets = read_mces(INPUT_ALL_DATASETS)
all_directives = fetch_directives()
directives = [directive for directive in all_directives if not directive.drop]
all_dataset_urns = {
dataset_name_to_urn(directive.table) for directive in all_directives
}
allowed_urns = {
dataset_name_to_urn(directive.table)
for directive in all_directives
if not directive.drop
}
missing_dataset_directives = [
dataset.proposedSnapshot.urn
for dataset in datasets
if dataset.proposedSnapshot.urn not in all_dataset_urns
]
assert not missing_dataset_directives
filtered_dataset_mces = [
dataset for dataset in datasets if dataset.proposedSnapshot.urn in allowed_urns
]
owner_names = {owner for directive in directives for owner in directive.owners}
owner_entity_mces = [
create_owner_entity_mce(owner) for owner in sorted(owner_names)
]
ownership_aspect_mces = [
create_ownership_aspect_mce(directive)
for directive in directives
if directive.owners
]
lineage_aspect_mces = [
create_lineage_aspect_mce(directive)
for directive in directives
if directive.depends_on
]
global_tags_aspect_mces = [
create_global_tags_aspect_mce(directive)
for directive in directives
if not directive.drop
]
editable_schema_info_aspect_mces = [
create_editable_schema_info_aspect_mce(directive)
for directive in directives
if not directive.drop
]
enriched_mces = (
filtered_dataset_mces
+ owner_entity_mces
+ ownership_aspect_mces
+ lineage_aspect_mces
+ global_tags_aspect_mces
+ editable_schema_info_aspect_mces
)
write_mces(OUTPUT_ENRICHED, enriched_mces)