backend.api.llm_pipeline¶
LLM Pipeline Orchestration
This module defines: - Utilities for loading vector indexes and reranker models - A TypedDict AgentState for LangGraph workflows - LLM_Pipeline: a LangGraph-based workflow for legal question answering
Key Components¶
load_vector_index(top_k, persist_dir, embedding) -> Loads a LlamaIndex vector index with a given embedding model and returns a retriever.
load_reranker_model() -> Loads a Cohere client + finetuned reranker model.
initialize_indexes(top_k) -> Preloads all vector indexes (Phishing, Law Cases Recall/Precision, Greek Penal Code Recall/Precision, GDPR Recall/Precision).
AgentState -> TypedDict describing the state used across LangGraph workflow.
LLM_Pipeline -> Orchestrates the RAG pipeline:
starting_prompt: safety & relevance check
query_translation: ensure English queries for retrieval
query_rewriting: generate paraphrases for broader coverage
run_classifications_parallel: classify into multiple categories
run_retrievals_parallel: retrieve from multiple indexes
get_context: summarize retrieved docs
web_search: complementary retrieval from TavilySearch
run_full_pipeline: executes the full flow end-to-end
Classes¶
Shared state for the pipeline's LangGraph workflow. |
|
LangGraph-based pipeline for multilingual, legal RAG. |
Functions¶
|
Load a LlamaIndex vector index with a specified embedding model. |
Load a Cohere client and finetuned reranker model. |
|
|
Initialize and return all domain-specific retrievers. |
Module Contents¶
- load_vector_index(top_k: int, persist_dir: str, embedding)[source]¶
Load a LlamaIndex vector index with a specified embedding model.
- Parameters:
top_k (int) -- Max number of docs to retrieve.
persist_dir (str) -- Directory containing the persisted index.
embedding (HuggingFaceEmbeddings) -- Embedding model to use for retrieval.
- Returns:
A retriever configured for hybrid search with similarity top_k.
- Return type:
VectorIndexRetriever
- load_reranker_model()[source]¶
Load a Cohere client and finetuned reranker model.
- Returns:
(Cohere client, finetuned model reference)
- Return type:
tuple
- initialize_indexes(top_k: int)[source]¶
Initialize and return all domain-specific retrievers.
Domains:
Phishing
Law Cases (Recall & Precision)
Greek Penal Code (Recall & Precision)
GDPR (Recall & Precision)
- Return type:
dict[str, VectorIndexRetriever]
- class LLM_Pipeline(index_mapping, reranker_model, cohere_client=None)[source]¶
LangGraph-based pipeline for multilingual, legal RAG.
Steps¶
starting_prompt: check if query is legal, safe, in-domain.
query_translation: translate non-English queries to English.
query_rewriting: produce paraphrases to improve recall.
query_classification: assign categories (GDPR, GPC, Phishing, Cases).
retrieving_docs: fetch from indexes, rerank with CrossEncoder or Cohere.
get_context: summarize retrieved docs into coherent context.
web_search: augment with TavilySearch.
run_full_pipeline: execute all steps and return final structured response.
- reranker_model[source]¶
Model used to rerank retrieved documents.
- Type:
CrossEncoder | GetFinetunedModelResponse
- retrieving_docs(query: str, index_mapping: dict[str, llama_index.core.retrievers.VectorIndexRetriever], indexes: List[llama_index.core.retrievers.VectorIndexRetriever], reranker_model: sentence_transformers.CrossEncoder | cohere.finetuning.finetuning.types.get_finetuned_model_response.GetFinetunedModelResponse, cohere_client: cohere.client_v2.ClientV2 | None)[source]¶
Retrieve and rerank documents for a given query.
- Parameters:
query (str) -- The user query (in English, after translation).
index_mapping (dict[str, VectorIndexRetriever]) -- Mapping of index keys to retrievers.
indexes (list[str]) -- Keys from index_mapping specifying which retrievers to query.
reranker_model (CrossEncoder | GetFinetunedModelResponse) -- Model used to rerank results. Supports SentenceTransformer CrossEncoder or a Cohere finetuned model reference.
cohere_client (cohere.ClientV2 | None) -- Cohere client, required if reranker_model is Cohere.
- Returns:
List of [doc_text, metadata, score] for top reranked documents.
- Return type:
list[list]
- starting_prompt(query: str)[source]¶
Classify a query as legal, non-legal, or unsafe.
- Parameters:
query (str) -- The raw user query.
- Returns:
[bool, str | None] indicating if legal and optional message.
- Return type:
list
Notes
If legal: returns [True, None]
If non-legal: returns [False, "<short helpful answer + reminder>"]
If unsafe (medical, financial, illegal, etc.): returns [False, "I'm a legal assistant. I cannot answer unsafe..."]
- query_translation(query: str)[source]¶
Translate query into English if necessary.
- Parameters:
query (str) -- User query in any supported language.
- Returns:
(language_name, translated_query)
- Return type:
tuple
- web_search(query: str)[source]¶
Perform a web search via TavilySearch and summarize results.
- Parameters:
query (str) -- User query (in English).
- Returns:
{'search_results': str} summarizing retrieved context.
- Return type:
dict
- rag_pipeline(query: str)[source]¶
Run the LangGraph workflow (query rewriting -> classification -> retrieval -> context).
- Parameters:
query (str) -- User query in English.
- Returns:
{'query': query,'summarized_context': str}
- Return type:
dict
- query_rewriting(state)[source]¶
Generate two paraphrased variations of the query.
- Parameters:
state (dict) -- Workflow state containing 'user_query'.
- Returns:
{'questions': {0: original, 1: rewrite1, 2: rewrite2}}
- Return type:
dict
- run_classifications_parallel(state)[source]¶
Classify original query + rewrites in parallel.
- Parameters:
state (dict) -- Workflow state containing 'questions'.
- Returns:
{'query_classification': {0: [...], 1: [...], 2: [...]}}
- Return type:
dict
- query_classification(state, level: int)[source]¶
Classify a query into one or more legal categories.
- Parameters:
state (dict) -- Workflow state containing 'questions'.
level (int) -- Which query variant to classify.
- Returns:
{'query_classification': {level: [query_text, index_keys]}}
- Return type:
dict
- run_retrievals_parallel(state)[source]¶
Retrieve documents for each query variant in parallel.
- Parameters:
state (dict) -- Workflow state with classifications.
- Returns:
{'retrieved_docs': {0: [...], 1: [...], 2: [...]}}
- Return type:
dict
- retrieve_docs(state, level: int)[source]¶
Retrieve docs for a single query variant.
- Parameters:
state (dict) -- Workflow state with classifications.
level (int) -- Which query variant to process.
- Returns:
List of [doc_text, metadata, score] or None if no indexes found.
- Return type:
list | None
- get_context(state)[source]¶
Summarize retrieved documents into a coherent context.
- Parameters:
state (dict) -- Workflow state containing 'retrieved_docs' and 'questions'.
- Returns:
{'summarized_context': str} – the aggregated context string.
- Return type:
dict
Notes
Iterates through retrieval results for each query variant (0,1,2).
Summarizes each batch of retrieved documents via the LLM.
Concatenates summaries into a single combined context.
- get_search_results(query: str)[source]¶
Perform Tavily web search and summarize results.
- Parameters:
query (str) -- User query in English.
- Returns:
{'search_results': str} – summarized context from Tavily results.
- Return type:
dict
- initialize_workflow()[source]¶
Build the LangGraph workflow that wires together pipeline nodes.
Nodes¶
query_rewriting → generate paraphrases
parallel_classification → classify all query variations
parallel_retrieval → retrieve docs for classified categories
get_context → summarize retrieved docs
Edges¶
query_rewriting → parallel_classification parallel_classification → parallel_retrieval parallel_retrieval → get_context
- returns:
Compiled LangGraph app with MemorySaver checkpointing.
- rtype:
StateGraph
- run_full_pipeline(query: str)[source]¶
Execute the full RAG workflow for a user query.
Steps¶
starting_prompt → checks safety & domain.
query_translation → ensures English query.
Run web_search + rag_pipeline in parallel.
Aggregate results: legal context + web results.
- param query:
Raw user query (any language).
- type query:
str
- returns:
- If query is legal:
{ "query": translated_query, "summarized_context": str, "search_results": str, "language": str }
- If query is non-legal or unsafe:
str – helpful or refusal message.
- rtype:
dict | str