# Python开发者必看:如何用Locust轻松实现百万级并发测试(附实战代码)
如果你是一名Python开发者,正在为高并发性能测试发愁,或者厌倦了JMeter这类工具的笨重和复杂,那么Locust很可能就是你一直在寻找的答案。我最初接触性能测试时,也经历过在JMeter里配置线程组、监听器的繁琐过程,直到发现了Locust,才真正体会到用代码驱动压测的优雅和灵活。
Locust的魅力在于,它把性能测试脚本的编写完全交给了Python。这意味着你可以用你最熟悉的语言,像写业务逻辑一样去描述用户行为。更重要的是,它基于gevent协程的架构,让单机模拟成千上万的并发用户成为可能,彻底打破了传统工具在并发数上的瓶颈。我曾在一次电商大促前的压测中,用一台普通的8核开发机,轻松模拟了超过5万的并发用户,而内存占用还不到2GB。这种效率,是传统基于线程/进程的工具难以企及的。
这篇文章不是一篇简单的入门教程。我会带你深入Locust的协程机制,拆解它如何用极少的资源实现高并发,分享我在实际项目中优化Locust脚本、构建分布式压测集群的经验,并提供可以直接复用的实战代码。无论你是要测试一个简单的API,还是一个包含登录、浏览、下单的复杂电商流程,Locust都能让你用Pythonic的方式搞定。
## 1. 为什么是Locust?重新理解性能测试工具的选择
在讨论具体技术之前,我们得先搞清楚一个问题:当我们需要做性能测试时,到底在测试什么?很多人会脱口而出:“测QPS(每秒查询率)”、“测响应时间”。这没错,但不够本质。性能测试的核心是**模拟真实用户行为,观察系统在压力下的表现**。这里的关键词是“模拟”和“真实”。
传统的性能测试工具如JMeter,采用线程模型来模拟用户。每个虚拟用户对应一个操作系统线程。当并发数上升到几千时,线程切换的开销会变得非常显著,消耗大量内存和CPU资源。这就是为什么你经常看到JMeter在单机上跑几千并发就力不从心,需要部署多台压力机。
Locust走了另一条路:**基于事件循环的协程模型**。它使用gevent库,在单个操作系统线程内通过协程切换来模拟成千上万的并发用户。这种切换发生在用户空间,开销极小。简单来说,Locust让成千上万的“虚拟用户”在同一个线程里“排队”执行,当一个用户等待网络响应时,立即切换到下一个用户,从而实现高效的并发。
| 特性 | JMeter | Locust |
|------|--------|--------|
| 并发模型 | 线程/进程 | 协程(gevent) |
| 脚本语言 | Java/Groovy(GUI配置) | Python(纯代码) |
| 单机并发能力 | 通常几百到几千 | 轻松上万,可达数十万 |
| 学习曲线 | 中等(需学习GUI操作) | 低(对Python开发者) |
| 扩展性 | 通过插件 | 直接修改Python代码 |
| 分布式部署 | 需要额外配置 | 原生支持,配置简单 |
> **注意**:虽然Locust单机并发能力很强,但这并不意味着你可以无限制地增加并发数。网络带宽、目标服务器的处理能力、以及Locust运行机本身的资源(特别是CPU和文件描述符限制)都会成为瓶颈。
我第一次用Locust替换JMeter时,最直观的感受是“自由”。不再需要和复杂的GUI界面打交道,所有测试逻辑都用Python代码表达。比如,要实现一个用户先登录、然后随机浏览商品、最后有10%的概率下单的复杂场景,用Locust写出来就像下面这样自然:
```python
from locust import HttpUser, task, between
import random
class EcommerceUser(HttpUser):
wait_time = between(1, 3) # 每次任务后等待1-3秒
def on_start(self):
"""用户会话开始时的初始化,比如登录"""
self.client.post("/login", json={
"username": f"user_{random.randint(1, 10000)}",
"password": "test_password"
})
@task(3) # 权重为3,更频繁执行
def browse_products(self):
"""浏览商品列表"""
page = random.randint(1, 10)
self.client.get(f"/api/products?page={page}")
@task(1)
def view_product_detail(self):
"""查看商品详情"""
product_id = random.randint(1, 1000)
self.client.get(f"/api/products/{product_id}")
@task(1)
def add_to_cart(self):
"""加购商品"""
product_id = random.randint(1, 1000)
self.client.post("/api/cart/items", json={
"product_id": product_id,
"quantity": random.randint(1, 3)
})
@task(1)
def place_order(self):
"""下单(只有10%的浏览会触发)"""
if random.random() < 0.1: # 10%概率
self.client.post("/api/orders", json={
"items": [{"product_id": random.randint(1, 1000), "quantity": 1}]
})
```
这种代码化的测试场景描述,不仅易于版本控制,还能轻松集成到CI/CD流程中。当业务逻辑变更时,你只需要修改对应的Python函数,而不是在GUI里重新配置一堆元件。
## 2. 深入Locust核心:协程机制与百万并发的秘密
要真正用好Locust,理解它的协程机制是关键。很多人知道Locust用gevent,但可能不清楚这到底意味着什么。让我用一个简单的比喻来解释:想象一个餐厅里只有一个服务员(一个操作系统线程),但他服务着100桌客人(100个协程)。传统线程模型是给每桌客人都配一个服务员(100个线程),成本极高。而协程模型是,服务员快速地在各桌之间切换,当一桌客人在看菜单时,服务员就去另一桌点菜。
**gevent的核心是“猴子补丁”(monkey patching)**。它通过替换Python标准库中的阻塞式I/O操作(如socket、select等),使其变为非阻塞。当Locust发起一个HTTP请求时,实际发生的过程是这样的:
1. 协程A调用`self.client.get("/api/products")`
2. gevent发现这是一个网络I/O操作,立即挂起协程A
3. 将控制权交给事件循环,事件循环选择另一个就绪的协程B执行
4. 当协程A的HTTP响应返回时,事件循环重新激活协程A
这个切换过程发生在用户态,不涉及操作系统内核的线程调度,所以开销极小。下面是一个简化的示例,展示gevent如何工作:
```python
import gevent
from gevent import monkey
monkey.patch_all() # 关键:打补丁,让标准库变成非阻塞
import requests
import time
def fetch_url(url, user_id):
"""模拟一个虚拟用户获取URL"""
start = time.time()
response = requests.get(url)
elapsed = time.time() - start
print(f"User {user_id}: {url} -> {response.status_code} in {elapsed:.2f}s")
return response
# 创建1000个协程(模拟1000个并发用户)
url = "http://httpbin.org/delay/1" # 这个端点会延迟1秒响应
jobs = [gevent.spawn(fetch_url, url, i) for i in range(1000)]
# 等待所有协程完成
gevent.joinall(jobs)
print("所有请求完成!")
```
运行这段代码,你会发现1000个请求几乎同时开始,大约1秒后全部完成。如果没有gevent,用1000个线程来做同样的事,你的机器可能早就卡死了。
**Locust的架构设计**正是基于这个原理。每个虚拟用户(Locust实例)都是一个协程,它们共享一个事件循环。当你在Locust中设置`wait_time = between(1, 5)`时,并不是真的让线程睡眠,而是让协程在等待时间到达前主动让出控制权。
这种设计带来了几个重要优势:
- **资源效率**:1万个协程的内存占用可能只有几十MB,而1万个线程可能需要GB级别的内存
- **高并发**:单机轻松支持数万并发用户
- **避免C10K问题**:传统的一个连接一个线程模型在万级连接时就会遇到瓶颈,而事件驱动模型可以轻松应对
但协程模型也有需要注意的地方。由于所有协程在同一个线程中运行,**如果你的测试脚本中有CPU密集型操作,会阻塞整个事件循环**。比如下面这个错误示例:
```python
@task
def bad_task(self):
# 这个计算会阻塞所有其他协程!
result = sum(i*i for i in range(10000000))
self.client.get("/api/data")
```
正确的做法是将CPU密集型操作移到协程外部,或者使用gevent的线程池:
```python
from gevent.threadpool import ThreadPool
import time
pool = ThreadPool(10) # 创建10个线程的池子
class SmartUser(HttpUser):
@task
def good_task(self):
# 将CPU密集型操作放到线程池执行
heavy_result = pool.apply(sum, (range(10000000),))
self.client.get("/api/data")
```
理解了这些底层原理,你就能更好地设计Locust测试脚本,避免常见的性能陷阱。
## 3. 从零构建生产级Locust测试脚本
现在让我们动手写一个完整的、可用于生产环境的Locust测试脚本。我会带你一步步构建一个电商系统的性能测试,涵盖用户登录、浏览商品、加购、下单等完整流程。
### 3.1 基础架构设计
一个好的Locust脚本应该具备以下特点:
- **模块化**:不同关注点分离(如用户行为、测试数据、配置)
- **可配置**:通过环境变量或配置文件控制测试参数
- **可维护**:清晰的代码结构和注释
- **健壮性**:完善的错误处理和日志记录
我们先创建一个项目结构:
```
ecommerce_load_test/
├── locustfile.py # 主测试脚本
├── config.py # 配置文件
├── test_data/ # 测试数据
│ ├── products.csv
│ └── users.json
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── data_loader.py
│ └── custom_checks.py
└── requirements.txt # 依赖包
```
### 3.2 配置管理
在`config.py`中定义配置:
```python
import os
from typing import Dict, Any
class Config:
"""测试配置类"""
# 目标系统URL
TARGET_HOST = os.getenv("TARGET_HOST", "https://api.example.com")
# 测试用户配置
USER_COUNT = int(os.getenv("USER_COUNT", "100"))
SPAWN_RATE = int(os.getenv("SPAWN_RATE", "10")) # 每秒启动用户数
# 测试时长(秒),0表示不限制
TEST_DURATION = int(os.getenv("TEST_DURATION", "300"))
# 思考时间配置(模拟用户操作间隔)
MIN_WAIT = int(os.getenv("MIN_WAIT", "1000")) # 毫秒
MAX_WAIT = int(os.getenv("MAX_WAIT", "5000"))
# 认证配置
AUTH_ENABLED = os.getenv("AUTH_ENABLED", "true").lower() == "true"
AUTH_TOKEN = os.getenv("AUTH_TOKEN", "")
# 测试数据路径
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "test_data")
# 请求超时配置(秒)
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
# 是否验证SSL证书
VERIFY_SSL = os.getenv("VERIFY_SSL", "false").lower() == "true"
@classmethod
def get_headers(cls) -> Dict[str, str]:
"""获取请求头"""
headers = {
"User-Agent": "LocustPerformanceTest/1.0",
"Content-Type": "application/json"
}
if cls.AUTH_ENABLED and cls.AUTH_TOKEN:
headers["Authorization"] = f"Bearer {cls.AUTH_TOKEN}"
return headers
config = Config()
```
### 3.3 测试数据管理
在`utils/data_loader.py`中实现数据加载:
```python
import csv
import json
import random
from typing import List, Dict, Any
from pathlib import Path
class TestDataManager:
"""测试数据管理器"""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
self._products = None
self._users = None
@property
def products(self) -> List[Dict[str, Any]]:
"""商品数据(懒加载)"""
if self._products is None:
self._products = self._load_products()
return self._products
@property
def users(self) -> List[Dict[str, Any]]:
"""用户数据(懒加载)"""
if self._users is None:
self._users = self._load_users()
return self._users
def _load_products(self) -> List[Dict[str, Any]]:
"""从CSV加载商品数据"""
products_file = self.data_dir / "products.csv"
products = []
with open(products_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 转换数据类型
row['id'] = int(row['id'])
row['price'] = float(row['price'])
row['stock'] = int(row['stock'])
products.append(row)
return products
def _load_users(self) -> List[Dict[str, Any]]:
"""从JSON加载用户数据"""
users_file = self.data_dir / "users.json"
with open(users_file, 'r', encoding='utf-8') as f:
return json.load(f)
def get_random_product(self) -> Dict[str, Any]:
"""随机获取一个商品"""
return random.choice(self.products)
def get_random_user(self) -> Dict[str, Any]:
"""随机获取一个用户"""
return random.choice(self.users)
def get_products_by_category(self, category: str, limit: int = 10) -> List[Dict[str, Any]]:
"""按分类获取商品"""
filtered = [p for p in self.products if p['category'] == category]
return filtered[:limit] if limit else filtered
# 全局数据管理器实例
data_manager = TestDataManager(Config.TEST_DATA_DIR)
```
### 3.4 自定义检查点与断言
在`utils/custom_checks.py`中定义响应验证逻辑:
```python
from typing import Dict, Any, Optional
from locust.clients import ResponseContextManager
class ResponseValidator:
"""响应验证器"""
@staticmethod
def validate_json_response(
response: ResponseContextManager,
expected_status: int = 200,
required_fields: Optional[list] = None
) -> bool:
"""
验证JSON响应
Args:
response: Locust响应对象
expected_status: 期望的HTTP状态码
required_fields: 响应中必须包含的字段
Returns:
bool: 验证是否通过
"""
# 检查状态码
if response.status_code != expected_status:
response.failure(f"Expected status {expected_status}, got {response.status_code}")
return False
# 尝试解析JSON
try:
data = response.json()
except ValueError:
response.failure("Response is not valid JSON")
return False
# 检查必需字段
if required_fields:
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
response.failure(f"Missing required fields: {missing_fields}")
return False
# 检查业务逻辑错误码(如果存在)
if isinstance(data, dict) and data.get("code") not in (0, 200, "success"):
response.failure(f"Business error: {data.get('message', 'Unknown error')}")
return False
return True
@staticmethod
def validate_response_time(
response: ResponseContextManager,
max_time_ms: int = 1000
) -> bool:
"""
验证响应时间
Args:
response: Locust响应对象
max_time_ms: 最大允许响应时间(毫秒)
Returns:
bool: 是否超时
"""
if response.elapsed.total_seconds() * 1000 > max_time_ms:
# 注意:这里不标记为失败,只是记录警告
# 在实际测试中,你可能想记录这个信息到自定义的统计中
return False
return True
```
### 3.5 完整的Locust测试脚本
现在,让我们把这些组件组合起来,创建完整的`locustfile.py`:
```python
"""
电商系统性能测试脚本
支持完整的用户旅程:登录 -> 浏览 -> 加购 -> 下单
"""
import random
import time
from typing import Dict, Any
from locust import HttpUser, task, between, events
from locust.clients import ResponseContextManager
from config import config
from utils.data_loader import data_manager
from utils.custom_checks import ResponseValidator
# 自定义事件监听器,用于收集额外指标
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
"""请求完成时的回调"""
if exception:
# 这里可以记录到外部监控系统
pass
elif response_time > 5000: # 超过5秒的请求
# 记录慢请求
pass
class EcommerceUser(HttpUser):
"""
电商用户行为模拟
模拟真实用户的完整购物流程
"""
# 基础配置
host = config.TARGET_HOST
wait_time = between(config.MIN_WAIT / 1000, config.MAX_WAIT / 1000) # 转换为秒
# 用户会话状态
current_user: Dict[str, Any] = None
cart_items: list = []
def on_start(self):
"""用户会话开始时的初始化"""
# 随机选择一个测试用户
self.current_user = data_manager.get_random_user()
# 登录(如果启用认证)
if config.AUTH_ENABLED:
self.login()
# 初始化购物车
self.cart_items = []
# 记录用户启动
print(f"用户 {self.current_user['username']} 开始测试")
def login(self):
"""用户登录"""
login_data = {
"username": self.current_user["username"],
"password": self.current_user["password"]
}
with self.client.post(
"/api/auth/login",
json=login_data,
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="用户登录"
) as response:
if ResponseValidator.validate_json_response(response, 200, ["token"]):
# 保存token用于后续请求
token = response.json()["token"]
self.client.headers.update({"Authorization": f"Bearer {token}"})
response.success()
else:
# 登录失败,标记为失败用户
self.stop(True) # 停止这个用户
@task(5) # 权重最高,用户大部分时间在浏览
def browse_homepage(self):
"""浏览首页"""
with self.client.get(
"/api/home",
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="浏览首页"
) as response:
if ResponseValidator.validate_json_response(response):
# 检查响应时间
ResponseValidator.validate_response_time(response, 2000)
response.success()
@task(3)
def browse_products(self):
"""浏览商品列表"""
# 随机选择分类
categories = ["electronics", "clothing", "books", "home"]
category = random.choice(categories)
page = random.randint(1, 5)
with self.client.get(
f"/api/products?category={category}&page={page}&size=20",
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="浏览商品列表"
) as response:
if ResponseValidator.validate_json_response(response, required_fields=["products"]):
# 随机选择一个商品查看详情
products = response.json()["products"]
if products:
product_id = random.choice(products)["id"]
# 立即查看商品详情(模拟用户点击)
self.view_product_detail(product_id)
response.success()
def view_product_detail(self, product_id: int = None):
"""查看商品详情"""
if product_id is None:
product = data_manager.get_random_product()
product_id = product["id"]
with self.client.get(
f"/api/products/{product_id}",
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="查看商品详情"
) as response:
if ResponseValidator.validate_json_response(response):
# 30%的概率加购
if random.random() < 0.3:
self.add_to_cart(product_id)
response.success()
@task(2)
def add_to_cart(self, product_id: int = None):
"""添加商品到购物车"""
if product_id is None:
product = data_manager.get_random_product()
product_id = product["id"]
quantity = random.randint(1, 3)
cart_data = {
"product_id": product_id,
"quantity": quantity
}
with self.client.post(
"/api/cart/items",
json=cart_data,
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="添加购物车"
) as response:
if ResponseValidator.validate_json_response(response, 201):
# 记录到购物车
self.cart_items.append({
"product_id": product_id,
"quantity": quantity
})
response.success()
@task(1)
def view_cart(self):
"""查看购物车"""
with self.client.get(
"/api/cart",
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="查看购物车"
) as response:
if ResponseValidator.validate_json_response(response):
# 如果购物车有商品,10%的概率下单
if self.cart_items and random.random() < 0.1:
self.place_order()
response.success()
def place_order(self):
"""下单"""
if not self.cart_items:
return
order_data = {
"items": self.cart_items,
"shipping_address": self.current_user.get("address", {}),
"payment_method": random.choice(["credit_card", "paypal", "alipay"])
}
with self.client.post(
"/api/orders",
json=order_data,
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="创建订单"
) as response:
if ResponseValidator.validate_json_response(response, 201, ["order_id"]):
# 下单成功,清空购物车
self.cart_items = []
# 记录订单ID(在实际项目中可能用于后续验证)
order_id = response.json()["order_id"]
print(f"用户 {self.current_user['username']} 下单成功,订单号: {order_id}")
response.success()
@task(1)
def search_products(self):
"""搜索商品"""
keywords = ["手机", "电脑", "衣服", "书", "家具"]
keyword = random.choice(keywords)
with self.client.get(
f"/api/search?q={keyword}&page=1",
headers=config.get_headers(),
timeout=config.REQUEST_TIMEOUT,
catch_response=True,
name="搜索商品"
) as response:
if ResponseValidator.validate_json_response(response):
response.success()
def on_stop(self):
"""用户会话结束时的清理"""
print(f"用户 {self.current_user['username']} 测试结束")
# 这里可以添加清理逻辑,比如登出
if config.AUTH_ENABLED:
try:
self.client.post("/api/auth/logout", headers=config.get_headers())
except:
pass # 登出失败不影响测试结果
# 自定义用户类,模拟不同用户行为
class HeavyBuyer(EcommerceUser):
"""重度购买用户,更频繁地下单"""
weight = 2 # 出现频率是普通用户的2倍
def on_start(self):
super().on_start()
# 重度用户初始就有商品在购物车
for _ in range(random.randint(1, 3)):
product = data_manager.get_random_product()
self.cart_items.append({
"product_id": product["id"],
"quantity": random.randint(1, 2)
})
@task(3) # 更频繁查看购物车
def view_cart(self):
super().view_cart()
class CasualBrowser(EcommerceUser):
"""随意浏览用户,只浏览不下单"""
weight = 1
def place_order(self):
"""覆盖父类方法,不下单"""
pass # 随意浏览用户不下单
```
这个脚本实现了一个完整的电商用户行为模拟,具有以下特点:
1. **真实的用户行为模式**:用户不是机械地重复请求,而是有逻辑地浏览、加购、下单
2. **状态保持**:用户登录态、购物车状态在整个会话中保持
3. **错误处理**:完善的响应验证和异常处理
4. **可扩展性**:通过继承创建不同类型的用户(重度购买者、随意浏览者)
5. **可配置性**:所有参数通过配置文件控制
## 4. 分布式部署与大规模压测实战
当需要模拟数万甚至百万级并发时,单机Locust可能无法满足需求。这时就需要使用Locust的分布式模式。Locust的分布式架构采用主从模式(Master-Worker),一个Master节点负责协调和收集数据,多个Worker节点负责生成负载。
### 4.1 分布式架构设计
典型的Locust分布式部署架构如下:
```
+----------------+
| Master节点 |
| (Web UI) |
+-------+--------+
|
| 协调指令/收集数据
|
+-------------------+-------------------+
| | |
+-------+-------+ +-------+-------+ +-------+-------+
| Worker节点1 | | Worker节点2 | | Worker节点N |
| (生成负载) | | (生成负载) | | (生成负载) |
+---------------+ +---------------+ +---------------+
```
**Master节点**负责:
- 提供Web控制界面
- 协调所有Worker节点
- 收集和聚合测试结果
- 控制测试的启动和停止
**Worker节点**负责:
- 实际执行测试脚本
- 生成并发请求
- 将统计数据发送给Master
### 4.2 部署配置示例
假设我们有3台服务器用于压测,IP分别为192.168.1.10、192.168.1.11、192.168.1.12。我们将192.168.1.10作为Master,其他作为Worker。
**在Master节点(192.168.1.10)上启动:**
```bash
# 启动Master,指定Web界面端口为8089
locust -f locustfile.py --master --web-port=8089 --expect-workers=2
```
`--expect-workers=2`告诉Master期望有2个Worker连接,当所有Worker都连接后,测试才能开始。
**在Worker节点(192.168.1.11)上启动:**
```bash
# 启动Worker,连接到Master
locust -f locustfile.py --worker --master-host=192.168.1.10
```
**在Worker节点(192.168.1.12)上启动:**
```bash
locust -f locustfile.py --worker --master-host=192.168.1.10
```
### 4.3 使用Docker容器化部署
对于大规模压测,使用Docker部署更加方便。首先创建Dockerfile:
```dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露Locust端口
EXPOSE 8089 5557 5558
# 启动命令(会被docker-compose覆盖)
CMD ["locust", "-f", "locustfile.py"]
```
然后创建`docker-compose.yml`:
```yaml
version: '3.8'
services:
master:
build: .
ports:
- "8089:8089" # Web界面
- "5557:5557" # Worker通信端口
- "5558:5558" # Worker通信备用端口
command: locust -f locustfile.py --master --web-port=8089 --expect-workers=3
networks:
- locust-network
worker1:
build: .
command: locust -f locustfile.py --worker --master-host=master
depends_on:
- master
networks:
- locust-network
deploy:
replicas: 1
worker2:
build: .
command: locust -f locustfile.py --worker --master-host=master
depends_on:
- master
networks:
- locust-network
deploy:
replicas: 1
worker3:
build: .
command: locust -f locustfile.py --worker --master-host=master
depends_on:
- master
networks:
- locust-network
deploy:
replicas: 1
networks:
locust-network:
driver: bridge
```
启动集群:
```bash
# 启动所有服务
docker-compose up -d --scale worker=5 # 启动5个worker实例
# 查看状态
docker-compose ps
# 查看日志
docker-compose logs -f master
```
### 4.4 性能优化与调优
在大规模压测中,有几个关键点需要注意:
**1. 调整系统限制**
Linux系统默认的文件描述符限制可能不够。调整方法:
```bash
# 查看当前限制
ulimit -n
# 临时调整(对当前会话有效)
ulimit -n 65535
# 永久调整,编辑 /etc/security/limits.conf
# 添加:
# * soft nofile 65535
# * hard nofile 65535
```
**2. 优化Locust配置**
创建`locust.conf`配置文件:
```ini
# locust.conf
[master]
host = 0.0.0.0
port = 5557
web-port = 8089
expect-workers = 5
[worker]
master-host = 192.168.1.10
master-port = 5557
[run]
users = 10000
spawn-rate = 100
run-time = 10m
headless = false
web-ui = true
[logging]
level = INFO
file = /var/log/locust.log
```
使用配置文件启动:
```bash
locust -f locustfile.py --config=locust.conf
```
**3. 监控Worker节点状态**
可以编写一个简单的监控脚本:
```python
# monitor_workers.py
import requests
import time
from datetime import datetime
def monitor_master(master_url="http://localhost:8089"):
"""监控Master节点状态"""
try:
# 获取统计数据
stats_url = f"{master_url}/stats/requests"
response = requests.get(stats_url, timeout=5)
data = response.json()
print(f"\n[{datetime.now()}] Master状态:")
print(f"总RPS: {data.get('total_rps', 0):.2f}")
print(f"总用户数: {data.get('user_count', 0)}")
print(f"失败率: {data.get('fail_ratio', 0)*100:.2f}%")
# 获取Worker状态
workers_url = f"{master_url}/stats/distributed"
workers_response = requests.get(workers_url, timeout=5)
workers_data = workers_response.json()
print(f"Worker数量: {len(workers_data.get('workers', []))}")
for worker in workers_data.get('workers', []):
print(f" Worker {worker['id']}: {worker['state']}, "
f"用户数: {worker['user_count']}, "
f"RPS: {worker['current_rps']:.2f}")
except Exception as e:
print(f"监控失败: {e}")
if __name__ == "__main__":
while True:
monitor_master()
time.sleep(5) # 每5秒监控一次
```
**4. 处理常见问题**
**问题1:Worker连接失败**
```
Error: Failed to connect to master
```
**解决方案**:检查防火墙设置,确保5557和5558端口开放。
**问题2:Master显示Worker数量不正确**
```
Expected 3 workers, but only 2 connected
```
**解决方案**:检查Worker日志,确保所有Worker都成功连接到Master。
**问题3:测试结果不一致**
不同Worker上的统计数据有差异。
**解决方案**:确保所有Worker节点的时间同步,使用NTP服务:
```bash
# 安装并配置NTP
sudo apt-get install ntp
sudo systemctl start ntp
sudo systemctl enable ntp
```
### 4.5 实战:百万并发压测演练
假设我们要对一个电商系统进行百万并发压测,架构如下:
- 目标系统:电商API集群,预计能承受百万QPS
- 压测集群:10台Worker服务器,每台模拟10万用户
- Master节点:单独一台服务器
**步骤1:准备测试环境**
```bash
# 在每台Worker上
git clone https://github.com/your-org/loadtest.git
cd loadtest
pip install -r requirements.txt
# 准备测试数据
python prepare_test_data.py --users 100000 --products 50000
```
**步骤2:启动Master**
```bash
# 在Master服务器上
locust -f locustfile.py \
--master \
--web-port=8089 \
--expect-workers=10 \
--csv=full_test \
--html=report.html \
--logfile=master.log \
--loglevel=INFO
```
**步骤3:启动Worker**
```bash
# 在每个Worker服务器上(使用screen保持会话)
screen -S locust-worker
locust -f locustfile.py \
--worker \
--master-host=192.168.1.100 \
--logfile=worker.log \
--loglevel=INFO
# 按Ctrl+A, 然后按D detach会话
```
**步骤4:开始测试**
访问Master的Web界面(http://192.168.1.100:8089),配置:
- Number of users: 1000000
- Spawn rate: 10000
- Host: https://api.ecommerce.com
点击"Start swarming"开始测试。
**步骤5:监控与调整**
在测试过程中,实时监控:
- 系统资源使用率(CPU、内存、网络)
- 目标系统的响应时间
- 错误率
- Worker节点的负载
如果发现某些Worker负载过高,可以动态调整:
```bash
# 在负载过高的Worker上减少用户数
locust -f locustfile.py --worker --master-host=192.168.1.100 --users 80000
# 在负载较低的Worker上增加用户数
locust -f locustfile.py --worker --master-host=192.168.1.100 --users 120000
```
**步骤6:结果分析**
测试完成后,从Master节点获取结果:
```bash
# CSV格式的详细结果
ls full_test_*.csv
# HTML报告
open report.html
```
使用Python分析结果:
```python
import pandas as pd
import matplotlib.pyplot as plt
# 加载数据
stats = pd.read_csv('full_test_stats.csv')
failures = pd.read_csv('full_test_failures.csv')
# 分析关键指标
print("测试摘要:")
print(f"总请求数: {stats['Request Count'].sum()}")
print(f"总失败数: {failures['Occurrences'].sum()}")
print(f"平均响应时间: {stats['Average Response Time'].mean():.2f}ms")
print(f"95%响应时间: {stats['95%'].mean():.2f}ms")
# 绘制RPS曲线
rps_data = pd.read_csv('full_test_rps.csv')
plt.figure(figsize=(12, 6))
plt.plot(rps_data['Timestamp'], rps_data['Current RPS'], label='RPS')
plt.xlabel('时间')
plt.ylabel('请求数/秒')
plt.title('RPS随时间变化')
plt.legend()
plt.grid(True)
plt.savefig('rps_trend.png')
```
通过这样的分布式部署,我们能够轻松模拟百万级并发用户,同时保持对测试过程的完全控制。Locust的分布式架构设计得非常简洁高效,Master和Worker之间通过ZeroMQ通信,开销极小,使得横向扩展变得非常容易。
在实际项目中,我经常使用这种架构进行大规模压测。有一次我们需要模拟双十一级别的流量,就是用了20台Worker服务器,每台模拟5万用户,总共100万并发。整个压测持续了2小时,Locust集群稳定运行,收集到了完整的性能数据,帮助团队发现了系统的多个瓶颈点。