Chat app
Simple chat app example build with FastAPI.
Demonstrates:
- reusing chat history
- serializing messages
This demonstrates storing chat history between requests and using it to give the model context for new responses.
Most of the complex logic here is in chat_app.html
which includes the page layout and JavaScript to handle the chat.
Running the Example
With dependencies installed and environment variables set, run:
python -m pydantic_ai_examples.chat_app
uv run -m pydantic_ai_examples.chat_app
Then open the app at localhost:8000.
TODO screenshot.
Example Code
chat_app.py
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated
import fastapi
import logfire
from fastapi.responses import HTMLResponse, Response, StreamingResponse
from pydantic import Field, TypeAdapter
from pydantic_ai import Agent
from pydantic_ai.messages import Message, MessagesTypeAdapter, UserPrompt
# 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configured
logfire.configure(send_to_logfire='if-token-present')
agent = Agent('openai:gpt-4o')
app = fastapi.FastAPI()
logfire.instrument_fastapi(app)
@app.get('/')
async def index() -> HTMLResponse:
return HTMLResponse((THIS_DIR / 'chat_app.html').read_bytes())
@app.get('/chat/')
async def get_chat() -> Response:
msgs = database.get_messages()
return Response(
b'\n'.join(MessageTypeAdapter.dump_json(m) for m in msgs),
media_type='text/plain',
)
@app.post('/chat/')
async def post_chat(prompt: Annotated[str, fastapi.Form()]) -> StreamingResponse:
async def stream_messages():
"""Streams new line delimited JSON `Message`s to the client."""
# stream the user prompt so that can be displayed straight away
yield MessageTypeAdapter.dump_json(UserPrompt(content=prompt)) + b'\n'
# get the chat history so far to pass as context to the agent
messages = list(database.get_messages())
response = await agent.run(prompt, message_history=messages)
# add new messages (e.g. the user prompt and the agent response in this case) to the database
database.add_messages(response.new_messages_json())
# stream the last message which will be the agent response, we can't just yield `new_messages_json()`
# since we already stream the user prompt
yield MessageTypeAdapter.dump_json(response.all_messages()[-1]) + b'\n'
return StreamingResponse(stream_messages(), media_type='text/plain')
THIS_DIR = Path(__file__).parent
MessageTypeAdapter: TypeAdapter[Message] = TypeAdapter(
Annotated[Message, Field(discriminator='role')]
)
@dataclass
class Database:
"""Very rudimentary database to store chat messages in a JSON lines file."""
file: Path = THIS_DIR / '.chat_app_messages.jsonl'
def add_messages(self, messages: bytes):
with self.file.open('ab') as f:
f.write(messages + b'\n')
def get_messages(self) -> Iterator[Message]:
if self.file.exists():
with self.file.open('rb') as f:
for line in f:
if line:
yield from MessagesTypeAdapter.validate_json(line)
database = Database()
if __name__ == '__main__':
import uvicorn
uvicorn.run(
'pydantic_ai_examples.chat_app:app', reload=True, reload_dirs=[str(THIS_DIR)]
)
chat_app.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Chat App</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
main {
max-width: 700px;
}
#conversation .user::before {
content: 'You asked: ';
font-weight: bold;
display: block;
}
#conversation .llm-response::before {
content: 'AI Response: ';
font-weight: bold;
display: block;
}
#spinner {
opacity: 0;
transition: opacity 500ms ease-in;
width: 30px;
height: 30px;
border: 3px solid #222;
border-bottom-color: transparent;
border-radius: 50%;
animation: rotation 1s linear infinite;
}
@keyframes rotation {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
#spinner.active {
opacity: 1;
}
</style>
</head>
<body>
<main class="border rounded mx-auto my-5 p-4">
<h1>Chat App</h1>
<p>Ask me anything...</p>
<div id="conversation" class="px-2"></div>
<div class="d-flex justify-content-center mb-3">
<div id="spinner"></div>
</div>
<form method="post">
<input id="prompt-input" name="prompt" class="form-control"/>
<div class="d-flex justify-content-end">
<button class="btn btn-primary mt-2">Send</button>
</div>
</form>
<div id="error" class="d-none text-danger">
Error occurred, check the console for more information.
</div>
</main>
</body>
</html>
<script type="module">
import { marked } from 'https://cdn.jsdelivr.net/npm/marked/lib/marked.esm.js';
function addMessages(lines) {
const messages = lines.filter(line => line.length > 1).map((line) => JSON.parse(line))
const parent = document.getElementById('conversation');
for (const message of messages) {
let msgDiv = document.createElement('div');
msgDiv.classList.add('border-top', 'pt-2', message.role);
msgDiv.innerHTML = marked.parse(message.content);
parent.appendChild(msgDiv);
}
}
function onError(error) {
console.error(error);
document.getElementById('error').classList.remove('d-none');
document.getElementById('spinner').classList.remove('active');
}
async function fetchResponse(response) {
let text = '';
if (response.ok) {
const reader = response.body.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
break;
}
text += new TextDecoder().decode(value);
const lines = text.split('\n');
if (lines.length > 1) {
addMessages(lines.slice(0, -1));
text = lines[lines.length - 1];
}
}
addMessages(text.split('\n'));
let input = document.getElementById('prompt-input')
input.disabled = false;
input.focus();
} else {
const text = await response.text();
console.error(`Unexpected response: ${response.status}`, {response, text});
throw new Error(`Unexpected response: ${response.status}`);
}
}
async function onSubmit(e) {
e.preventDefault();
const spinner = document.getElementById('spinner');
spinner.classList.add('active');
const body = new FormData(e.target);
let input = document.getElementById('prompt-input')
input.value = '';
input.disabled = true;
const response = await fetch('/chat/', {method: 'POST', body});
await fetchResponse(response);
spinner.classList.remove('active');
}
// call onSubmit when form is submitted (e.g. user clicks the send button or hits Enter)
document.querySelector('form').addEventListener('submit', (e) => onSubmit(e).catch(onError));
// load messages on page load
fetch('/chat/').then(fetchResponse).catch(onError);
</script>