Merge remote-tracking branch 'upstream/branch0101' into branch0101

This commit is contained in:
matthewhyx 2023-12-28 00:18:32 +08:00
commit 851a575514
9 changed files with 59 additions and 99 deletions

View File

@ -14,24 +14,18 @@ class IndicatorFuse(FuseOp):
self.search_client = SearchClient("Financial.Indicator")
def invoke(self, subject_records: List[SPGRecord]) -> List[SPGRecord]:
print("##########IndicatorFuse###########")
print("####################IndicatorFuse#####################")
print("IndicatorFuse(Input): ")
print(subject_records)
print("----------------------")
[print(r) for r in subject_records]
fused_records = []
for record in subject_records:
query = {"match": {"name": record.get_property("name", "")}}
recall_records = self.search_client.search(query, start=0, size=10)
if recall_records is not None and len(recall_records) > 0:
rerank_record = SPGRecord(
"Financial.Indicator",
{
"id": recall_records[0].doc_id,
"name": recall_records[0].properties.get("name", ""),
},
)
rerank_record.update_property("name", record.get_property("name"))
fused_records.append(rerank_record)
continue
fused_records.append(record)
print("IndicatorFuse(Output): ")
print(fused_records)
print("##########IndicatorFuse###########")
print("----------------------")
[print(r) for r in fused_records]
return fused_records

View File

@ -20,35 +20,18 @@ ${rel}
"""
def build_prompt(self, variables: Dict[str, str]):
"""
record: {
"input": "济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合
财力明显下滑2022年济南市一般公共预算收入1,000.21亿元扣除留 抵退税因素后同比增长8%规模在山东省下辖地市中排名第2位其中税收收入690.31亿元税收占比69.02%一般公共 预算支出1,260.23亿元财政自给率79.37%
府性基金收入547.29亿元同比大幅下降48.38%主要系土地出让收入 同比由966.74亿元降至453.74亿元转移性收入285.78亿元上年同期为233.11亿元综合财力约1,833.28亿元上年 同期为2,301.02亿元"
"ner": "[{'财政': ['财政收入质量', '财政自给能力', '土地出让收入', '一般公共预算收入', '留抵退税', '税收收入', '税收收入/一般公共预算收入', '一般公共预算支出', '财政自给率', '政府性基金收入', '转移性收入', '综合财力']}]",
"rel": "[{'subject': '一般公共预算收入', 'predicate': '包含', 'object': ['税收收入']}, {'subject': '税收收入', 'predicate': '包含', 'object': ['留抵退税']}, {'subject': '政府性基金收入', 'predicate': '包含', 'object': ['土地出让收入', '转移性收入']}, {'subject': '综合财力', 'predicate': '包含', 'object': ['一般公共预算收入', '政府性基金收入']}]",
"id": "财政",
"name": "财政",
"hasA": "财政收入质量,财政自给能力,土地出让收入....."
}
"""
return (
template = (
self.template.replace("${input}", variables.get("input"))
.replace("${ner}", variables.get("ner"))
.replace("${rel}", variables.get("rel"))
.replace("${ner}", variables.get("IndicatorNER"))
.replace("${rel}", variables.get("IndicatorREL"))
)
print("####################IndicatorLOGIC(状态逻辑抽取)#####################")
print("LLM(Input): ")
print("----------------------")
print(template)
return template
def parse_response(self, response: str) -> List[SPGRecord]:
"""
response: "[{\"subject\": \"土地出让收入大幅下降\", \"predicate\": \"顺承\", \"object\": [\"综合财力明显下滑\"]}]"
"""
response = (
'[{"subject": "土地出让收入大幅下降", "predicate": "顺承", "object": ["综合财力明显下滑"]}]'
)
print("##########IndicatorLOGIC###########")
print("IndicatorLOGIC(Input): ")
print(response)
output_list = json.loads(response)
logic_result = []
@ -61,7 +44,4 @@ ${rel}
elif k == "object":
properties["causeOf"] = ",".join(v)
logic_result.append(SPGRecord("Financial.State", properties=properties))
print("IndicatorLOGIC(Output): ")
print(logic_result)
print("##########IndicatorLOGIC###########")
return logic_result

View File

@ -17,22 +17,18 @@ ${input}
"""
def build_prompt(self, variables: Dict[str, str]):
return self.template.replace("${input}", variables.get("input"))
template = self.template.replace("${input}", variables.get("input"))
print("####################IndicatorNER(指标抽取)#####################")
print("LLM(Input): ")
print("----------------------")
print(template)
return template
def parse_response(self, response: str) -> List[SPGRecord]:
response = "[{'财政': ['财政收入质量', '财政自给能力', '土地出让收入', '一般公共预算收入', '留抵退税', '税收收入', '税收收入/一般公共预算收入', '一般公共预算支出', '财政自给率', '政府性基金收入', '转移性收入', '综合财力']}]"
print("##########IndicatorNER###########")
print("IndicatorNER(Input): ")
print(response)
output_list = json.loads(response.replace("'", '"'))
ner_result = []
# IF hasA
for output in output_list:
# {'财政': ['财政收入....}
for category, indicator_list in output.items():
# '财政', ['财政收入....]
for indicator in indicator_list:
ner_result.append(
SPGRecord(
@ -40,17 +36,4 @@ ${input}
properties={"id": indicator, "name": indicator},
)
)
print("IndicatorNER(Output): ")
print(ner_result)
print("##########IndicatorNER###########")
return ner_result
def build_next_variables(
self, variables: Dict[str, str], response: str
) -> List[Dict[str, str]]:
"""
response: "[{'subject': '一般公共预算收入', 'predicate': '包含', 'object': ['税收收入']}, {'subject': '税收收入', 'predicate': '包含', 'object': ['留抵退税']}, {'subject': '政府性基金收入', 'predicate': '包含', 'object': ['土地出让收入', '转移性收入']}, {'subject': '综合财力', 'predicate': '包含', 'object': ['一般公共预算收入', '政府性基金收入']}]"
"""
response = "[{'财政': ['财政收入质量', '财政自给能力', '土地出让收入', '一般公共预算收入', '留抵退税', '税收收入', '税收收入/一般公共预算收入', '一般公共预算支出', '财政自给率', '政府性基金收入', '转移性收入', '综合财力']}]"
return [{"input": variables["input"], "ner": response}]

View File

@ -14,8 +14,9 @@ class IndicatorPredict(PredictOp):
self.search_client = SearchClient("Financial.Indicator")
def invoke(self, subject_record: SPGRecord) -> List[SPGRecord]:
print("##########IndicatorPredict###########")
print("####################IndicatorPredict(状态关联指标预测)#####################")
print("IndicatorPredict(Input): ")
print("----------------------")
print(subject_record)
predicted_records = []
query = {"match": {"name": subject_record.get_property("name", "")}}
@ -30,6 +31,6 @@ class IndicatorPredict(PredictOp):
)
predicted_records.append(rerank_record)
print("IndicatorPredict(Output): ")
print(predicted_records)
print("##########IndicatorPredict###########")
print("----------------------")
[print(r) for r in predicted_records]
return predicted_records

View File

@ -16,26 +16,11 @@ ${ner}
"""
def build_prompt(self, variables: Dict[str, str]) -> str:
"""
record: {
"input": "济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合财力明显下滑。济南市财政收入质量及自给能力均较好,但土地出让收入大幅下降致综合
财力明显下滑2022年济南市一般公共预算收入1,000.21亿元扣除留 抵退税因素后同比增长8%规模在山东省下辖地市中排名第2位其中税收收入690.31亿元税收占比69.02%一般公共 预算支出1,260.23亿元财政自给率79.37%
府性基金收入547.29亿元同比大幅下降48.38%主要系土地出让收入 同比由966.74亿元降至453.74亿元转移性收入285.78亿元上年同期为233.11亿元综合财力约1,833.28亿元上年 同期为2,301.02亿元"
"ner": "[{'财政': ['财政收入质量', '财政自给能力', '土地出让收入', '一般公共预算收入', '留抵退税', '税收收入', '税收收入/一般公共预算收入', '一般公共预算支出', '财政自给率', '政府性基金收入', '转移性收入', '综合财力']}]",
"id": "财政",
"name": "财政",
"hasA": "财政收入质量,财政自给能力,土地出让收入....."
}
"""
return self.template.replace("${input}", variables.get("input")).replace(
"${ner}", variables.get("ner")
template = self.template.replace("${input}", variables.get("input")).replace(
"${ner}", variables.get("IndicatorNER")
)
def build_next_variables(
self, variables: Dict[str, str], response: str
) -> List[Dict[str, str]]:
"""
response: "[{'subject': '一般公共预算收入', 'predicate': '包含', 'object': ['税收收入']}, {'subject': '税收收入', 'predicate': '包含', 'object': ['留抵退税']}, {'subject': '政府性基金收入', 'predicate': '包含', 'object': ['土地出让收入', '转移性收入']}, {'subject': '综合财力', 'predicate': '包含', 'object': ['一般公共预算收入', '政府性基金收入']}]"
"""
response = "[{'subject': '一般公共预算收入', 'predicate': '包含', 'object': ['税收收入']}, {'subject': '税收收入', 'predicate': '包含', 'object': ['留抵退税']}, {'subject': '政府性基金收入', 'predicate': '包含', 'object': ['土地出让收入', '转移性收入']}, {'subject': '综合财力', 'predicate': '包含', 'object': ['一般公共预算收入', '政府性基金收入']}]"
return [{"input": variables["input"], "ner": variables["ner"], "rel": response}]
print("####################IndicatorREL(指标关系抽取)#####################")
print("LLM(Input): ")
print("----------------------")
print(template)
return template

View File

@ -14,9 +14,10 @@ class StateFuse(FuseOp):
self.search_client = SearchClient("Financial.State")
def invoke(self, subject_records: List[SPGRecord]) -> List[SPGRecord]:
print("##########StateFuse###########")
print("####################StateFuse(状态融合)#####################")
print("StateFuse(Input): ")
print(subject_records)
print("----------------------")
[print(r) for r in subject_records]
fused_records = []
for record in subject_records:
query = {"match": {"name": record.get_property("name", "")}}
@ -32,6 +33,6 @@ class StateFuse(FuseOp):
rerank_record.update_property("name", record.get_property("name"))
fused_records.append(rerank_record)
print("StateFuse(Output): ")
print(fused_records)
print("##########StateFuse###########")
print("----------------------")
[print(r) for r in fused_records]
return fused_records

View File

@ -99,6 +99,7 @@ input:${input}
def _render(self, spg_type: BaseSpgType, property_names: List[str]):
spos = []
repeat_desc = []
for property_name in property_names:
if property_name in ["id", "name", "description"]:
continue
@ -108,11 +109,11 @@ input:${input}
if object_type:
object_desc = object_type.desc
spos.append(
f"{spg_type.name_zh}({spg_type.desc or spg_type.name_zh})"
f"-{prop.name_zh}({prop.desc or prop.name_zh})"
f"{spg_type.name_zh}" + (f"({spg_type.desc or spg_type.name_zh})" if spg_type.name_zh not in repeat_desc else "") +
f"-{prop.name_zh}({prop.desc or prop.name_zh})"
f"-{prop.object_type_name_zh}({object_desc or prop.object_type_name_zh})"
)
schema_text = "[" + ",".join(spos) + "]"
schema_text = "\n[" + ",\n".join(spos) + "]\n"
self.template = self.template.replace("${schema}", schema_text)

View File

@ -52,8 +52,16 @@ class _BuiltInOnlineExtractor(ExtractOp):
while retry_times < self.max_retry_times:
try:
query = op.build_prompt(input_param)
# response = self.model.remote_inference(query)
response = "test"
op_name = op.__class__.__name__
if op_name == "IndicatorNER":
response = "[{'财政': ['财政收入质量', '财政自给能力', '土地出让收入', '一般公共预算收入', '留抵退税', '税收收入', '税收收入/一般公共预算收入', '一般公共预算支出', '财政自给率', '政府性基金收入', '转移性收入', '综合财力']}]"
elif op_name == "IndicatorREL":
response = "[{'subject': '一般公共预算收入', 'predicate': '包含', 'object': ['税收收入']}, {'subject': '税收收入', 'predicate': '包含', 'object': ['留抵退税']}, {'subject': '政府性基金收入', 'predicate': '包含', 'object': ['土地出让收入', '转移性收入']}, {'subject': '综合财力', 'predicate': '包含', 'object': ['一般公共预算收入', '政府性基金收入']}]"
elif op_name == "IndicatorLOGIC":
response = '[{"subject": "土地出让收入大幅下降", "predicate": "顺承", "object": ["综合财力明显下滑"]}]'
else:
print(query)
response = self.model.remote_inference(query)
collector.extend(op.parse_response(response))
next_params.extend(
op.build_next_variables(input_param, response)
@ -61,6 +69,9 @@ class _BuiltInOnlineExtractor(ExtractOp):
break
except Exception as e:
retry_times += 1
print(e)
raise e
input_params = next_params
print("####################抽取结果#####################")
[print(c) for c in collector]
return collector

View File

@ -127,7 +127,11 @@ class PromptOp(BaseOp, ABC):
def build_next_variables(
self, variables: Dict[str, str], response: str
) -> List[Dict[str, str]]:
return []
variables.update({f"{self.__class__.__name__}": response})
print("LLM(Output): ")
print("----------------------")
print([variables])
return [variables]
def invoke(self, *args):
pass