feat(build): run linting on scripts folder (#14361)

This commit is contained in:
Aseem Bansal 2025-08-07 15:29:01 +05:30 committed by GitHub
parent 5ed87a8df6
commit 502572daaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 150 additions and 125 deletions

View File

@ -140,16 +140,16 @@ task modelDocUpload(type: Exec, dependsOn: [modelDocGen]) {
task lint(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
venv_activate_command +
"ruff check scripts/capability_summary.py scripts/tests/ src/ tests/ examples/ && " +
"ruff format --check scripts/capability_summary.py scripts/tests/ src/ tests/ examples/ && " +
"ruff check scripts/ scripts/tests/ src/ tests/ examples/ && " +
"ruff format --check scripts/ scripts/tests/ src/ tests/ examples/ && " +
"mypy --show-traceback --show-error-codes src/ tests/ examples/"
}
task lintFix(type: Exec, dependsOn: installDev) {
commandLine 'bash', '-c',
venv_activate_command +
"ruff check --fix scripts/capability_summary.py scripts/tests/ src/ tests/ examples/ && " +
"ruff format scripts/capability_summary.py scripts/tests/ src/ tests/ examples/ "
"ruff check --fix scripts/ scripts/tests/ src/ tests/ examples/ && " +
"ruff format scripts/ scripts/tests/ src/ tests/ examples/ "
}
def pytest_default_env = "" // set to "PYTHONDEVMODE=1" to warn on unclosed sockets

View File

@ -4,7 +4,7 @@ import json
import re
import textwrap
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple, Union, Set
from typing import Dict, Iterable, List, Optional, Tuple, Union
import avro.schema
import click

View File

@ -31,6 +31,7 @@ DENY_LIST = {
"datahub-mock-data",
}
def get_snippet(long_string: str, max_length: int = 100) -> str:
snippet = ""
if len(long_string) > max_length:
@ -89,7 +90,7 @@ def map_capability_name_to_enum(capability_name: str) -> SourceCapability:
try:
return SourceCapability(capability_name)
except ValueError:
raise ValueError(f"Unknown capability name: {capability_name}")
raise ValueError(f"Unknown capability name: {capability_name}") from None
def does_extra_exist(extra_name: str) -> bool:
@ -285,7 +286,7 @@ class PlatformMetrics:
)
@click.option("--extra-docs", type=str, required=False)
@click.option("--source", type=str, required=False)
def generate(
def generate( # noqa: C901
out_dir: str,
capability_summary: str,
extra_docs: Optional[str] = None,

View File

@ -229,15 +229,15 @@ class FieldHeader(FieldRow):
def get_prefixed_name(field_prefix: Optional[str], field_name: Optional[str]) -> str:
assert (
field_prefix or field_name
), "One of field_prefix or field_name should be present"
assert field_prefix or field_name, (
"One of field_prefix or field_name should be present"
)
return (
f"{field_prefix}.{field_name}" # type: ignore
if field_prefix and field_name
else field_name
if not field_prefix
else field_prefix
if field_prefix
else field_name
)
@ -343,21 +343,23 @@ def priority_value(path: str) -> str:
return "A"
def should_hide_field(schema_field, current_source: str, schema_dict: Dict[str, Any]) -> bool:
def should_hide_field(
schema_field, current_source: str, schema_dict: Dict[str, Any]
) -> bool:
"""Check if field should be hidden for the current source"""
# Extract field name from the path
field_name = schema_field.fieldPath.split('.')[-1]
field_name = schema_field.fieldPath.split(".")[-1]
for ends_with in [
"pattern.[type=array].allow",
"pattern.[type=array].allow.[type=string].string",
"pattern.[type=array].deny",
"pattern.[type=array].deny.[type=string].string",
"pattern.[type=boolean].ignoreCase"
"pattern.[type=boolean].ignoreCase",
]:
# We don't want repeated allow/deny/ignoreCase for Allow/Deny patterns in docs
if schema_field.fieldPath.endswith(ends_with):
return True
# Look in definitions for the field schema
definitions = schema_dict.get("definitions", {})
for _, def_schema in definitions.items():
@ -366,13 +368,18 @@ def should_hide_field(schema_field, current_source: str, schema_dict: Dict[str,
field_schema = properties[field_name]
schema_extra = field_schema.get("schema_extra", {})
supported_sources = schema_extra.get("supported_sources")
if supported_sources and current_source:
return current_source.lower() not in [s.lower() for s in supported_sources]
return current_source.lower() not in [
s.lower() for s in supported_sources
]
return False
def gen_md_table_from_json_schema(schema_dict: Dict[str, Any], current_source: Optional[str] = None) -> str:
def gen_md_table_from_json_schema(
schema_dict: Dict[str, Any], current_source: Optional[str] = None
) -> str:
# we don't want default field values to be injected into the description of the field
JsonSchemaTranslator._INJECT_DEFAULTS_INTO_DESCRIPTION = False
schema_fields = list(JsonSchemaTranslator.get_fields_from_schema(schema_dict))
@ -396,13 +403,18 @@ def gen_md_table_from_json_schema(schema_dict: Dict[str, Any], current_source: O
return "".join(result)
def gen_md_table_from_pydantic(model: Type[BaseModel], current_source: Optional[str] = None) -> str:
def gen_md_table_from_pydantic(
model: Type[BaseModel], current_source: Optional[str] = None
) -> str:
return gen_md_table_from_json_schema(model.schema(), current_source)
if __name__ == "__main__":
# Simple test code.
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
print("".join(gen_md_table_from_pydantic(SnowflakeV2Config, current_source="snowflake")))
print(
"".join(
gen_md_table_from_pydantic(SnowflakeV2Config, current_source="snowflake")
)
)

View File

@ -201,207 +201,217 @@ def load_schema_file(schema_file: str) -> None:
all_aspects.append(aspect_name)
entity_definition.aspects = all_aspects
entity_registry[entity_name] = entity_definition
logger.debug(f"Loaded entity schema: {entity_name} with aspects: {all_aspects}")
logger.debug(
f"Loaded entity schema: {entity_name} with aspects: {all_aspects}"
)
else:
logger.debug(f"Ignoring schema {schema_file}")
def extract_lineage_fields_from_schema(
schema: avro.schema.Schema,
current_path: str = ""
schema: avro.schema.Schema, current_path: str = ""
) -> List[LineageField]:
"""
Recursively extract lineage fields from an Avro schema.
Args:
schema: The Avro schema to analyze
current_path: The current field path (for nested fields)
Returns:
List of LineageField objects found in the schema
"""
lineage_fields = []
if isinstance(schema, avro.schema.RecordSchema):
logger.debug(f"Analyzing record schema at path: {current_path}")
for field in schema.fields:
field_path = f"{current_path}.{field.name}" if current_path else field.name
logger.debug(f"Analyzing field: {field.name} at path: {field_path}")
# Check if this field has lineage properties
is_lineage = False
relationship_info = None
# Check for isLineage property
if hasattr(field, 'other_props') and field.other_props:
is_lineage = field.other_props.get('isLineage', False)
if hasattr(field, "other_props") and field.other_props:
is_lineage = field.other_props.get("isLineage", False)
if is_lineage:
logger.debug(f"Found isLineage=true for field: {field_path}")
# Check for Relationship property
if 'Relationship' in field.other_props:
relationship_data = field.other_props['Relationship']
logger.debug(f"Found Relationship property for field: {field_path}: {relationship_data}")
if "Relationship" in field.other_props:
relationship_data = field.other_props["Relationship"]
logger.debug(
f"Found Relationship property for field: {field_path}: {relationship_data}"
)
# Handle both direct relationship and path-based relationship
if 'entityTypes' in relationship_data:
if "entityTypes" in relationship_data:
# Direct relationship
relationship_is_lineage = relationship_data.get('isLineage', False)
relationship_is_lineage = relationship_data.get(
"isLineage", False
)
relationship_info = LineageRelationship(
name=relationship_data.get('name', ''),
entityTypes=relationship_data.get('entityTypes', []),
isLineage=relationship_is_lineage
name=relationship_data.get("name", ""),
entityTypes=relationship_data.get("entityTypes", []),
isLineage=relationship_is_lineage,
)
is_lineage = is_lineage or relationship_is_lineage
else:
# Path-based relationship - find the actual relationship data
for _, value in relationship_data.items():
if isinstance(value, dict) and 'entityTypes' in value:
relationship_is_lineage = value.get('isLineage', False)
if isinstance(value, dict) and "entityTypes" in value:
relationship_is_lineage = value.get("isLineage", False)
relationship_info = LineageRelationship(
name=value.get('name', ''),
entityTypes=value.get('entityTypes', []),
isLineage=relationship_is_lineage
name=value.get("name", ""),
entityTypes=value.get("entityTypes", []),
isLineage=relationship_is_lineage,
)
is_lineage = is_lineage or relationship_is_lineage
break
# If this field is lineage, add it to the results
if is_lineage:
lineage_field = LineageField(
name=field.name,
path=field_path,
isLineage=True,
relationship=relationship_info
relationship=relationship_info,
)
lineage_fields.append(lineage_field)
logger.debug(f"Added lineage field: {field_path}")
# Recursively check nested fields
nested_fields = extract_lineage_fields_from_schema(field.type, field_path)
lineage_fields.extend(nested_fields)
elif isinstance(schema, avro.schema.ArraySchema):
logger.debug(f"Analyzing array schema at path: {current_path}")
# For arrays, check the items schema
nested_fields = extract_lineage_fields_from_schema(schema.items, current_path)
lineage_fields.extend(nested_fields)
elif isinstance(schema, avro.schema.UnionSchema):
logger.debug(f"Analyzing union schema at path: {current_path}")
# For unions, check all possible schemas
for union_schema in schema.schemas:
nested_fields = extract_lineage_fields_from_schema(union_schema, current_path)
nested_fields = extract_lineage_fields_from_schema(
union_schema, current_path
)
lineage_fields.extend(nested_fields)
return lineage_fields
def extract_lineage_fields() -> LineageData:
"""
Extract lineage fields from all aspects in the aspect registry.
Returns:
LineageData containing all lineage information organized by entity and aspect
"""
logger.info("Starting lineage field extraction")
lineage_data = LineageData(entities={})
# Group aspects by entity
entity_aspects: Dict[str, List[str]] = {}
for entity_name, entity_def in entity_registry.items():
entity_aspects[entity_name] = entity_def.aspects
logger.debug(f"Entity {entity_name} has aspects: {entity_def.aspects}")
# Process each aspect
for aspect_name, aspect_def in aspect_registry.items():
logger.debug(f"Processing aspect: {aspect_name}")
if not aspect_def.schema:
logger.warning(f"Aspect {aspect_name} has no schema, skipping")
continue
# Extract lineage fields from this aspect
lineage_fields = extract_lineage_fields_from_schema(aspect_def.schema)
if lineage_fields:
logger.info(f"Found {len(lineage_fields)} lineage fields in aspect {aspect_name}")
logger.info(
f"Found {len(lineage_fields)} lineage fields in aspect {aspect_name}"
)
# Find which entities use this aspect
for entity_name, entity_aspect_list in entity_aspects.items():
if aspect_name in entity_aspect_list:
logger.debug(f"Aspect {aspect_name} is used by entity {entity_name}")
logger.debug(
f"Aspect {aspect_name} is used by entity {entity_name}"
)
# Initialize entity if not exists
if entity_name not in lineage_data.entities:
lineage_data.entities[entity_name] = LineageEntity(aspects={})
# Add aspect with lineage fields
lineage_aspect = LineageAspect(
aspect=aspect_name,
fields=lineage_fields
aspect=aspect_name, fields=lineage_fields
)
lineage_data.entities[entity_name].aspects[aspect_name] = (
lineage_aspect
)
lineage_data.entities[entity_name].aspects[aspect_name] = lineage_aspect
else:
logger.debug(f"No lineage fields found in aspect {aspect_name}")
# Log summary
total_entities_with_lineage = len(lineage_data.entities)
total_aspects_with_lineage = sum(len(entity.aspects) for entity in lineage_data.entities.values())
total_aspects_with_lineage = sum(
len(entity.aspects) for entity in lineage_data.entities.values()
)
total_lineage_fields = sum(
len(aspect.fields)
for entity in lineage_data.entities.values()
len(aspect.fields)
for entity in lineage_data.entities.values()
for aspect in entity.aspects.values()
)
logger.info("Lineage extraction complete:")
logger.info(f" - Entities with lineage: {total_entities_with_lineage}")
logger.info(f" - Aspects with lineage: {total_aspects_with_lineage}")
logger.info(f" - Total lineage fields: {total_lineage_fields}")
return lineage_data
def generate_lineage_json(lineage_data: LineageData) -> str:
"""
Generate JSON representation of lineage data.
Args:
lineage_data: The lineage data to convert to JSON
Returns:
JSON string representation
"""
logger.info("Generating lineage JSON")
# Convert dataclasses to dictionaries
def lineage_field_to_dict(field: LineageField) -> Dict[str, Any]:
result = {
"name": field.name,
"path": field.path,
"isLineage": field.isLineage
}
result = {"name": field.name, "path": field.path, "isLineage": field.isLineage}
if field.relationship:
result["relationship"] = {
"name": field.relationship.name,
"entityTypes": field.relationship.entityTypes,
"isLineage": field.relationship.isLineage
"isLineage": field.relationship.isLineage,
}
return result
def lineage_aspect_to_dict(aspect: LineageAspect) -> Dict[str, Any]:
return {
"aspect": aspect.aspect,
"fields": [lineage_field_to_dict(field) for field in aspect.fields]
"fields": [lineage_field_to_dict(field) for field in aspect.fields],
}
def lineage_entity_to_dict(entity: LineageEntity) -> Dict[str, Any]:
return {
aspect_name: lineage_aspect_to_dict(aspect)
for aspect_name, aspect in entity.aspects.items()
}
# Build the final JSON structure
json_data = {
"entities": {
@ -409,13 +419,13 @@ def generate_lineage_json(lineage_data: LineageData) -> str:
for entity_name, entity in lineage_data.entities.items()
}
}
json_data["generated_by"] = "metadata-ingestion/scripts/modeldocgen.py"
json_data["generated_at"] = datetime.now(timezone.utc).isoformat()
json_string = json.dumps(json_data, indent=2)
logger.info(f"Generated lineage JSON with {len(json_string)} characters")
return json_string
@ -668,9 +678,9 @@ def generate_stitched_record(
# detect if we have relationship specified at leaf level or thru path specs
if "entityTypes" not in relationship_info:
# path spec
assert (
len(relationship_info.keys()) == 1
), "We should never have more than one path spec assigned to a relationship annotation"
assert len(relationship_info.keys()) == 1, (
"We should never have more than one path spec assigned to a relationship annotation"
)
final_info = None
for _, v in relationship_info.items():
final_info = v
@ -696,7 +706,9 @@ def generate_stitched_record(
make_schema_field_urn(foreign_dataset_urn, "urn")
],
sourceFields=[
make_schema_field_urn(source_dataset_urn, f_field.fieldPath)
make_schema_field_urn(
source_dataset_urn, f_field.fieldPath
)
],
)
foreign_keys.append(fkey)
@ -777,39 +789,37 @@ def load_registry_file(registry_file: str) -> Dict[str, EntityDefinition]:
with open(registry_file, "r") as f:
registry = EntityRegistry.parse_obj(yaml.safe_load(f))
index: int = 0
for entity_def in registry.entities:
index += 1
for index, entity_def in enumerate(registry.entities):
entity_def.priority = index
entity_registry[entity_def.name] = entity_def
return entity_registry
def get_sorted_entity_names(
entity_names: List[Tuple[str, EntityDefinition]]
entity_names: List[Tuple[str, EntityDefinition]],
) -> List[Tuple[str, List[str]]]:
"""
Sort entity names by category and priority for documentation generation.
This function organizes entities into a structured order for generating
documentation. Entities are grouped by category (CORE vs INTERNAL) and
within each category, sorted by priority and then alphabetically.
Business Logic:
- CORE entities are displayed first, followed by INTERNAL entities
- Within each category, entities with priority values are sorted first
- Priority entities are sorted by their priority value (lower numbers = higher priority)
- Non-priority entities are sorted alphabetically after priority entities
- Zero and negative priority values are treated as valid priorities
Args:
entity_names: List of tuples containing (entity_name, EntityDefinition)
Returns:
List of tuples containing (EntityCategory, List[str]) where:
- First tuple: (EntityCategory.CORE, sorted_core_entity_names)
- Second tuple: (EntityCategory.INTERNAL, sorted_internal_entity_names)
Example:
Input: [
("dataset", EntityDefinition(priority=2, category=CORE)),
@ -867,8 +877,10 @@ def get_sorted_entity_names(
)
@click.option("--png", type=str, required=False)
@click.option("--extra-docs", type=str, required=False)
@click.option("--lineage-output", type=str, required=False, help="generate lineage JSON file")
def generate(
@click.option(
"--lineage-output", type=str, required=False, help="generate lineage JSON file"
)
def generate( # noqa: C901
schemas_root: str,
registry: str,
generated_docs_dir: str,
@ -908,7 +920,6 @@ def generate(
if entity_extra_docs:
for entity_name in entity_extra_docs:
entity_registry[entity_name].doc_file_contents = entity_extra_docs[
entity_name
]
@ -918,19 +929,21 @@ def generate(
try:
lineage_data = extract_lineage_fields()
lineage_json = generate_lineage_json(lineage_data)
output_path = Path(lineage_output)
output_path.parent.mkdir(parents=True, exist_ok=True)
new_json_data = json.loads(lineage_json)
write_file = should_write_json_file(output_path, new_json_data, "lineage file")
write_file = should_write_json_file(
output_path, new_json_data, "lineage file"
)
if write_file:
with open(output_path, 'w') as f:
with open(output_path, "w") as f:
f.write(lineage_json)
logger.info(f"Successfully wrote lineage JSON to {lineage_output}")
except Exception as e:
logger.error(f"Failed to generate lineage JSON: {e}")
raise
@ -946,7 +959,6 @@ def generate(
index = 0
for _, sorted_entities in sorted_entity_names:
for entity_name in sorted_entities:
entity_dir = f"{generated_docs_dir}/entities/"
os.makedirs(entity_dir, exist_ok=True)

View File

@ -1,32 +1,32 @@
import json
import logging
from pathlib import Path
from typing import Dict, Any
from typing import Any, Dict
logger = logging.getLogger(__name__)
def should_write_json_file(
output_path: Path,
new_json_data: Dict[str, Any],
file_description: str = "JSON file"
output_path: Path,
new_json_data: Dict[str, Any],
file_description: str = "JSON file",
) -> bool:
"""
Check if a JSON file should be written by comparing content with existing file.
This function compares the new JSON data with existing file content, excluding
the 'generated_at' field from comparison since it changes on every generation.
Args:
output_path: Path to the output file
new_json_data: The new JSON data to potentially write
file_description: Description of the file for logging purposes
Returns:
True if the file should be written, False if content is unchanged
"""
write_file = True
if output_path.exists():
try:
with open(output_path, "r") as f:
@ -45,5 +45,5 @@ def should_write_json_file(
write_file = False
except Exception as e:
logger.warning(f"Could not read existing {file_description}: {e}")
return write_file
return write_file