Index

class utils.index.BasePostVerifier

Basic class for post verifier.

__init__() None
class utils.index.FaissIndex(d: int, index_type: str, metric: Literal['ip', 'cos', 'l2'], start_text_idx: int, device: Union[int, Literal['cpu']], save_dir: str, **kwargs)

Faiss Index.

index

the faiss index object

Type

faiss.Index

onGPU

if the index is moved to GPU in utils.index.FaissIndex.fit()

Type

bool

__init__(d: int, index_type: str, metric: Literal['ip', 'cos', 'l2'], start_text_idx: int, device: Union[int, Literal['cpu']], save_dir: str, **kwargs)
Parameters
  • d – embedding dimension

  • index_type – command for index_factory

  • metric

  • start_text_idx – when config.parallel=='text', each shard starts from different offset

  • device – if number, the index will be transfered to this device

  • save_dir – the directory to save the index

load(load_path: Optional[str] = None)

Load index from load_path.

Parameters

load_path – if None, use self.save_path

save(save_path: Optional[str] = None)

Save index to a given path.

Parameters

save_path – if None, use self.save_path

fit(text_embeddings: numpy.ndarray)
  1. Move the index to gpu if needed;

  2. Train the index by text_embeddings;

  3. Add text_embeddings to the index.

search(query_embeddings: numpy.ndarray, hits: int, batch_size: int = 500, query_start_idx: int = 0, eval_posting_length: bool = False, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], Optional[numpy.ndarray]]

KNN search the query_embeddings in self.index.

Parameters
  • query_embeddings – array of [N, D]

  • hits – top K

  • batch_size – the batch size for searching

  • query_start_idx – when config.parallel=='query', offset the query across different processes

  • eval_posting_length – when using IVFxx indexes, count the average hitted document number

  • verifier – the verifier to post rank the hitted results

Returns

the retrieval result with scores the posting length array or None

static get_xb(index: faiss.Index) numpy.ndarray

Get the database of the index.

static get_pq_codebook(pq: faiss.ProductQuantizer) numpy.ndarray

Get the codebook of the pq.

Parameters

pq – faiss ProductQuantizer instance

static pq_quantize(codes: numpy.ndarray, centroids: numpy.ndarray) numpy.ndarray

Reconstruct the embedding from PQ.

Parameters
  • codes – array of [B, M]

  • centroids – array of [M, ksub, dsub]

Returns

Reconstructed embedding of [B, M*dsub]

class utils.index.BaseInvertedIndex(text_num: int, token_num: int, device: Union[int, Literal['cpu']], rank: int, save_dir: str, special_token_ids: set = {})

Base class of the Inverted Indexes

__init__(text_num: int, token_num: int, device: Union[int, Literal['cpu']], rank: int, save_dir: str, special_token_ids: set = {})
Parameters
  • text_num – the number of documents

  • token_num – the number of tokens

  • device

  • rank – current global process rank

  • save_dir – the directory for inverted lists

  • special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:utils.manager.Manager.config.special_token_ids

fit(text_token_ids: numpy.ndarray, text_embeddings: numpy.ndarray, load_index: bool = True, save_index: bool = True, threads: int = 16, shards: int = 32, posting_prune: float = 0.0, start_text_idx: int = 0)
  1. Populate the inverted lists;

  2. Save the inverted lists if necessary;

  3. Move the posting lists to gpu if necessary;

  4. Sort each posting list by the descending token weights if necessary.

Parameters
  • text_token_ids – the token ids of each document, array of [N, L]

  • text_embeddings – the token weights/vectors, array of [N, L, D]

  • load_index – if True, load the posting lists from save_dir

  • save_index – if True, save the posting lists to save_dir

  • threads – how many processes to put in the pool for parallel building inverted index

  • shards – how many pieces to shard the text_token_ids

  • posting_prune – what percentile of the inverted lists are kept

  • start_text_idx – the offset of the text_idx on current process

_prune_posting_list(posting_list)

Shortcut for posting pruning.

class utils.index.InvertedHitIndex(*args, **kwargs)
__init__(*args, **kwargs)
Parameters
  • text_num – the number of documents

  • token_num – the number of tokens

  • device

  • rank – current global process rank

  • save_dir – the directory for inverted lists

  • special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:utils.manager.Manager.config.special_token_ids

search(query_token_ids: numpy.ndarray, eval_posting_length: bool = False, query_start_idx: int = 0, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[typing.Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], typing.Optional[<built-in function array>]]

Search the inverted index. Recall all documents hit by query_token_ids in the inverted index.

Parameters
  • query_token_ids – array of [M, LQ]

  • eval_posting_length – if True, count the average hitted document number

  • query_start_idx – when config.parallel=='query', offset the query across different processes

  • verifier – the verifier to post rank the hitted results

class utils.index.InvertedVectorIndex(*args, **kwargs)

Inverted Vector Index as described in COIL.

__init__(*args, **kwargs)
Parameters
  • text_num – the number of documents

  • token_num – the number of tokens

  • device

  • rank – current global process rank

  • save_dir – the directory for inverted lists

  • special_token_ids – the special token ids in PLM tokenizer, usually can be obtained by :py:obj:utils.manager.Manager.config.special_token_ids

search(query_token_ids: numpy.ndarray, query_embeddings: Optional[numpy.ndarray], hits, eval_posting_length: bool = False, query_start_idx: int = 0, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], Optional[numpy.ndarray]]

Search the inverted index. The max matching score of the same token is preserved.

Parameters
  • query_token_ids – array of [M, LQ]

  • query_embeddings – array of [M, LQ, D] or None (regard all tokens in the query as the same important)

  • hits – top K

  • eval_posting_length – if True, count the average hitted document number

  • query_start_idx – when config.parallel=='query', offset the query across different processes

  • verifier – the verifier to post rank the hitted results

class utils.index.BaseAnseriniIndex(collection_dir: str, index_dir: str)

Wrapper of Anserini Indexes.

__init__(collection_dir: str, index_dir: str) None
Parameters
convert_retrieval_result(tmp_retrieval_result_path: str, verifier: Optional[utils.index.BasePostVerifier] = None) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]

Anserini defaults to use docids in the collection. Convert the naive docids to our doc idx.

Parameters
  • tmp_retrieval_result_path – the path to the temporary retrieval result file created by anserini Searcher

  • verifier – post verification

prepare_query(query_path, tmp_query_dir, query_token_ids=None, query_token_weights=None)

Generate temperary query file for anserini.

  1. handle pretokenization

  2. handle subword_to_word transformation and token weight quantization

  3. split query into multiple files if there are two many

class utils.index.AnseriniImpactIndex(collection_dir: str, index_dir: str)

Anserini impact index.

__init__(collection_dir: str, index_dir: str) None
Parameters
fit(text_token_ids: numpy.ndarray, text_token_weights: numpy.ndarray, quantize_bit: int, tokenizer: Any, granularity: str, subword_to_word: Callable, stop_words: set, reduce: str, thread_num: int = 32, enable_build_collection: bool = True, enable_build_index: bool = True, language: str = 'eng', **kwargs)
  1. Collect tokens into words and create json collection.

  2. Construct the anserini index.

Parameters
  • text_token_ids – array of [N, L]

  • text_token_weights – array of [N, L, 1]

  • quantize_bit

  • tokenizer

  • granularity – {token, word}

  • subword_to_word

  • stop_words

  • reduce

  • enable_build_collection – if True, rebuild the json collection

  • enable_build_index – if True, rebuild the anserini index

  • lanugage – {eng, zh}

search(query_token_ids, retrieval_result_path: str, hits: int, query_token_weights: Optional[numpy.ndarray] = None, query_path: Optional[str] = None, tmp_query_dir: Optional[str] = None, language: str = 'eng', verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]

Search by Anserini.

Parameters
  • query_token_ids – the pretokenized query token ids

  • query_token_weights – the weights of each token

  • query_path – the raw query file path

  • retrieval_result_path

  • hits – Top K

  • qid2index – mapping from query id to query idx; generated by scripts.preprocess

  • tid2index – mapping from document id to document idx

  • query_path – the raw query file path

  • tmp_query_path – the temperary file to save pretokenized query

  • lanugage – {eng, zh}

  • verifier – the verifier to post rank the hitted results

class utils.index.AnseriniBM25Index(collection_dir, index_dir)

Anserini BM25 index.

__init__(collection_dir, index_dir) None
Parameters
fit(text_path: Optional[str] = None, text_cols: Optional[list] = None, text_token_ids: Optional[numpy.ndarray] = None, tokenizer: Optional[Any] = None, thread_num: int = 32, enable_build_collection: bool = True, enable_build_index: bool = True, language: str = 'eng', stop_words: Optional[set] = None, granularity: Optional[str] = None, **kwargs)
  1. Convert the TSV collection into json collection by scripts.collection.

  2. Construct the anserini index.

Parameters
  • enable_build_collection – if True, rebuild the json collection

  • enable_build_index – if True, rebuild the anserini index

  • lanugage – {eng, zh}

search(query_path: str, retrieval_result_path: str, hits: int, query_token_ids: Optional[numpy.ndarray] = None, tmp_query_dir: Optional[str] = None, language: str = 'eng', k1: float = 0.82, b: float = 0.68, verifier: Optional[utils.index.BasePostVerifier] = None, **kwargs) Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]

Search by Anserini.

Parameters
  • query_path – the raw query file path

  • retrieval_result_path

  • hits – Top K

  • qid2index – mapping from query id to query idx; generated by scripts.preprocess

  • tid2index – mapping from document id to document idx

  • lanugage – {eng, zh}

  • query_token_ids – the pretokenized query token ids

  • tmp_query_path – the temperary file to save pretokenized query

  • k1 – BM25 parameter

  • b – BM25 parameter

  • verifier – the verifier to post rank the hitted results

class utils.index.TrieIndex(rank: int = 0, save_dir: str = '.', save_name: Optional[str] = None, **kwargs)

TrieIndex Index.

__init__(rank: int = 0, save_dir: str = '.', save_name: Optional[str] = None, **kwargs)
Parameters
  • save_dir – the directory to save the trie index

  • pad_token_id – will replace the -1 in the codes with pad_token_id

_find(key)

Find the node corresponding to the key.

Returns

a valid node if the key is stored, otherwise None

Return type

Optional[Node]

keys(prefix=None)

Return a list of this trie’s keys. :param prefix: If not None, return only the keys prefixed by prefix.

values(prefix=None)

Return a list of this trie’s values. :param prefix: If not None, return only the values associated with keys prefixed by prefix.

items(prefix=None)

Return a list of this trie’s items ((key,value) tuples). :param prefix: If not None, return only the items associated with keys prefixed by prefix.

iterkeys(prefix=None)

Return an iterator over this trie’s keys. :param prefix: If not None, yield only the keys prefixed by prefix.

itervalues(prefix=None)

Return an iterator over this trie’s values. :param prefix: If not None, yield only the values associated with keys prefixed by prefix.

iteritems(prefix=None)

Return an iterator over this trie’s items ((key,value) tuples). :param prefix: If not None, yield only the items associated with keys prefixed by prefix.

get_valid_tokens(prefix: Union[list, numpy.ndarray]) list

Returns valid key at the next position given the prefix.

add(sequences: Union[list[list], numpy.ndarray], ids: Optional[Union[list, numpy.ndarray]] = None, verbose: bool = True)

Add a bulk of sequences into the trie.

Parameters
  • sequences – usually the document codes generated by models.BaseModel.BaseModel.generate_code()

  • ids – auxillary ids for each document

  • verbose – print information

save(save_path: Optional[str] = None)

Save the trie at save_path.

Parameters

save_path – if None, the self.save_path will be used

load(save_path: Optional[str] = None)

Load the trie from save_path.

Parameters

save_path – if None, the self.save_path will be used

fit(text_codes: numpy.ndarray, load_index: bool = False, save_index: bool = False, verbose: bool = True, **kwargs)
  1. Build TrieIndex from codes;

  2. Save TrieIndex if necessary.

Parameters
  • text_codes – the codes of texts

  • load_index – if True, load the existing trie

  • save_index – if True, force to save the new constructed trie

  • rebuild_index – if False, default to load_index=True and save_index=False; if True, default to load_index=False and save_index=False

inspect_structure(verbose=True) numpy.ndarray

check the children number in each layer of the trie

class utils.index.BeamDecoder

Minxin for beam search with threshold and eos_hidden_states cache. Based on code from transformers.

_add_beam(global_batch_idx: int, hypothesis: Union[list, numpy.ndarray], score: float, eos_hidden_state: Optional[torch.Tensor] = None, text_idx: Optional[Union[list, numpy.ndarray]] = None, word_set: Optional[Union[list, numpy.ndarray]] = None)

Update self.beams

_finalize()

Break the self.beams into beams, scores, eos_hidden_states and text_indices;

update_beams(beam_tokens, beam_scores, beam_indices, input_ids, model, model_kwargs, past_key_values, max_new_tokens, constrain_index: Union[utils.index.TrieIndex, utils.index.WordSetIndex])
  1. update beams if eos is decoded for trie index

  2. update beams if the decoded words correspond to a unique doc for wordset index

  3. update input_ids by concatenation

  4. update model_kwargs by next_beam_indices

Parameters
  • next_beam_tokens – (batch_size, num_beams * 2)

  • next_beam_scores – (batch_size, num_beams * 2)

  • next_beam_indices – (batch_size, num_beams * 2)

search(model: transformers.T5ForConditionalGeneration, query: Mapping, nbeam=10, max_new_tokens=33, constrain_index: Optional[Union[utils.index.TrieIndex, utils.index.WordSetIndex]] = None, threshold=0, trsd_start_len=0, rank_type='prob', tokenizer=None, text_indices=None, do_sample=False, do_greedy=False, do_early_stop=False, top_k=None, top_p=None, typical_p=None, temperature=None, renormalize_logits=None, early_stop_start_len=0, **kwargs)

Perform beam search with constrain from trie_index or wordset_index.

class utils.index.FlatVerifier(query_embeddings: numpy.ndarray, text_embeddings: numpy.ndarray, hits: int = 1000, device: Union[int, Literal['cpu']] = 'cpu', **kwargs)

Post verify retrieval_result by the brute-force ranking from config.verifier_src.

__init__(query_embeddings: numpy.ndarray, text_embeddings: numpy.ndarray, hits: int = 1000, device: Union[int, Literal['cpu']] = 'cpu', **kwargs) None
class utils.index.PQVerifier(query_embeddings: numpy.ndarray, pq_index: faiss.Index, start_text_idx: int, end_text_idx: int, hits: int = 1000, **kwargs)

Post verify retrieval_result by the PQ ranking from config.verifier_src.

__init__(query_embeddings: numpy.ndarray, pq_index: faiss.Index, start_text_idx: int, end_text_idx: int, hits: int = 1000, **kwargs) None
Parameters
  • start_text_idx – the offset of this shard

  • end_text_idx – the ending offset

utils.index.merge_retrieval_result(start_query_idx: int, end_query_idx: int, text_num: int, retrieval_results: list[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]]]) tuple[Union[dict[int, list[int]], dict[int, list[tuple[int, float]]]], numpy.ndarray, numpy.ndarray]

Merge a list of retrieval results into one.

Parameters
  • start_query_idx – the starting offset in retrieval_results[i]

  • end_query_idx – the ending offset in retrieval_results[i]

  • retrieval_results – a list of retrieval results

Returns

the merged retrieval result the ids of all hitted text, array of [n] the inverse index of all hitted text, array of [N]

utils.index.build_inverted_lists(args)

Build inverted lists for utils.index.BaseInvertedIndex.

utils.index.subword_to_word_bert(x)
Returns

True for a subword and False for valid word word: the filtered subword or the word itself

Return type

is_subword

utils.index.convert_tokens_to_words(tokens, subword_to_word, scores=None, reduce='max')

transform the tokens output by tokenizer to words (connecting subwords) :returns: list of words :rtype: words

utils.index.build_impact_collection(token_ids, token_weights, text_start_idx, output_json_path, tokenizer, subword_to_word, stop_words, reduce)

generate jsonl files in multiple threads

utils.index.build_pretokenized_collection(token_ids, text_start_idx, output_json_path, tokenizer, stop_words, granularity)

generate jsonl files in multiple threads

Parameters

granularity – {token, word}

utils.index.count_stepwise_code_collision(codes, verbose=True) tuple[numpy.ndarray, numpy.ndarray]

Count code collision at each step, both as sequences and as sets.

utils.index.permute_code(codes: numpy.ndarray, rule='rotate', level: int = 5, k: int = 5) list[numpy.ndarray]

Reorder the first level codes.

Parameters
  • codes – text codes generated by models.BaseModel.BaseModel.generate_code()

  • rule – how to permute codes

  • level – how many steps to permute

  • k – how many replicas to return

Returns

a series of permuted codes

Return type

list[np.ndarray]