From 676da681d08e2fc97b1e6ff66ef98f57c6f0260f Mon Sep 17 00:00:00 2001 From: Domenico Date: Thu, 9 Nov 2023 12:20:41 +0100 Subject: [PATCH] feat: MetaField Ranker (#6189) * proposal: meta field ranker * Apply suggestions from code review Co-authored-by: ZanSara * update proposal filename * feat: add metafield ranker * fix docstrings * remove proposal file from pr * add release notes * update code according to new Document class * separate loops for each ranking mode in __merge_scores * change error type in init and new tests for linear score warning * docstring upd --------- Co-authored-by: ZanSara Co-authored-by: Daria Fokina --- .../preview/components/rankers/meta_field.py | 181 ++++++++++++++++++ ...etadata-field-ranker-a8afd5bf15f29a0a.yaml | 4 + .../components/rankers/test_metafield.py | 123 ++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 haystack/preview/components/rankers/meta_field.py create mode 100644 releasenotes/notes/add-metadata-field-ranker-a8afd5bf15f29a0a.yaml create mode 100644 test/preview/components/rankers/test_metafield.py diff --git a/haystack/preview/components/rankers/meta_field.py b/haystack/preview/components/rankers/meta_field.py new file mode 100644 index 000000000..f205d136f --- /dev/null +++ b/haystack/preview/components/rankers/meta_field.py @@ -0,0 +1,181 @@ +import logging +import warnings +from collections import defaultdict +from typing import List, Dict, Any, Optional, Literal + +from haystack.preview import ComponentError, Document, component, default_to_dict + +logger = logging.getLogger(__name__) + + +@component +class MetaFieldRanker: + """ + Ranks documents based on the value of a metadata field. + + Usage example: + ``` + from haystack.preview import Document + from haystack.preview.components.rankers import MetaFieldRanker + + ranker = MetaFieldRanker(metadata_field="rating") + docs = [ + Document(text="Paris", metadata={"rating": 1.3}), + Document(text="Berlin", metadata={"rating": 0.7}), + Document(text="Barcelona", metadata={"rating": 2.1}), + ] + + output = ranker.run(query="", documents=docs) + docs = output["documents"] + assert docs[0].text == "Barcelona" + """ + + def __init__( + self, + metadata_field: str, + weight: float = 1.0, + top_k: Optional[int] = None, + ranking_mode: Literal["reciprocal_rank_fusion", "linear_score"] = "reciprocal_rank_fusion", + ): + """ + Creates an instance of MetaFieldRanker. + + :param metadata_field: The name of the metadata field to rank by. + :param weight: In range [0,1]. + 0 disables sorting by metadata field. + 0.5 content and metadata fields have the same impact. + 1 means sorting only by metadata field, highest value comes first. + :param top_k: The maximum number of documents to return per query. + :param ranking_mode: The mode used to combine retriever and recentness. + Possible values are 'reciprocal_rank_fusion' (default) and 'linear_score'. + Make sure to use 'score' mode only with retrievers/rankers that give back OK score in range [0,1]. + """ + + self.metadata_field = metadata_field + self.weight = weight + self.top_k = top_k + self.ranking_mode = ranking_mode + + if self.weight < 0 or self.weight > 1: + raise ValueError( + """ + Param needs to be in range [0,1] but was set to '{}'.\n + '0' disables sorting by metadata field, '0.5' gives equal weight to previous relevance scores and metadata field, and '1' ranks by metadata field only.\n + Please change param when initializing the MetaFieldRanker. + """.format( + self.weight + ) + ) + + if self.ranking_mode not in ["reciprocal_rank_fusion", "linear_score"]: + raise ValueError( + """ + Param needs to be 'reciprocal_rank_fusion' or 'linear_score' but was set to '{}'. \n + Please change the when initializing the MetaFieldRanker. + """.format( + self.ranking_mode + ) + ) + + def to_dict(self) -> Dict[str, Any]: + """ + Serialize object to a dictionary. + """ + return default_to_dict( + self, + metadata_field=self.metadata_field, + weight=self.weight, + top_k=self.top_k, + ranking_mode=self.ranking_mode, + ) + + @component.output_types(documents=List[Document]) + def run(self, query: str, documents: List[Document], top_k: Optional[int] = None): + """ + This method is used to rank a list of documents based on the selected metadata field by: + 1. Sorting the documents by the metadata field in descending order. + 2. Merging the scores from the metadata field with the scores from the previous component according to the strategy and weight provided. + 3. Returning the top-k documents. + + :param query: Not used in practice (so can be left blank), as this ranker does not perform sorting based on semantic closeness of documents to the query. + :param documents: Documents provided for ranking. + :param top_k: (optional) How many documents to return at the end. If not provided, all documents will be returned. + """ + if not documents: + return {"documents": []} + + if top_k is None: + top_k = self.top_k + elif top_k <= 0: + raise ValueError(f"top_k must be > 0, but got {top_k}") + + try: + sorted_by_metadata = sorted(documents, key=lambda doc: doc.meta[self.metadata_field], reverse=True) + except KeyError: + raise ComponentError( + """ + Param was set to '{}' but document(s) {} do not contain this metadata key.\n + Please double-check the names of existing metadata fields of your documents \n + and set to the name of the field that contains the metadata you want to rank by. + """.format( + self.metadata_field, ",".join([doc.id for doc in documents if self.metadata_field not in doc.meta]) + ) + ) + + if self.weight > 0: + sorted_documents = self._merge_scores(documents, sorted_by_metadata) + return {"documents": sorted_documents[:top_k]} + else: + return {"documents": sorted_by_metadata[:top_k]} + + def _merge_scores(self, documents: List[Document], sorted_documents: List[Document]) -> List[Document]: + """ + Merge scores for documents sorted both by content and by metadata field. + """ + scores_map: Dict = defaultdict(int) + + if self.ranking_mode == "reciprocal_rank_fusion": + for i, (doc, sorted_doc) in enumerate(zip(documents, sorted_documents)): + scores_map[doc.id] += self._calculate_rrf(rank=i) * (1 - self.weight) + scores_map[sorted_doc.id] += self._calculate_rrf(rank=i) * self.weight + elif self.ranking_mode == "linear_score": + for i, (doc, sorted_doc) in enumerate(zip(documents, sorted_documents)): + score = float(0) + if doc.score is None: + warnings.warn("The score was not provided; defaulting to 0") + elif doc.score < 0 or doc.score > 1: + warnings.warn( + "The score {} for document {} is outside the [0,1] range; defaulting to 0".format( + doc.score, doc.id + ) + ) + else: + score = doc.score + + scores_map[doc.id] += score * (1 - self.weight) + scores_map[sorted_doc.id] += self._calc_linear_score(rank=i, amount=len(sorted_documents)) * self.weight + + for doc in documents: + doc.score = scores_map[doc.id] + + new_sorted_documents = sorted(documents, key=lambda doc: doc.score if doc.score else -1, reverse=True) + return new_sorted_documents + + @staticmethod + def _calculate_rrf(rank: int, k: int = 61) -> float: + """ + Calculates the reciprocal rank fusion. The constant K is set to 61 (60 was suggested by the original paper, + plus 1 as python lists are 0-based and the paper [https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf] used 1-based ranking). + """ + return 1 / (k + rank) + + @staticmethod + def _calc_linear_score(rank: int, amount: int) -> float: + """ + Calculate the metadata field score as a linear score between the greatest and the lowest score in the list. + This linear scaling is useful to + a) reduce the effect of outliers and + b) create scores that are meaningfully distributed in [0,1], + similar to scores coming from a retriever/ranker. + """ + return (amount - rank) / amount diff --git a/releasenotes/notes/add-metadata-field-ranker-a8afd5bf15f29a0a.yaml b/releasenotes/notes/add-metadata-field-ranker-a8afd5bf15f29a0a.yaml new file mode 100644 index 000000000..f8f3110f4 --- /dev/null +++ b/releasenotes/notes/add-metadata-field-ranker-a8afd5bf15f29a0a.yaml @@ -0,0 +1,4 @@ +--- +preview: + - | + Adds MetaFieldRanker, a component that ranks a list of Documents based on the value of a metadata field of choice. diff --git a/test/preview/components/rankers/test_metafield.py b/test/preview/components/rankers/test_metafield.py new file mode 100644 index 000000000..0fd7521bc --- /dev/null +++ b/test/preview/components/rankers/test_metafield.py @@ -0,0 +1,123 @@ +import pytest + +from haystack.preview import Document, ComponentError +from haystack.preview.components.rankers.meta_field import MetaFieldRanker + + +class TestMetaFieldRanker: + @pytest.mark.unit + def test_to_dict(self): + component = MetaFieldRanker(metadata_field="rating") + data = component.to_dict() + assert data == { + "type": "MetaFieldRanker", + "init_parameters": { + "metadata_field": "rating", + "weight": 1.0, + "top_k": None, + "ranking_mode": "reciprocal_rank_fusion", + }, + } + + @pytest.mark.unit + def test_to_dict_with_custom_init_parameters(self): + component = MetaFieldRanker(metadata_field="rating", weight=0.5, top_k=5, ranking_mode="linear_score") + data = component.to_dict() + assert data == { + "type": "MetaFieldRanker", + "init_parameters": {"metadata_field": "rating", "weight": 0.5, "top_k": 5, "ranking_mode": "linear_score"}, + } + + @pytest.mark.integration + @pytest.mark.parametrize( + "query, metafield_values, expected_first_value", [("", [1.3, 0.7, 2.1], 2.1), ("", [1, 5, 8], 8)] + ) + def test_run(self, query, metafield_values, expected_first_value): + """ + Test if the component ranks documents correctly. + """ + ranker = MetaFieldRanker(metadata_field="rating") + docs_before = [Document(content="abc", meta={"rating": value}) for value in metafield_values] + + output = ranker.run(query=query, documents=docs_before) + docs_after = output["documents"] + + assert len(docs_after) == 3 + assert docs_after[0].meta["rating"] == expected_first_value + + sorted_scores = sorted([doc.meta["rating"] for doc in docs_after], reverse=True) + assert [doc.meta["rating"] for doc in docs_after] == sorted_scores + + @pytest.mark.integration + def test_returns_empty_list_if_no_documents_are_provided(self): + ranker = MetaFieldRanker(metadata_field="rating") + output = ranker.run(query="", documents=[]) + docs_after = output["documents"] + assert docs_after == [] + + @pytest.mark.integration + def test_raises_component_error_if_metadata_not_found(self): + ranker = MetaFieldRanker(metadata_field="rating") + docs_before = [Document(content="abc", meta={"wrong_field": 1.3})] + with pytest.raises(ComponentError): + ranker.run(query="", documents=docs_before) + + @pytest.mark.integration + def test_raises_component_error_if_wrong_ranking_mode(self): + with pytest.raises(ValueError): + MetaFieldRanker(metadata_field="rating", ranking_mode="wrong_mode") + + @pytest.mark.integration + @pytest.mark.parametrize("score", [-1, 2, 1.3, 2.1]) + def test_raises_component_error_if_wrong_weight(self, score): + with pytest.raises(ValueError): + MetaFieldRanker(metadata_field="rating", weight=score) + + @pytest.mark.integration + def test_linear_score(self): + ranker = MetaFieldRanker(metadata_field="rating", ranking_mode="linear_score", weight=0.5) + docs_before = [ + Document(content="abc", meta={"rating": 1.3}, score=0.3), + Document(content="abc", meta={"rating": 0.7}, score=0.4), + Document(content="abc", meta={"rating": 2.1}, score=0.6), + ] + output = ranker.run(query="", documents=docs_before) + docs_after = output["documents"] + assert docs_after[0].score == 0.8 + + @pytest.mark.integration + def test_reciprocal_rank_fusion(self): + ranker = MetaFieldRanker(metadata_field="rating", ranking_mode="reciprocal_rank_fusion", weight=0.5) + docs_before = [ + Document(content="abc", meta={"rating": 1.3}, score=0.3), + Document(content="abc", meta={"rating": 0.7}, score=0.4), + Document(content="abc", meta={"rating": 2.1}, score=0.6), + ] + output = ranker.run(query="", documents=docs_before) + docs_after = output["documents"] + assert docs_after[0].score == 0.01626123744050767 + + @pytest.mark.integration + @pytest.mark.parametrize("score", [-1, 2, 1.3, 2.1]) + def test_linear_score_raises_warning_if_doc_wrong_score(self, score): + ranker = MetaFieldRanker(metadata_field="rating", ranking_mode="linear_score", weight=0.5) + docs_before = [ + Document(id=1, content="abc", meta={"rating": 1.3}, score=score), + Document(id=2, content="abc", meta={"rating": 0.7}, score=0.4), + Document(id=3, content="abc", meta={"rating": 2.1}, score=0.6), + ] + with pytest.warns( + UserWarning, match=rf"The score {score} for document 1 is outside the \[0,1\] range; defaulting to 0" + ): + ranker.run(query="", documents=docs_before) + + @pytest.mark.integration + def test_linear_score_raises_raises_warning_if_doc_without_score(self): + ranker = MetaFieldRanker(metadata_field="rating", ranking_mode="linear_score", weight=0.5) + docs_before = [ + Document(content="abc", meta={"rating": 1.3}), + Document(content="abc", meta={"rating": 0.7}), + Document(content="abc", meta={"rating": 2.1}), + ] + with pytest.warns(UserWarning, match="The score was not provided; defaulting to 0"): + ranker.run(query="", documents=docs_before)