diff --git a/haystack/components/joiners/document_joiner.py b/haystack/components/joiners/document_joiner.py index 6c02c1ffa..6e1e5a803 100644 --- a/haystack/components/joiners/document_joiner.py +++ b/haystack/components/joiners/document_joiner.py @@ -22,6 +22,7 @@ class DocumentJoiner: - concatenate: Keeps the highest scored Document in case of duplicates. - merge: Merge a calculate a weighted sum of the scores of duplicate Documents. - reciprocal_rank_fusion: Merge and assign scores based on reciprocal rank fusion. + - distribution_based_rank_fusion: Merge and assign scores based on scores distribution in each retriever Usage example: ```python @@ -57,16 +58,17 @@ class DocumentJoiner: - `concatenate` - `merge` - `reciprocal_rank_fusion` + - `distribution_based_rank_fusion` :param weights: Weight for each list of Documents received, must have the same length as the number of inputs. - If `join_mode` is `concatenate` this parameter is ignored. + If `join_mode` is `concatenate` or `distribution_based_rank_fusion` this parameter is ignored. :param top_k: The maximum number of Documents to return. :param sort_by_score: If True sorts the Documents by score in descending order. If a Document has no score, it is handled as if its score is -infinity. """ - if join_mode not in ["concatenate", "merge", "reciprocal_rank_fusion"]: + if join_mode not in ["concatenate", "merge", "reciprocal_rank_fusion", "distribution_based_rank_fusion"]: raise ValueError(f"DocumentJoiner component does not support '{join_mode}' join_mode.") self.join_mode = join_mode self.weights = [float(i) / sum(weights) for i in weights] if weights else None @@ -88,12 +90,16 @@ class DocumentJoiner: - `documents`: Merged list of Documents """ output_documents = [] + + documents = list(documents) if self.join_mode == "concatenate": output_documents = self._concatenate(documents) elif self.join_mode == "merge": output_documents = self._merge(documents) elif self.join_mode == "reciprocal_rank_fusion": output_documents = self._reciprocal_rank_fusion(documents) + elif self.join_mode == "distribution_based_rank_fusion": + output_documents = self._distribution_based_rank_fusion(documents) if self.sort_by_score: output_documents = sorted( @@ -112,7 +118,7 @@ class DocumentJoiner: return {"documents": output_documents} - def _concatenate(self, document_lists): + def _concatenate(self, document_lists: List[List[Document]]) -> List[Document]: """ Concatenate multiple lists of Documents and return only the Document with the highest score for duplicates. """ @@ -125,11 +131,11 @@ class DocumentJoiner: output.append(doc_with_best_score) return output - def _merge(self, document_lists): + def _merge(self, document_lists: List[List[Document]]) -> List[Document]: """ Merge multiple lists of Documents and calculate a weighted sum of the scores of duplicate Documents. """ - scores_map = defaultdict(int) + scores_map: dict = defaultdict(int) documents_map = {} weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists) @@ -141,9 +147,9 @@ class DocumentJoiner: for doc in documents_map.values(): doc.score = scores_map[doc.id] - return documents_map.values() + return list(documents_map.values()) - def _reciprocal_rank_fusion(self, document_lists): + def _reciprocal_rank_fusion(self, document_lists: List[List[Document]]) -> List[Document]: """ Merge multiple lists of Documents and assign scores based on reciprocal rank fusion. @@ -152,7 +158,7 @@ class DocumentJoiner: """ k = 61 - scores_map = defaultdict(int) + scores_map: dict = defaultdict(int) documents_map = {} weights = self.weights if self.weights else [1 / len(document_lists)] * len(document_lists) @@ -170,4 +176,29 @@ class DocumentJoiner: for doc in documents_map.values(): doc.score = scores_map[doc.id] - return documents_map.values() + return list(documents_map.values()) + + def _distribution_based_rank_fusion(self, document_lists: List[List[Document]]) -> List[Document]: + """ + Merge multiple lists of Documents and assign scores based on Distribution-Based Score Fusion. + + (https://medium.com/plain-simple-software/distribution-based-score-fusion-dbsf-a-new-approach-to-vector-search-ranking-f87c37488b18) + If a Document is in more than one retriever, the one with the highest score is used. + """ + for documents in document_lists: + scores_list = [] + + for doc in documents: + scores_list.append(doc.score if doc.score is not None else 0) + + mean_score = sum(scores_list) / len(scores_list) + std_dev = (sum((x - mean_score) ** 2 for x in scores_list) / len(scores_list)) ** 0.5 + min_score = mean_score - 3 * std_dev + max_score = mean_score + 3 * std_dev + + for doc in documents: + doc.score = (doc.score - min_score) / (max_score - min_score) + + output = self._concatenate(document_lists=document_lists) + + return output diff --git a/releasenotes/notes/add-distribution-based-rank-fusion-mode-JoinDocuments-6fca30b82fd535ce.yaml b/releasenotes/notes/add-distribution-based-rank-fusion-mode-JoinDocuments-6fca30b82fd535ce.yaml new file mode 100644 index 000000000..e7bb46a8a --- /dev/null +++ b/releasenotes/notes/add-distribution-based-rank-fusion-mode-JoinDocuments-6fca30b82fd535ce.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Added a new mode in JoinDocuments, + Distribution-based rank fusion as + [the article](https://medium.com/plain-simple-software/distribution-based-score-fusion-dbsf-a-new-approach-to-vector-search-ranking-f87c37488b18) diff --git a/test/components/joiners/test_document_joiner.py b/test/components/joiners/test_document_joiner.py index 48fa5b230..41fe27554 100644 --- a/test/components/joiners/test_document_joiner.py +++ b/test/components/joiners/test_document_joiner.py @@ -115,6 +115,36 @@ class TestDocumentJoiner: ] assert all(doc.id in expected_document_ids for doc in output["documents"]) + def test_run_with_distribution_based_rank_fusion_join_mode(self): + joiner = DocumentJoiner(join_mode="distribution_based_rank_fusion") + documents_1 = [ + Document(content="a", score=0.6), + Document(content="b", score=0.2), + Document(content="c", score=0.5), + ] + documents_2 = [ + Document(content="d", score=0.5), + Document(content="e", score=0.8), + Document(content="f", score=1.1, meta={"key": "value"}), + Document(content="g", score=0.3), + Document(content="a", score=0.3), + ] + output = joiner.run([documents_1, documents_2]) + assert len(output["documents"]) == 7 + expected_document_ids = [ + doc.id + for doc in [ + Document(content="a", score=0.66), + Document(content="b", score=0.27), + Document(content="c", score=0.56), + Document(content="d", score=0.44), + Document(content="e", score=0.60), + Document(content="f", score=0.76, meta={"key": "value"}), + Document(content="g", score=0.33), + ] + ] + assert all(doc.id in expected_document_ids for doc in output["documents"]) + def test_run_with_top_k_in_run_method(self): joiner = DocumentJoiner() documents_1 = [Document(content="a"), Document(content="b"), Document(content="c")]