# ArcFace人脸数据库工程化实战:从数据采集到系统运维的完整生命周期管理
在构建一个真正可用的人脸识别系统时,很多开发者会把大部分精力放在模型选择、算法调优和识别精度上,这当然没错。但当我接手了几个安防和考勤系统的实际项目后,发现一个残酷的现实:那些在测试集上表现优异的模型,在实际部署后经常因为数据库管理不当而“翻车”。人脸识别系统不是一次性的算法实验,而是一个需要长期维护、持续演进的工程系统。数据库就是这个系统的“记忆中枢”——如果记忆混乱、更新不及时、数据质量参差不齐,再聪明的“大脑”也会做出错误的判断。
今天我想分享的,正是这个常被忽视却至关重要的环节:人脸数据库的全生命周期管理。我们将超越简单的`.pkl`文件存储,探讨如何在生产环境中构建健壮、可扩展、易维护的人脸数据管理系统。无论你是正在开发考勤系统的工程师,还是维护安防监控平台的技术负责人,这些从实战中总结的经验都能帮你避开我踩过的那些坑。
## 1. 数据库架构设计:超越简单的字典存储
当我们初次接触人脸识别时,很容易把数据库简化为一个Python字典——人名作为键,特征向量作为值,然后用`pickle`一存了事。这种方案在原型阶段没问题,但当人脸数量超过几百、需要多人协作、或者系统要运行数年时,问题就接踵而至。
### 1.1 为什么原始的.pkl方案会出问题
我曾在某个项目中接手过一个用简单字典存储的`.pkl`文件,里面存了公司300多名员工的人脸数据。运行半年后,出现了几个棘手的问题:
- **并发写入冲突**:当两个考勤终端同时尝试更新数据库时,后写入的会覆盖先写入的
- **数据损坏风险**:`pickle`文件在写入过程中如果程序崩溃,整个文件可能无法读取
- **查询效率低下**:每次识别都需要加载全部数据到内存,人脸数超过1000后延迟明显
- **缺乏版本控制**:误删了某个人脸数据?只能从备份恢复,但你可能不知道什么时候备份的
### 1.2 分层存储架构设计
经过几次重构,我总结出一套分层存储方案,核心思想是**分离特征存储与元数据管理**:
```
人脸数据库系统
├── 特征向量存储层(高性能读取)
│ ├── 内存缓存(最近使用的特征)
│ ├── 向量数据库(Faiss / Milvus)
│ └── 本地.npy文件备份(容灾)
├── 元数据管理层(关系型数据)
│ ├── 人员基本信息(SQLite / PostgreSQL)
│ ├── 人脸图片索引
│ ├── 录入时间、操作日志
│ └── 版本控制信息
└── 原始图片存储层(对象存储)
├── 原始采集图片(按日期目录)
├── 对齐后人脸区域
└── 质量评估结果
```
> **关键决策点**:对于中小型系统(<1万人脸),SQLite + 本地.npy文件是性价比最高的选择。当人脸数超过1万或需要分布式查询时,才需要考虑专门的向量数据库。
### 1.3 元数据表结构设计
下面是一个实用的SQLite表结构设计,包含了实际项目中需要的各种字段:
```sql
-- 人员信息表
CREATE TABLE persons (
person_id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id VARCHAR(32) UNIQUE, -- 工号/学号
name VARCHAR(64) NOT NULL, -- 姓名
department VARCHAR(64), -- 部门/班级
role VARCHAR(32), -- 角色(员工/访客/管理员)
status INTEGER DEFAULT 1, -- 状态 1:正常 0:禁用
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 人脸特征表
CREATE TABLE face_embeddings (
embedding_id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
embedding BLOB NOT NULL, -- 512维特征向量
embedding_path VARCHAR(256), -- .npy文件路径(可选)
image_path VARCHAR(256), -- 原始图片路径
face_quality_score FLOAT, -- 人脸质量评分
yaw FLOAT, -- 偏航角
pitch FLOAT, -- 俯仰角
roll FLOAT, -- 旋转角
brightness FLOAT, -- 亮度评估
sharpness FLOAT, -- 清晰度评估
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES persons(person_id) ON DELETE CASCADE
);
-- 操作日志表
CREATE TABLE operation_logs (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
operator VARCHAR(64), -- 操作员
operation_type VARCHAR(32), -- 操作类型:add/update/delete/query
target_person_id INTEGER, -- 目标人员ID
details TEXT, -- 操作详情
ip_address VARCHAR(45), -- 操作IP
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建索引提升查询性能
CREATE INDEX idx_person_employee_id ON persons(employee_id);
CREATE INDEX idx_embedding_person_id ON face_embeddings(person_id);
CREATE INDEX idx_logs_time ON operation_logs(created_time);
```
这种设计的好处是显而易见的:你可以轻松查询“某个部门的所有人脸”、“最近一周新增的人员”、“质量分数低于阈值需要重新采集的人脸”等等。而这些在简单的字典存储中几乎无法实现。
## 2. 数据采集与质量管控:建立标准化录入流程
人脸识别的准确性很大程度上取决于输入数据的质量。在实验室里,我们用的是精心挑选的清晰正面照;但在实际场景中,你得到的是光线不足、角度刁钻、表情各异的真实照片。如果没有严格的质量管控,数据库很快就会积累大量“垃圾数据”,导致识别率持续下降。
### 2.1 自动化质量评估体系
我开发了一套人脸质量评估流程,在录入阶段就过滤掉不合格的数据。这套流程基于InsightFace提供的丰富人脸属性:
```python
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from datetime import datetime
import sqlite3
import json
class FaceQualityValidator:
def __init__(self, quality_threshold=0.8):
self.app = FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.quality_threshold = quality_threshold
def assess_face_quality(self, face):
"""综合评估单个人脸的质量"""
quality_score = 0.0
weights = {
'det_score': 0.3, # 检测置信度
'pose': 0.25, # 姿态评分
'clarity': 0.2, # 清晰度
'lighting': 0.15, # 光照条件
'occlusion': 0.1 # 遮挡程度
}
# 1. 检测置信度(基础分)
det_score = face.det_score
quality_score += det_score * weights['det_score']
# 2. 姿态评估(偏航、俯仰、旋转角)
yaw, pitch, roll = abs(face.pose[0]), abs(face.pose[1]), abs(face.pose[2])
pose_penalty = (min(yaw, 30) / 30 * 0.5 +
min(pitch, 30) / 30 * 0.3 +
min(roll, 30) / 30 * 0.2)
pose_score = 1.0 - pose_penalty
quality_score += pose_score * weights['pose']
# 3. 清晰度评估(通过Laplacian方差)
face_region = face.bbox.astype(int)
# 这里需要原始图像,实际实现时会传入
clarity_score = self._calculate_clarity(face_region)
quality_score += clarity_score * weights['clarity']
# 4. 光照评估(通过人脸区域亮度分布)
lighting_score = self._assess_lighting(face_region)
quality_score += lighting_score * weights['lighting']
# 5. 遮挡评估(通过关键点可见性)
occlusion_score = self._assess_occlusion(face)
quality_score += occlusion_score * weights['occlusion']
return {
'total_score': round(quality_score, 3),
'det_score': round(det_score, 3),
'pose': {'yaw': round(yaw, 1), 'pitch': round(pitch, 1), 'roll': round(roll, 1)},
'clarity_score': round(clarity_score, 3),
'lighting_score': round(lighting_score, 3),
'occlusion_score': round(occlusion_score, 3),
'passed': quality_score >= self.quality_threshold
}
def _calculate_clarity(self, face_region):
"""计算人脸区域清晰度"""
# 实际实现中会计算Laplacian方差
return np.random.uniform(0.7, 0.95) # 示例值
def _assess_lighting(self, face_region):
"""评估光照条件"""
return np.random.uniform(0.6, 0.9) # 示例值
def _assess_occlusion(self, face):
"""评估遮挡程度"""
return np.random.uniform(0.8, 1.0) # 示例值
# 使用示例
validator = FaceQualityValidator(quality_threshold=0.75)
img = cv2.imread('path/to/face.jpg')
faces = validator.app.get(img)
if len(faces) > 0:
quality_report = validator.assess_face_quality(faces[0])
print(f"质量评估报告: {json.dumps(quality_report, indent=2, ensure_ascii=False)}")
if quality_report['passed']:
print("✅ 人脸质量合格,可以录入")
else:
print(f"❌ 人脸质量不合格,总分: {quality_report['total_score']}")
# 给出具体改进建议
if quality_report['pose']['yaw'] > 20:
print("建议:请保持正面朝向摄像头")
if quality_report['lighting_score'] < 0.7:
print("建议:光线不足,请调整光照条件")
```
### 2.2 批量录入的工程化实现
在实际的考勤系统部署中,我们经常需要一次性录入几十甚至上百名员工的人脸数据。这时候,一个可靠的批量录入工具就至关重要了。
我设计了一个命令行工具,支持多种输入源:
```python
import argparse
import os
import sys
from pathlib import Path
from tqdm import tqdm
import pandas as pd
class BatchFaceEnroller:
def __init__(self, db_path='face_database.db'):
self.db_path = db_path
self.validator = FaceQualityValidator()
self.app = FaceAnalysis(name='buffalo_l')
self.app.prepare(ctx_id=0, det_size=(640, 640))
def enroll_from_csv(self, csv_path):
"""从CSV文件批量录入"""
df = pd.read_csv(csv_path)
required_columns = ['employee_id', 'name', 'image_path']
if not all(col in df.columns for col in required_columns):
print(f"CSV文件必须包含以下列: {required_columns}")
return False
success_count = 0
fail_count = 0
for _, row in tqdm(df.iterrows(), total=len(df), desc="批量录入进度"):
result = self._enroll_single_person(
employee_id=row['employee_id'],
name=row['name'],
image_path=row['image_path'],
department=row.get('department', ''),
role=row.get('role', 'employee')
)
if result['success']:
success_count += 1
else:
fail_count += 1
print(f"录入失败: {row['name']} - {result['message']}")
print(f"\n批量录入完成: 成功 {success_count} 个, 失败 {fail_count} 个")
return True
def enroll_from_directory(self, dir_path, naming_pattern='{name}_{id}'):
"""从目录结构批量录入"""
# 支持多种目录结构
# 结构1: person_name/image1.jpg, image2.jpg
# 结构2: images/name_id_001.jpg
pass
def _enroll_single_person(self, employee_id, name, image_path, **kwargs):
"""录入单个人脸到数据库"""
# 连接数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 1. 检查人员是否已存在
cursor.execute(
"SELECT person_id FROM persons WHERE employee_id = ?",
(employee_id,)
)
existing = cursor.fetchone()
if existing:
person_id = existing[0]
print(f"人员 {name}({employee_id}) 已存在,更新人脸数据")
else:
# 2. 插入人员信息
cursor.execute("""
INSERT INTO persons (employee_id, name, department, role)
VALUES (?, ?, ?, ?)
""", (employee_id, name, kwargs.get('department', ''), kwargs.get('role', 'employee')))
person_id = cursor.lastrowid
# 3. 读取并处理图片
if not os.path.exists(image_path):
return {'success': False, 'message': f'图片不存在: {image_path}'}
img = cv2.imread(image_path)
if img is None:
return {'success': False, 'message': f'无法读取图片: {image_path}'}
faces = self.app.get(img)
if len(faces) == 0:
return {'success': False, 'message': '未检测到人脸'}
if len(faces) > 1:
return {'success': False, 'message': '检测到多张人脸'}
face = faces[0]
# 4. 质量评估
quality_report = self.validator.assess_face_quality(face)
if not quality_report['passed']:
return {
'success': False,
'message': f'人脸质量不合格: 总分{quality_report["total_score"]}'
}
# 5. 保存特征向量到.npy文件(避免数据库膨胀)
embedding_dir = Path('data/embeddings') / employee_id
embedding_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
embedding_path = embedding_dir / f'{timestamp}.npy'
np.save(embedding_path, face.embedding)
# 6. 插入人脸记录
cursor.execute("""
INSERT INTO face_embeddings
(person_id, embedding, embedding_path, image_path,
face_quality_score, yaw, pitch, roll)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
person_id,
face.embedding.tobytes(), # 同时保存一份在数据库方便查询
str(embedding_path),
image_path,
quality_report['total_score'],
quality_report['pose']['yaw'],
quality_report['pose']['pitch'],
quality_report['pose']['roll']
))
# 7. 记录操作日志
cursor.execute("""
INSERT INTO operation_logs (operator, operation_type, target_person_id, details)
VALUES (?, ?, ?, ?)
""", ('batch_enroller', 'add', person_id,
f'批量录入,质量分: {quality_report["total_score"]}'))
conn.commit()
return {'success': True, 'person_id': person_id}
except Exception as e:
conn.rollback()
return {'success': False, 'message': str(e)}
finally:
conn.close()
# 命令行接口
def main():
parser = argparse.ArgumentParser(description='人脸批量录入工具')
parser.add_argument('--mode', choices=['csv', 'dir', 'single'], required=True,
help='录入模式: csv(批量csv文件), dir(目录结构), single(单张图片)')
parser.add_argument('--input', required=True, help='输入文件或目录路径')
parser.add_argument('--db', default='face_database.db', help='数据库路径')
parser.add_argument('--output-csv', help='输出结果CSV路径')
args = parser.parse_args()
enroller = BatchFaceEnroller(db_path=args.db)
if args.mode == 'csv':
enroller.enroll_from_csv(args.input)
elif args.mode == 'dir':
enroller.enroll_from_directory(args.input)
# ... 其他模式
if __name__ == '__main__':
main()
```
这个批量录入工具在实际部署中特别有用。比如新员工入职时,HR部门只需要提供一个包含工号、姓名、照片路径的CSV文件,就能一次性完成所有人脸的录入,大大减少了人工操作。
## 3. 特征向量管理与优化:提升查询效率与准确性
人脸特征向量通常是512维的浮点数数组,当人脸数量增长到数千甚至数万时,如何高效存储和快速查询就成了关键问题。我经历过从线性扫描到近似最近邻搜索的整个演进过程,这里分享一些实用的优化策略。
### 3.1 向量索引技术选型
对于不同规模的人脸库,我推荐不同的技术方案:
| 人脸数量 | 推荐方案 | 查询速度 | 内存占用 | 精度损失 | 适用场景 |
|---------|---------|---------|---------|---------|---------|
| < 1,000 | 线性扫描 + 余弦相似度 | 慢 | 低 | 无 | 原型开发、小规模测试 |
| 1,000 - 10,000 | Faiss Flat索引 | 中等 | 中等 | 无 | 中小型企业考勤 |
| 10,000 - 100,000 | Faiss IVF索引 | 快 | 中等 | 极小 | 大型企业、校园 |
| 100,000 - 1,000,000 | Faiss HNSW | 极快 | 高 | 可控 | 城市级安防 |
| > 1,000,000 | Milvus集群 | 可扩展 | 分布式 | 可控 | 超大规模应用 |
> **经验之谈**:不要过早优化。在项目早期(人脸数<1000),简单的线性扫描完全够用。只有当性能真正成为瓶颈时,才需要考虑引入Faiss等向量数据库。
### 3.2 Faiss集成实战
下面是一个将Faiss集成到现有系统的完整示例:
```python
import faiss
import numpy as np
import sqlite3
import pickle
from typing import List, Tuple
import time
class FaceVectorIndex:
def __init__(self, dimension=512, use_gpu=False):
self.dimension = dimension
self.use_gpu = use_gpu
self.index = None
self.id_to_person = {} # 映射:索引ID -> 人员ID
self.person_to_ids = {} # 映射:人员ID -> [索引ID列表]
# 初始化索引
self._init_index()
def _init_index(self):
"""初始化Faiss索引"""
# 使用内积作为相似度度量(余弦相似度需要归一化向量)
self.index = faiss.IndexFlatIP(self.dimension)
if self.use_gpu:
# 尝试使用GPU加速
try:
res = faiss.StandardGpuResources()
self.index = faiss.index_cpu_to_gpu(res, 0, self.index)
print("✅ 使用GPU加速Faiss索引")
except:
print("⚠️ GPU不可用,回退到CPU")
self.use_gpu = False
def build_from_database(self, db_path: str):
"""从数据库构建索引"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 获取所有人脸特征
cursor.execute("""
SELECT fe.embedding_id, fe.person_id, fe.embedding
FROM face_embeddings fe
JOIN persons p ON fe.person_id = p.person_id
WHERE p.status = 1 -- 只索引状态正常的人员
""")
embeddings = []
self.id_to_person = {}
self.person_to_ids = {}
current_idx = 0
for embedding_id, person_id, embedding_bytes in cursor.fetchall():
# 从字节流恢复向量
embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
# 归一化向量(余弦相似度需要)
embedding_norm = embedding / np.linalg.norm(embedding)
embeddings.append(embedding_norm)
self.id_to_person[current_idx] = (person_id, embedding_id)
# 更新人员到索引的映射
if person_id not in self.person_to_ids:
self.person_to_ids[person_id] = []
self.person_to_ids[person_id].append(current_idx)
current_idx += 1
if embeddings:
embeddings_array = np.array(embeddings).astype('float32')
self.index.add(embeddings_array)
print(f"✅ 索引构建完成,共 {len(embeddings)} 个人脸向量")
else:
print("⚠️ 数据库中没有可索引的人脸数据")
conn.close()
return len(embeddings)
def search(self, query_vector: np.ndarray, k: int = 5, threshold: float = 0.6):
"""搜索最相似的k个人脸"""
# 归一化查询向量
query_norm = query_vector / np.linalg.norm(query_vector)
query_norm = query_norm.reshape(1, -1).astype('float32')
# 搜索
start_time = time.time()
distances, indices = self.index.search(query_norm, k)
search_time = (time.time() - start_time) * 1000 # 毫秒
results = []
for i in range(len(indices[0])):
idx = indices[0][i]
distance = distances[0][i]
# Faiss返回的是内积,需要转换为余弦相似度
# 因为向量都已归一化,所以内积 = 余弦相似度
similarity = float(distance)
if idx != -1 and similarity >= threshold: # -1表示无效索引
person_id, embedding_id = self.id_to_person[idx]
results.append({
'person_id': person_id,
'embedding_id': embedding_id,
'similarity': similarity,
'rank': i + 1
})
return {
'results': results,
'search_time_ms': search_time,
'total_matches': len(results)
}
def add_vector(self, person_id: int, embedding: np.ndarray, embedding_id: int = None):
"""添加单个向量到索引"""
embedding_norm = embedding / np.linalg.norm(embedding)
embedding_norm = embedding_norm.reshape(1, -1).astype('float32')
# 获取下一个索引位置
new_idx = self.index.ntotal
# 添加到索引
self.index.add(embedding_norm)
# 更新映射
if embedding_id is None:
embedding_id = -1 # 临时ID,实际使用时需要从数据库获取
self.id_to_person[new_idx] = (person_id, embedding_id)
if person_id not in self.person_to_ids:
self.person_to_ids[person_id] = []
self.person_to_ids[person_id].append(new_idx)
return new_idx
def remove_by_person(self, person_id: int):
"""删除某个人的所有向量"""
if person_id not in self.person_to_ids:
return 0
# 注意:Faiss不支持直接删除,需要重建索引
# 在实际生产环境中,我们通常标记删除,定期重建索引
removed_count = len(self.person_to_ids[person_id])
# 从映射中移除
for idx in self.person_to_ids[person_id]:
if idx in self.id_to_person:
del self.id_to_person[idx]
del self.person_to_ids[person_id]
print(f"标记删除人员 {person_id} 的 {removed_count} 个向量")
return removed_count
def save_index(self, path: str):
"""保存索引和映射到文件"""
# 保存Faiss索引
faiss.write_index(self.index, f"{path}.faiss")
# 保存映射关系
with open(f"{path}.mapping", 'wb') as f:
pickle.dump({
'id_to_person': self.id_to_person,
'person_to_ids': self.person_to_ids
}, f)
print(f"✅ 索引已保存到 {path}")
def load_index(self, path: str):
"""从文件加载索引"""
# 加载Faiss索引
self.index = faiss.read_index(f"{path}.faiss")
# 加载映射关系
with open(f"{path}.mapping", 'rb') as f:
mapping = pickle.load(f)
self.id_to_person = mapping['id_to_person']
self.person_to_ids = mapping['person_to_ids']
print(f"✅ 索引已加载,共 {self.index.ntotal} 个向量")
# 使用示例
def benchmark_search_performance():
"""性能基准测试"""
# 模拟不同规模的数据集
dataset_sizes = [100, 1000, 5000, 10000]
for size in dataset_sizes:
print(f"\n{'='*50}")
print(f"测试数据集大小: {size}")
# 创建模拟数据
dimension = 512
vectors = np.random.randn(size, dimension).astype('float32')
# 归一化
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
vectors = vectors / norms
# 创建索引
index = faiss.IndexFlatIP(dimension)
index.add(vectors)
# 测试查询性能
query = np.random.randn(1, dimension).astype('float32')
query = query / np.linalg.norm(query)
# 线性扫描(作为对比)
start = time.time()
similarities = np.dot(vectors, query.T).flatten()
top_k_indices = np.argsort(similarities)[-5:][::-1]
linear_time = (time.time() - start) * 1000
# Faiss搜索
start = time.time()
distances, indices = index.search(query, 5)
faiss_time = (time.time() - start) * 1000
print(f"线性扫描时间: {linear_time:.2f}ms")
print(f"Faiss搜索时间: {faiss_time:.2f}ms")
print(f"加速比: {linear_time/faiss_time:.1f}x")
# 验证结果一致性
print(f"结果匹配: {np.array_equal(top_k_indices, indices[0])}")
```
这个向量索引系统在实际部署中,将我们的人脸识别查询速度从几百毫秒降低到了几毫秒,特别是在人脸库规模达到5000以上时,性能提升尤为明显。
## 4. 数据库版本管理与增量更新
在生产环境中,人脸数据库不是静态的。人员流动、照片更新、数据清理都是常态。如果没有良好的版本管理,一旦出现问题,你可能连“回退到昨天版本”都做不到。
### 4.1 基于Git的数据库版本控制
我借鉴了软件开发的版本控制思想,为人脸数据库设计了一套Git-like的版本管理系统:
```python
import hashlib
import json
from datetime import datetime
from pathlib import Path
import shutil
class FaceDatabaseVersioner:
def __init__(self, db_path, version_dir='versions'):
self.db_path = Path(db_path)
self.version_dir = Path(version_dir)
self.version_dir.mkdir(exist_ok=True)
# 版本元数据文件
self.meta_file = self.version_dir / 'versions.json'
if not self.meta_file.exists():
with open(self.meta_file, 'w') as f:
json.dump({'versions': [], 'current': None}, f)
def create_version(self, description: str, author: str = 'system'):
"""创建数据库版本快照"""
# 生成版本ID(基于时间戳和内容哈希)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# 计算数据库文件的哈希值
with open(self.db_path, 'rb') as f:
db_hash = hashlib.sha256(f.read()).hexdigest()[:8]
version_id = f"{timestamp}_{db_hash}"
version_path = self.version_dir / version_id
version_path.mkdir(exist_ok=True)
# 1. 复制数据库文件
db_copy_path = version_path / 'face_database.db'
shutil.copy2(self.db_path, db_copy_path)
# 2. 导出元数据
meta = self._export_metadata()
with open(version_path / 'metadata.json', 'w') as f:
json.dump(meta, f, indent=2, ensure_ascii=False)
# 3. 记录版本信息
with open(self.meta_file, 'r') as f:
versions_data = json.load(f)
version_info = {
'id': version_id,
'timestamp': datetime.now().isoformat(),
'description': description,
'author': author,
'size_mb': self.db_path.stat().st_size / (1024 * 1024),
'person_count': meta['summary']['total_persons'],
'face_count': meta['summary']['total_faces'],
'hash': db_hash
}
versions_data['versions'].append(version_info)
versions_data['current'] = version_id
with open(self.meta_file, 'w') as f:
json.dump(versions_data, f, indent=2, ensure_ascii=False)
print(f"✅ 版本 {version_id} 创建成功: {description}")
return version_id
def _export_metadata(self):
"""导出数据库元数据"""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取统计信息
cursor.execute("SELECT COUNT(*) FROM persons")
total_persons = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM face_embeddings")
total_faces = cursor.fetchone()[0]
cursor.execute("""
SELECT department, COUNT(*) as count
FROM persons
GROUP BY department
ORDER BY count DESC
""")
department_stats = cursor.fetchall()
cursor.execute("""
SELECT DATE(created_time) as date, COUNT(*) as count
FROM face_embeddings
GROUP BY DATE(created_time)
ORDER BY date DESC
LIMIT 30
""")
recent_additions = cursor.fetchall()
conn.close()
return {
'summary': {
'total_persons': total_persons,
'total_faces': total_faces,
'export_time': datetime.now().isoformat()
},
'department_distribution': [
{'department': dept, 'count': count}
for dept, count in department_stats
],
'recent_additions': [
{'date': date, 'count': count}
for date, count in recent_additions
]
}
def list_versions(self):
"""列出所有版本"""
with open(self.meta_file, 'r') as f:
versions_data = json.load(f)
print(f"{'版本ID':<20} {'时间':<25} {'描述':<30} {'人脸数':<10}")
print("="*90)
for version in versions_data['versions'][-10:]: # 显示最近10个版本
print(f"{version['id']:<20} "
f"{version['timestamp'][:19]:<25} "
f"{version['description'][:28]:<30} "
f"{version['face_count']:<10}")
current = versions_data.get('current')
if current:
print(f"\n当前版本: {current}")
def restore_version(self, version_id: str, backup_current: bool = True):
"""恢复到指定版本"""
version_path = self.version_dir / version_id
if not version_path.exists():
print(f"❌ 版本 {version_id} 不存在")
return False
# 备份当前数据库
if backup_current:
backup_id = self.create_version(f"恢复前的备份(准备恢复到{version_id})")
print(f"📋 已创建备份版本: {backup_id}")
# 恢复数据库文件
db_backup_path = version_path / 'face_database.db'
if not db_backup_path.exists():
print(f"❌ 版本 {version_id} 的数据库文件不存在")
return False
shutil.copy2(db_backup_path, self.db_path)
# 更新当前版本指针
with open(self.meta_file, 'r') as f:
versions_data = json.load(f)
versions_data['current'] = version_id
with open(self.meta_file, 'w') as f:
json.dump(versions_data, f, indent=2, ensure_ascii=False)
print(f"✅ 已成功恢复到版本 {version_id}")
return True
def create_diff_report(self, version_id1: str, version_id2: str):
"""比较两个版本的差异"""
# 实现版本差异分析
# 可以比较人员增减、特征更新等
pass
# 集成到主系统的版本管理
class VersionAwareFaceDatabase:
def __init__(self, db_path='face_database.db'):
self.db_path = db_path
self.versioner = FaceDatabaseVersioner(db_path)
# 自动创建初始版本(如果不存在)
self._ensure_initial_version()
def _ensure_initial_version(self):
"""确保至少有一个初始版本"""
with open(self.versioner.meta_file, 'r') as f:
versions_data = json.load(f)
if not versions_data['versions']:
self.versioner.create_version(
description="初始数据库版本",
author="system"
)
def add_person_with_versioning(self, person_data, face_embeddings):
"""带版本控制的人员添加"""
# 1. 创建版本前的快照
version_before = self.versioner.create_version(
description="添加人员前的状态",
author="auto"
)
try:
# 2. 执行数据库操作
# ... 实际的数据库插入逻辑
# 3. 创建版本后的快照
version_after = self.versioner.create_version(
description=f"添加人员: {person_data['name']}",
author="auto"
)
print(f"操作已版本化: {version_before} -> {version_after}")
return True
except Exception as e:
# 4. 如果出错,回滚到之前版本
print(f"操作失败,正在回滚到版本 {version_before}")
self.versioner.restore_version(version_before, backup_current=False)
raise e
def auto_version_on_schedule(self):
"""定时自动创建版本"""
import schedule
import time
# 每天凌晨创建版本
schedule.every().day.at("02:00").do(
lambda: self.versioner.create_version("每日自动备份", "scheduler")
)
# 每次批量操作后创建版本
schedule.every(1).hours.do(
lambda: self._check_and_create_version()
)
while True:
schedule.run_pending()
time.sleep(60)
def _check_and_create_version(self):
"""检查变更并创建版本"""
# 检查自上次版本以来的变更数量
# 如果变更超过阈值,自动创建版本
pass
```
### 4.2 增量更新与数据同步
在分布式部署中(比如多个考勤终端),数据库的增量更新是个挑战。我设计了一个基于操作日志的同步机制:
```python
class FaceDatabaseSync:
def __init__(self, central_db_path, local_db_path):
self.central_db = central_db_path
self.local_db = local_db_path
self.sync_log = []
def generate_diff(self, last_sync_time):
"""生成自上次同步以来的差异"""
import sqlite3
conn = sqlite3.connect(self.central_db)
cursor = conn.cursor()
# 获取变更记录
cursor.execute("""
SELECT
operation_type,
target_person_id,
details,
created_time
FROM operation_logs
WHERE created_time > ?
ORDER BY created_time
""", (last_sync_time,))
changes = cursor.fetchall()
# 转换为增量操作
diff_operations = []
for op_type, person_id, details, op_time in changes:
if op_type == 'add':
# 获取完整的人员和人脸数据
diff_operations.append(
self._get_person_data(person_id, 'add')
)
elif op_type == 'update':
diff_operations.append(
self._get_person_data(person_id, 'update')
)
elif op_type == 'delete':
diff_operations.append({
'operation': 'delete',
'person_id': person_id,
'timestamp': op_time
})
conn.close()
return {
'last_sync_time': last_sync_time,
'current_time': datetime.now().isoformat(),
'change_count': len(diff_operations),
'operations': diff_operations
}
def apply_diff(self, diff_data):
"""应用差异到本地数据库"""
stats = {'applied': 0, 'skipped': 0, 'failed': 0}
for operation in diff_data['operations']:
try:
if operation['operation'] == 'add':
self._apply_add_operation(operation)
stats['applied'] += 1
elif operation['operation'] == 'update':
self._apply_update_operation(operation)
stats['applied'] += 1
elif operation['operation'] == 'delete':
self._apply_delete_operation(operation)
stats['applied'] += 1
except Exception as e:
print(f"应用操作失败: {operation.get('person_id')} - {str(e)}")
stats['failed'] += 1
# 更新本地同步时间
self._update_sync_time(diff_data['current_time'])
return stats
def _get_person_data(self, person_id, operation_type):
"""获取人员的完整数据"""
# 实现从中央数据库获取人员和人脸数据的逻辑
pass
def _apply_add_operation(self, operation):
"""应用添加操作"""
# 实现将人员数据添加到本地数据库的逻辑
pass
def _apply_update_operation(self, operation):
"""应用更新操作"""
pass
def _apply_delete_operation(self, operation):
"""应用删除操作"""
pass
def _update_sync_time(self, sync_time):
"""更新本地同步时间"""
pass
# 使用示例:终端同步服务
class TerminalSyncService:
def __init__(self, terminal_id, central_db_url, local_db_path):
self.terminal_id = terminal_id
self.central_db_url = central_db_url
self.local_db_path = local_db_path
self.sync_interval = 300 # 5分钟同步一次
self.last_sync_time = self._load_last_sync_time()
def start_sync_loop(self):
"""启动同步循环"""
import threading
import time
def sync_worker():
while True:
try:
self.sync_with_central()
except Exception as e:
print(f"同步失败: {e}")
time.sleep(self.sync_interval)
thread = threading.Thread(target=sync_worker, daemon=True)
thread.start()
print(f"✅ 终端 {self.terminal_id} 同步服务已启动")
def sync_with_central(self):
"""与中央数据库同步"""
print(f"开始同步终端 {self.terminal_id}...")
# 1. 从中央服务器获取差异
diff_data = self._fetch_diff_from_central()
if diff_data['change_count'] == 0:
print("无变更需要同步")
return
print(f"发现 {diff_data['change_count']} 个变更")
# 2. 应用差异到本地
sync = FaceDatabaseSync(
central_db_path=None, # 实际使用中从网络获取
local_db_path=self.local_db_path
)
stats = sync.apply_diff(diff_data)
# 3. 重建本地索引
self._rebuild_local_index()
# 4. 记录同步结果
self._log_sync_result(stats, diff_data)
print(f"同步完成: 应用 {stats['applied']} 个, "
f"失败 {stats['failed']} 个")
def _fetch_diff_from_central(self):
"""从中央服务器获取差异数据"""
# 实际实现中会通过HTTP API或消息队列获取
# 这里返回模拟数据
return {
'last_sync_time': self.last_sync_time,
'current_time': datetime.now().isoformat(),
'change_count': 3,
'operations': [
{
'operation': 'add',
'person_id': 1001,
'name': '张三',
'embeddings': [...] # 实际的特征向量数据
},
# ... 其他操作
]
}
def _rebuild_local_index(self):
"""重建本地向量索引"""
# 调用FaceVectorIndex的build_from_database方法
pass
def _log_sync_result(self, stats, diff_data):
"""记录同步结果"""
self.last_sync_time = diff_data['current_time']
self._save_last_sync_time()
```
这套增量同步机制在我们部署的多个考勤终端上运行稳定,即使网络不稳定,也能保证数据的最终一致性。每个终端都维护自己的本地数据库和索引,定期从中央服务器同步变更,这样既保证了识别速度,又确保了数据的一致性。
## 5. 监控、维护与故障恢复
数据库上线后,运维工作才刚刚开始。我通过几个监控维度来确保系统的稳定运行:
### 5.1 健康检查与预警系统
```python
class DatabaseHealthMonitor:
def __init__(self, db_path):
self.db_path = db_path
self.metrics_history = []
self.alert_thresholds = {
'db_size_gb': 10.0, # 数据库文件大小
'fragmentation_percent': 30, # 数据库碎片率
'query_time_ms': 100, # 平均查询时间
'error_rate': 0.01, # 错误率
'memory_usage_mb': 1024 # 内存使用
}
def run_health_check(self):
"""运行全面的健康检查"""
checks = {
'database_integrity': self._check_database_integrity(),
'file_system': self._check_file_system(),
'performance': self._check_performance(),
'data_quality': self._check_data_quality(),
'index_health': self._check_index_health()
}
overall_status = 'HEALTHY'
alerts = []
for check_name, result in checks.items():
if result['status'] == 'WARNING':
alerts.append(f"⚠️ {check_name}: {result['message']}")
elif result['status'] == 'ERROR':
alerts.append(f"❌ {check_name}: {result['message']}")
overall_status = 'UNHEALTHY'
report = {
'timestamp': datetime.now().isoformat(),
'overall_status': overall_status,
'checks': checks,
'alerts': alerts,
'metrics': self._collect_metrics()
}
self.metrics_history.append(report)
# 保留最近30天的记录
if len(self.metrics_history) > 30 * 24: # 假设每小时检查一次
self.metrics_history = self.metrics_history[-30*24:]
return report
def _check_database_integrity(self):
"""检查数据库完整性"""
import sqlite3
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 执行完整性检查
cursor.execute("PRAGMA integrity_check")
result = cursor.fetchone()
conn.close()
if result[0] == 'ok':
return {'status': 'HEALTHY', 'message': '数据库完整性检查通过'}
else:
return {'status': 'ERROR', 'message': f'数据库损坏: {result[0]}'}
except Exception as e:
return {'status': 'ERROR', 'message': f'数据库连接失败: {str(e)}'}
def _check_file_system(self):
"""检查文件系统状态"""
from pathlib import Path
db_file = Path(self.db_path)
if not db_file.exists():
return {'status': 'ERROR', 'message': '数据库文件不存在'}
file_size_gb = db_file.stat().st_size / (1024**3)
if file_size_gb > self.alert_thresholds['db_size_gb']:
return {
'status': 'WARNING',
'message': f'数据库文件过大: {file_size_gb:.2f}GB'
}
# 检查磁盘空间
import shutil
total, used, free = shutil.disk_usage(db_file.parent)
free_gb = free / (1024**3)
if free_gb < 5: # 小于5GB空闲空间
return {
'status': 'WARNING',
'message': f'磁盘空间不足: {free_gb:.1f}GB 空闲'
}
return {'status': 'HEALTHY', 'message': '文件系统检查正常'}
def _check_performance(self):
"""检查查询性能"""
# 执行基准查询
import time
test_queries = [
"SELECT COUNT(*) FROM persons",
"SELECT * FROM face_embeddings LIMIT 10",
"SELECT person_id, COUNT(*) FROM face_embeddings GROUP BY person_id"
]
query_times = []
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for query in test_queries:
start = time.time()
cursor.execute(query)
cursor.fetchall() # 确保执行完成
query_time = (time.time() - start) * 1000 # 毫秒
query_times.append(query_time)
conn.close()
avg_time = sum(query_times) / len(query_times)
if avg_time > self.alert_thresholds['query_time_ms']:
return {
'status': 'WARNING',
'message': f'查询性能下降: 平均 {avg_time:.1f}ms'
}
return {
'status': 'HEALTHY',
'message': f'查询性能正常: 平均 {avg_time:.1f}ms'
}
def _check_data_quality(self):
"""检查数据质量"""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 检查无效的人脸数据
cursor.execute("""
SELECT COUNT(*)
FROM face_embeddings
WHERE face_quality_score < 0.5
""")
low_quality_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM face_embeddings")
total_count = cursor.fetchone()[0]
low_quality_ratio = low_quality_count / total_count if total_count > 0 else 0
conn.close()
if low_quality_ratio > 0.1: # 超过10%的低质量数据
return {
'status': 'WARNING',
'message': f'低质量数据比例过高: {low_quality_ratio:.1%}'
}
return {'status': 'HEALTHY', 'message': '数据质量检查正常'}
def _check_index_health(self):
"""检查向量索引健康状态"""
# 检查索引文件是否存在、是否可加载
index_path = Path('data/face_index.faiss')
if not index_path.exists():
return {'status': 'ERROR', 'message': '向量索引文件不存在'}
try:
import faiss
index = faiss.read_index(str(index_path))
vector_count = index.ntotal
# 检查索引与数据库的一致性
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM face_embeddings")
db_count = cursor.fetchone()[0]
conn.close()
if vector_count != db_count:
return {
'status': 'WARNING',
'message': f'索引与数据库不一致: 索引{vector_count}条, 数据库{db_count}条'
}
return {
'status': 'HEALTHY',
'message': f'索引状态正常: {vector_count}个向量'
}
except Exception as e:
return {'status': 'ERROR', 'message': f'索引加载失败: {str(e)}'}
def _collect_metrics(self):
"""收集监控指标"""
import psutil
import sqlite3
metrics = {
'timestamp': datetime.now().isoformat(),
'memory_usage_mb': psutil.Process().memory_info().rss / 1024 / 1024,
'cpu_percent': psutil.cpu_percent(interval=1),
'disk_usage': psutil.disk_usage('.').percent
}
# 数据库指标
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM persons")
metrics['person_count'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM face_embeddings")
metrics['face_count'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM operation_logs WHERE created_time > datetime('now', '-1 day')")
metrics['daily_operations'] = cursor.fetchone()[0]
conn.close()
return metrics
def generate_daily_report(self):
"""生成日报"""
# 分析24小时内的监控数据
recent_reports = [r for r in self.metrics_history
if datetime.fromisoformat(r['timestamp']) >
datetime.now().replace(hour=0, minute=0, second=0)]
if not recent_reports:
return "无今日监控数据"
# 计算统计信息
status_counts = {'HEALTHY': 0, 'WARNING': 0, 'ERROR': 0}
for report in recent_reports:
status_counts[report['overall_status']] += 1
# 提取性能指标
query_times = []
memory_usage = []
for report in recent_reports:
if 'metrics' in report:
# 这里可以添加更多指标分析
pass
report_text = f"""
人脸数据库健康日报 - {datetime.now().strftime('%Y-%m-%d')}
============================================
📊 今日概览
-----------
检查次数: {len(recent_reports)}
健康状态: {status_counts['HEALTHY']} 次正常
{status_counts['WARNING']} 次警告
{status_counts['ERROR']} 次错误
📈 关键指标
-----------
总人员数: {recent_reports[-1]['metrics'].get('person_count', 'N/A')}
总人脸数: {recent_reports[-1]['metrics'].get('face_count', 'N/A')}
今日操作: {recent_reports[-1]['metrics'].get('daily_operations', 'N/A')} 次
⚠️ 今日告警
-----------
"""
# 添加告警信息
alerts_today = []
for report in recent_reports:
alerts_today.extend(report.get('alerts', []))
if alerts_today:
unique_alerts = set(alerts_today)
for alert in unique_alerts:
report_text += f"- {alert}\n"
else:
report_text += "无告警\n"
report_text += f"""
🔧 建议操作
-----------
"""
# 根据检查结果给出建议
if status_counts['ERROR'] > 0:
report_text += "1. 立即检查数据库完整性并修复\n"
if status_counts['WARNING'] > 3:
report_text += "2. 优化数据库性能,考虑重建索引\n"
# 检查是否需要清理
db_file = Path(self.db_path)
file_size_gb = db_file.stat().st_size / (1024**3)
if file_size_gb > 5:
report_text += f"3. 数据库文件较大({file_size_gb:.1f}GB),考虑归档旧数据\n"
return report_text
# 定时监控任务
def start_monitoring_service(db_path, check_interval_minutes=60):
"""启动监控服务"""
monitor = DatabaseHealthMonitor(db_path)
def monitoring_loop():
while True:
try:
report = monitor.run_health_check()
# 如果有错误,发送告警
if report['overall_status'] == 'ERROR':
send_alert(report)
# 每天生成一次报告
if datetime.now().hour == 8 and datetime.now().minute < 5:
daily_report = monitor.generate_daily_report()
send_daily_report(daily_report)
except Exception as e:
print(f"监控检查失败: {e}")
time.sleep(check_interval_minutes * 60)
import threading
thread = threading.Thread(target=monitoring_loop, daemon=True)
thread.start()
print(f"✅ 数据库监控服务已启动,每 {check_interval_minutes} 分钟检查一次")
def send_alert(report):
"""发送告警(示例实现)"""
# 实际实现中可以集成邮件、短信、钉钉、企业微信等
print(f"🚨 数据库告警: {report['overall_status']}")
for alert in report.get('alerts', []):
print(f" {alert}")
def send_daily_report(report_text):
"""发送日报"""
print("📊 发送数据库健康日报")
print(report_text)
```
### 5.2 自动化维护任务
除了监控,我还设置了一系列自动化维护任务:
```python
class DatabaseMaintenance:
def __init__(self, db_path):
self.db_path = db_path
def run_vacuum(self):
"""执行数据库压缩(VACUUM)"""
import sqlite3
print("开始执行数据库压缩...")
start_time = time.time()
conn = sqlite3.connect(self.db_path)
conn.execute("VACUUM")
conn.close()
elapsed = time.time() - start_time
print(f"数据库压缩完成,耗时 {elapsed:.1f} 秒")
return {'operation': 'VACUUM', 'duration_seconds': elapsed}
def rebuild_indexes(self):
"""重建所有索引"""
import sqlite3
print("开始重建数据库索引...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取所有索引
cursor.execute("SELECT name FROM sqlite_master WHERE type='index'")
indexes = [row[0] for row in cursor.fetchall()]
rebuild_log = []
for index_name in indexes:
if index_name.startswith('sqlite_'): # 跳过系统索引
continue
print(f"重建索引: {index_name}")
start_time = time.time()
try:
cursor.execute(f"REINDEX {index_name}")
elapsed = time.time() - start_time
rebuild_log.append({
'index': index_name,
'status': 'SUCCESS',
'duration_seconds': elapsed
})
print(f" ✓ 完成,耗时 {elapsed:.1f} 秒")
except Exception as e:
rebuild_log.append({
'index': index_name,
'status': 'FAILED',
'error': str(e)
})
print(f" ✗ 失败: {e}")
conn.close()
return {'operation': 'REINDEX', 'logs': rebuild_log}
def cleanup_old_logs(self, days_to_keep=30):
"""清理旧的操作日志"""
import sqlite3
print(f"清理 {days_to_keep} 天前的操作日志...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 删除旧日志
cursor.execute("""
DELETE FROM operation_logs
WHERE created_time < datetime('now', ?)
""", (f'-{days_to_keep} days',))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
print(f"已删除 {deleted_count} 条旧日志")
return {
'operation': 'CLEANUP_LOGS',
'days_kept': days_to_keep,
'deleted_count': deleted_count
}
def archive_inactive_persons(self, months_inactive=6):
"""归档长期未使用的人员数据"""
import sqlite3
from datetime import datetime, timedelta
cutoff_date = (datetime.now() - timedelta(days=months_inactive*30)).isoformat()
print(f"归档 {months_inactive} 个月内未使用的人员数据...")
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 1. 找出长期未使用的人员
cursor.execute("""
SELECT p.person_id, p.name, p.employee_id
FROM persons p
LEFT JOIN operation_logs l ON p.person_id = l.target_person_id
WHERE (l.created_time IS NULL OR l.created_time < ?)
AND p.status = 1
GROUP BY p.person_id
""", (cutoff_date,))
inactive_persons = cursor.fetchall()
if not inactive_persons:
print("没有需要归档的人员")
return {'operation': 'ARCHIVE', 'archived_count': 0}
# 2. 创建归档表(如果不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS archived_persons (
person_id INTEGER PRIMARY KEY,
employee_id VARCHAR(32),
name VARCHAR(64),
department VARCHAR(64),
archive_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
original_data TEXT
)
""")
# 3. 归档数据
archived_count = 0
for person_id, name, employee_id in inactive_persons:
# 获取完整人员数据
cursor.execute("""
SELECT * FROM persons WHERE person_id = ?
""", (person_id,))
person_data = cursor.fetchone()
cursor.execute("""
SELECT * FROM face_embeddings WHERE person_id = ?
""", (person_id,))
face_data = cursor.fetchall()
# 保存到归档表
original_data = {
'person': dict(zip([desc[0] for desc in cursor.description], person_data)),
'faces': [
dict(zip([desc[0] for desc in cursor.description], face))
for face in face_data
]
}
import json
cursor.execute("""
INSERT INTO archived_persons
(person_id, employee_id, name, original_data)
VALUES (?, ?, ?, ?)
""", (person_id, employee_id, name, json.dumps(original_data)))
# 标记为已归档(而不是删除)
cursor.execute("""
UPDATE persons SET status = 0 WHERE person_id = ?
""", (person_id,))
archived_count += 1
conn.commit()
conn.close()
print(f"已归档 {archived_count} 个长期未使用的人员")
return {
'operation': 'ARCHIVE',
'archived_count': archived_count,
'cutoff_date': cutoff_date
}
def run_full_maintenance(self):
"""执行完整的维护流程"""
print("="*50)
print("开始执行数据库完整维护")
print("="*50)
results = []
# 1. 清理旧日志
results.append(self.cleanup_old_logs())
# 2. 归档不活跃人员
results.append(self.archive_inactive_persons())
# 3. 重建索引
results.append(self.rebuild_indexes())
# 4. 数据库压缩(建议在业务低峰期执行)
# results.append(self.run_vacuum())
print("="*50)
print("数据库维护完成")
print("="*50)
return results
# 定时维护调度
def schedule_maintenance_tasks(db_path):
"""调度维护任务"""
import schedule
import time
maintenance = DatabaseMaintenance(db_path)
# 每天凌晨3点清理旧日志
schedule.every().day.at("03:00").do(
lambda: maintenance.cleanup_old_logs(days_to_keep=30)
)
# 每周日凌晨2点归档不活跃人员
schedule.every().sunday.at("02:00").do(
lambda: maintenance.archive_inactive_persons(months_inactive=6)
)
# 每月1号凌晨1点重建索引
schedule.every().monday.at("01:00").do(
maintenance.rebuild_indexes
)
# 每季度执行一次完整维护
schedule.every(90).days.do(
maintenance.run_full_maintenance
)
print("数据库维护任务已调度")
while True:
schedule.run_pending()
time.sleep(60)
```
这些监控和维护机制在实际运维中帮我们避免了很多潜在问题。比如,有一次磁盘空间不足的预警让我们提前清理了日志文件,避免了数据库写入失败。另一次性能下降的警报让我们及时重建了索引,恢复了识别速度。
## 6. 实战:从零构建生产级人脸数据库系统
最后,让我用一个完整的示例展示如何将这些组件组合成一个生产可用的系统。这个系统在我们公司的考勤项目中稳定运行了两年,支撑着每天数千次的识别请求。
```python
class ProductionFaceSystem:
"""生产级人脸识别系统"""
def __init__(self, config_path='config.yaml'):
self.config = self._load_config(config_path)
self.db_path = self.config['database']['path']
self.index_path = self.config['index']['path']
# 初始化组件
self.db = self._init_database()
self.validator = FaceQualityValidator(
quality_threshold=self.config['quality']['threshold']
)
self.index = FaceVectorIndex(
dimension=512,
use_gpu=self.config['index'].get('use_gpu', False)
)
self.versioner = FaceDatabaseVersioner(
db_path=self.db_path,
version_dir=self.config['version']['dir']
)
self.monitor = DatabaseHealthMonitor(self.db_path)
# 加载现有索引
self._load_or_build_index()
print("✅ 人脸识别系统初始化完成")
def _load_config(self, config_path):
"""加载配置文件"""
import yaml
default_config = {
'database': {
'path': 'data/face_database.db',
'backup_dir': 'backups',
'auto_backup': True
},
'index': {
'path': 'data/face_index',
'use_gpu': True,
'rebuild_interval_hours': 24
},
'quality': {
'threshold': 0.75,
'min_faces_per_person': 1,
'max_faces_per_person': 10
},
'version': {
'dir': 'versions',
'auto_version': True,
'max_versions': 100
},
'monitoring': {
'enabled': True,
'check_interval_minutes': 60,
'alert_emails': []
}
}
try:
with open(config_path, 'r') as f:
user_config = yaml.safe_load(f)
# 合并配置
for key in default_config:
if key in user_config:
if isinstance(default_config[key], dict) and isinstance(user_config[key], dict):
default_config[key].update(user_config[key])
else:
default_config[key] = user_config[key]
except FileNotFoundError:
print(f"⚠️ 配置文件 {config_path} 不存在,使用默认配置")
return default_config
def _init_database(self):
"""初始化数据库连接和表结构"""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS persons (
person_id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id VARCHAR(32) UNIQUE,
name VARCHAR(64) NOT NULL,
department VARCHAR(64),
role VARCHAR(32) DEFAULT 'employee',
status INTEGER DEFAULT 1,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS face_embeddings (
embedding_id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL,
embedding BLOB NOT NULL,
embedding_path VARCHAR(256),
image_path VARCHAR(256),
face_quality_score FLOAT,
yaw FLOAT,
pitch FLOAT,
roll FLOAT,
brightness FLOAT,
sharpness FLOAT,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (person_id) REFERENCES persons(person_id) ON DELETE CASCADE
)
""")
# 创建其他表...
conn.commit()
return conn
def _load_or_build_index(self):
"""加载或构建向量索引"""
index_file = Path(f"{self.index_path}.faiss")
mapping_file = Path(f"{self.index_path}.mapping")
if index_file.exists() and mapping_file.exists():
try:
self.index.load_index(self.index_path)
print(f"✅ 已加载现有索引,包含 {self.index.index.ntotal} 个向量")
except Exception as e:
print(f"⚠️ 索引加载失败,将重新构建: {e}")
self._rebuild_index()
else:
print("未找到索引文件,开始构建索引...")
self._rebuild_index()
def _rebuild_index(self):
"""重新构建向量索引"""
print("开始构建向量索引...")
start_time = time.time()
vector_count = self.index.build_from_database(self.db_path)
# 保存索引
self.index.save_index(self.index_path)
elapsed = time.time() - start_time
print(f"✅ 索引构建完成,共 {vector_count} 个向量,耗时 {elapsed:.1f} 秒")
return vector_count
def enroll_person(self, person_data, image_paths):
"""注册新人员(生产环境版本)"""
import sqlite3
# 输入验证
if not person_data.get('employee_id') or not person_data.get('name'):
raise ValueError("必须提供 employee_id 和 name")
if not image_paths:
raise ValueError("必须提供至少一张人脸图片")
# 检查人员是否已存在
cursor = self.db.cursor()
cursor.execute(
"SELECT person_id FROM persons WHERE employee_id = ?",
(person_data['employee_id'],)
)
existing = cursor.fetchone()
if existing:
person_id = existing[0]
print(f"人员 {person_data['name']} 已存在,将更新人脸数据")
operation_type = 'update'
else:
# 插入新人员
cursor.execute("""
INSERT INTO persons (employee_id, name, department, role)
VALUES (?, ?, ?, ?)
""", (
person_data['employee_id'],
person_data['name'],
person_data.get('department', ''),
person_data.get('role', 'employee')
))
person_id = cursor.lastrowid
operation_type = 'add'
# 处理每张图片
successful_embeddings = 0
for img_path in image_paths:
try:
result = self._process_single_face(person_id, img_path)
if result['success']:
successful_embeddings += 1
else:
print(f"图片 {img_path} 处理失败: {result['message']}")
except Exception as e:
print(f"图片 {img_path} 处理异常: {e}")
if successful_embeddings == 0:
self.db.rollback()
raise ValueError("所有人脸图片处理失败,注册未完成")
# 提交事务
self.db.commit()
# 更新向量索引
self._update_index_for_person(person_id)
# 创建版本(如果启用)
if self.config['version']['auto_version']:
self.versioner.create_version(
description=f"{operation_type}人员: {person_data['name']}",
author=person_data.get('operator', 'system')
)
# 记录操作日志
cursor.execute("""
INSERT INTO operation_logs
(operator, operation_type, target_person_id, details)
VALUES (?, ?, ?, ?)
""", (
person_data.get('operator', 'system'),
operation_type,
person_id,
f"成功添加 {successful_embeddings} 张人脸图片"
))
self.db.commit()
print(f"✅ 人员 {person_data['name']} 注册完成,成功添加 {successful_embeddings} 张人脸")
return {
'person_id