feat(ingest/lineage): generate static json lineage file (#13906)

This commit is contained in:
Aseem Bansal 2025-07-01 20:51:18 +05:30 committed by GitHub
parent 6b1817902d
commit 92784ec3a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1447 additions and 43 deletions

View File

@ -13,3 +13,9 @@ repos:
language: system
files: ^metadata-ingestion/src/datahub/ingestion/source/.*\.py$
pass_filenames: false
- id: update-lineage-file
name: update-lineage-file
entry: ./gradlew :metadata-ingestion:lineageGen
language: system
files: ^(metadata-ingestion-modules/.*|metadata-models/.*)$
pass_filenames: false

View File

@ -69,21 +69,25 @@ jobs:
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install package
run: ./gradlew :metadata-ingestion:installPackageOnly
- name: Check lint and capability_summary.json being up-to-date
- name: Check lint passes and autogenerated JSON files are up-to-date
if: ${{ matrix.command == 'testQuick' }}
run: |
./gradlew :metadata-ingestion:lint
echo "Lint passed. Checking if capability_summary.json is up-to-date."
./gradlew :metadata-ingestion:capabilitySummary
# Check if capability summary file has changed
if git diff --quiet metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json; then
echo "✅ Capability summary file is unchanged"
else
echo "❌ Capability summary file has changed. Please commit the updated file."
echo "Changed lines:"
git diff metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json
exit 1
fi
./gradlew :metadata-ingestion:lint
- name: Check autogenerated JSON files are up-to-date
if: ${{ matrix.command == 'testQuick' }}
run: |
./gradlew :metadata-ingestion:capabilitySummary :metadata-ingestion:lineageGen
for json_file in metadata-ingestion/src/datahub/ingestion/autogenerated/*.json; do
filename=$(basename "$json_file")
if git diff --quiet "$json_file"; then
echo "✅ $filename is unchanged"
else
echo "❌ $filename has changed. Please commit the updated file."
echo "Changed lines:"
git diff "$json_file"
exit 1
fi
done
- name: Run metadata-ingestion tests
run: ./gradlew :metadata-ingestion:${{ matrix.command }}
- name: Debug info

View File

@ -1,4 +1,4 @@
# Auto-generated by .github/scripts/generate_pre_commit.py at 2025-06-27 12:14:33 UTC
# Auto-generated by .github/scripts/generate_pre_commit.py at 2025-07-01 10:36:31 UTC
# Do not edit this file directly. Run the script to regenerate.
# Add additional hooks in .github/scripts/pre-commit-override.yaml
repos:
@ -500,3 +500,10 @@ repos:
language: system
files: ^metadata-ingestion/src/datahub/ingestion/source/.*\.py$
pass_filenames: false
- id: update-lineage-file
name: update-lineage-file
entry: ./gradlew :metadata-ingestion:lineageGen
language: system
files: ^(metadata-ingestion-modules/.*|metadata-models/.*)$
pass_filenames: false

View File

@ -111,6 +111,21 @@ task modelDocGen(type: Exec, dependsOn: [codegen]) {
commandLine 'bash', '-c', "${venv_activate_command} python scripts/modeldocgen.py ${schemasRoot} --registry ${entityRegistry} --generated-docs-dir ${docsOutdir} --file ${outdir}/metadata_model_mces.json --extra-docs ${metadataModelDocsRoot}"
}
task lineageGen(type: Exec, dependsOn: [codegen]) {
def datahubRoot = '..'
def schemasRoot = "${datahubRoot}/metadata-events/mxe-schemas/src/mainGeneratedAvroSchema/avro/"
def entityRegistry = "${datahubRoot}/metadata-models/src/main/resources/entity-registry.yml"
def lineageOutput = "src/datahub/ingestion/autogenerated/lineage.json"
inputs.files(
file('scripts/modeldocgen.py'),
project.fileTree(dir: "../metadata-events/mxe-schemas/src/", include: "**/*.avsc")
)
outputs.file(lineageOutput)
commandLine 'bash', '-c', "${venv_activate_command} python scripts/modeldocgen.py ${schemasRoot} --registry ${entityRegistry} --generated-docs-dir /tmp --lineage-output ${lineageOutput}"
}
task testScripts(type: Exec, dependsOn: [installDev]) {
inputs.files(project.fileTree(dir: "scripts/tests/", include: "**/*.py"))
outputs.dir("${venv_name}")

View File

@ -7,6 +7,7 @@ from typing import Dict, Optional
import click
from docgen_types import Plugin
from utils import should_write_json_file
from datahub.ingestion.api.decorators import SupportStatus
from datahub.ingestion.source.source_registry import source_registry
@ -160,25 +161,10 @@ def save_capability_report(summary: CapabilitySummary, output_dir: str) -> None:
summary_json = json.dumps(summary_dict, indent=2, sort_keys=True)
summary_file = output_path / "capability_summary.json"
write_file = True
if summary_file.exists():
try:
with open(summary_file, "r") as f:
existing_data = json.load(f)
write_file = should_write_json_file(
summary_file, summary_dict, "capability summary file"
)
# Create copies without generated_at for comparison
existing_for_comparison = existing_data.copy()
new_for_comparison = summary_dict.copy()
existing_for_comparison.pop("generated_at", None)
new_for_comparison.pop("generated_at", None)
if json.dumps(
existing_for_comparison, indent=2, sort_keys=True
) == json.dumps(new_for_comparison, indent=2, sort_keys=True):
logger.info(f"No changes detected in {summary_file}, skipping write.")
write_file = False
except Exception as e:
logger.warning(f"Could not read existing summary file: {e}")
if write_file:
with open(summary_file, "w") as f:
f.write(summary_json)

View File

@ -6,12 +6,14 @@ import re
import shutil
import unittest.mock
from dataclasses import Field, dataclass, field
from datetime import datetime, timezone
from enum import auto
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import avro.schema
import click
from utils import should_write_json_file
from datahub.configuration.common import ConfigEnum, PermissiveConfigModel
from datahub.emitter.mce_builder import (
@ -30,7 +32,6 @@ from datahub.metadata.schema_classes import (
BrowsePathsClass,
BrowsePathsV2Class,
DatasetPropertiesClass,
DatasetSnapshotClass,
ForeignKeyConstraintClass,
GlobalTagsClass,
OtherSchemaClass,
@ -94,6 +95,38 @@ class EventDefinition:
name: str
# New dataclasses for lineage representation
@dataclass
class LineageRelationship:
name: str
entityTypes: List[str]
isLineage: bool = True
@dataclass
class LineageField:
name: str
path: str
isLineage: bool = True
relationship: Optional[LineageRelationship] = None
@dataclass
class LineageAspect:
aspect: str
fields: List[LineageField]
@dataclass
class LineageEntity:
aspects: Dict[str, LineageAspect]
@dataclass
class LineageData:
entities: Dict[str, LineageEntity]
entity_registry: Dict[str, EntityDefinition] = {}
@ -129,6 +162,7 @@ def add_name(self, name_attr, space_attr, new_schema):
def load_schema_file(schema_file: str) -> None:
logger.debug(f"Loading schema file: {schema_file}")
with open(schema_file) as f:
raw_schema_text = f.read()
@ -146,6 +180,7 @@ def load_schema_file(schema_file: str) -> None:
aspect_definition.schema = record_schema
aspect_registry[aspect_definition.name] = aspect_definition
logger.debug(f"Loaded aspect schema: {aspect_definition.name}")
elif avro_schema.name == "MetadataChangeEvent":
# probably an MCE schema
field: Field = avro_schema.fields[1]
@ -158,7 +193,7 @@ def load_schema_file(schema_file: str) -> None:
entity_name, EntityDefinition(**entity_def)
)
entity_definition.aspect_map = get_aspects_from_snapshot(member_schema)
all_aspects = [a for a in entity_definition.aspect_map.keys()]
all_aspects = [a for a in entity_definition.aspect_map]
# in terms of order, we prefer the aspects from snapshot over the aspects from the config registry
# so we flip the aspect list here
for aspect_name in entity_definition.aspects:
@ -166,8 +201,222 @@ def load_schema_file(schema_file: str) -> None:
all_aspects.append(aspect_name)
entity_definition.aspects = all_aspects
entity_registry[entity_name] = entity_definition
logger.debug(f"Loaded entity schema: {entity_name} with aspects: {all_aspects}")
else:
print(f"Ignoring schema {schema_file}")
logger.debug(f"Ignoring schema {schema_file}")
def extract_lineage_fields_from_schema(
schema: avro.schema.Schema,
current_path: str = ""
) -> List[LineageField]:
"""
Recursively extract lineage fields from an Avro schema.
Args:
schema: The Avro schema to analyze
current_path: The current field path (for nested fields)
Returns:
List of LineageField objects found in the schema
"""
lineage_fields = []
if isinstance(schema, avro.schema.RecordSchema):
logger.debug(f"Analyzing record schema at path: {current_path}")
for field in schema.fields:
field_path = f"{current_path}.{field.name}" if current_path else field.name
logger.debug(f"Analyzing field: {field.name} at path: {field_path}")
# Check if this field has lineage properties
is_lineage = False
relationship_info = None
# Check for isLineage property
if hasattr(field, 'other_props') and field.other_props:
is_lineage = field.other_props.get('isLineage', False)
if is_lineage:
logger.debug(f"Found isLineage=true for field: {field_path}")
# Check for Relationship property
if 'Relationship' in field.other_props:
relationship_data = field.other_props['Relationship']
logger.debug(f"Found Relationship property for field: {field_path}: {relationship_data}")
# Handle both direct relationship and path-based relationship
if 'entityTypes' in relationship_data:
# Direct relationship
relationship_is_lineage = relationship_data.get('isLineage', False)
relationship_info = LineageRelationship(
name=relationship_data.get('name', ''),
entityTypes=relationship_data.get('entityTypes', []),
isLineage=relationship_is_lineage
)
is_lineage = is_lineage or relationship_is_lineage
else:
# Path-based relationship - find the actual relationship data
for _, value in relationship_data.items():
if isinstance(value, dict) and 'entityTypes' in value:
relationship_is_lineage = value.get('isLineage', False)
relationship_info = LineageRelationship(
name=value.get('name', ''),
entityTypes=value.get('entityTypes', []),
isLineage=relationship_is_lineage
)
is_lineage = is_lineage or relationship_is_lineage
break
# If this field is lineage, add it to the results
if is_lineage:
lineage_field = LineageField(
name=field.name,
path=field_path,
isLineage=True,
relationship=relationship_info
)
lineage_fields.append(lineage_field)
logger.debug(f"Added lineage field: {field_path}")
# Recursively check nested fields
nested_fields = extract_lineage_fields_from_schema(field.type, field_path)
lineage_fields.extend(nested_fields)
elif isinstance(schema, avro.schema.ArraySchema):
logger.debug(f"Analyzing array schema at path: {current_path}")
# For arrays, check the items schema
nested_fields = extract_lineage_fields_from_schema(schema.items, current_path)
lineage_fields.extend(nested_fields)
elif isinstance(schema, avro.schema.UnionSchema):
logger.debug(f"Analyzing union schema at path: {current_path}")
# For unions, check all possible schemas
for union_schema in schema.schemas:
nested_fields = extract_lineage_fields_from_schema(union_schema, current_path)
lineage_fields.extend(nested_fields)
return lineage_fields
def extract_lineage_fields() -> LineageData:
"""
Extract lineage fields from all aspects in the aspect registry.
Returns:
LineageData containing all lineage information organized by entity and aspect
"""
logger.info("Starting lineage field extraction")
lineage_data = LineageData(entities={})
# Group aspects by entity
entity_aspects: Dict[str, List[str]] = {}
for entity_name, entity_def in entity_registry.items():
entity_aspects[entity_name] = entity_def.aspects
logger.debug(f"Entity {entity_name} has aspects: {entity_def.aspects}")
# Process each aspect
for aspect_name, aspect_def in aspect_registry.items():
logger.debug(f"Processing aspect: {aspect_name}")
if not aspect_def.schema:
logger.warning(f"Aspect {aspect_name} has no schema, skipping")
continue
# Extract lineage fields from this aspect
lineage_fields = extract_lineage_fields_from_schema(aspect_def.schema)
if lineage_fields:
logger.info(f"Found {len(lineage_fields)} lineage fields in aspect {aspect_name}")
# Find which entities use this aspect
for entity_name, entity_aspect_list in entity_aspects.items():
if aspect_name in entity_aspect_list:
logger.debug(f"Aspect {aspect_name} is used by entity {entity_name}")
# Initialize entity if not exists
if entity_name not in lineage_data.entities:
lineage_data.entities[entity_name] = LineageEntity(aspects={})
# Add aspect with lineage fields
lineage_aspect = LineageAspect(
aspect=aspect_name,
fields=lineage_fields
)
lineage_data.entities[entity_name].aspects[aspect_name] = lineage_aspect
else:
logger.debug(f"No lineage fields found in aspect {aspect_name}")
# Log summary
total_entities_with_lineage = len(lineage_data.entities)
total_aspects_with_lineage = sum(len(entity.aspects) for entity in lineage_data.entities.values())
total_lineage_fields = sum(
len(aspect.fields)
for entity in lineage_data.entities.values()
for aspect in entity.aspects.values()
)
logger.info("Lineage extraction complete:")
logger.info(f" - Entities with lineage: {total_entities_with_lineage}")
logger.info(f" - Aspects with lineage: {total_aspects_with_lineage}")
logger.info(f" - Total lineage fields: {total_lineage_fields}")
return lineage_data
def generate_lineage_json(lineage_data: LineageData) -> str:
"""
Generate JSON representation of lineage data.
Args:
lineage_data: The lineage data to convert to JSON
Returns:
JSON string representation
"""
logger.info("Generating lineage JSON")
# Convert dataclasses to dictionaries
def lineage_field_to_dict(field: LineageField) -> Dict[str, Any]:
result = {
"name": field.name,
"path": field.path,
"isLineage": field.isLineage
}
if field.relationship:
result["relationship"] = {
"name": field.relationship.name,
"entityTypes": field.relationship.entityTypes,
"isLineage": field.relationship.isLineage
}
return result
def lineage_aspect_to_dict(aspect: LineageAspect) -> Dict[str, Any]:
return {
"aspect": aspect.aspect,
"fields": [lineage_field_to_dict(field) for field in aspect.fields]
}
def lineage_entity_to_dict(entity: LineageEntity) -> Dict[str, Any]:
return {
aspect_name: lineage_aspect_to_dict(aspect)
for aspect_name, aspect in entity.aspects.items()
}
# Build the final JSON structure
json_data = {
"entities": {
entity_name: lineage_entity_to_dict(entity)
for entity_name, entity in lineage_data.entities.items()
}
}
json_data["generated_by"] = "metadata-ingestion/scripts/modeldocgen.py"
json_data["generated_at"] = datetime.now(timezone.utc).isoformat()
json_string = json.dumps(json_data, indent=2)
logger.info(f"Generated lineage JSON with {len(json_string)} characters")
return json_string
@dataclass
@ -243,7 +492,7 @@ def make_relnship_docs(relationships: List[Relationship], direction: str) -> str
def make_entity_docs(entity_display_name: str, graph: RelationshipGraph) -> str:
entity_name = entity_display_name[0:1].lower() + entity_display_name[1:]
entity_def: Optional[EntityDefinition] = entity_registry.get(entity_name, None)
entity_def: Optional[EntityDefinition] = entity_registry.get(entity_name)
if entity_def:
doc = entity_def.doc_file_contents or (
f"# {entity_def.display_name}\n{entity_def.doc}"
@ -330,8 +579,6 @@ def generate_stitched_record(
final_path = re.sub(r"^\[version=2.0\]\.", "", final_path)
return final_path
datasets: List[DatasetSnapshotClass] = []
for entity_name, entity_def in entity_registry.items():
entity_display_name = entity_def.display_name
entity_fields = []
@ -402,7 +649,7 @@ def generate_stitched_record(
if entity_def.keyAspect == aspect_info.get("name"):
f_field.isPartOfKey = True
if "timeseries" == aspect_info.get("type", ""):
if aspect_info.get("type", "") == "timeseries":
# f_field.globalTags = f_field.globalTags or GlobalTagsClass(
# tags=[]
# )
@ -425,7 +672,7 @@ def generate_stitched_record(
len(relationship_info.keys()) == 1
), "We should never have more than one path spec assigned to a relationship annotation"
final_info = None
for k, v in relationship_info.items():
for _, v in relationship_info.items():
final_info = v
relationship_info = final_info
@ -620,6 +867,7 @@ def get_sorted_entity_names(
)
@click.option("--png", type=str, required=False)
@click.option("--extra-docs", type=str, required=False)
@click.option("--lineage-output", type=str, required=False, help="generate lineage JSON file")
def generate(
schemas_root: str,
registry: str,
@ -629,11 +877,13 @@ def generate(
dot: Optional[str],
png: Optional[str],
extra_docs: Optional[str],
lineage_output: Optional[str],
) -> None:
logger.info(f"server = {server}")
logger.info(f"file = {file}")
logger.info(f"dot = {dot}")
logger.info(f"png = {png}")
logger.info(f"lineage_output = {lineage_output}")
entity_extra_docs = {}
if extra_docs:
@ -663,6 +913,28 @@ def generate(
entity_name
]
if lineage_output:
logger.info(f"Generating lineage JSON to {lineage_output}")
try:
lineage_data = extract_lineage_fields()
lineage_json = generate_lineage_json(lineage_data)
output_path = Path(lineage_output)
output_path.parent.mkdir(parents=True, exist_ok=True)
new_json_data = json.loads(lineage_json)
write_file = should_write_json_file(output_path, new_json_data, "lineage file")
if write_file:
with open(output_path, 'w') as f:
f.write(lineage_json)
logger.info(f"Successfully wrote lineage JSON to {lineage_output}")
except Exception as e:
logger.error(f"Failed to generate lineage JSON: {e}")
raise
relationship_graph = RelationshipGraph()
mcps = list(generate_stitched_record(relationship_graph))
@ -672,11 +944,9 @@ def generate(
sorted_entity_names = get_sorted_entity_names(entity_names)
index = 0
for category, sorted_entities in sorted_entity_names:
for _, sorted_entities in sorted_entity_names:
for entity_name in sorted_entities:
entity_def = entity_registry[entity_name]
entity_category = entity_def.category
entity_dir = f"{generated_docs_dir}/entities/"
os.makedirs(entity_dir, exist_ok=True)

View File

@ -12,12 +12,16 @@ from modeldocgen import (
AspectDefinition,
EntityCategory,
EntityDefinition,
LineageData,
Relationship,
RelationshipAdjacency,
RelationshipGraph,
aspect_registry,
capitalize_first,
entity_registry,
extract_lineage_fields,
extract_lineage_fields_from_schema,
generate_lineage_json,
generate_stitched_record,
get_sorted_entity_names,
load_schema_file,
@ -929,3 +933,173 @@ class TestLoadSchemaFile:
assert entity_def.name == "dataset"
assert entity_def.keyAspect == "datasetKey"
assert "datasetProperties" in entity_def.aspects
class TestLineageGeneration:
"""Test lineage generation functionality."""
def test_extract_lineage_fields_from_schema_simple(self):
"""Test extracting lineage fields from a simple schema with isLineage property."""
clear_registries()
# Create a simple schema with a lineage field
schema_dict = {
"type": "record",
"name": "TestAspect",
"fields": [
{"name": "upstreams", "type": "string", "isLineage": True},
{"name": "description", "type": "string"},
],
}
schema = avro.schema.parse(json.dumps(schema_dict))
# Extract lineage fields
lineage_fields = extract_lineage_fields_from_schema(schema)
# Should find one lineage field
assert len(lineage_fields) == 1
field = lineage_fields[0]
assert field.name == "upstreams"
assert field.path == "upstreams"
assert field.isLineage is True
assert field.relationship is None
def test_extract_lineage_fields_from_schema_with_relationship(self):
"""Test extracting lineage fields from schema with Relationship property."""
clear_registries()
# Create a schema with a relationship field
schema_dict = {
"type": "record",
"name": "TestAspect",
"fields": [
{
"name": "relatedDataset",
"type": "string",
"Relationship": {
"entityTypes": ["dataset"],
"name": "DownstreamOf",
"isLineage": True,
},
}
],
}
schema = avro.schema.parse(json.dumps(schema_dict))
# Extract lineage fields
lineage_fields = extract_lineage_fields_from_schema(schema)
# Should find one lineage field with relationship info
assert len(lineage_fields) == 1
field = lineage_fields[0]
assert field.name == "relatedDataset"
assert field.path == "relatedDataset"
assert field.isLineage is True
assert field.relationship is not None
assert field.relationship.name == "DownstreamOf"
assert field.relationship.entityTypes == ["dataset"]
assert field.relationship.isLineage is True
def test_generate_lineage_json_empty_data(self):
"""Test generating JSON from empty lineage data."""
clear_registries()
# Create empty lineage data
lineage_data = LineageData(entities={})
# Generate JSON
json_string = generate_lineage_json(lineage_data)
# Parse and verify structure
json_data = json.loads(json_string)
assert "entities" in json_data
assert json_data["entities"] == {}
# Verify metadata fields are present
assert "generated_by" in json_data
assert json_data["generated_by"] == "metadata-ingestion/scripts/modeldocgen.py"
assert "generated_at" in json_data
assert isinstance(json_data["generated_at"], str)
# Verify it's valid JSON
assert isinstance(json_string, str)
assert json_string.strip().startswith("{")
assert json_string.strip().endswith("}")
def test_extract_lineage_fields_from_schema_nested(self):
"""Test extracting lineage fields from nested schema structures."""
clear_registries()
# Create a schema with nested lineage fields
schema_dict = {
"type": "record",
"name": "TestAspect",
"fields": [
{
"name": "upstreams",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "UpstreamInfo",
"fields": [
{
"name": "dataset",
"type": "string",
"isLineage": True,
},
{"name": "metadata", "type": "string"},
],
},
},
}
],
}
schema = avro.schema.parse(json.dumps(schema_dict))
# Extract lineage fields
lineage_fields = extract_lineage_fields_from_schema(schema)
# Should find one lineage field in the nested structure
assert len(lineage_fields) == 1
field = lineage_fields[0]
assert field.name == "dataset"
assert field.path == "upstreams.dataset"
assert field.isLineage is True
def test_extract_lineage_fields_with_registry_data(self):
"""Test the main extract_lineage_fields function with registry data."""
clear_registries()
# Setup entity and aspect in registries
entity = EntityDefinition(
name="dataset", keyAspect="datasetKey", aspects=["upstreamLineage"]
)
entity_registry["dataset"] = entity
# Create aspect schema with lineage field
aspect_schema_dict = {
"type": "record",
"name": "UpstreamLineage",
"fields": [{"name": "upstreams", "type": "string", "isLineage": True}],
}
aspect_schema = avro.schema.parse(json.dumps(aspect_schema_dict))
aspect_registry["upstreamLineage"] = AspectDefinition(
name="upstreamLineage", schema=aspect_schema
)
# Extract lineage fields
lineage_data = extract_lineage_fields()
# Verify structure
assert "dataset" in lineage_data.entities
assert "upstreamLineage" in lineage_data.entities["dataset"].aspects
aspect_data = lineage_data.entities["dataset"].aspects["upstreamLineage"]
assert aspect_data.aspect == "upstreamLineage"
assert len(aspect_data.fields) == 1
field = aspect_data.fields[0]
assert field.name == "upstreams"
assert field.path == "upstreams"
assert field.isLineage is True

View File

@ -0,0 +1,53 @@
import json
import tempfile
from pathlib import Path
from utils import should_write_json_file
class TestShouldWriteJsonFile:
"""Test the should_write_json_file utility function."""
def test_should_write_when_file_does_not_exist(self):
"""Test that function returns True when output file doesn't exist."""
with tempfile.TemporaryDirectory() as temp_dir:
output_path = Path(temp_dir) / "nonexistent.json"
new_data = {"key": "value", "generated_at": "2023-01-01T00:00:00Z"}
result = should_write_json_file(output_path, new_data)
assert result is True
def test_should_write_when_content_changed(self):
"""Test that function returns True when content has actually changed."""
with tempfile.TemporaryDirectory() as temp_dir:
output_path = Path(temp_dir) / "test.json"
# Create existing file with different content
existing_data = {"key": "old_value", "generated_at": "2023-01-01T00:00:00Z"}
with open(output_path, "w") as f:
json.dump(existing_data, f)
# New data with different content
new_data = {"key": "new_value", "generated_at": "2023-01-02T00:00:00Z"}
result = should_write_json_file(output_path, new_data)
assert result is True
def test_should_not_write_when_only_timestamp_changed(self):
"""Test that function returns False when only generated_at timestamp changed."""
with tempfile.TemporaryDirectory() as temp_dir:
output_path = Path(temp_dir) / "test.json"
# Create existing file
existing_data = {"key": "value", "generated_at": "2023-01-01T00:00:00Z"}
with open(output_path, "w") as f:
json.dump(existing_data, f)
# New data with same content but different timestamp
new_data = {"key": "value", "generated_at": "2023-01-02T00:00:00Z"}
result = should_write_json_file(output_path, new_data)
assert result is False

View File

@ -0,0 +1,49 @@
import json
import logging
from pathlib import Path
from typing import Dict, Any
logger = logging.getLogger(__name__)
def should_write_json_file(
output_path: Path,
new_json_data: Dict[str, Any],
file_description: str = "JSON file"
) -> bool:
"""
Check if a JSON file should be written by comparing content with existing file.
This function compares the new JSON data with existing file content, excluding
the 'generated_at' field from comparison since it changes on every generation.
Args:
output_path: Path to the output file
new_json_data: The new JSON data to potentially write
file_description: Description of the file for logging purposes
Returns:
True if the file should be written, False if content is unchanged
"""
write_file = True
if output_path.exists():
try:
with open(output_path, "r") as f:
existing_data = json.load(f)
# Create copies without generated_at for comparison
existing_for_comparison = existing_data.copy()
new_for_comparison = new_json_data.copy()
existing_for_comparison.pop("generated_at", None)
new_for_comparison.pop("generated_at", None)
if json.dumps(
existing_for_comparison, indent=2, sort_keys=True
) == json.dumps(new_for_comparison, indent=2, sort_keys=True):
logger.info(f"No changes detected in {output_path}, skipping write.")
write_file = False
except Exception as e:
logger.warning(f"Could not read existing {file_description}: {e}")
return write_file

View File

@ -0,0 +1,401 @@
{
"entities": {
"dataJob": {
"dataJobInputOutput": {
"aspect": "dataJobInputOutput",
"fields": [
{
"name": "inputDatasets",
"path": "inputDatasets",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "inputDatasetEdges",
"path": "inputDatasetEdges",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "outputDatasets",
"path": "outputDatasets",
"isLineage": true,
"relationship": {
"name": "Produces",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "outputDatasetEdges",
"path": "outputDatasetEdges",
"isLineage": true,
"relationship": {
"name": "Produces",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "inputDatajobs",
"path": "inputDatajobs",
"isLineage": true,
"relationship": {
"name": "DownstreamOf",
"entityTypes": [
"dataJob"
],
"isLineage": true
}
},
{
"name": "inputDatajobEdges",
"path": "inputDatajobEdges",
"isLineage": true,
"relationship": {
"name": "DownstreamOf",
"entityTypes": [
"dataJob"
],
"isLineage": true
}
}
]
}
},
"dataProcessInstance": {
"dataProcessInstanceOutput": {
"aspect": "dataProcessInstanceOutput",
"fields": [
{
"name": "outputEdges",
"path": "outputEdges",
"isLineage": true,
"relationship": {
"name": "DataProcessInstanceProduces",
"entityTypes": [
"dataset",
"mlModel",
"dataProcessInstance"
],
"isLineage": true
}
}
]
},
"dataProcessInstanceInput": {
"aspect": "dataProcessInstanceInput",
"fields": [
{
"name": "inputEdges",
"path": "inputEdges",
"isLineage": true,
"relationship": {
"name": "DataProcessInstanceConsumes",
"entityTypes": [
"dataset",
"mlModel",
"dataProcessInstance"
],
"isLineage": true
}
}
]
}
},
"dataProcess": {
"dataProcessInfo": {
"aspect": "dataProcessInfo",
"fields": [
{
"name": "inputs",
"path": "inputs",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "outputs",
"path": "outputs",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
}
]
}
},
"dataset": {
"upstreamLineage": {
"aspect": "upstreamLineage",
"fields": [
{
"name": "dataset",
"path": "upstreams.dataset",
"isLineage": true,
"relationship": {
"name": "DownstreamOf",
"entityTypes": [
"dataset"
],
"isLineage": true
}
}
]
}
},
"chart": {
"chartInfo": {
"aspect": "chartInfo",
"fields": [
{
"name": "inputs",
"path": "inputs",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "inputEdges",
"path": "inputEdges",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
}
]
}
},
"dashboard": {
"dashboardInfo": {
"aspect": "dashboardInfo",
"fields": [
{
"name": "charts",
"path": "charts",
"isLineage": true,
"relationship": {
"name": "Contains",
"entityTypes": [
"chart"
],
"isLineage": true
}
},
{
"name": "chartEdges",
"path": "chartEdges",
"isLineage": true,
"relationship": {
"name": "Contains",
"entityTypes": [
"chart"
],
"isLineage": true
}
},
{
"name": "datasets",
"path": "datasets",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "datasetEdges",
"path": "datasetEdges",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"dataset"
],
"isLineage": true
}
},
{
"name": "dashboards",
"path": "dashboards",
"isLineage": true,
"relationship": {
"name": "DashboardContainsDashboard",
"entityTypes": [
"dashboard"
],
"isLineage": true
}
}
]
}
},
"mlModelGroup": {
"mlModelGroupProperties": {
"aspect": "mlModelGroupProperties",
"fields": [
{
"name": "trainingJobs",
"path": "trainingJobs",
"isLineage": true,
"relationship": {
"name": "TrainedBy",
"entityTypes": [
"dataJob",
"dataProcessInstance"
],
"isLineage": true
}
},
{
"name": "downstreamJobs",
"path": "downstreamJobs",
"isLineage": true,
"relationship": {
"name": "UsedBy",
"entityTypes": [
"dataJob",
"dataProcessInstance"
],
"isLineage": true
}
}
]
}
},
"mlFeature": {
"mlFeatureProperties": {
"aspect": "mlFeatureProperties",
"fields": [
{
"name": "sources",
"path": "sources",
"isLineage": true,
"relationship": {
"name": "DerivedFrom",
"entityTypes": [
"dataset"
],
"isLineage": true
}
}
]
}
},
"mlPrimaryKey": {
"mlPrimaryKeyProperties": {
"aspect": "mlPrimaryKeyProperties",
"fields": [
{
"name": "sources",
"path": "sources",
"isLineage": true,
"relationship": {
"name": "DerivedFrom",
"entityTypes": [
"dataset"
],
"isLineage": true
}
}
]
}
},
"mlModel": {
"mlModelProperties": {
"aspect": "mlModelProperties",
"fields": [
{
"name": "trainingJobs",
"path": "trainingJobs",
"isLineage": true,
"relationship": {
"name": "TrainedBy",
"entityTypes": [
"dataJob",
"dataProcessInstance"
],
"isLineage": true
}
},
{
"name": "downstreamJobs",
"path": "downstreamJobs",
"isLineage": true,
"relationship": {
"name": "UsedBy",
"entityTypes": [
"dataJob",
"dataProcessInstance"
],
"isLineage": true
}
},
{
"name": "mlFeatures",
"path": "mlFeatures",
"isLineage": true,
"relationship": {
"name": "Consumes",
"entityTypes": [
"mlFeature"
],
"isLineage": true
}
},
{
"name": "groups",
"path": "groups",
"isLineage": true,
"relationship": {
"name": "MemberOf",
"entityTypes": [
"mlModelGroup"
],
"isLineage": true
}
}
]
}
}
},
"generated_by": "metadata-ingestion/scripts/modeldocgen.py",
"generated_at": "2025-07-01T10:49:03.713749+00:00"
}

View File

@ -0,0 +1,193 @@
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from datahub.utilities.urns.urn import guess_entity_type
logger = logging.getLogger(__name__)
# Global cache for lineage data to avoid repeated file reads
_lineage_data: Optional[Dict] = None
def _load_lineage_data() -> Dict:
"""
This is experimental internal API subject to breaking changes without prior notice.
Load lineage data from the autogenerated lineage.json file.
Returns:
Dict containing the lineage information
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
"""
global _lineage_data
if _lineage_data is not None:
return _lineage_data
# Get the path to lineage.json relative to this file
current_file = Path(__file__)
lineage_file = current_file.parent / "lineage.json"
if not lineage_file.exists():
raise FileNotFoundError(f"Lineage file not found: {lineage_file}")
try:
with open(lineage_file, "r") as f:
_lineage_data = json.load(f)
return _lineage_data
except json.JSONDecodeError as e:
raise json.JSONDecodeError(
f"Failed to parse lineage.json: {e}", e.doc, e.pos
) from e
def get_lineage_fields(entity_type: str, aspect_name: str) -> List[Dict]:
"""
This is experimental internal API subject to breaking changes without prior notice.
Get lineage fields for a specific entity type and aspect.
Args:
entity_type: The entity type (e.g., 'dataset', 'dataJob')
aspect_name: The aspect name (e.g., 'upstreamLineage', 'dataJobInputOutput')
Returns:
List of lineage field dictionaries, each containing:
- name: field name
- path: dot-notation path to the field
- isLineage: boolean indicating if it's lineage
- relationship: relationship information
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
"""
lineage_data = _load_lineage_data()
entity_data = lineage_data.get("entities", {}).get(entity_type, {})
aspect_data = entity_data.get(aspect_name, {})
return aspect_data.get("fields", [])
def is_lineage_field(urn: str, aspect_name: str, field_path: str) -> bool:
"""
This is experimental internal API subject to breaking changes without prior notice.
Check if a specific field path is lineage-related.
Args:
urn: The entity URN (e.g., 'urn:li:dataset:(urn:li:dataPlatform:mysql,test_db.test_table,PROD)')
aspect_name: The aspect name (e.g., 'upstreamLineage', 'dataJobInputOutput')
field_path: The dot-notation path to the field (e.g., 'upstreams.dataset')
Returns:
True if the field is lineage-related, False otherwise
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
AssertionError: If URN doesn't start with 'urn:li:'
"""
entity_type = guess_entity_type(urn)
lineage_fields = get_lineage_fields(entity_type, aspect_name)
for field in lineage_fields:
if field.get("path") == field_path:
return field.get("isLineage", False)
return False
def has_lineage(urn: str, aspect: Any) -> bool:
"""
This is experimental internal API subject to breaking changes without prior notice.
Check if an aspect has any lineage fields.
Args:
urn: The entity URN (e.g., 'urn:li:dataset:(urn:li:dataPlatform:mysql,test_db.test_table,PROD)')
aspect: The aspect object
Returns:
True if the aspect has lineage fields, False otherwise
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
AssertionError: If URN doesn't start with 'urn:li:'
"""
entity_type = guess_entity_type(urn)
aspect_class = getattr(aspect, "__class__", None)
aspect_name = (
aspect_class.__name__ if aspect_class is not None else str(type(aspect))
)
lineage_fields = get_lineage_fields(entity_type, aspect_name)
return len(lineage_fields) > 0
def has_lineage_aspect(entity_type: str, aspect_name: str) -> bool:
"""
This is experimental internal API subject to breaking changes without prior notice.
Check if an aspect has any lineage fields.
Args:
entity_type: The entity type (e.g., 'dataset', 'dataJob')
aspect_name: The aspect name (e.g., 'upstreamLineage', 'dataJobInputOutput')
Returns:
True if the aspect has lineage fields, False otherwise
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
"""
lineage_fields = get_lineage_fields(entity_type, aspect_name)
return len(lineage_fields) > 0
def get_all_lineage_aspects(entity_type: str) -> Set[str]:
"""
This is experimental internal API subject to breaking changes without prior notice.
Get all aspects that have lineage fields for a given entity type.
Args:
entity_type: The entity type (e.g., 'dataset', 'dataJob')
Returns:
Set of aspect names that have lineage fields
Raises:
FileNotFoundError: If lineage.json doesn't exist
json.JSONDecodeError: If lineage.json is malformed
"""
lineage_data = _load_lineage_data()
entity_data = lineage_data.get("entities", {}).get(entity_type, {})
lineage_aspects = set()
for aspect_name, aspect_data in entity_data.items():
if aspect_data.get("fields"):
lineage_aspects.add(aspect_name)
return lineage_aspects
def clear_cache() -> None:
"""
This is experimental internal API subject to breaking changes without prior notice.
Clear the internal cache of lineage data.
This is useful for testing or when the lineage.json file has been updated.
"""
global _lineage_data
_lineage_data = None

View File

@ -0,0 +1,246 @@
import json
from typing import Any, Dict
import pytest
from datahub.ingestion.autogenerated.lineage_helper import (
_load_lineage_data,
clear_cache,
get_all_lineage_aspects,
get_lineage_fields,
has_lineage,
has_lineage_aspect,
is_lineage_field,
)
class MockAspect:
"""Mock aspect class for testing."""
def __init__(self, class_name="UpstreamLineage"):
self.__class__ = type(class_name, (), {})
class TestLineageHelper:
"""Test cases for lineage helper functions."""
@pytest.fixture
def mock_lineage_fields(self):
"""Fixture for common lineage fields mock data."""
return [{"name": "dataset", "path": "upstreams.dataset", "isLineage": True}]
@pytest.fixture
def mock_lineage_data(self):
"""Fixture for common lineage data mock structure."""
return {
"entities": {
"dataset": {
"upstreamLineage": {
"aspect": "upstreamLineage",
"fields": [
{
"name": "dataset",
"path": "upstreams.dataset",
"isLineage": True,
"relationship": {
"name": "DownstreamOf",
"entityTypes": ["dataset"],
"isLineage": True,
},
}
],
}
}
}
}
@pytest.fixture
def mock_file_data(self, mock_lineage_data):
"""Fixture for mock file data."""
return json.dumps(mock_lineage_data)
def setup_method(self):
"""Clear cache before each test."""
clear_cache()
def teardown_method(self):
"""Clear cache after each test."""
clear_cache()
def setup_mock_get_fields(self, monkeypatch, fields):
"""Helper to setup mock for get_lineage_fields function."""
def mock_get_fields(*args, **kwargs):
return fields
monkeypatch.setattr(
"datahub.ingestion.autogenerated.lineage_helper.get_lineage_fields",
mock_get_fields,
)
def setup_mock_load_data(self, monkeypatch, data):
"""Helper to setup mock for _load_lineage_data function."""
def mock_load_data():
return data
monkeypatch.setattr(
"datahub.ingestion.autogenerated.lineage_helper._load_lineage_data",
mock_load_data,
)
def setup_mock_file_operations(self, monkeypatch, file_data, exists=True):
"""Helper to setup mock for file operations."""
def mock_open_file(*args, **kwargs):
class MockFile:
def __enter__(self):
return self
def __exit__(self, *args):
pass
def read(self):
return file_data
return MockFile()
def mock_path_exists(*args, **kwargs):
return exists
monkeypatch.setattr("builtins.open", mock_open_file)
monkeypatch.setattr("pathlib.Path.exists", mock_path_exists)
def test_load_lineage_data_success(
self, monkeypatch, mock_file_data, mock_lineage_data
):
"""Test successful loading of lineage data."""
self.setup_mock_file_operations(monkeypatch, mock_file_data, exists=True)
result = _load_lineage_data()
assert result == mock_lineage_data
assert (
result["entities"]["dataset"]["upstreamLineage"]["fields"][0]["isLineage"]
is True
)
def test_load_lineage_data_file_not_found(self, monkeypatch):
"""Test handling of missing lineage.json file."""
self.setup_mock_file_operations(monkeypatch, "", exists=False)
with pytest.raises(FileNotFoundError):
_load_lineage_data()
def test_load_lineage_data_invalid_json(self, monkeypatch):
"""Test handling of malformed JSON."""
self.setup_mock_file_operations(monkeypatch, "invalid json", exists=True)
with pytest.raises(json.JSONDecodeError):
_load_lineage_data()
def test_get_lineage_fields(self, monkeypatch, mock_lineage_data):
"""Test getting lineage fields for an entity and aspect."""
self.setup_mock_load_data(monkeypatch, mock_lineage_data)
result = get_lineage_fields("dataset", "upstreamLineage")
assert len(result) == 1
assert result[0]["name"] == "dataset"
assert result[0]["path"] == "upstreams.dataset"
assert result[0]["isLineage"] is True
def test_get_lineage_fields_no_fields(self, monkeypatch):
"""Test getting lineage fields when none exist."""
mock_data: Dict[str, Any] = {"entities": {"dataset": {"upstreamLineage": {}}}}
self.setup_mock_load_data(monkeypatch, mock_data)
result = get_lineage_fields("dataset", "upstreamLineage")
assert result == []
@pytest.mark.parametrize(
"field_path,expected",
[
("upstreams.dataset", True),
("nonexistent.path", False),
],
)
def test_is_lineage_field(
self, monkeypatch, mock_lineage_fields, field_path, expected
):
"""Test checking if a field is lineage-related."""
self.setup_mock_get_fields(monkeypatch, mock_lineage_fields)
result = is_lineage_field("urn:li:dataset:test", "upstreamLineage", field_path)
assert result is expected
def test_is_lineage_field_invalid_urn(self):
"""Test checking if a field is lineage-related with invalid URN."""
with pytest.raises(AssertionError):
is_lineage_field("invalid:urn", "upstreamLineage", "upstreams.dataset")
@pytest.mark.parametrize(
"has_fields,expected",
[
(True, True),
(False, False),
],
)
def test_has_lineage(self, monkeypatch, mock_lineage_fields, has_fields, expected):
"""Test checking if an aspect has lineage fields."""
fields = mock_lineage_fields if has_fields else []
self.setup_mock_get_fields(monkeypatch, fields)
result = has_lineage("urn:li:dataset:test", MockAspect())
assert result is expected
def test_has_lineage_invalid_urn(self):
"""Test checking if an aspect has lineage fields with invalid URN."""
with pytest.raises(AssertionError):
has_lineage("invalid:urn", MockAspect())
@pytest.mark.parametrize(
"has_fields,expected",
[
(True, True),
(False, False),
],
)
def test_has_lineage_aspect(
self, monkeypatch, mock_lineage_fields, has_fields, expected
):
"""Test checking if an aspect has lineage fields."""
fields = mock_lineage_fields if has_fields else []
self.setup_mock_get_fields(monkeypatch, fields)
result = has_lineage_aspect("dataset", "upstreamLineage")
assert result is expected
def test_get_all_lineage_aspects(self, monkeypatch):
"""Test getting all lineage aspects for an entity type."""
mock_data = {
"entities": {
"dataset": {
"upstreamLineage": {"fields": [{"name": "field1"}]},
"ownership": {"fields": []}, # No lineage fields
"schemaMetadata": {"fields": [{"name": "field2"}]},
}
}
}
self.setup_mock_load_data(monkeypatch, mock_data)
result = get_all_lineage_aspects("dataset")
assert result == {"upstreamLineage", "schemaMetadata"}
assert "ownership" not in result
def test_clear_cache(self):
"""Test clearing the internal cache."""
# This test ensures the clear_cache function doesn't raise any exceptions
clear_cache()
# If we get here without exceptions, the test passes
assert True