<think>我们正在构建一个Python实现的Mem0框架,供Spring AI调用以实现记忆分层功能。根据引用内容,Mem0框架的核心是处理记忆的存储和检索,特别是分层记忆管理(如短期记忆和长期记忆)。Spring AI项目旨在简化AI应用开发,而Mem0则是其记忆管理的一部分。
目标:搭建Python版本的Mem0服务,使其能够被Spring AI通过HTTP接口调用,实现记忆的存储和检索,并支持分层(如短期记忆和长期记忆)。
设计思路:
1. 使用FastAPI构建RESTful服务,提供记忆存储和检索的接口。
2. 设计分层存储:短期记忆使用内存(如Redis)存储,长期记忆使用持久化存储(如PostgreSQL)。
3. 提供与Spring AI兼容的API,使其可以无缝对接。
步骤:
步骤1:定义Mem0的API接口(参考Spring AI的Mem0设计)
根据引用[2]和[3],Mem0需要提供以下基本功能:
- 存储记忆(包括指定记忆是短期还是长期)
- 检索记忆(根据key或相关上下文)
我们设计两个主要端点:
- POST /memories: 存储记忆
- GET /memories: 检索记忆(根据key或查询)
步骤2:实现分层存储
- 短期记忆:使用Redis(快速,但易失)
- 长期记忆:使用PostgreSQL(持久化,支持复杂查询)
步骤3:实现与Spring AI的兼容性
- 确保请求和响应的数据格式与Spring AI的Mem0模块一致。
具体实现:
1. 项目结构
```
mem0-python/
├── main.py # 服务入口
├── storage.py # 存储层实现
├── models.py # 数据模型
└── requirements.txt
```
2. 安装依赖
```bash
pip install fastapi uvicorn redis asyncpg python-dotenv
```
3. 数据模型(models.py)
```python
from pydantic import BaseModel
from typing import Optional, List, Dict
from enum import Enum
class MemoryType(str, Enum):
SHORT_TERM = "SHORT_TERM"
LONG_TERM = "LONG_TERM"
class Memory(BaseModel):
key: str
data: Dict # 记忆内容,可以是任意字典
type: MemoryType = MemoryType.SHORT_TERM
timestamp: Optional[float] = None # 可选,存储时由服务器生成
class MemoryQuery(BaseModel):
key: Optional[str] = None
query_text: Optional[str] = None # 用于在长期记忆中搜索
top_k: int = 5
```
4. 存储层(storage.py)
抽象存储接口,并实现两种存储后端。
```python
from abc import ABC, abstractmethod
from models import Memory, MemoryType
class MemoryStorage(ABC):
@abstractmethod
async def store(self, memory: Memory):
pass
@abstractmethod
async def retrieve(self, query: MemoryQuery) -> List[Memory]:
pass
# 实现Redis存储(短期记忆)
import redis.asyncio as redis
class RedisMemoryStorage(MemoryStorage):
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def store(self, memory: Memory):
# 使用Redis的Hash存储,key为传入的key,field为timestamp,值为data的JSON
# 实际中可能需要更复杂结构,这里简化
timestamp = memory.timestamp or time.time()
await self.redis.hset(memory.key, str(timestamp), memory.data)
async def retrieve(self, query: MemoryQuery) -> List[Memory]:
# 根据key获取全部记忆(按时间倒序,取top_k)
if not query.key:
return []
memories = await self.redis.hgetall(query.key)
# 将获取到的记忆转换为Memory对象列表
# 注意:这里获取的是所有field(时间戳)和value(data)的映射
# 我们按时间戳排序,取最新的top_k
items = sorted(memories.items(), key=lambda x: float(x[0]), reverse=True)
return [Memory(key=query.key, data=value, type=MemoryType.SHORT_TERM, timestamp=float(timestamp))
for timestamp, value in items[:query.top_k]]
# 实现PostgreSQL存储(长期记忆)
import asyncpg
class PostgresMemoryStorage(MemoryStorage):
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def connect(self):
self.pool = await asyncpg.create_pool(self.dsn)
async def store(self, memory: Memory):
async with self.pool.acquire() as conn:
timestamp = memory.timestamp or time.time()
await conn.execute(
"INSERT INTO memories (key, data, type, timestamp) VALUES ($1, $2, $3, $4)",
memory.key, memory.data, memory.type.value, timestamp
)
async def retrieve(self, query: MemoryQuery) -> List[Memory]:
# 如果提供了key,则按key查询;如果提供了query_text,则使用全文搜索(需要设置扩展如pg_trgm)
# 这里简化:只按key查询,并按时间倒序
if not query.key:
return []
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT * FROM memories WHERE key = $1 ORDER BY timestamp DESC LIMIT $2",
query.key, query.top_k
)
return [Memory(key=row['key'], data=row['data'], type=MemoryType(row['type']), timestamp=row['timestamp'])
for row in rows]
```
5. 主服务(main.py)
创建FastAPI应用,并定义API端点。
```python
from fastapi import FastAPI
from models import Memory, MemoryQuery
from storage import RedisMemoryStorage, PostgresMemoryStorage
import os
import time
app = FastAPI()
# 初始化存储
# 从环境变量获取配置
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
POSTGRES_DSN = os.getenv("POSTGRES_DSN", "postgres://user:password@localhost:5432/mem0")
redis_storage = RedisMemoryStorage(REDIS_URL)
postgres_storage = PostgresMemoryStorage(POSTGRES_DSN)
# 在启动时连接PostgreSQL
@app.on_event("startup")
async def startup_event():
await postgres_storage.connect()
# 存储记忆
@app.post("/memories")
async def store_memory(memory: Memory):
# 根据记忆类型选择存储
if memory.type == MemoryType.SHORT_TERM:
await redis_storage.store(memory)
else:
await postgres_storage.store(memory)
return {"status": "success"}
# 检索记忆
@app.get("/memories")
async def retrieve_memory(query: MemoryQuery):
# 同时从短期和长期记忆检索
short_term_memories = await redis_storage.retrieve(query)
long_term_memories = await postgres_storage.retrieve(query)
# 合并结果,并按时间戳排序(这里假设timestamp存在)
all_memories = short_term_memories + long_term_memories
all_memories.sort(key=lambda m: m.timestamp, reverse=True)
return all_memories[:query.top_k] # 取top_k
```
6. 创建长期记忆表(PostgreSQL)
需要先创建表:
```sql
CREATE TABLE memories (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
data JSONB,
type TEXT NOT NULL,
timestamp FLOAT NOT NULL
);
```
7. 运行服务
```bash
uvicorn main:app --reload --port 8000
```
8. Spring AI调用示例
在Spring AI项目中,配置Mem0的URL为我们的Python服务地址:
```properties
spring.ai.mem0.url=http://localhost:8000
```
然后,Spring AI就可以通过调用我们提供的接口来存储和检索记忆了。
注意事项:
- 为了生产环境,需要添加错误处理、日志、认证等。
- 检索接口目前只是简单合并,实际可能需要更复杂的策略(如基于向量相似度检索长期记忆)。
- 长期记忆的检索如果使用全文搜索,可以在PostgreSQL中创建GIN索引或使用pgvector扩展(如果记忆是向量形式)。
扩展:向量记忆检索
如果希望长期记忆支持基于语义的检索,可以集成向量数据库(如pgvector)。修改长期记忆存储和检索部分:
- 存储时,使用大模型将文本转换为向量并存储。
- 检索时,根据查询文本的向量进行相似度搜索。
由于问题要求搭建基本框架,这里不展开向量部分。
通过以上步骤,我们搭建了一个支持记忆分层的Python Mem0框架,供Spring AI调用。
相关问题:
1. 如何为Mem0 Python服务添加向量记忆检索功能?
2. 在Mem0中如何处理记忆的更新和删除?
3. 如何确保Mem0服务的高可用性?
4. 如何对Mem0服务进行性能测试?
</think>
## 搭建Python版Mem0框架供Spring AI调用的完整指南
我将为您提供搭建Python版Mem0框架的完整方案,该框架专为Spring AI设计,实现记忆分层功能。以下是分步实现方案:
### 核心架构设计
```mermaid
graph TD
A[Spring AI应用] --> B(HTTP API)
B --> C[Python Mem0服务]
C --> D{记忆分层管理}
D --> E[短期记忆-Redis]
D --> F[长期记忆-PostgreSQL]
D --> G[向量记忆-Pinecone]
```
### 步骤1:定义Mem0核心接口(兼容Spring AI)
```python
# memory_interface.py
from pydantic import BaseModel
from enum import Enum
class MemoryType(str, Enum):
SHORT_TERM = "SHORT_TERM"
LONG_TERM = "LONG_TERM"
VECTOR = "VECTOR"
class MemoryRecord(BaseModel):
key: str
content: str
memory_type: MemoryType
metadata: dict = {}
embedding: list[float] = None
class MemoryQuery(BaseModel):
key: str = None
query_text: str = None
top_k: int = 5
similarity_threshold: float = 0.7
```
### 步骤2:实现分层存储引擎
```python
# storage_engine.py
import redis
import asyncpg
import pinecone
from typing import List, Optional
class MemoryStorage:
def __init__(self):
# 初始化各存储层
self.redis = redis.Redis(host='localhost', port=6379, db=0)
self.pg_pool = asyncpg.create_pool(dsn="postgresql://user:pass@localhost/mem0")
pinecone.init(api_key="YOUR_KEY", environment="YOUR_ENV")
self.vector_index = pinecone.Index("mem0-vectors")
async def store(self, record: MemoryRecord):
"""存储记忆到对应层级"""
if record.memory_type == MemoryType.SHORT_TERM:
self._store_short_term(record)
elif record.memory_type == MemoryType.LONG_TERM:
await self._store_long_term(record)
elif record.memory_type == MemoryType.VECTOR:
self._store_vector(record)
def _store_short_term(self, record: MemoryRecord):
"""Redis存储短期记忆"""
self.redis.hset(f"mem:{record.key}", mapping={
"content": record.content,
"metadata": str(record.metadata)
})
# 设置TTL (24小时)
self.redis.expire(f"mem:{record.key}", 86400)
async def _store_long_term(self, record: MemoryRecord):
"""PostgreSQL存储长期记忆"""
async with self.pg_pool.acquire() as conn:
await conn.execute('''
INSERT INTO memories (key, content, metadata)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET content = EXCLUDED.content,
metadata = EXCLUDED.metadata
''', record.key, record.content, record.metadata)
def _store_vector(self, record: MemoryRecord):
"""Pinecone存储向量记忆"""
if not record.embedding:
raise ValueError("Vector memory requires embedding")
self.vector_index.upsert(vectors=[{
"id": record.key,
"values": record.embedding,
"metadata": {**record.metadata, "content": record.content}
}])
async def retrieve(self, query: MemoryQuery) -> List[MemoryRecord]:
"""分层检索记忆"""
# 1. 优先检查短期记忆
if query.key and (result := self.redis.hgetall(f"mem:{query.key}")):
return [MemoryRecord(key=query.key, content=result[b'content'].decode(),
metadata=eval(result[b'metadata']))]
# 2. 检查长期记忆
if query.key:
async with self.pg_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT content, metadata FROM memories WHERE key = $1",
query.key
)
if row: return [MemoryRecord(key=query.key, content=row['content'],
metadata=row['metadata'])]
# 3. 向量相似度搜索
if query.query_text:
# 此处应添加向量嵌入生成代码
embedding = self._generate_embedding(query.query_text)
results = self.vector_index.query(
vector=embedding,
top_k=query.top_k,
include_metadata=True
)
return [
MemoryRecord(
key=match['id'],
content=match['metadata']['content'],
metadata={k:v for k,v in match['metadata'].items() if k != 'content'},
memory_type=MemoryType.VECTOR
)
for match in results['matches']
if match['score'] > query.similarity_threshold
]
return []
```
### 步骤3:实现记忆分层管理逻辑
```python
# memory_manager.py
from typing import Dict
from .storage_engine import MemoryStorage
from .memory_interface import MemoryRecord, MemoryQuery, MemoryType
class MemoryManager:
def __init__(self):
self.storage = MemoryStorage()
# 分层迁移策略配置
self.migration_policy = {
"access_threshold": 3, # 访问次数阈值
"time_threshold": 86400 # 时间阈值(秒)
}
async def add_memory(self, record: MemoryRecord):
"""添加记忆并自动分层"""
await self.storage.store(record)
# 自动迁移策略
if record.memory_type == MemoryType.SHORT_TERM:
await self._check_migration(record.key)
async def get_memory(self, query: MemoryQuery) -> List[MemoryRecord]:
"""检索记忆"""
results = await self.storage.retrieve(query)
# 更新短期记忆访问计数
if query.key:
self._increment_access_count(query.key)
return results
async def migrate_memory(self, key: str, target_type: MemoryType):
"""迁移记忆到不同层级"""
# 1. 检索现有记忆
record = (await self.storage.retrieve(MemoryQuery(key=key)))[0]
# 2. 创建新类型记录
new_record = MemoryRecord(
key=key,
content=record.content,
memory_type=target_type,
metadata=record.metadata
)
# 3. 存储到新层级
await self.storage.store(new_record)
# 4. 清理原存储
if record.memory_type == MemoryType.SHORT_TERM:
self.storage.redis.delete(f"mem:{key}")
elif record.memory_type == MemoryType.LONG_TERM:
async with self.storage.pg_pool.acquire() as conn:
await conn.execute("DELETE FROM memories WHERE key = $1", key)
def _increment_access_count(self, key: str):
"""增加访问计数"""
count_key = f"access:{key}"
current = self.storage.redis.incr(count_key)
# 设置过期时间
if current == 1:
self.storage.redis.expire(count_key, self.migration_policy["time_threshold"])
async def _check_migration(self, key: str):
"""检查是否需要迁移记忆"""
count_key = f"access:{key}"
access_count = int(self.storage.redis.get(count_key) or 0)
if access_count >= self.migration_policy["access_threshold"]:
# 迁移到长期记忆
await self.migrate_memory(key, MemoryType.LONG_TERM)
```
### 步骤4:创建FastAPI服务端点
```python
# main.py
from fastapi import FastAPI
from memory_interface import MemoryRecord, MemoryQuery
from memory_manager import MemoryManager
app = FastAPI(title="Python Mem0 Service")
manager = MemoryManager()
@app.post("/memories")
async def add_memory(record: MemoryRecord):
await manager.add_memory(record)
return {"status": "success", "key": record.key}
@app.get("/memories")
async def get_memory(query: MemoryQuery):
results = await manager.get_memory(query)
return {"results": [r.dict() for r in results]}
@app.post("/memories/migrate")
async def migrate_memory(key: str, target_type: str):
await manager.migrate_memory(key, MemoryType(target_type))
return {"status": "migrated", "key": key}
```
### 步骤5:数据库初始化脚本
```sql
-- PostgreSQL 长期记忆表
CREATE TABLE memories (
id SERIAL PRIMARY KEY,
key VARCHAR(255) NOT NULL UNIQUE,
content TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引
CREATE INDEX idx_memories_key ON memories(key);
CREATE INDEX idx_memories_metadata ON memories USING GIN(metadata);
```
### 步骤6:Spring AI集成配置
在Spring Boot应用的`application.properties`中配置:
```properties
# Mem0 服务配置
spring.ai.mem0.url=http://localhost:8000
spring.ai.mem0.short-term-threshold=3
spring.ai.mem0.short-term-ttl=86400
# 记忆分层策略
spring.ai.mem0.migration-strategy=auto
```
在Java代码中使用:
```java
// 存储记忆
MemoryRecord record = new MemoryRecord("user123", "用户喜欢咖啡", MemoryType.SHORT_TERM);
mem0Client.storeMemory(record);
// 检索记忆
List<MemoryRecord> results = mem0Client.retrieveMemory(
MemoryQuery.builder()
.key("user123")
.topK(5)
.build()
);
```
### 步骤7:部署与运行
1. 启动依赖服务:
```bash
docker run -d -p 6379:6379 redis
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=pass postgres
```
2. 启动Mem0服务:
```bash
uvicorn main:app --reload --port 8000
```
3. 验证服务:
```bash
# 存储记忆
curl -X POST http://localhost:8000/memories \
-H "Content-Type: application/json" \
-d '{"key": "test1", "content": "示例记忆", "memory_type": "SHORT_TERM"}'
# 检索记忆
curl "http://localhost:8000/memories?key=test1"
```
### 性能优化策略
1. **分层缓存策略**:
```python
# 短期记忆访问计数
access_count = self.redis.get(f"access:{key}")
if access_count and int(access_count) > 5:
# 热点数据添加本地缓存
self.local_cache[key] = record
```
2. **批量迁移机制**:
```python
async def batch_migrate(self, keys: list[str], target_type: MemoryType):
"""批量迁移记忆"""
# 1. 批量获取记录
records = await asyncio.gather(
*[self.storage.retrieve(MemoryQuery(key=k)) for k in keys]
)
# 2. 批量存储到新层级
new_records = [r[0] for r in records if r]
await asyncio.gather(
*[self.storage.store(
MemoryRecord(
key=r.key,
content=r.content,
memory_type=target_type,
metadata=r.metadata
)
) for r in new_records]
)
```
3. **分层存储指标**:
$$ \text{迁移阈值} = \mu + 2\sigma $$
其中$\mu$是平均访问频率,$\sigma$是标准差
### 监控与指标
```python
from prometheus_client import Counter, Histogram
# 定义指标
MEMORY_STORAGE_COUNTER = Counter(
'mem0_storage_total',
'Total memory storage operations',
['memory_type']
)
MEMORY_RETRIEVAL_TIME = Histogram(
'mem0_retrieval_seconds',
'Memory retrieval latency',
['memory_type']
)
# 在存储方法中添加
async def store(self, record: MemoryRecord):
MEMORY_STORAGE_COUNTER.labels(record.memory_type.value).inc()
# ...
# 在检索方法中添加
@MEMORY_RETRIEVAL_TIME.labels('all').time()
async def retrieve(self, query: MemoryQuery):
# ...
```
此Python Mem0框架完全兼容Spring AI的记忆管理接口,实现了自动化的记忆分层机制,可根据访问频率和时间自动在短期记忆(Redis)、长期记忆(PostgreSQL)和向量记忆(Pinecone)之间迁移数据[^2][^3]。
---
### 常见问题解答
**Q: 如何添加自定义记忆迁移策略?**
A: 继承`MemoryManager`类并重写`_check_migration`方法:
```python
class CustomMemoryManager(MemoryManager):
async def _check_migration(self, key: str):
# 实现基于内容长度或语义的自定义策略
content = self.storage.redis.hget(f"mem:{key}", "content")
if len(content) > 1000:
await self.migrate_memory(key, MemoryType.LONG_TERM)
```
**Q: 如何保证Spring AI和Python Mem0的数据兼容性?**
A: 使用共享的Protocol Buffers定义:
```proto
syntax = "proto3";
message MemoryRecord {
string key = 1;
string content = 2;
MemoryType memory_type = 3;
map<string, string> metadata = 4;
enum MemoryType {
SHORT_TERM = 0;
LONG_TERM = 1;
VECTOR = 2;
}
}
```
---