openspg/python/knext/core/schema/schema_ml.py
2023-11-21 15:17:02 +08:00

1106 lines
45 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright 2023 Ant Group CO., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import copy
import re
from enum import Enum
from knext.core.schema import Schema
from knext.core.schema.model import (
EntityType,
ConceptType,
EventType,
StandardType,
Property,
Relation,
)
from knext.core.schema.model.base import (
HypernymPredicateEnum,
BasicTypeEnum,
ConstraintTypeEnum,
AlterOperationEnum,
SpgTypeEnum,
PropertyGroupEnum,
)
class IndentLevel(Enum):
# Define entity/concept/event/standard types or subtypes
Type = 0
# Define description/properties/relations of type
TypeMeta = 1
# Define property/relation name of type
Predicate = 2
# Define description/constraint/rule of property/relation
PredicateMeta = 3
# Define property about property
SubProperty = 4
# Define constraint of sub property
SubPropertyMeta = 5
class RegisterUnit(Enum):
Type = "type"
Property = "property"
Relation = "relation"
SubProperty = "subProperty"
class SPGSchemaMarkLang:
"""
SPG Schema Mark Language Parser
Feature1: parse schema script and then alter the schema of project
Feature2: export schema script from a project
"""
internal_type = set()
entity_internal_property = set()
event_internal_property = {"eventTime"}
concept_internal_property = {"stdId", "alias"}
keyword_type = {"EntityType", "ConceptType", "EventType"}
semantic_rel = {
"SYNANT": ["synonym", "antonym"],
"CAU": ["leadTo"],
"SEQ": ["happenedBefore"],
"IND": ["belongTo"],
"INC": ["isPartOf"],
}
parsing_register = {
RegisterUnit.Type: None,
RegisterUnit.Property: None,
RegisterUnit.Relation: None,
RegisterUnit.SubProperty: None,
}
indent_level_pos = [None, None, None, None, None, None]
rule_quote_predicate = None
rule_quote_open = False
current_parsing_level = 0
last_indent_level = 0
namespace = None
types = {}
def __init__(self, filename):
self.schema_file = filename
self.current_line_num = 0
schema = Schema()
thing = schema.query_spg_type("Thing")
for prop in thing.properties:
self.entity_internal_property.add(prop)
self.event_internal_property.add(prop)
self.concept_internal_property.add(prop)
session = schema.create_session()
for type_name in session._spg_types:
spg_type = session.get(type_name)
if session.get(type_name).spg_type_enum in [
SpgTypeEnum.Basic,
SpgTypeEnum.Standard,
]:
self.internal_type.add(spg_type.name)
self.load_script()
def save_register(self, element: RegisterUnit, value):
"""
maintain the session for parsing
"""
self.parsing_register[element] = value
if element == RegisterUnit.Type:
self.parsing_register[RegisterUnit.Property] = None
self.parsing_register[RegisterUnit.Relation] = None
self.parsing_register[RegisterUnit.SubProperty] = None
elif element == RegisterUnit.Property:
self.parsing_register[RegisterUnit.Relation] = None
self.parsing_register[RegisterUnit.SubProperty] = None
elif element == RegisterUnit.Relation:
self.parsing_register[RegisterUnit.Property] = None
self.parsing_register[RegisterUnit.SubProperty] = None
def adjust_parsing_level(self, step):
"""
mark the indent level and clear related session
"""
if step == 0:
self.current_parsing_level = IndentLevel.Type.value
# finish type parsing, clear the field in session
self.save_register(RegisterUnit.Type, None)
return
if step < 0:
self.current_parsing_level = max(0, self.current_parsing_level + step)
if self.current_parsing_level == IndentLevel.PredicateMeta.value:
# finish sub property parsing, clear the field in session
self.save_register(RegisterUnit.SubProperty, None)
elif self.current_parsing_level == IndentLevel.Predicate.value:
# finish predicate parsing, clear the fields in session
if self.parsing_register[RegisterUnit.Property] is not None:
self.save_register(
RegisterUnit.Property,
Property(name="_", object_type_name="Thing"),
)
elif self.parsing_register[RegisterUnit.Relation] is not None:
self.save_register(
RegisterUnit.Relation,
Relation(name="_", object_type_name="Thing"),
)
elif step == 1:
assert self.current_parsing_level + 1 < len(IndentLevel), self.error_msg(
"Invalid indentation (too many levels?)"
)
self.current_parsing_level += 1
def error_msg(self, msg):
return f"Line# {self.current_line_num}: {msg}"
def parse_type(self, expression):
"""
parse the SPG type definition
"""
namespace_match = re.match(r"^namespace\s+([a-zA-Z0-9]+)$", expression)
if namespace_match:
assert self.namespace is None, self.error_msg(
"Duplicated namespace define, please ensure define it only once"
)
self.namespace = namespace_match.group(1)
return
type_match = re.match(
r"^([a-zA-Z0-9\.]+)\((\w+)\):\s*?([a-zA-Z0-9,]+)$", expression
)
if type_match:
assert self.namespace is not None, self.error_msg(
"Missing namespace, please define namespace at the first"
)
type_name = type_match.group(1)
type_name_zh = type_match.group(2).strip()
type_class = type_match.group(3).strip()
assert type_class in self.keyword_type, self.error_msg(
f"{type_class} is illegal, please define it before current line"
)
spg_type = None
if type_class == "EntityType":
spg_type = EntityType(
name=f"{self.namespace}.{type_name}", name_zh=type_name_zh
)
elif type_class == "ConceptType":
spg_type = ConceptType(
name=f"{self.namespace}.{type_name}",
name_zh=type_name_zh,
hypernym_predicate=HypernymPredicateEnum.IsA,
)
elif type_class == "EventType":
spg_type = EventType(
name=f"{self.namespace}.{type_name}", name_zh=type_name_zh
)
elif type_class == "StandardType":
spg_type = StandardType(name=f"STD.{type_name}", name_zh=type_name_zh)
ns_type_name = f"{self.namespace}.{type_name}"
assert ns_type_name not in self.types, self.error_msg(
f'Type "{type_name}" is duplicated in the schema'
)
self.types[ns_type_name] = spg_type
self.save_register(RegisterUnit.Type, spg_type)
return
sub_type_match = re.match(
r"^([a-zA-Z0-9]+)\((\w+)\)\s*?->\s*?([a-zA-Z0-9\.]+):$", expression
)
if sub_type_match:
assert self.namespace is not None, self.error_msg(
"Missing namespace, please define namespace at the first"
)
type_name = sub_type_match.group(1)
type_name_zh = sub_type_match.group(2).strip()
type_class = sub_type_match.group(3).strip()
if "." not in type_class:
ns_type_class = f"{self.namespace}.{type_class}"
else:
ns_type_class = type_class
assert (
type_class not in self.keyword_type
and type_class not in self.internal_type
), self.error_msg(f"{type_class} is not a valid inheritable type")
assert ns_type_class in self.types, self.error_msg(
f"{type_class} not found, please define it first"
)
parent_spg_type = self.types[ns_type_class]
assert parent_spg_type.spg_type_enum in [
SpgTypeEnum.Entity,
SpgTypeEnum.Event,
], self.error_msg(
f'"{type_class}" cannot be inherited, only entity/event type can be inherited.'
)
spg_type = EntityType(
name=f"{self.namespace}.{type_name}", name_zh=type_name_zh
)
if type_class == "EventType":
spg_type = EventType(
name=f"{self.namespace}.{type_name}", name_zh=type_name_zh
)
spg_type.name = type_name
spg_type.name_zh = type_name_zh
spg_type.parent_type_name = ns_type_class
ns_type_name = f"{self.namespace}.{type_name}"
self.types[ns_type_name] = spg_type
self.save_register(RegisterUnit.Type, spg_type)
return
raise Exception(
self.error_msg(
"unrecognized expression, expect namespace A or A(B):C or A(B)->C"
)
)
def parse_type_meta(self, expression):
"""
parse the meta definition of SPG type
"""
match = re.match(
r"^(desc|properties|relations|hypernymPredicate):\s*?(.*)$", expression
)
assert match, self.error_msg(
"Unrecognized expression, expect desc:|properties:|relations:"
)
type_meta = match.group(1)
meta_value = match.group(2).strip()
if type_meta == "desc" and len(meta_value) > 0:
self.parsing_register[RegisterUnit.Type].desc = meta_value
elif type_meta == "properties":
self.save_register(
RegisterUnit.Property, Property(name="_", object_type_name="Thing")
)
elif type_meta == "relations":
self.save_register(
RegisterUnit.Relation, Relation(name="_", object_type_name="Thing")
)
elif type_meta == "hypernymPredicate":
assert meta_value in ["isA", "locateAt"], self.error_msg(
"Invalid hypernym predicate, expect isA or locateAt"
)
assert (
self.parsing_register[RegisterUnit.Type].spg_type_enum
== SpgTypeEnum.Concept
), self.error_msg("Hypernym predicate is available for concept type only")
if meta_value == "isA":
self.parsing_register[
RegisterUnit.Type
].hypernym_predicate = HypernymPredicateEnum.IsA
else:
self.parsing_register[
RegisterUnit.Type
].hypernym_predicate = HypernymPredicateEnum.LocateAt
return
def check_semantic_relation(self, predicate_name, predicate_class):
"""
Check if the definition of semantic relations is correct
"""
name_arr = predicate_name.split("#")
short_name = name_arr[0]
pred_name = name_arr[1]
assert short_name in self.semantic_rel, self.error_msg(
f"{short_name} is incorrect, expect SYNANT/CAU/SEQ/IND/INC"
)
assert pred_name in self.semantic_rel[short_name], self.error_msg(
f'{pred_name} is incorrect, expect {" / ".join(self.semantic_rel[short_name])}'
)
subject_type = self.parsing_register[RegisterUnit.Type]
predicate_class_ns = predicate_class
if "." not in predicate_class:
predicate_class_ns = f"{self.namespace}.{predicate_class}"
assert predicate_class_ns in self.types, self.error_msg(
f"{predicate_class} is illegal"
)
object_type = self.types[predicate_class_ns]
if short_name == "SYNANT":
assert subject_type.spg_type_enum == SpgTypeEnum.Concept, self.error_msg(
"Only concept types could define synonym/antonym relation"
)
assert subject_type.name == predicate_class_ns, self.error_msg(
"Synonymy/antonym relation should be self-referential"
)
elif short_name == "CAU":
assert subject_type.spg_type_enum in [
SpgTypeEnum.Concept,
SpgTypeEnum.Event,
], self.error_msg("Only concept types could define causal relation")
assert object_type.spg_type_enum in [
SpgTypeEnum.Concept,
SpgTypeEnum.Event,
], self.error_msg(
f'"{predicate_class}" must be a concept type to conform to the definition of causal relation'
)
elif short_name == "SEQ":
assert subject_type.spg_type_enum in [
SpgTypeEnum.Event,
SpgTypeEnum.Concept,
], self.error_msg(
"Only concept/event types could define sequential relation"
)
assert (
subject_type.spg_type_enum == object_type.spg_type_enum
), self.error_msg(
f'"{predicate_class}" should keep the same type with "{subject_type.name.split(".")[1]}"'
)
elif short_name == "IND":
assert subject_type.spg_type_enum in [
SpgTypeEnum.Entity,
SpgTypeEnum.Event,
], self.error_msg("Only entity/event types could define inductive relation")
assert object_type.spg_type_enum == SpgTypeEnum.Concept, self.error_msg(
f'"{predicate_class}" must be a concept type to conform to the definition of inductive relation'
)
elif short_name == "INC":
assert subject_type.spg_type_enum == SpgTypeEnum.Concept, self.error_msg(
"Only concept types could define inclusive relation"
)
assert subject_type.name == predicate_class_ns, self.error_msg(
"Inclusive relation should be self-referential"
)
def parse_predicate(self, expression):
"""
parse the property/relation definition of SPG type
"""
match = re.match(
r"^([a-zA-Z0-9#]+)\(([\w\.]+)\):\s*?([a-zA-Z0-9,\.]+)$", expression
)
assert match, self.error_msg(
"Unrecognized expression, expect pattern like english(Chinese):Type"
)
predicate_name = match.group(1)
predicate_name_zh = match.group(2).strip()
predicate_class = match.group(3).strip()
type_name = self.parsing_register[RegisterUnit.Type].name
if "#" in predicate_name:
self.check_semantic_relation(predicate_name, predicate_class)
predicate_name = predicate_name.split("#")[1]
else:
for semantic_short in self.semantic_rel.values():
assert predicate_name not in semantic_short, self.error_msg(
f"{predicate_name} is a semantic predicate, please add the semantic prefix"
)
assert (
f"{self.namespace}.{predicate_class}" in self.types
or predicate_class in self.internal_type
), self.error_msg(f"{predicate_class} is illegal")
assert predicate_name not in self.entity_internal_property, self.error_msg(
f"property {predicate_name} is the default property of type"
)
if self.parsing_register[RegisterUnit.Relation] is not None:
assert (
predicate_name
not in self.parsing_register[RegisterUnit.Relation].sub_properties
), self.error_msg(
f'Property "{predicate_name}" is duplicated under the relation '
f"{self.parsing_register[RegisterUnit.Relation].name}"
)
else:
assert (
predicate_name
not in self.parsing_register[RegisterUnit.Type].properties
), self.error_msg(
f'Property "{predicate_name}" is duplicated under the type {type_name[type_name.index(".") + 1:]}'
)
if predicate_class == "ConceptType":
assert not self.is_internal_property(
predicate_name, SpgTypeEnum.Concept
), self.error_msg(
f"property {predicate_name} is the default property of ConceptType"
)
if predicate_class == "EventType":
assert not self.is_internal_property(
predicate_name, SpgTypeEnum.Event
), self.error_msg(
f"property {predicate_name} is the default property of EventType"
)
if (
"." not in predicate_class
and predicate_class not in BasicTypeEnum.__members__
):
predicate_class = f"{self.namespace}.{predicate_class}"
if self.parsing_register[RegisterUnit.SubProperty]:
# predicate is sub property
predicate = Property(name=predicate_name, object_type_name=predicate_class)
if self.parsing_register[RegisterUnit.Property] is not None:
self.parsing_register[RegisterUnit.Property].add_sub_property(predicate)
elif self.parsing_register[RegisterUnit.Relation] is not None:
self.parsing_register[RegisterUnit.Relation].add_sub_property(predicate)
self.save_register(RegisterUnit.SubProperty, predicate)
elif self.parsing_register[RegisterUnit.Property]:
# predicate is property
predicate = Property(name=predicate_name, object_type_name=predicate_class)
if (
self.parsing_register[RegisterUnit.Type].spg_type_enum
== SpgTypeEnum.Event
and predicate_name == "subject"
):
assert predicate_class not in self.internal_type, self.error_msg(
f"The subject of event type only allows entity/concept type"
)
predicate.property_group = PropertyGroupEnum.Subject
if "," in predicate_class:
# multi-types for subject
predicate.object_type_name = "Text"
subject_types = predicate_class.split(",")
for subject_type in subject_types:
subject_type = subject_type.strip()
assert (
subject_type not in BasicTypeEnum.__members__
), self.error_msg(
f"{predicate_class} is illegal for subject in event type"
)
if "." not in subject_type:
subject_type = f"{self.namespace}.{predicate_class}"
assert subject_type in self.types, self.error_msg(
f"{predicate_class} is illegal"
)
subject_predicate = Property(
name=f"subject{subject_type}", object_type_name=subject_type
)
subject_predicate.property_group = PropertyGroupEnum.Subject
self.parsing_register[RegisterUnit.Type].add_property(
subject_predicate
)
self.parsing_register[RegisterUnit.Type].add_property(predicate)
self.save_register(RegisterUnit.Property, predicate)
else:
# predicate is relation
assert predicate_class in self.types, self.error_msg(
f"{predicate_class} is illegal"
)
assert (
f"{predicate_name}_{predicate_class}"
not in self.parsing_register[RegisterUnit.Type].relations
), self.error_msg(
f'Relation "{match.group()}" is duplicated under the type {type_name[type_name.index(".") + 1:]}'
)
predicate = Relation(name=predicate_name, object_type_name=predicate_class)
self.parsing_register[RegisterUnit.Type].add_relation(predicate)
self.save_register(RegisterUnit.Relation, predicate)
predicate.name_zh = predicate_name_zh
def parse_property_meta(self, expression):
"""
parse the property meta definition of SPG type
"""
match = re.match(r"^(desc|properties|constraint|rule):\s*?(.*)$", expression)
assert match, self.error_msg(
"Unrecognized expression, expect desc:|properties:|constraint:|rule:"
)
property_meta = match.group(1)
meta_value = match.group(2)
if property_meta == "desc" and len(meta_value) > 0:
if self.parsing_register[RegisterUnit.SubProperty] is not None:
self.parsing_register[RegisterUnit.SubProperty].desc = meta_value
elif self.parsing_register[RegisterUnit.Property] is not None:
self.parsing_register[RegisterUnit.Property].desc = meta_value
elif property_meta == "constraint":
if self.parsing_register[RegisterUnit.SubProperty] is not None:
self.parse_constraint_for_property(
meta_value, self.parsing_register[RegisterUnit.SubProperty]
)
elif self.parsing_register[RegisterUnit.Property] is not None:
self.parse_constraint_for_property(
meta_value, self.parsing_register[RegisterUnit.Property]
)
elif property_meta == "properties":
self.save_register(
RegisterUnit.SubProperty, Property(name="_", object_type_name="Thing")
)
elif property_meta == "rule":
self.parse_predicate_rule(meta_value.lstrip(), RegisterUnit.Property)
def parse_relation_meta(self, expression):
"""
parse the relation meta definition of SPG type
"""
match = re.match(r"^(desc|properties|rule):\s*?(.*)$", expression)
assert match, self.error_msg(
"Unrecognized expression, expect desc:|properties:|rule:"
)
property_meta = match.group(1)
meta_value = match.group(2)
if property_meta == "desc" and len(meta_value) > 0:
self.parsing_register[RegisterUnit.Relation].desc = meta_value
elif property_meta == "properties":
self.save_register(
RegisterUnit.SubProperty, Property(name="_", object_type_name="Thing")
)
elif property_meta == "rule":
self.parse_predicate_rule(meta_value.lstrip(), RegisterUnit.Relation)
def parsing_dispatch(self, expression, parsing_level):
if parsing_level == IndentLevel.Type.value:
self.parse_type(expression)
elif parsing_level == IndentLevel.TypeMeta.value:
self.parse_type_meta(expression)
elif parsing_level == IndentLevel.Predicate.value:
self.parse_predicate(expression)
elif parsing_level == IndentLevel.PredicateMeta.value:
if self.parsing_register[RegisterUnit.Property] is not None:
self.parse_property_meta(expression)
else:
self.parse_relation_meta(expression)
elif parsing_level == IndentLevel.SubProperty.value:
self.parse_predicate(expression)
elif parsing_level == IndentLevel.SubPropertyMeta.value:
self.parse_property_meta(expression)
def parse_predicate_rule(self, rule, key):
"""
parse the logic rule for property/relation
"""
strip_rule = rule
if strip_rule.startswith("[["):
self.rule_quote_predicate = self.parsing_register[key]
self.rule_quote_open = True
if len(strip_rule) > 2:
self.rule_quote_predicate.logical_rule = strip_rule[2].lstrip()
else:
self.rule_quote_predicate.logical_rule = ""
else:
self.parsing_register[key].logical_rule = rule
def parse_constraint_for_property(self, expression, prop):
"""
parse the constraint definition of property
"""
if len(expression) == 0:
return
pattern = re.compile(r"(Enum|Regular)\s*?=\s*?\"([^\"]+)\"", re.IGNORECASE)
matches = re.findall(pattern, expression)
if matches:
for group in matches:
if group[0].lower() == "enum":
enum_values = group[1].split(",")
strip_enum_values = list()
for ev in enum_values:
strip_enum_values.append(ev.strip())
prop.add_constraint(ConstraintTypeEnum.Enum, strip_enum_values)
elif group[0].lower() == "regular":
prop.add_constraint(ConstraintTypeEnum.Regular, group[1])
expression = re.sub(r"(Enum|Regular)\s*?=\s*?\"([^\"]+)\"", "", expression)
array = expression.split(",")
for cons in array:
cons = cons.strip()
if cons.lower() == "multivalue":
prop.add_constraint(ConstraintTypeEnum.MultiValue)
elif cons.lower() == "notnull":
prop.add_constraint(ConstraintTypeEnum.NotNull)
def complete_rule(self, rule):
"""
Auto generate define statement and append namespace to the entity name
"""
pattern = re.compile(r"Define\s*\(", re.IGNORECASE)
match = pattern.match(rule.strip())
if not match:
subject_name = self.parsing_register[RegisterUnit.Type].name
predicate = None
if self.parsing_register[RegisterUnit.Property] is not None:
predicate = self.parsing_register[RegisterUnit.Property]
elif self.parsing_register[RegisterUnit.Relation] is not None:
predicate = self.parsing_register[RegisterUnit.Relation]
head = (
f"Define (s:{subject_name})-[p:{predicate.name}]->(o:{predicate.object_type_name})"
+ " {\n"
)
rule = head + rule
rule += "\n}"
pattern = re.compile(r"\(([\w\s]*?:)([\w\s\.]+)\)", re.IGNORECASE)
matches = re.findall(pattern, rule)
replace_list = []
if matches:
for group in matches:
if "." in group[1] or group[1].lower() in ["integer", "text", "float"]:
continue
replace_list.append(
(
f"({group[0]}{group[1]})",
f"({group[0]}{self.namespace}.{group[1].strip()})",
)
)
if len(replace_list) > 0:
for t in replace_list:
rule = rule.replace(t[0], t[1])
return rule.strip()
def load_script(self):
"""
Load and then parse the script file
"""
file = open(self.schema_file, "r", encoding="utf-8")
lines = file.read().splitlines()
for line in lines:
self.current_line_num += 1
strip_line = line.strip()
# replace tabs with two spaces
line = line.replace("\t", " ")
if strip_line == "" or strip_line.startswith("#"):
# skip empty or comments line
continue
if self.rule_quote_open:
# process the multi-line assignment [[ .... ]]
right_strip_line = line.rstrip()
if strip_line.endswith("]]"):
self.rule_quote_open = False
if len(right_strip_line) > 2:
self.rule_quote_predicate.logical_rule += right_strip_line[
: len(right_strip_line) - 2
]
self.rule_quote_predicate.logical_rule = self.complete_rule(
self.rule_quote_predicate.logical_rule
)
else:
self.rule_quote_predicate.logical_rule += line + "\n"
continue
indent_count = len(line) - len(line.lstrip())
if indent_count == 0:
# the line without indent is namespace definition or a type definition
self.adjust_parsing_level(0)
elif indent_count > self.last_indent_level:
# the line is the sub definition of the previous line
self.adjust_parsing_level(1)
elif indent_count < self.last_indent_level:
# finish current indent parsing
backward_step = None
for i in range(0, len(self.indent_level_pos)):
if indent_count == self.indent_level_pos[i]:
backward_step = i - self.current_parsing_level
break
assert backward_step, self.error_msg(
f"Invalid indentation, please align with the previous definition"
)
if backward_step != 0:
self.adjust_parsing_level(backward_step)
self.parsing_dispatch(strip_line, self.current_parsing_level)
self.last_indent_level = indent_count
self.indent_level_pos[self.current_parsing_level] = indent_count
def is_internal_property(self, prop: Property, spg_type: SpgTypeEnum):
if spg_type == SpgTypeEnum.Entity or spg_type == SpgTypeEnum.Standard:
return prop in self.entity_internal_property
elif spg_type == SpgTypeEnum.Concept:
return prop in self.concept_internal_property
elif spg_type == SpgTypeEnum.Event:
return prop in self.event_internal_property
def sync_schema(self):
return self.diff_and_sync(False)
def print_diff(self):
self.diff_and_sync(True)
def diff_sub_property(self, new, old, old_type_name, old_property, new_property):
need_update = False
inherited_type = self.get_inherited_type(old_type_name)
for prop in old:
if not old_property.inherited and prop not in new:
assert inherited_type is None, self.error_msg(
f'"{old_type_name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
old[prop].alter_operation = AlterOperationEnum.Delete
need_update = True
print(
f"Delete sub property: [{old_type_name}] {old_property.name}.{prop}"
)
for prop, o in new.items():
if prop not in old and not new_property.inherited:
assert inherited_type is None, self.error_msg(
f'"{old_type_name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
old_property.add_sub_property(new[prop])
need_update = True
print(
f"Create sub property: [{old_type_name}] {old_property.name}.{prop}"
)
elif old[prop].object_type_name != new[prop].object_type_name:
assert inherited_type is None, self.error_msg(
f'"{old_type_name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
assert not old_property.inherited, self.error_msg(
f"{old_type_name}] {old_property.name}.{prop} is inherited sub property, deny modify"
)
old[prop].alter_operation = AlterOperationEnum.Delete
old_property.add_sub_property(new[prop])
need_update = True
print(
f"Recreate sub property: [{old_type_name}] {old_property.name}.{prop}"
)
elif old[prop] != new[prop]:
assert inherited_type is None, self.error_msg(
f'"{old_type_name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
assert not old_property.inherited, self.error_msg(
f"{old_type_name}] {old_property.name}.{prop} is inherited property, deny modify"
)
old[prop].overwritten_by(o)
old[prop].alter_operation = AlterOperationEnum.Update
need_update = True
print(f"Update property: [{old_type_name}] {old_property.name}.{prop}")
return need_update
def get_inherited_type(self, type_name):
for spg_type in self.types:
if self.types[spg_type].parent_type_name == type_name:
return spg_type
return None
def diff_and_sync(self, print_only):
"""
Get the schema diff and then sync to graph storage
"""
schema = Schema()
session = schema.create_session()
# generate the delete list of spg type
for spg_type in session._spg_types:
if spg_type in self.internal_type:
continue
if spg_type not in self.types:
session.delete_type(session.get(spg_type))
print(f"Delete type: {spg_type}")
for spg_type in self.types:
# generate the creation list of spg type
if spg_type not in session._spg_types:
session.create_type(self.types[spg_type])
print(f"Create type: {spg_type}")
relations = self.types[spg_type].relations
if len(relations) > 0:
for rel in relations:
print(f'Create relation: [{spg_type}] {rel.split("_")[0]}')
else:
# generate the update list
new_type = self.types[spg_type]
old_type = session.get(spg_type)
# if class of type changed then recreate the type
if (
new_type.spg_type_enum != old_type.spg_type_enum
or new_type.parent_type_name != old_type.parent_type_name
):
inherited_type = self.get_inherited_type(new_type.name)
assert not inherited_type, self.error_msg(
f'"{inherited_type}" inherited {new_type.name}, prohibit type alteration. '
f'If you still want to make change, please delete "{inherited_type}" first.'
)
session.delete_type(old_type)
session.create_type(new_type)
print(f"Recreate type: {spg_type}")
continue
need_update = False
if new_type.desc != old_type.desc:
old_type.desc = new_type.desc
need_update = True
if new_type.name_zh != old_type.name_zh:
old_type.name_zh = new_type.name_zh
need_update = True
if (
new_type.spg_type_enum == SpgTypeEnum.Concept
and new_type.hypernym_predicate != old_type.hypernym_predicate
):
old_type.hypernym_predicate = new_type.hypernym_predicate
old_type.relations[new_type.hypernym_predicate] = copy.deepcopy(
old_type.relations[old_type.hypernym_predicate]
)
old_type.relations[
new_type.hypernym_predicate
].alter_operation = AlterOperationEnum.Create
old_type.relations[
old_type.hypernym_predicate
].alter_operation = AlterOperationEnum.Delete
need_update = True
print(
f"Recreate hypernym predicate: [{new_type.name}] {new_type.hypernym_predicate}"
)
for prop in old_type.properties:
if (
not old_type.properties[prop].inherited
and prop not in new_type.properties
and not self.is_internal_property(prop, new_type.spg_type_enum)
):
assert (
prop != "subject"
and old_type.properties[prop].property_group
!= PropertyGroupEnum.Subject
), self.error_msg(
"The subject property of event type cannot be deleted"
)
old_type.properties[
prop
].alter_operation = AlterOperationEnum.Delete
need_update = True
print(f"Delete property: [{new_type.name}] {prop}")
for prop, o in new_type.properties.items():
inherited_type = self.get_inherited_type(new_type.name)
if (
prop not in old_type.properties
and not self.is_internal_property(prop, new_type.spg_type_enum)
and not o.inherited
):
assert inherited_type is None, self.error_msg(
f'"{new_type.name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
old_type.add_property(new_type.properties[prop])
need_update = True
print(f"Create property: [{new_type.name}] {prop}")
elif (
old_type.properties[prop].object_type_name
!= new_type.properties[prop].object_type_name
):
assert inherited_type is None, self.error_msg(
f'"{new_type.name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
assert not old_type.properties[prop].inherited, self.error_msg(
f"{new_type.name}] {prop} is inherited property, deny modify"
)
old_type.properties[
prop
].alter_operation = AlterOperationEnum.Delete
old_type.add_property(new_type.properties[prop])
need_update = True
print(f"Recreate property: [{new_type.name}] {prop}")
elif (
old_type.properties[prop].sub_properties
!= new_type.properties[prop].sub_properties
):
need_update = self.diff_sub_property(
new_type.properties[prop].sub_properties,
old_type.properties[prop].sub_properties,
old_type.name,
old_type.properties[prop],
new_type.properties[prop],
)
if need_update:
old_type.properties[
prop
].alter_operation = AlterOperationEnum.Update
elif old_type.properties[prop] != new_type.properties[prop]:
assert inherited_type is None, self.error_msg(
f'"{new_type.name} was inherited by other type, such as "{inherited_type}". Prohibit property alteration!'
)
assert not old_type.properties[prop].inherited, self.error_msg(
f"{new_type.name}] {prop} is inherited property, deny modify"
)
old_type.properties[prop].overwritten_by(o)
old_type.properties[
prop
].alter_operation = AlterOperationEnum.Update
need_update = True
print(f"Update property: [{new_type.name}] {prop}")
for relation in new_type.relations:
p_name = relation.split("_")[0]
if (
relation not in old_type.relations
or old_type.relations[relation].object_type_name
!= new_type.relations[relation].object_type_name
):
old_type.add_relation(new_type.relations[relation])
need_update = True
print(f"Create relation: [{new_type.name}] {p_name}")
elif (
old_type.relations[relation].sub_properties
!= new_type.relations[relation].sub_properties
):
need_update = self.diff_sub_property(
new_type.relations[relation].sub_properties,
old_type.relations[relation].sub_properties,
old_type.name,
old_type.relations[relation],
new_type.relations[relation],
)
if need_update:
old_type.relations[
relation
].alter_operation = AlterOperationEnum.Update
elif old_type.relations[relation] != new_type.relations[relation]:
assert not old_type.relations[
relation
].inherited, self.error_msg(
f"{new_type.name}] {p_name} is inherited relation, deny modify"
)
old_type.relations[relation].overwritten_by(
new_type.relations[relation]
)
old_type.relations[
relation
].alter_operation = AlterOperationEnum.Update
need_update = True
print(f"Update relation: [{new_type.name}] {relation}")
for relation, o in old_type.relations.items():
p_name = relation.split("_")[0]
if o.inherited or p_name in new_type.properties or o.is_dynamic:
# skip the inherited and semantic relation
continue
if (
relation not in new_type.relations
and not o.inherited
and not o.is_dynamic
and not (
new_type.spg_type_enum == SpgTypeEnum.Concept
and p_name in ["isA", "locateAt"]
)
):
old_type.relations[
relation
].alter_operation = AlterOperationEnum.Delete
need_update = True
print(f"Delete relation: [{new_type.name}] {p_name}")
if need_update:
session.update_type(old_type)
if not print_only:
session.commit()
if session._alter_spg_types:
return True
return False
def export_schema_python(self, filename):
"""
Export the schema helper class in python
You can import the exported class in your code to obtain the code prompt in IDE
"""
schema = Schema()
session = schema.create_session()
assert len(self.namespace) > 0, "Schema is invalid"
inner = ""
content = "# ATTENTION!\n"
content += "# This file is generated by Schema automatically, "
content += "it will be refreshed after schema has been committed\n"
content += "# PLEASE DO NOT MODIFY THIS FILE!!!\n#\n\n"
content += f"class {self.namespace}:\n"
content += "\tdef __init__(self):\n"
for spg_type in sorted(session._spg_types):
if spg_type.startswith("STD.") or spg_type in self.internal_type:
continue
type_name = spg_type.split(".")[1]
content += f"\t\tself.{type_name} = self.{type_name}()\n"
inner += f"\n\tclass {type_name}:\n"
inner += f'\t\t__typename__ = "{spg_type}"\n'
for prop in session.get(spg_type).properties:
inner += f'\t\t{prop} = "{prop}"\n'
inner += "\n\t\tdef __init__(self):\n\t\t\tpass\n"
content += inner + "\n"
with open(filename, "w") as f:
f.write(content)