Chat Memory with Callbacks ====================== This example demonstrates how to create an agent with memory by using callbacks to track conversation history and retrieve past messages. Building a Memory System --------------------- First, let's create a memory system to store and retrieve conversation history: .. code-block:: python from brain.agents.tool import tool, Output from brain.agents.callback import callback from pydantic import BaseModel from typing import List, Optional import json from datetime import datetime import os # Define models for our memory system class MemoryEntry(BaseModel): role: str content: str timestamp: str class MemorySearchResult(Output): entries: List[MemoryEntry] total_found: int # Create a simple file-based memory store class ConversationMemory: def __init__(self, memory_file="conversation_memory.json"): self.memory_file = memory_file self.entries: List[MemoryEntry] = [] self._load_memory() def _load_memory(self): """Load memory from file if it exists""" if os.path.exists(self.memory_file): try: with open(self.memory_file, "r") as f: data = json.load(f) self.entries = [MemoryEntry(**entry) for entry in data] except Exception as e: print(f"Error loading memory: {e}") def _save_memory(self): """Save memory to file""" with open(self.memory_file, "w") as f: json.dump([entry.dict() for entry in self.entries], f, indent=2) def add_entry(self, role: str, content: str): """Add a new memory entry""" entry = MemoryEntry( role=role, content=content, timestamp=datetime.now().isoformat() ) self.entries.append(entry) self._save_memory() def search(self, query: str, limit: int = 5) -> List[MemoryEntry]: """Search memory for entries containing the query""" results = [] # Very simple search implementation for entry in reversed(self.entries): if query.lower() in entry.content.lower(): results.append(entry) if len(results) >= limit: break return results def get_recent(self, count: int = 5) -> List[MemoryEntry]: """Get the most recent entries""" return list(reversed(self.entries[-count:])) # Create a global instance of our memory system memory = ConversationMemory() Creating Memory-Related Tools -------------------------- Now, let's define tools that the agent can use to interact with memory: .. code-block:: python class MemoryQuery(BaseModel): query: str limit: int = 5 @tool() def search_memory(input: MemoryQuery) -> MemorySearchResult: """ Search previous conversation history for relevant information Args: input: A MemoryQuery object with the search query and optional limit Returns: MemorySearchResult with matching entries """ results = memory.search(input.query, input.limit) return MemorySearchResult( entries=results, total_found=len(results) ) @tool() def get_recent_messages(limit: int = 5) -> MemorySearchResult: """ Retrieve the most recent messages from the conversation Args: limit: Maximum number of messages to retrieve Returns: MemorySearchResult with the most recent messages """ results = memory.get_recent(limit) return MemorySearchResult( entries=results, total_found=len(results) ) Setting Up Memory Callbacks ------------------------ Next, let's create callbacks to automatically track the conversation: .. code-block:: python # Create callbacks to track all messages @callback("message.*") async def track_conversation_history(agent, event, data): """Add all messages to conversation history""" # Skip storing tool response messages if hasattr(data, "tool_call_id"): return # Add the message to memory memory.add_entry(data.role, data.content) Creating the Agent with Memory --------------------------- Now, let's create an agent that uses our memory system: .. code-block:: python import asyncio import os from brain.agents.agent import Agent from brain.agents.llm.openai import OpenAIBaseLLM from brain.agents.callback import callback @callback("message_stream.assistant") async def stream_to_console(agent, event, stream): print("\nAssistant: ", end="", flush=True) async for chunk in stream: if hasattr(chunk, "chunk"): print(chunk.chunk, end="", flush=True) async def main(): # Initialize the LLM llm = OpenAIBaseLLM( api_key=os.environ.get("OPENAI_API_KEY"), default_model="gpt-4o" ) # Create the agent with memory tools and the memory callback agent = Agent( llm=llm, tools=[search_memory, get_recent_messages], instructions=""" You are a helpful assistant with memory. You can remember previous parts of the conversation and reference them when appropriate. You have access to the following memory tools: - search_memory: Search for specific information in past conversations - get_recent_messages: Get the most recent messages from the conversation Use these tools when the user refers to something mentioned earlier or when you need to check past information to provide a consistent response. """, callbacks=[ track_conversation_history, stream_to_console ] ) # Run a conversation loop print("Memory Assistant (type 'exit' to quit)") while True: # Get user input user_input = input("\nYou: ") if user_input.lower() == "exit": break # Process with the agent await agent.run(user_input) print() # Add a newline after the response if __name__ == "__main__": asyncio.run(main()) Complete Example -------------- Here's the complete example with all components integrated: .. code-block:: python import asyncio import json import os from datetime import datetime from typing import List, Optional from brain.agents.agent import Agent from brain.agents.callback import callback from brain.agents.llm.openai import OpenAIBaseLLM from brain.agents.tool import tool, Output from pydantic import BaseModel # Define models for our memory system class MemoryEntry(BaseModel): role: str content: str timestamp: str class MemorySearchResult(Output): entries: List[MemoryEntry] total_found: int # Create a simple file-based memory store class ConversationMemory: def __init__(self, memory_file="conversation_memory.json"): self.memory_file = memory_file self.entries: List[MemoryEntry] = [] self._load_memory() def _load_memory(self): """Load memory from file if it exists""" if os.path.exists(self.memory_file): try: with open(self.memory_file, "r") as f: data = json.load(f) self.entries = [MemoryEntry(**entry) for entry in data] except Exception as e: print(f"Error loading memory: {e}") def _save_memory(self): """Save memory to file""" with open(self.memory_file, "w") as f: json.dump([entry.model_dump() for entry in self.entries], f, indent=2) def add_entry(self, role: str, content: str): """Add a new memory entry""" entry = MemoryEntry( role=role, content=content, timestamp=datetime.now().isoformat() ) self.entries.append(entry) self._save_memory() def search(self, query: str, limit: int = 5) -> List[MemoryEntry]: """Search memory for entries containing the query""" results = [] # Very simple search implementation for entry in reversed(self.entries): if query.lower() in entry.content.lower(): results.append(entry) if len(results) >= limit: break return results def get_recent(self, count: int = 5) -> List[MemoryEntry]: """Get the most recent entries""" return list(reversed(self.entries[-count:])) if self.entries else [] # Create a global instance of our memory system memory = ConversationMemory() # Define the memory query model class MemoryQuery(BaseModel): query: str limit: int = 5 # Create memory-related tools @tool() def search_memory(input: MemoryQuery) -> MemorySearchResult: """ Search previous conversation history for relevant information Args: input: A MemoryQuery object with the search query and optional limit Returns: MemorySearchResult with matching entries """ results = memory.search(input.query, input.limit) return MemorySearchResult( entries=results, total_found=len(results) ) @tool() def get_recent_messages(limit: int = 5) -> MemorySearchResult: """ Retrieve the most recent messages from the conversation Args: limit: Maximum number of messages to retrieve Returns: MemorySearchResult with the most recent messages """ results = memory.get_recent(limit) return MemorySearchResult( entries=results, total_found=len(results) ) # Create callbacks for tracking conversation and streaming @callback("message.*") async def track_conversation_history(agent, event, data): """Add all messages to conversation history""" # Skip storing tool response messages if hasattr(data, "tool_call_id"): return # Add the message to memory memory.add_entry(data.role, data.content) @callback("message_stream.assistant") async def stream_to_console(agent, event, stream): print("\nAssistant: ", end="", flush=True) async for chunk in stream: if hasattr(chunk, "chunk"): print(chunk.chunk, end="", flush=True) async def main(): # Initialize the LLM llm = OpenAIBaseLLM( api_key=os.environ.get("OPENAI_API_KEY"), default_model="gpt-4o" ) # Create the agent with memory tools and the memory callback agent = Agent( llm=llm, tools=[search_memory, get_recent_messages], instructions=""" You are a helpful assistant with memory. You can remember previous parts of the conversation and reference them when appropriate. You have access to the following memory tools: - search_memory: Search for specific information in past conversations - get_recent_messages: Get the most recent messages from the conversation Use these tools when the user refers to something mentioned earlier or when you need to check past information to provide a consistent response. """, callbacks=[ track_conversation_history, stream_to_console ] ) # Run a conversation loop print("Memory Assistant (type 'exit' to quit)") print("This assistant remembers your conversation history and can refer back to it.") print("Try asking about something mentioned earlier in the conversation!") while True: # Get user input user_input = input("\nYou: ") if user_input.lower() == "exit": break # Process with the agent await agent.run(user_input) print() # Add a newline after the response if __name__ == "__main__": asyncio.run(main()) Alternative Approach: Stateful Tool -------------------------------- Instead of using callbacks, you can also implement memory using a stateful tool: .. code-block:: python from brain.agents.tool import tool, StateContext # Define a state schema class ChatState(BaseModel): history: List[MemoryEntry] = [] @tool(stateful=True, state_schema=ChatState) def remember_and_search(query: str, context: StateContext[ChatState]) -> str: """ Search conversation history and remember the query Args: query: The search query context: The state context containing conversation history Returns: The search results as a formatted string """ # Access the current state state = context.state # Search history results = [] for entry in reversed(state.history): if query.lower() in entry.content.lower(): results.append(entry) if len(results) >= 3: break # Format results if results: result_text = "I found these relevant messages:\n\n" for i, entry in enumerate(results): result_text += f"{i+1}. {entry.role}: {entry.content}\n" return result_text else: return "I couldn't find any relevant information in our conversation history." This approach has the advantage of using the built-in state management feature of the Tool system, but it requires more manual memory management compared to the callback approach.