<think>我们使用PyMongo库来操作MongoDB。首先确保已经安装PyMongo:pip install pymongo
在MongoDB中,创建集合是隐式的。当我们第一次向一个不存在的集合插入数据时,MongoDB会自动创建该集合。
但是,我们也可以显式地创建一个空集合,或者创建一个带有验证规则的集合(在MongoDB 3.2+中支持)。
下面将展示几种不同的创建集合的方法:
1. 隐式创建集合(通过插入文档)
2. 显式创建空集合(使用create_collection方法)
3. 创建带有验证规则的集合
同时,我们也会展示如何创建固定集合(capped collection)和时序集合(time series collection,MongoDB 5.0+)。
</think>
# Python 中 MongoDB 集合创建的全面指南
在 Python 中创建 MongoDB 集合是与 NoSQL 数据库交互的基础操作。MongoDB 采用动态模式设计,集合在首次插入文档时自动创建,但我们可以显式创建集合并配置各种选项。
## 1. 基础集合创建
### 1.1 安装 PyMongo
```bash
pip install pymongo
```
### 1.2 基本集合创建代码
```python
from pymongo import MongoClient
from pymongo.errors import OperationFailure
def create_basic_collection():
try:
# 连接到 MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# 创建集合(如果不存在)
collection_name = 'users'
if collection_name not in db.list_collection_names():
# 显式创建集合
db.create_collection(collection_name)
print(f"集合 '{collection_name}' 创建成功")
else:
print(f"集合 '{collection_name}' 已存在")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
# 关闭连接
client.close()
# 调用函数创建集合
create_basic_collection()
```
### 代码解释:
1. **连接 MongoDB**:使用 `MongoClient` 建立连接
2. **选择数据库**:`client['mydatabase']` 选择或创建数据库
3. **检查集合存在**:`list_collection_names()` 列出所有集合
4. **创建集合**:`create_collection()` 显式创建集合
5. **关闭连接**:`client.close()` 释放资源
## 2. 高级集合配置
### 2.1 创建带验证规则的集合
```python
def create_validated_collection():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# 定义验证规则
validator = {
'$jsonSchema': {
'bsonType': 'object',
'required': ['name', 'email', 'age'],
'properties': {
'name': {
'bsonType': 'string',
'description': '必须是字符串类型'
},
'email': {
'bsonType': 'string',
'pattern': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$',
'description': '必须是有效的电子邮件地址'
},
'age': {
'bsonType': 'int',
'minimum': 18,
'maximum': 120,
'description': '年龄必须在18-120之间'
}
}
}
}
# 创建带验证规则的集合
db.create_collection(
'validated_users',
validator=validator,
validationAction='error', # 或 'warn'
validationLevel='strict' # 或 'moderate'
)
print("带验证规则的集合 'validated_users' 创建成功")
# 测试验证规则
try:
db.validated_users.insert_one({
'name': 'John Doe',
'email': 'invalid-email', # 无效邮箱
'age': 25
})
except OperationFailure as e:
print(f"验证规则生效: {e.details['errmsg']}")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
create_validated_collection()
```
### 2.2 创建带索引的集合
```python
def create_collection_with_indexes():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# 创建集合
collection_name = 'products'
if collection_name not in db.list_collection_names():
db.create_collection(collection_name)
collection = db[collection_name]
# 创建索引
# 单字段索引
collection.create_index('name')
# 复合索引
collection.create_index([('category', 1), ('price', -1)])
# 唯一索引
collection.create_index('sku', unique=True)
# 文本索引
collection.create_index([('description', 'text')])
# TTL索引 (自动过期)
collection.create_index('created_at', expireAfterSeconds=3600*24*30) # 30天后过期
print(f"集合 '{collection_name}' 已创建并添加索引")
# 查看索引
indexes = list(collection.list_indexes())
print(f"集合索引: {[index['name'] for index in indexes]}")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
create_collection_with_indexes()
```
## 3. 特殊类型集合
### 3.1 创建固定集合 (Capped Collection)
```python
def create_capped_collection():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# 创建固定集合
db.create_collection(
'system_logs',
capped=True,
size=10000000, # 10MB大小限制
max=1000 # 最多1000个文档
)
print("固定集合 'system_logs' 创建成功")
# 测试插入文档
collection = db['system_logs']
for i in range(1500):
collection.insert_one({
'timestamp': datetime.datetime.utcnow(),
'message': f'Log entry {i}',
'level': 'INFO' if i % 2 == 0 else 'ERROR'
})
# 检查文档数量 (应不超过1000)
print(f"文档数量: {collection.count_documents({})}")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
create_capped_collection()
```
### 3.2 创建时序集合 (Time Series Collection)
```python
def create_time_series_collection():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['iot_database']
# 创建时序集合 (MongoDB 5.0+)
db.create_collection(
'sensor_data',
timeseries={
'timeField': 'timestamp', # 必需的时间字段
'metaField': 'sensor_meta', # 元数据字段 (可选)
'granularity': 'minutes' # 时间粒度: seconds, minutes, hours
}
)
print("时序集合 'sensor_data' 创建成功")
# 插入时序数据
collection = db['sensor_data']
for i in range(10):
collection.insert_one({
'timestamp': datetime.datetime.utcnow() - datetime.timedelta(minutes=i),
'sensor_meta': {'sensor_id': i % 3, 'location': f'Area {i//3}'},
'temperature': 20 + random.random() * 10,
'humidity': 40 + random.random() * 20
})
# 查询时序数据
results = collection.find().sort('timestamp', -1).limit(3)
for doc in results:
print(f"{doc['timestamp']} - Temp: {doc['temperature']:.1f}°C")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
create_time_series_collection()
```
## 4. 集合管理工具
### 4.1 集合操作工具类
```python
class MongoDBCollectionManager:
def __init__(self, db_name, uri='mongodb://localhost:27017/'):
self.client = MongoClient(uri)
self.db = self.client[db_name]
def collection_exists(self, collection_name):
"""检查集合是否存在"""
return collection_name in self.db.list_collection_names()
def create_collection(self, collection_name, **options):
"""创建集合并返回状态"""
try:
if self.collection_exists(collection_name):
return {'status': 'exists', 'collection': collection_name}
self.db.create_collection(collection_name, **options)
return {'status': 'created', 'collection': collection_name}
except OperationFailure as e:
return {'status': 'error', 'error': str(e)}
def drop_collection(self, collection_name):
"""删除集合"""
try:
if not self.collection_exists(collection_name):
return {'status': 'not_found', 'collection': collection_name}
self.db[collection_name].drop()
return {'status': 'dropped', 'collection': collection_name}
except OperationFailure as e:
return {'status': 'error', 'error': str(e)}
def clone_collection(self, source_name, target_name):
"""克隆集合结构和数据"""
try:
if not self.collection_exists(source_name):
return {'status': 'source_not_found'}
# 获取源集合索引
source = self.db[source_name]
indexes = list(source.list_indexes())
# 创建目标集合
result = self.create_collection(target_name)
if result['status'] != 'created':
return result
# 复制索引
target = self.db[target_name]
for index in indexes:
if index['name'] != '_id_': # 跳过默认的_id索引
keys = index['key']
options = {k: v for k, v in index.items() if k not in ['v', 'key', 'ns']}
target.create_index(keys, **options)
# 复制数据
source.aggregate([{'$out': target_name}])
return {'status': 'cloned', 'source': source_name, 'target': target_name}
except OperationFailure as e:
return {'status': 'error', 'error': str(e)}
def close(self):
"""关闭连接"""
self.client.close()
# 使用示例
if __name__ == "__main__":
manager = MongoDBCollectionManager('company_db')
# 创建带验证规则的集合
validator = {
'$jsonSchema': {
'bsonType': 'object',
'required': ['employee_id', 'name'],
'properties': {
'employee_id': {'bsonType': 'int'},
'name': {'bsonType': 'string'},
'email': {'bsonType': 'string', 'pattern': '.+@.+\\..+'}
}
}
}
result = manager.create_collection('employees', validator=validator)
print(f"创建结果: {result}")
# 克隆集合
clone_result = manager.clone_collection('employees', 'employees_backup')
print(f"克隆结果: {clone_result}")
# 清理
manager.drop_collection('employees_backup')
manager.close()
```
## 5. 最佳实践
### 5.1 安全连接与配置
```python
from pymongo import MongoClient, WriteConcern, ReadPreference
from pymongo.errors import ConnectionFailure, OperationFailure
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
def secure_collection_creation():
try:
# 从环境变量获取配置
MONGO_URI = os.getenv('MONGO_URI', 'mongodb://localhost:27017/')
DB_NAME = os.getenv('MONGO_DB', 'secure_db')
# 创建安全连接
client = MongoClient(
MONGO_URI,
tls=True, # 启用TLS加密
tlsCAFile='/path/to/ca.pem',
tlsCertificateKeyFile='/path/to/client.pem',
authMechanism='SCRAM-SHA-256',
appname='SecureApp',
maxPoolSize=10,
minPoolSize=2,
serverSelectionTimeoutMS=5000
)
# 验证连接
try:
client.admin.command('ping')
print("成功连接到 MongoDB")
except ConnectionFailure:
print("无法连接到 MongoDB")
return
db = client[DB_NAME]
# 创建带写入关注的集合
wc_majority = WriteConcern("majority", wtimeout=1000)
db.create_collection('audit_log', writeConcern=wc_majority)
# 配置集合
audit_collection = db['audit_log'].with_options(
read_preference=ReadPreference.SECONDARY_PREFERRED,
write_concern=wc_majority
)
# 创建索引
audit_collection.create_index('timestamp')
audit_collection.create_index('user_id')
print("安全集合 'audit_log' 创建并配置完成")
# 插入测试文档
try:
result = audit_collection.insert_one({
'timestamp': datetime.datetime.utcnow(),
'user_id': 'system',
'action': 'collection_created',
'details': {'collection': 'audit_log'}
})
print(f"插入文档 ID: {result.inserted_id}")
except OperationFailure as e:
print(f"写入失败: {e}")
except Exception as e:
print(f"错误: {e}")
finally:
client.close()
secure_collection_creation()
```
### 5.2 性能优化技巧
```python
def create_optimized_collection():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['performance_db']
# 创建优化的集合
db.create_collection('high_perf_data')
collection = db['high_perf_data']
# 1. 创建合适的索引
collection.create_index([('timestamp', 1)]) # 升序索引
collection.create_index([('category', 1), ('value', -1)]) # 复合索引
# 2. 使用投影减少返回数据
print("使用投影查询:")
for doc in collection.find({'category': 'A'}, {'_id': 0, 'timestamp': 1, 'value': 1}).limit(3):
print(doc)
# 3. 批量插入提高性能
print("\n批量插入文档:")
documents = [
{'timestamp': datetime.datetime.utcnow(), 'category': chr(65+i), 'value': i*10}
for i in range(1000)
]
result = collection.insert_many(documents)
print(f"插入 {len(result.inserted_ids)} 个文档")
# 4. 使用聚合管道优化查询
print("\n聚合查询结果:")
pipeline = [
{'$match': {'value': {'$gt': 500}}},
{'$group': {'_id': '$category', 'avg_value': {'$avg': '$value'}}},
{'$sort': {'avg_value': -1}}
]
for result in collection.aggregate(pipeline):
print(result)
# 5. 使用分片扩展性能
# 注意:分片需要在分片集群环境中配置
# client.admin.command('enableSharding', 'performance_db')
# client.admin.command('shardCollection', 'performance_db.high_perf_data', key={'timestamp': 1})
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
create_optimized_collection()
```
## 6. 集合监控与维护
### 6.1 集合状态监控
```python
def monitor_collection():
try:
client = MongoClient('mongodb://localhost:27017/')
db = client['monitoring_db']
collection = db['monitored_data']
# 确保集合存在
if 'monitored_data' not in db.list_collection_names():
db.create_collection('monitored_data')
# 添加监控数据
for i in range(100):
collection.insert_one({
'timestamp': datetime.datetime.utcnow(),
'metric': 'cpu_usage',
'value': random.uniform(0, 100)
})
time.sleep(0.1)
# 获取集合统计信息
stats = db.command('collStats', 'monitored_data')
print("\n集合统计信息:")
print(f"- 文档数量: {stats['count']}")
print(f"- 存储大小: {stats['size'] / 1024:.2f} KB")
print(f"- 索引大小: {stats['totalIndexSize'] / 1024:.2f} KB")
print(f"- 平均文档大小: {stats['avgObjSize']:.2f} bytes")
print(f"- 索引使用情况: {stats['indexSizes']}")
# 获取索引使用统计
index_stats = collection.aggregate([
{'$indexStats': {}}
])
print("\n索引使用统计:")
for idx in index_stats:
print(f"索引 '{idx['name']}':")
print(f" 访问次数: {idx['accesses']['ops']}")
print(f" 最后访问: {idx['accesses']['since']}")
# 执行查询计划分析
explain = collection.find({'value': {'$gt': 80}}).explain()
print("\n查询计划分析:")
print(f"查询类型: {explain['queryPlanner']['winningPlan']['stage']}")
print(f"扫描文档数: {explain['executionStats']['totalDocsExamined']}")
print(f"返回文档数: {explain['executionStats']['nReturned']}")
except OperationFailure as e:
print(f"数据库操作错误: {e}")
finally:
client.close()
monitor_collection()
```