Introduction
This section provides detailed solutions for the exercises in Chapter 7. These implementations demonstrate best practices for building robust, scalable DSPy applications in production environments.
Solution 1: Custom PostgreSQL Adapter
This adapter implements connection pooling and caching using psycopg2.
Python
import dspy
import psycopg2
from psycopg2 import pool
import json
class PostgresAdapter(dspy.Adapter):
def __init__(self, connection_string):
self.pool = psycopg2.pool.SimpleConnectionPool(1, 10, connection_string)
self._init_db()
def _init_db(self):
conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS predictions (
key TEXT PRIMARY KEY,
value JSONB
);
""")
conn.commit()
finally:
self.pool.putconn(conn)
def get(self, key):
conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("SELECT value FROM predictions WHERE key = %s", (key,))
result = cur.fetchone()
return result[0] if result else None
finally:
self.pool.putconn(conn)
def set(self, key, value):
conn = self.pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO predictions (key, value) VALUES (%s, %s)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
""", (key, json.dumps(value)))
conn.commit()
finally:
self.pool.putconn(conn)
Solution 2: Multi-Level Caching System
A sophisticated caching strategy managing Memory (L1), Redis (L2), and Disk (L3).
Python
import redis
import pickle
import os
class MultiLevelCache:
def __init__(self, redis_url="redis://localhost:6379", disk_path="./cache"):
self.l1 = {}
self.l2 = redis.from_url(redis_url)
self.l3_path = disk_path
os.makedirs(disk_path, exist_ok=True)
def get(self, key):
# Check L1 (Memory)
if key in self.l1:
return self.l1[key]
# Check L2 (Redis)
l2_val = self.l2.get(key)
if l2_val:
val = pickle.loads(l2_val)
self.l1[key] = val # Promote to L1
return val
# Check L3 (Disk)
file_path = os.path.join(self.l3_path, key)
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
val = pickle.load(f)
self.l2.set(key, pickle.dumps(val)) # Promote to L2
self.l1[key] = val # Promote to L1
return val
return None
def set(self, key, value):
# Write through to all layers
self.l1[key] = value
self.l2.set(key, pickle.dumps(value))
with open(os.path.join(self.l3_path, key), 'wb') as f:
pickle.dump(value, f)
Solution 3: Async Streaming RAG
Handling concurrent streams efficiently using asyncio.
Python
import asyncio
import dspy
class AsyncRAGController:
def __init__(self, rag_module, concurrency=5):
self.module = rag_module
self.semaphore = asyncio.Semaphore(concurrency)
async def process_query(self, query):
async with self.semaphore:
# Simulate async wrapper for sync DSPy call
result = await asyncio.to_thread(self.module.forward, question=query)
return result
async def ingest_stream(self, document_stream):
async for doc in document_stream:
# Fake async ingestion
await asyncio.sleep(0.01)
print(f"Ingested: {doc[:20]}...")
async def run_pipeline(self, queries, doc_stream):
ingest_task = asyncio.create_task(self.ingest_stream(doc_stream))
results = await asyncio.gather(*(self.process_query(q) for q in queries))
await ingest_task
return results
Solution 5: Kubernetes Manifest
Complete production deployment configuration.
YAML
apiVersion: apps/v1
kind: Deployment
metadata:
name: dspy-app
namespace: adaptive-ai
spec:
replicas: 3
selector:
matchLabels:
app: dspy-app
template:
metadata:
labels:
app: dspy-app
spec:
containers:
- name: dspy-server
image: dspy-app:v1.0
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: dspy-secrets
key: api-key
---
apiVersion: v1
kind: Service
metadata:
name: dspy-service
spec:
selector:
app: dspy-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: dspy-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: dspy-app
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 75