Retrieval Augmented Thinking¶
A middleware solution to improve routing accuracy for manager agents that contain "difficult-to-describe" collaborator agents, such as those implement RAG. Retrieval-Augmented Thinking is made up of both a build-time component (extracting topics of expertise) and a run-time component (Retrieval Agumented Thinking) to improve collaborator selection.
Table of Contents¶
Overview¶
Retrieval-Augmented Thinking is a technique that uses dynamic instruction at runtime to give hints to the agent regarding which collaborator(s) might be best to consult to answer a user query. This approach has proven highly effective for improving collaborator invocations for agents that have broad areas of expertise, such as RAG-based agents.
How it Works¶
Retrieval-Augmented Thinking operates in two steps
-
Build time During agent build, content (documents, etc) for each route must be made available to a build time topic extraction processor, which extracts topics of expertise and stores them in a vector store for later retrieval. Given that documents are often stored in vector stores for RAG implementations, we provide a sample implementation of processing content directly from a ChromaDB vector store.
-
Runtime Simply select a RAT-enabled WxO Style for your agent to leverage Retrieval-Augmented thinking in production. When a query comes in, the vector store that has been populated with "Topics of Expertise" is searched, and the most relevant collaborator expertise is summarized and dynamically injected into the thinking prompt. This provides more information to the AI Agent to make good routing choices and produce the best possible plan.
Architecture

Sample Results¶
We evaluated Retrieve-And-Think using an AskHR dataset, where a ReACT-based manager agent routes to 7 collaborator agents, each of which is backed by RAG. We then evaluate two key metrics: (1) % of time the correct agent is called first, and (2) % of time the correct collaborator agent is called ever. We compare with and without Retrieval-Augmented Thinking
llama-3-3-70b¶
| Style | Correct Agent Called First | Correct Agent Called Ever |
|---|---|---|
| React | 56% | 67% |
| React with RAT | 84% | 86% |
granite-3-2-8b¶
| Style | Correct Agent Called First | Correct Agent Called Ever |
|---|---|---|
| React | 49% | 54% |
| React with RAT | 79% | 84% |
Getting Started¶
Go to our GitHub repo and get the code running by following the instructions in the README.
Topic Extraction¶
The TopicExtractionMiddleware is a build time component that extracts a list of topics from documents using a TopicExtractor and stores them in a TopicSink. A topic is represented by the TopicInfo object that has the following properties:
topic: this is a string with the extracted topic from the documentexpertise: this is an optional field that has the level of expertise the document has on the topic. The different levels of expertise are, from high to low:expert,knowledge, andmentions.subject: this is a string representing the "entity" owning the documents and for which we want to know which topics it has knowledge on. It's tipically an agent name or an agent tool name.metadata: a dictionary with key value pairs that can be used to store metadata associated to the topic.
Topics can be extracted with the following topic extractors:
- LLM. Uses an LLM to extract the topics from the documents and optionally set the level of expertise that the document has on each topic.
- TFIDF words. Uses Term Frequency (TF) and Inverse Document Frequency (IDF) to identify the most relevant words in a document to extract the topics.
- BERTopic. Uses the BERTopic library.
The extracted topics can be stored in Milvus and Chroma DB using the following topic sinks:
altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider.ChromaDBProvider. Stores the topics in a ChromaDB instancealtk.routing_toolkit.retrieval_augmented_thinking.chroma.topic_sink.MilvusProvider. Stores the topics in a MilvusDB instance
LLM Topic Extractor¶
The LLM Topic Extractor gets the documents from which to extract the topics from a ContentProvider. Currently there's only one ContentProvider implementation that gets the documents from a ChromaDB instance.
The LLM Topic Extractor uses the ALTK's LLM Library to call the LLM.
In the following example the LLM Topic Extractor gets documents from a ChromaDB, as specified in the Content Provider configuration under the content_provider field and stores the extracted topics in another Chroma DB instance specified in the Topic Sink configuration under the topics_sink field.
import os
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider import ChromaDBProvider
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import TopicExtractionBuildOutput
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.llm import LLMTopicExtractor, LLMTopicExtractorOutput
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.topic_extractor import TopicExtractionMiddleware
from altk.core.llm.base import get_llm
from altk.core.toolkit import AgentPhase
WatsonXAIClient = get_llm("watsonx")
llm_client = WatsonXAIClient(
model_id="ibm/granite-3-3-8b-instruct",
api_key=os.getenv("WX_API_KEY"),
project_id=os.getenv("WX_PROJECT_ID"),
url=os.getenv("WX_URL"),
)
topic_extractor = LLMTopicExtractor(
content_provider=ChromaDBProvider(
collection="sound",
db_path="/path/to/chroma/db/with/source/documents",
n_docs=5,
),
llm_client=llm_client,
)
topic_extractor_extraction = TopicExtractionMiddleware(
subject="sound",
topic_extractor=topic_extractor,
topics_sink=ChromaDBProvider(
collection="sound.topics",
db_path="/path/to/chroma/db/with/extracted/topics",
),
)
topic_extraction_output: TopicExtractionBuildOutput = topic_extractor_extraction.process(data=None, phase=AgentPhase.BUILDTIME)
print(f"{topic_extraction_output.error=}")
print(f"{topic_extraction_output.topics}")
BERTopic Topic Extractor¶
As this type of topic extractor does not use an externally hosted model there's no need to create an LLM client as we did for the LLM Topic Extractor.
In the following example the extracted topics are stored in the collection sound.topics in a local ChromaDB instance whose configuration is specified in the Topic Sink configuration under the topics_sink key. There's no need to specify a Content Provider configuration since the BERTTopic extractor receives the documents directly as an input parameter.
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider import (
ChromaDBProvider,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicExtractionInput,
TopicExtractionBuildOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.bertopic import (
BERTopicTopicExtractor,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.topic_extractor import (
TopicExtractionMiddleware,
)
from altk.core.toolkit import AgentPhase
topic_extractor = TopicExtractionMiddleware(
subject="sound",
topic_extractor=BERTopicTopicExtractor.from_settings(
{
"nr_topics": 1000,
"count_vectorizer_settings": {
"ngram_range": (3, 5),
"stop_words": "english",
},
}
),
topics_sink=ChromaDBProvider(
collection="sound.topics",
db_path="/path/to/chroma/db/with/extracted/topics",
),
)
topic_extraction_output: TopicExtractionBuildOutput = topic_extractor.process(
data=TopicExtractionInput(documents=["doc_1", "doc_2", "doc_3", "..."]),
phase=AgentPhase.BUILDTIME,
)
print(f"{topic_extraction_output.error=}")
print(f"{topic_extraction_output.topics}")
TFIDF words Topic Extractor¶
In this example an TFIDFWordsTopicExtractor is used that is instantiated in a similar way than the BERTopicTopicExtractor.
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider import (
ChromaDBProvider,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicExtractionInput,
TopicExtractionBuildOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.tfidf_words import (
TFIDFWordsTopicExtractor,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.topic_extractor import (
TopicExtractionMiddleware,
)
from altk.core.toolkit import AgentPhase
topic_extractor = TopicExtractionMiddleware(
subject="sound",
topic_extractor=TFIDFWordsTopicExtractor.from_settings(
{
"top_words": 50,
"count_vectorizer_settings": {
"ngram_range": (3, 5),
"stop_words": "english",
},
"tfidf_vectorizer_settings": {
"max_df": 0.85,
"min_df": 2,
},
}
),
topics_sink=ChromaDBProvider(
collection="sound.topics",
db_path="/path/to/chroma/db/with/extracted/topics",
),
)
topic_extraction_output: TopicExtractionBuildOutput = topic_extractor.process(
data=TopicExtractionInput(documents=["doc_1", "doc_2", "doc_3", "..."]),
phase=AgentPhase.BUILDTIME,
)
print(f"{topic_extraction_output.error=}")
print(f"{topic_extraction_output.topics}")
Runnable Topic Extraction examples¶
You need to run the following scripts from the root of the ALTK repository.
# LLM topic extraction example that gets documents and stores extracted topics in a Chroma DB
export WX_API_KEY=...
export WX_PROJECT_ID=...
export WX_URL=https://us-south.ml.cloud.ibm.com⁄
python altk/pre_llm/routing/retrieval_augmented_thinking/examples/run_llm_topic_extraction.py
# LLM topic extraction example that gets documents from a Chroma DB and stores extracted topics in a Milvus vector store
python altk/pre_llm/routing/retrieval_augmented_thinking/examples/run_llm_topic_extraction_milvus_sink.py
# BERTopic topic extraction example
python altk/pre_llm/routing/retrieval_augmented_thinking/examples/run_berttopic_topic_extraction.py
# TFIDF words topic extraction example
python altk/pre_llm/routing/retrieval_augmented_thinking/examples/run_tfidf_words_topic_extraction.py
Topic Extraction cli¶
The component provides the cli altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.cli that allows to:
- extract topics using an LLM topic extractor from a set of collections stored in a local Chroma DB
- store the extracted topics in a local Chroma DB
A json file with the topic extraction settings must be provided.
In the following example:
python -m altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.cli --topic_extractor llm --topic_extraction_config topic_extraction.json
topic_extraction.json file:
{
"model_id": "ibm/granite-3-3-8b-instruct",
"provider": "watsonx",
"levels_of_expertise": true,
"content_provider": {
"name": "altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider.ChromaDBProvider",
"config": {
"collections": [
"tool_1",
"tool_2",
"tool_3"
],
"instance": {
"db_path": "/path/to/chroma"
}
}
},
"topics_sink": {
"name": "altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider.ChromaDBProvider",
"config": {
"collection": "topics",
"instance": {
"db_path": "/path/to/chroma"
}
}
}
}
The topic extractor component will extract topics from collections that have documents about the expertise of tool_1, tool_2 and tool_3, that are stored in the local chroma db in /path/to/chroma. The topics will be extracted using the LLM ibm/granite-3-3-8b-instruct and the level of expertise will also be set for each topic. The extracted topics are stored in the topics collection in the local chroma db in /path/to/chroma.
The following env vars are required by the ALTK LLM Client Library to use the the watsonx LLM provider:
Built-in Content Providers¶
ChromaDBProvider¶
A ContentProvider that gets the content chunks from a ChromaDB vector store. It requires the following parameters:
collection_name. The name of the ChromaDB collection containing the documents returned in the iterator.dest_collection_name. You can use the sameChromaDBProviderinstance as aContentProviderandTopicsSink, in which case this parameter indicates the collection where the extracted topics will be saved.db_path. For a local ChromaDB instance, this is the directory containing the ChromaDB files.host,port. The host and port of a remote ChromaDB instance.client. An already instantiated ChromaDB client that is an instance ofClientAPIcan be provided instead. This is mostly used for testing.n_docs. Number of documents that are processed. If not specified, all the documents are processed.
Custom Content Provider implementation¶
To create a custom content provider follow these steps:
1. Define a Pydantic model that serves as a config object for the custom content provider implementation, e.g. MyContentProviderSettings. You can refer to the ChromaDBProviderSettings object as an example.
2. Create the class that implements the ContentProvider protocol. The class must have a method def get_content(self) -> Iterator[str] that provides a stream of content chunks.
3. The class must also have a class method named create_content_provider that receives the config object and creates an instance of the topic sink
@classmethod
def create_content_provider(cls, settings: MyContentProviderSettings | Dict) -> "MyContentProvider":
@TopicExtractionMiddleware.register_content_provider() decorator:
Built-in Topics Sinks¶
ChromaDBProvider¶
A ChromaDBProvider object can also be used as a TopicsSink. If the same ChromaDB instance is used as the ContentProvider and the TopicsSink make sure that the ChromaDB collection used for the TopicsSink is different than the one used for the ContentProvider. If you're using the same ChromaDBProvider object then the collection used for the TopicsSink must be specified in the dest_collection_name parameter.
MilvusProvider¶
TopicsSink implementation to store the topics in a Milvus vector store. It supports the following settings:
milvus_uri: Milvus instante URImilvus_token: Milvus tokenmilvus_db: Milvus databasecollection_name: collection name where the topics will be storedfull_text_search_config: if specified the topics will be stored so they can be queried using Milvus full text search. The following settings can be specified:topic_field_max_length: length of the field that stores the "topic" textsubject_field_max_length: length of the field that stores the "subject" textsparse_index_params: dictionary with the sparse index parameters. You can look at the default config object to see what settings can be set.ann_search_config: if specified the topics will be stored so they can be queried using the usual semantic search. The following settings can be specified:topic_field_max_length: length of the field that stores the "topic" textsubject_field_max_length: length of the field that stores the "subject" textembedding_function_provider: the embedding function implementation. It can be "default" that uses the default embedding function provided by Milvus (dense.onnx.OnnxEmbeddingFunction) or "sentence_transformer" that uses a Sentence Transformer model.embedding_function_config: a dictionary with the embedding function parametersembedding_function: instead of passing the embedding function implementation and config object you can pass an already instantiated embedding function in this parameter, likemodel.dense.SentenceTransformerEmbeddingFunction().vector_dimensions: Number of dimensions stored in the vector field. By default it uses the number of dimensions returned by the embedding function.metric_type: metric used during search. By default it usesCOSINE.
Custom Topics Sink implementation¶
To create a custom topic sink follow these steps:
1. Define a Pydantic model that serves as a config object for the custom Topic Sink implementation. You can refer to the ChromaDBProviderSettings object as an example.
2. Create the class that implements the TopicSink protocol. The class must implement the method def add_topics(self, topics: List[TopicInfo]) that stores the TopicInfo objects in whatever storage mechanism your custom implementation has.
3. The class must have a class method named create_topics_sink that receives the config object and creates an instance of the topic sink
@TopicExtractionMiddleware.register_topics_sink() decorator:
Creating a Topic Extraction component from settings¶
settings.json:
{
"subject": "sound",
"topic_extractor": {
"name": "llm",
"config": {
"model_id": "ibm/granite-3-3-8b-instruct",
"provider": "watsonx",
"content_provider": {
"name": "altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider.ChromaDBProvider",
"config": {
"collection": "sound",
"dest_collection": "sound.topics",
"instance": {
"db_path": "/path/to/chroma/with/source/documents"
}
}
}
}
},
"topics_sink": {
"name": "chromadb",
"config": {
"collection": "sound_topics",
"instance": {
"db_path": "/path/to/chroma/with/topics"
},
"embedding_function_provider": "sentence_transformer",
"embedding_function_config": {
"model_name": "all-MiniLM-L6-v2"
}
}
}
}
topic_extractor = TopicExtractionMiddleware.from_settings(json.loads(Path("settings.json").read_text()))
topic_extraction_output: TopicExtractionBuildOutput = topic_extractor.process(data=None, phase=AgentPhase.BUILDTIME)
WX_API_KEY, WX_PROJECT_ID and WX_URL respectively.
Topic Loading¶
If you alredy have generated topics and don't need to extract them using a Topic Extractor, they can be loaded directly into the vector store using the TopicLoadingMiddleware component.
We'll explain the topic loading using the following example:
from sentence_transformers import SentenceTransformer
from altk.core.toolkit import AgentPhase
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import EmbeddedTopic, TopicInfo, TopicLoadingInput
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_loading.topic_loading import TopicLoadingMiddleware
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.chromadb import ChromaDBProvider
from chromadb.utils import embedding_functions
embedding_model_name = "ibm-granite/granite-embedding-107m-multilingual"
embedding_model = SentenceTransformer(embedding_model_name)
topics = [
EmbeddedTopic(
topic=TopicInfo(
topic="Job tasks",
subject="job_offering",
metadata={"env": "dark_lunch"},
),
embeddings=embedding_model.encode("Job tasks").tolist(),
),
EmbeddedTopic(
topic=TopicInfo(
topic="Job skills",
subject="job_offering",
metadata={"env": "testing"},
),
embeddings=embedding_model.encode("Job skills").tolist(),
),
]
topic_loading = TopicLoadingMiddleware(
topics_sink=ChromaDBProvider(collection="topics", db_path="/path/to/local/chromadb")
)
topic_loading.process(
data=TopicLoadingInput(topics=topics),
phase=AgentPhase.BUILDTIME
)
- The preembedded topics are provided using the
EmbeddedTopicobject that contains a topic (represented by theTopicInfoobject) along with its embeddings - The preembedded topics are loaded into a vector store using a persistent ChromaDB
TopicsSinkwhose files reside in the folder/path/to/local/chromadb - Topics can have an optional dictionary containing metadata fields that can be used to filter the topics during topic retrieval.
- During topic loader, you can optionally configure an
embedding_functionfor adding pre-embedded topics. Embedding functions can be imported from chromaDB, and will default toall-MiniLM-L6-v2if excluded.
Alternatively instead of calculating the embeddings beforehand, the embedding function can be specified in the TopicsSink:
from altk.core.toolkit import AgentPhase
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import EmbeddedTopic, TopicInfo, TopicLoadingInput
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_loading.topic_loading import TopicLoadingMiddleware
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_extractor.chromadb import ChromaDBProvider
from chromadb.utils import embedding_functions
topics = [
TopicInfo(
topic="Job tasks",
subject="job_offering",
metadata={"env": "dark_lunch"},
),
TopicInfo(
topic="Job skills",
subject="job_offering",
metadata={"env": "testing"},
),
]
topic_loading = TopicLoadingMiddleware(
topics_sink=ChromaDBProvider(
collection="topics",
db_path=tmpdir,
embedding_function=SentenceTransformerEmbeddingFunction(
"ibm-granite/granite-embedding-107m-multilingual"
),
)
)
topic_loading.process(
data=TopicLoadingInput(topics=topics),
phase=AgentPhase.BUILDTIME
)
Creating a Topic Loading component from settings¶
A Topic Loading component is essentially a Topic Sink where to store the loaded topics.
settings.json:
{
"topics_sink": {
"name": "chromadb",
"config": {
"collection": "sound_topics",
"instance": {
"db_path": "/path/to/chroma/with/topics"
},
"embedding_function_provider": "sentence_transformer",
"embedding_function_config": {
"model_name": "all-MiniLM-L6-v2"
}
}
}
}
topic_loading = TopicLoadingMiddleware.from_settings(Path("settings.json").read_text())
topics = [...]
topic_loading.process(data=TopicLoadingInput(topics=topics), phase=AgentPhase.BUILDTIME)
Topic Retrieval¶
The topic retriever is a run time component that retrieves the topics from different vector stores using a TopicRetriever object. The retrieved topics can be used to create hints on which agent collaborator or tool to call for the given user query. The hints is usually a text fragment that is injected into the agent's prompt.
In the following example a ChromaDBTopicRetriever will do a semantic search on the topics stored in the topics collection for the query "What are my job responsibilities?".
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_retriever import (
ChromaDBTopicRetriever,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicRetrievalRunInput,
TopicRetrievalRunOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_retriever.topic_retriever import (
TopicRetrievalMiddleware,
)
from altk.core.toolkit import AgentPhase
from langchain_core.messages import HumanMessage
topic_retriever = TopicRetrievalMiddleware(
topic_retriever=ChromaDBTopicRetriever(
collection="topics", db_path="/path/to/local/chromadb"
)
)
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "What are my job responsibilities?"}]
),
phase=AgentPhase.RUNTIME,
)
print(f"{topic_retriever_ouput.topics}")
In the next example a MilvusTopicRetriever is used to retrieve topics from a Milvus vector store.
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicRetrievalRunInput,
TopicRetrievalRunOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_retriever import (
MilvusTopicRetriever,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_retriever.topic_retriever import (
TopicRetrievalMiddleware,
)
from altk.core.toolkit import AgentPhase
from langchain_core.messages import HumanMessage
topic_retriever = TopicRetrievalMiddleware(
topic_retriever=MilvusTopicRetriever(
collection_name="topics", milvus_uri="/path/to/local/milvus/files"
)
)
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "What are my job responsibilities?"}]
),
phase=AgentPhase.RUNTIME,
)
print(f"{topic_retriever_ouput.topics}")
ann_search_config parameter:
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicRetrievalRunInput,
TopicRetrievalRunOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.common import (
AnnSearchConfig,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_retriever import (
MilvusTopicRetriever,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_retriever.topic_retriever import (
TopicRetrievalMiddleware,
)
from altk.core.toolkit import AgentPhase
from langchain_core.messages import HumanMessage
from pymilvus import model
topic_retriever = TopicRetrievalMiddleware(
topic_retriever=MilvusTopicRetriever(
collection_name="topics",
milvus_uri="/path/to/local/milvus/files",
ann_search_config=AnnSearchConfig(
embedding_function=model.dense.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2", device="cpu"
),
),
)
)
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "What are my job responsibilities?"}]
),
phase=AgentPhase.RUNTIME,
)
print(f"{topic_retriever_ouput.topics}")
The following is a prompt fragment with tool hints using the retrieved topics that can be injected into the agent's prompt:
hints = "\n".join(
{
retrieved_topic.topic.subject + ": " + retrieved_topic.topic.topic
for retrieved_topic in topic_retriever_ouput.topics
}
)
hints_prompt_fragment = f"Here are some hints to use when thinking about which tool might be best to help:\n{hints}"
Topic Filtering¶
When using the Topic Retriever, topics can be filtered by their subject and expertise fields and also by their metadata fields. The filter expression is a dictionary with keyworded arguments that are passed directly to the underlying topic retriever implementation. For a ChromaDB Topic Retiever those kwargs are passed directly to the ChromaDB collection.query method. For a Milvus Topic Retriever the kwargs are passed to the client.search method.
In the following example ChromaDB is used first to load the topics and then to retrieve them. The topics have metadata fields representing an hypothetical software development environment where they can be used. During topic retrieval a ChromaDB metadata filtering expression is used to get the topics that can be used in any of the environments ["live", "dark_lunch"].
from sentence_transformers import SentenceTransformer
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_retriever import ChromaDBTopicRetriever
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_sink_content_provider import ChromaDBProvider
from altk.core.toolkit import AgentPhase
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
EmbeddedTopic,
TopicInfo,
TopicLoadingInput,
TopicRetrievalRunInput,
TopicRetrievalRunOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_loading.topic_loading import TopicLoadingMiddleware
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_retriever.topic_retriever import TopicRetrievalMiddleware
from chromadb import PersistentClient
embedding_model_name = "ibm-granite/granite-embedding-107m-multilingual"
embedding_model = SentenceTransformer(embedding_model_name)
topics = [
EmbeddedTopic(
topic=TopicInfo(
topic="Job tasks",
subject="job_offering",
metadata={"env_1": "live", "env_2": "dark_lunch"},
),
embeddings=embedding_model.encode("Job tasks").tolist(),
),
EmbeddedTopic(
topic=TopicInfo(
topic="Job responsibilities",
subject="job_offering",
metadata={"env_1": "staging", "env_2": "testing"},
),
embeddings=embedding_model.encode("Job responsibilities").tolist(),
),
]
topic_loading = TopicLoadingMiddleware(
topics_sink=ChromaDBProvider(collection="topics", db_path="/path/to/local/chromadb")
)
topic_loading.process(
data=TopicLoadingInput(topics=topics), phase=AgentPhase.BUILDTIME
)
topic_retriever = TopicRetrievalMiddleware(
topic_retriever=ChromaDBTopicRetriever(
collection="topics", chroma_client=PersistentClient(path="/path/to/local/chromadb")
)
)
# Get topics having metadata fields `env_1` or `env_2` containing any of the values ["live", "dark_lunch"]
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "What are my job responsibilities?"}],
query_kwargs={
"where": {
"$or": [
{"env_1": {"$in": ["live", "dark_lunch"]}},
{"env_2": {"$in": ["live", "dark_lunch"]}},
]
}
},
),
phase=AgentPhase.RUNTIME,
)
print(topic_retriever_ouput.topics)
In the following example the topics are loaded and retrieved from a Milvus vector store. During retrieval Milvus first applies the filter condition and then does the full text search on the topics that match the condition:
from altk.pre_llm.routing.retrieval_augmented_thinking.core.toolkit import (
TopicInfo,
TopicLoadingInput,
TopicRetrievalRunInput,
TopicRetrievalRunOutput,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_retriever import (
MilvusTopicRetriever,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_sink import (
MilvusProvider,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_loading.topic_loading import (
TopicLoadingMiddleware,
)
from altk.pre_llm.routing.retrieval_augmented_thinking.topic_retriever.topic_retriever import (
TopicRetrievalMiddleware,
)
from altk.core.toolkit import AgentPhase
from langchain_core.messages import HumanMessage
topics = [
TopicInfo(
topic="Job 1",
subject="HR Job Area",
metadata={"env": "testing", "prop_1": 1},
),
TopicInfo(
topic="Job 2",
subject="HR Job Area",
metadata={"env": "prod", "prop_1": 2},
),
]
topic_loading = TopicLoadingMiddleware(
topics_sink=MilvusProvider(
collection_name="topics", milvus_uri="/path/to/local/milvus/files"
)
)
topic_loading.process(data=TopicLoadingInput(topics=topics), phase=AgentPhase.BUILDTIME)
topic_retriever = TopicRetrievalMiddleware(
topic_retriever=MilvusTopicRetriever(
milvus_uri="/path/to/local/milvus/files", metadata_fields=["env", "prop_1"]
)
)
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "Get job positions"}],
query_kwargs={"filter": 'env == "prod"'},
),
phase=AgentPhase.RUNTIME,
)
Built-in Topics Retrievers¶
ChromaDBTopicRetriever¶
Retrieves topics from a ChromaDB instance. To use a persistent ChromaDB that runs in the same process where your code runs:
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_retriever import (
ChromaDBTopicRetriever,
)
topic_retriever = ChromaDBTopicRetriever(
collection="topics", db_path="/path/to/local/chromadb"
)
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_retriever import (
ChromaDBTopicRetriever,
)
topic_retriever = ChromaDBTopicRetriever(
collection="topics", host="localhost", port=8000
)
from altk.pre_llm.routing.retrieval_augmented_thinking.chroma.topic_retriever import (
ChromaDBTopicRetriever,
)
from chromadb import EphemeralClient
client = EphemeralClient()
topic_retriever = ChromaDBTopicRetriever(collection="topics", client=client)
MilvusTopicRetriever¶
Retrieves topics objects from a Milvus vector store.
To use a Milvus Lite instance:
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_retriever import (
MilvusTopicRetriever,
)
topic_retriever = MilvusTopicRetriever(
"/path/to/milvus/files", collection_name="topics"
)
from altk.pre_llm.routing.retrieval_augmented_thinking.milvus.topic_retriever import (
MilvusTopicRetriever,
)
topic_retriever = MilvusTopicRetriever(
"http://localhost:19530", collection_name="topics"
)
Custom Topic Retriever implementation¶
To create a custom Topic Retriever follow these steps:
- Define a Pydantic model that serves as a config object, you can refer to the
MilvusTopicRetrieverConfigobject as an example - The class must have a class method that receives the config object and creates an instance of the topic retriever
- The class must implement the
TopicRetrieverprotocol that requires a methoddef get_topics(self, query: str, n_results: int = 10) -> List[TopicInfo]that returns the topics for a user utterance - Register the implementation with a name, decorating the class like in the following example, where the implementation is registered under the name
my_topic_retriever:
Creating a Topic Retrieval component from settings¶
settings.json:
{
"name": "chromadb",
"config": {
"collection": "sound_topics",
"instance": {
"db_path": "/path/to/chroma/with/topics"
}
}
}
topic_retriever = TopicRetrievalMiddleware.from_settings(Path("settings.json").read_text())
topic_retriever_ouput: TopicRetrievalRunOutput = topic_retriever.process(
data=TopicRetrievalRunInput(
messages=[{"role": "user", "content": "Get job positions"}],
query_kwargs={"filter": 'env == "prod"'},
),
phase=AgentPhase.RUNTIME,
)