Index
- class utils.index.FaissIndex(d: int, index_type: str, metric: Literal['ip', 'cos', 'l2'], start_text_idx: int, device: Union[int, Literal['cpu']], save_dir: str, **kwargs)
Faiss Index.
- index
the faiss index object
- Type
faiss.Index
- onGPU
if the index is moved to GPU in
utils.index.FaissIndex.fit()- Type
- __init__(d: int, index_type: str, metric: Literal['ip', 'cos', 'l2'], start_text_idx: int, device: Union[int, Literal['cpu']], save_dir: str, **kwargs)
- Parameters
d – embedding dimension
index_type – command for index_factory
metric –
start_text_idx – when
config.parallel=='text', each shard starts from different offsetdevice – if number, the index will be transfered to this device
save_dir – the directory to save the index
- load(load_path: Optional[str] = None)
Load index from load_path.
- Parameters
load_path – if
None, useself.save_path
- save(save_path: Optional[str] = None)
Save index to a given path.
- Parameters
save_path – if
None, useself.save_path
- fit(text_embeddings: numpy.ndarray)
Move the index to gpu if needed;
Train the index by
text_embeddings;Add
text_embeddingsto the index.
- search(query_embeddings: numpy.ndarray, hits: int, batch_size: int = 500, query_start_idx: int = 0, eval_posting_length: bool = False, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], Optional[numpy.ndarray]]
KNN search the
query_embeddingsinself.index.- Parameters
query_embeddings – array of [N, D]
hits – top K
batch_size – the batch size for searching
query_start_idx – when
config.parallel=='query', offset the query across different processeseval_posting_length – when using IVFxx indexes, count the average hitted document number
verifier – the verifier to post rank the hitted results
- Returns
the retrieval result with scores the posting length array or None
- static get_xb(index: faiss.Index) numpy.ndarray
Get the database of the index.
- static get_pq_codebook(pq: faiss.ProductQuantizer) numpy.ndarray
Get the codebook of the pq.
- Parameters
pq – faiss ProductQuantizer instance
- static pq_quantize(codes: numpy.ndarray, centroids: numpy.ndarray) numpy.ndarray
Reconstruct the embedding from PQ.
- Parameters
codes – array of [B, M]
centroids – array of [M, ksub, dsub]
- Returns
Reconstructed embedding of [B, M*dsub]
- class utils.index.BaseInvertedIndex(text_num: int, token_num: int, device: Union[int, Literal['cpu']], rank: int, save_dir: str, special_token_ids: set = {})
Base class of the Inverted Indexes
- __init__(text_num: int, token_num: int, device: Union[int, Literal['cpu']], rank: int, save_dir: str, special_token_ids: set = {})
- Parameters
text_num – the number of documents
token_num – the number of tokens
device –
rank – current global process rank
save_dir – the directory for inverted lists
special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:
utils.manager.Manager.config.special_token_ids
- fit(text_token_ids: numpy.ndarray, text_embeddings: numpy.ndarray, load_index: bool = True, save_index: bool = True, threads: int = 16, shards: int = 32, posting_prune: float = 0.0, start_text_idx: int = 0)
Populate the inverted lists;
Save the inverted lists if necessary;
Move the posting lists to gpu if necessary;
Sort each posting list by the descending token weights if necessary.
- Parameters
text_token_ids – the token ids of each document, array of [N, L]
text_embeddings – the token weights/vectors, array of [N, L, D]
load_index – if
True, load the posting lists fromsave_dirsave_index – if
True, save the posting lists tosave_dirthreads – how many processes to put in the pool for parallel building inverted index
shards – how many pieces to shard the text_token_ids
posting_prune – what percentile of the inverted lists are kept
start_text_idx – the offset of the text_idx on current process
- _prune_posting_list(posting_list)
Shortcut for posting pruning.
- class utils.index.InvertedHitIndex(*args, **kwargs)
- __init__(*args, **kwargs)
- Parameters
text_num – the number of documents
token_num – the number of tokens
device –
rank – current global process rank
save_dir – the directory for inverted lists
special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:
utils.manager.Manager.config.special_token_ids
- search(query_token_ids: numpy.ndarray, eval_posting_length: bool = False, query_start_idx: int = 0, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[typing.Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], typing.Optional[<built-in function array>]]
Search the inverted index. Recall all documents hit by query_token_ids in the inverted index.
- Parameters
query_token_ids – array of [M, LQ]
eval_posting_length – if
True, count the average hitted document numberquery_start_idx – when
config.parallel=='query', offset the query across different processesverifier – the verifier to post rank the hitted results
- class utils.index.InvertedVectorIndex(*args, **kwargs)
Inverted Vector Index as described in COIL.
- __init__(*args, **kwargs)
- Parameters
text_num – the number of documents
token_num – the number of tokens
device –
rank – current global process rank
save_dir – the directory for inverted lists
special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:
utils.manager.Manager.config.special_token_ids
- search(query_token_ids: numpy.ndarray, query_embeddings: Optional[numpy.ndarray], hits, eval_posting_length: bool = False, query_start_idx: int = 0, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], Optional[numpy.ndarray]]
Search the inverted index. The max matching score of the same token is preserved.
- Parameters
query_token_ids – array of [M, LQ]
query_embeddings – array of [M, LQ, D] or None (regard all tokens in the query as the same important)
hits – top K
eval_posting_length – if
True, count the average hitted document numberquery_start_idx – when
config.parallel=='query', offset the query across different processesverifier – the verifier to post rank the hitted results
- class utils.index.BaseAnseriniIndex(collection_dir: str, index_dir: str)
Wrapper of Anserini Indexes.
- __init__(collection_dir: str, index_dir: str) None
- Parameters
collection_dir – the json collections folder generated by
utils.index.AnseriniImpactIndex.fit()andutils.index.AnseriniBM25Index.fit()index_dir – the folder to store the lucene index file; usually
models.BaseModel.BaseModel.index_dir
- convert_retrieval_result(tmp_retrieval_result_path: str, verifier: Optional[utils.index.BasePostVerifier] = None) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]
Anserini defaults to use docids in the collection. Convert the naive docids to our doc idx.
- Parameters
tmp_retrieval_result_path – the path to the temporary retrieval result file created by anserini Searcher
verifier – post verification
- prepare_query(query_path, tmp_query_dir, query_token_ids=None, query_token_weights=None)
Generate temperary query file for anserini.
handle pretokenization
handle subword_to_word transformation and token weight quantization
split query into multiple files if there are two many
- class utils.index.AnseriniImpactIndex(collection_dir: str, index_dir: str)
Anserini impact index.
- __init__(collection_dir: str, index_dir: str) None
- Parameters
collection_dir – the json collections folder generated by
utils.index.AnseriniImpactIndex.fit()andutils.index.AnseriniBM25Index.fit()index_dir – the folder to store the lucene index file; usually
models.BaseModel.BaseModel.index_dir
- fit(text_token_ids: numpy.ndarray, text_token_weights: numpy.ndarray, quantize_bit: int, tokenizer: Any, granularity: str, subword_to_word: Callable, stop_words: set, reduce: str, thread_num: int = 32, enable_build_collection: bool = True, enable_build_index: bool = True, language: str = 'eng', **kwargs)
Collect tokens into words and create json collection.
Construct the anserini index.
- Parameters
text_token_ids – array of [N, L]
text_token_weights – array of [N, L, 1]
quantize_bit –
tokenizer –
granularity – {token, word}
subword_to_word –
stop_words –
reduce –
enable_build_collection – if
True, rebuild the json collectionenable_build_index – if
True, rebuild the anserini indexlanugage – {eng, zh}
- search(query_token_ids, retrieval_result_path: str, hits: int, query_token_weights: Optional[numpy.ndarray] = None, query_path: Optional[str] = None, tmp_query_dir: Optional[str] = None, language: str = 'eng', verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]
Search by Anserini.
- Parameters
query_token_ids – the pretokenized query token ids
query_token_weights – the weights of each token
query_path – the raw query file path
retrieval_result_path –
hits – Top K
qid2index – mapping from query id to query idx; generated by
scripts.preprocesstid2index – mapping from document id to document idx
query_path – the raw query file path
tmp_query_path – the temperary file to save pretokenized query
lanugage – {eng, zh}
verifier – the verifier to post rank the hitted results
- class utils.index.AnseriniBM25Index(collection_dir, index_dir)
Anserini BM25 index.
- __init__(collection_dir, index_dir) None
- Parameters
collection_dir – the json collections folder generated by
utils.index.AnseriniImpactIndex.fit()andutils.index.AnseriniBM25Index.fit()index_dir – the folder to store the lucene index file; usually
models.BaseModel.BaseModel.index_dir
- fit(text_path: Optional[str] = None, text_cols: Optional[list] = None, text_token_ids: Optional[numpy.ndarray] = None, tokenizer: Optional[Any] = None, thread_num: int = 32, enable_build_collection: bool = True, enable_build_index: bool = True, language: str = 'eng', stop_words: Optional[set] = None, granularity: Optional[str] = None, **kwargs)
Convert the TSV collection into json collection by
scripts.collection.Construct the anserini index.
- Parameters
enable_build_collection – if
True, rebuild the json collectionenable_build_index – if
True, rebuild the anserini indexlanugage – {eng, zh}
- search(query_path: str, retrieval_result_path: str, hits: int, query_token_ids: Optional[numpy.ndarray] = None, tmp_query_dir: Optional[str] = None, language: str = 'eng', k1: float = 0.82, b: float = 0.68, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]
Search by Anserini.
- Parameters
query_path – the raw query file path
retrieval_result_path –
hits – Top K
qid2index – mapping from query id to query idx; generated by
scripts.preprocesstid2index – mapping from document id to document idx
lanugage – {eng, zh}
query_token_ids – the pretokenized query token ids
tmp_query_path – the temperary file to save pretokenized query
k1 – BM25 parameter
b – BM25 parameter
verifier – the verifier to post rank the hitted results
- class utils.index.TrieIndex(rank: int = 0, save_dir: str = '.', save_name: Optional[str] = None, **kwargs)
TrieIndex Index.
- __init__(rank: int = 0, save_dir: str = '.', save_name: Optional[str] = None, **kwargs)
- Parameters
save_dir – the directory to save the trie index
pad_token_id – will replace the -1 in the codes with
pad_token_id
- _find(key)
Find the node corresponding to the key.
- Returns
a valid node if the key is stored, otherwise None
- Return type
Optional[Node]
- keys(prefix=None)
Return a list of this trie’s keys. :param prefix: If not None, return only the keys prefixed by
prefix.
- values(prefix=None)
Return a list of this trie’s values. :param prefix: If not None, return only the values associated with keys prefixed by
prefix.
- items(prefix=None)
Return a list of this trie’s items (
(key,value)tuples). :param prefix: If not None, return only the items associated with keys prefixed byprefix.
- iterkeys(prefix=None)
Return an iterator over this trie’s keys. :param prefix: If not None, yield only the keys prefixed by
prefix.
- itervalues(prefix=None)
Return an iterator over this trie’s values. :param prefix: If not None, yield only the values associated with keys prefixed by
prefix.
- iteritems(prefix=None)
Return an iterator over this trie’s items (
(key,value)tuples). :param prefix: If not None, yield only the items associated with keys prefixed byprefix.
- get_valid_tokens(prefix: Union[list, numpy.ndarray]) list
Returns valid key at the next position given the
prefix.
- add(sequences: Union[list[list], numpy.ndarray], ids: Optional[Union[list, numpy.ndarray]] = None, verbose: bool = True)
Add a bulk of sequences into the trie.
- Parameters
sequences – usually the document codes generated by
models.BaseModel.BaseModel.generate_code()ids – auxillary ids for each document
verbose – print information
- save(save_path: Optional[str] = None)
Save the trie at
save_path.- Parameters
save_path – if
None, theself.save_pathwill be used
- load(save_path: Optional[str] = None)
Load the trie from
save_path.- Parameters
save_path – if
None, theself.save_pathwill be used
- fit(text_codes: numpy.ndarray, load_index: bool = False, save_index: bool = False, verbose: bool = True, **kwargs)
Build TrieIndex from codes;
Save TrieIndex if necessary.
- Parameters
text_codes – the codes of texts
load_index – if
True, load the existing triesave_index – if
True, force to save the new constructed trierebuild_index – if
False, default toload_index=Trueandsave_index=False; ifTrue, default toload_index=Falseandsave_index=False
- inspect_structure(verbose=True) numpy.ndarray
check the children number in each layer of the trie
- class utils.index.BeamDecoder
Minxin for beam search with threshold and eos_hidden_states cache. Based on code from transformers.
- _add_beam(global_batch_idx: int, hypothesis: Union[list, numpy.ndarray], score: float, eos_hidden_state: Optional[torch.Tensor] = None, text_idx: Optional[Union[list, numpy.ndarray]] = None, word_set: Optional[Union[list, numpy.ndarray]] = None)
Update self.beams
- _finalize()
Break the self.beams into beams, scores, eos_hidden_states and text_indices;
- update_beams(beam_tokens, beam_scores, beam_indices, input_ids, model, model_kwargs, past_key_values, max_new_tokens, constrain_index: Union[utils.index.TrieIndex, utils.index.WordSetIndex])
update beams if eos is decoded for trie index
update beams if the decoded words correspond to a unique doc for wordset index
update input_ids by concatenation
update model_kwargs by next_beam_indices
- Parameters
next_beam_tokens – (batch_size, num_beams * 2)
next_beam_scores – (batch_size, num_beams * 2)
next_beam_indices – (batch_size, num_beams * 2)
- search(model: transformers.T5ForConditionalGeneration, query: Mapping, nbeam=10, max_new_tokens=33, constrain_index: Optional[Union[utils.index.TrieIndex, utils.index.WordSetIndex]] = None, threshold=0, trsd_start_len=0, rank_type='prob', tokenizer=None, text_indices=None, do_sample=False, do_greedy=False, do_early_stop=False, top_k=None, top_p=None, typical_p=None, temperature=None, renormalize_logits=None, early_stop_start_len=0, **kwargs)
Perform beam search with constrain from trie_index or wordset_index.
- class utils.index.FlatVerifier(query_embeddings: numpy.ndarray, text_embeddings: numpy.ndarray, hits: int = 1000, device: Union[int, Literal['cpu']] = 'cpu', **kwargs)
Post verify
retrieval_resultby the brute-force ranking fromconfig.verifier_src.
- class utils.index.PQVerifier(query_embeddings: numpy.ndarray, pq_index: faiss.Index, start_text_idx: int, end_text_idx: int, hits: int = 1000, **kwargs)
Post verify
retrieval_resultby the PQ ranking fromconfig.verifier_src.
- utils.index.merge_retrieval_result(start_query_idx: int, end_query_idx: int, text_num: int, retrieval_results: list[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]]) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], numpy.ndarray, numpy.ndarray]
Merge a list of retrieval results into one.
- Parameters
start_query_idx – the starting offset in
retrieval_results[i]end_query_idx – the ending offset in
retrieval_results[i]retrieval_results – a list of retrieval results
- Returns
the merged retrieval result the ids of all hitted text, array of [n] the inverse index of all hitted text, array of [N]
- utils.index.build_inverted_lists(args)
Build inverted lists for
utils.index.BaseInvertedIndex.
- utils.index.subword_to_word_bert(x)
- Returns
True for a subword and False for valid word word: the filtered subword or the word itself
- Return type
is_subword
- utils.index.convert_tokens_to_words(tokens, subword_to_word, scores=None, reduce='max')
transform the tokens output by tokenizer to words (connecting subwords) :returns: list of words :rtype: words
- utils.index.build_impact_collection(token_ids, token_weights, text_start_idx, output_json_path, tokenizer, subword_to_word, stop_words, reduce)
generate jsonl files in multiple threads
- utils.index.build_pretokenized_collection(token_ids, text_start_idx, output_json_path, tokenizer, stop_words, granularity)
generate jsonl files in multiple threads
- Parameters
granularity – {token, word}
- utils.index.count_stepwise_code_collision(codes, verbose=True) tuple[numpy.ndarray, numpy.ndarray]
Count code collision at each step, both as sequences and as sets.
- utils.index.permute_code(codes: numpy.ndarray, rule='rotate', level: int = 5, k: int = 5) list[numpy.ndarray]
Reorder the first
levelcodes.- Parameters
codes – text codes generated by
models.BaseModel.BaseModel.generate_code()rule – how to permute codes
level – how many steps to permute
k – how many replicas to return
- Returns
a series of permuted codes
- Return type
list[np.ndarray]