## WebSocket 接口自动化测试的封装思想与Pytest实现详解
在构建WebSocket接口自动化测试框架时,合理的封装是提高代码可维护性、复用性和可读性的关键。以下将基于**分层设计、数据驱动、配置管理**三大核心封装思想,详细阐述如何使用Pytest实现一个完整的WebSocket接口自动化测试框架 [ref_3][ref_5]。
### 一、 核心封装思想
| 封装层次 | 核心职责 | 关键组件 | 目的 |
| :--- | :--- | :--- | :--- |
| **工具层** | 提供基础操作能力 | `WebSocketClient`类,日志、配置读取工具 | 封装底层WebSocket通信细节,提供稳定、易用的API [ref_6]。 |
| **业务层** | 封装被测接口业务逻辑 | 特定业务场景的`Service`类或函数 | 将接口调用与断言逻辑结合,形成可复用的业务操作单元。 |
| **数据层** | 管理测试数据与期望结果 | `YAML`/`JSON`/`Excel`文件,数据读取类 | 实现测试数据与脚本分离,支持数据驱动测试 [ref_4][ref_5]。 |
| **用例层** | 组织测试步骤与断言 | Pytest测试函数/类,使用`@pytest.mark.parametrize` | 专注于测试逻辑的编排,保持用例简洁。 |
| **夹具层** | 管理测试资源与生命周期 | `conftest.py`中的`@pytest.fixture` | 统一管理WebSocket连接、数据准备、清理工作,提升用例执行效率 [ref_2]。 |
### 二、 项目结构设计
基于上述思想,一个典型的项目目录结构如下:
```python
websocket_auto_test/
├── configs/ # 配置层
│ ├── __init__.py
│ ├── config.yaml # 全局配置(环境、URL、超时时间等)
│ └── test_data.yaml # 测试数据配置
├── core/ # 核心工具层
│ ├── __init__.py
│ ├── ws_client.py # WebSocket客户端核心封装
│ ├── logger.py # 日志封装
│ └── exceptions.py # 自定义异常类
├── utils/ # 通用工具层
│ ├── __init__.py
│ ├── data_loader.py # 数据加载器(支持YAML/JSON)
│ └── assert_utils.py # 断言工具扩展
├── services/ # 业务服务层(可选)
│ ├── __init__.py
│ └── echo_service.py # 示例:封装回声服务的业务操作
├── tests/ # 用例层
│ ├── __init__.py
│ ├── conftest.py # Pytest夹具集中管理
│ ├── test_echo.py # 具体测试模块
│ └── test_chat.py
├── reports/ # 测试报告输出目录
├── pytest.ini # Pytest配置文件
└── requirements.txt # 项目依赖
```
### 三、 分层封装实现详解
#### 1. 工具层封装:稳定的WebSocket客户端
这是最基础的封装,目标是隐藏`websocket-client`库的复杂性,提供健壮、易用的连接和消息收发方法。
```python
# core/ws_client.py
import websocket
import threading
import json
import time
import logging
from typing import Optional, Any, Union, Callable, List
from queue import Queue
from core.exceptions import WebSocketConnectionError, WebSocketTimeoutError
class WebSocketClient:
"""
WebSocket客户端高级封装。
支持自动重连、心跳检测、同步/异步消息接收。
"""
def __init__(self, url: str, logger: Optional[logging.Logger] = None):
self.url = url
self.ws_app: Optional[websocket.WebSocketApp] = None
self.thread: Optional[threading.Thread] = None
self._connected = threading.Event() # 连接状态事件
self._message_queue = Queue() # 消息队列,用于同步获取消息
self.logger = logger or logging.getLogger(__name__)
self._stop_event = threading.Event()
def _on_message(self, ws, message):
"""消息到达回调,将消息放入队列并记录日志。"""
self.logger.debug(f"Received raw message: {message}")
try:
# 尝试解析JSON格式消息
parsed_msg = json.loads(message)
self._message_queue.put(parsed_msg)
except json.JSONDecodeError:
# 非JSON消息,直接存储文本
self._message_queue.put(message)
def _on_error(self, ws, error):
"""错误处理回调。"""
self.logger.error(f"WebSocket error occurred: {error}")
def _on_close(self, ws, close_status_code, close_msg):
"""连接关闭回调。"""
self.logger.info(f"Connection closed. Code: {close_status_code}, Msg: {close_msg}")
self._connected.clear()
def _on_open(self, ws):
"""连接成功建立回调。"""
self.logger.info(f"WebSocket connection established to {self.url}")
self._connected.set()
def connect(self, timeout: int = 10):
"""
建立WebSocket连接。
:param timeout: 连接超时时间(秒)
:raises WebSocketConnectionError: 连接失败时抛出
"""
if self._connected.is_set():
self.logger.warning("Already connected.")
return
self.ws_app = websocket.WebSocketApp(
self.url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
# 在守护线程中运行WebSocket客户端
self.thread = threading.Thread(target=self.ws_app.run_forever)
self.thread.daemon = True
self.thread.start()
# 等待连接建立
if not self._connected.wait(timeout=timeout):
self.close()
raise WebSocketConnectionError(f"Failed to connect to {self.url} within {timeout}s")
def send(self, data: Union[dict, str, bytes]):
"""
发送消息。
:param data: 要发送的数据,字典会被序列化为JSON字符串。
"""
if not self._connected.is_set():
raise WebSocketConnectionError("Cannot send message, connection is not active.")
if isinstance(data, dict):
payload = json.dumps(data, ensure_ascii=False)
else:
payload = data
self.logger.debug(f"Sending message: {payload}")
self.ws_app.send(payload)
def recv(self, timeout: Optional[int] = 5) -> Any:
"""
同步接收一条消息。
:param timeout: 等待消息的超时时间(秒),None为无限等待。
:return: 接收到的消息内容(解析后的JSON对象或原始字符串)。
:raises WebSocketTimeoutError: 超时未收到消息时抛出。
"""
try:
message = self._message_queue.get(timeout=timeout)
return message
except:
raise WebSocketTimeoutError(f"No message received within {timeout}s")
def recv_all(self, timeout: Optional[int] = None) -> List[Any]:
"""
在指定时间内接收所有到达的消息。
:param timeout: 收集消息的时长(秒)。
:return: 消息列表。
"""
messages = []
end_time = time.time() + timeout if timeout else None
while True:
try:
wait_time = None
if end_time:
wait_time = max(0, end_time - time.time())
if wait_time <= 0:
break
message = self._message_queue.get(timeout=wait_time)
messages.append(message)
except:
break # 超时或队列为空
return messages
def close(self):
"""关闭WebSocket连接。"""
if self.ws_app:
self.logger.info("Closing WebSocket connection.")
self.ws_app.close()
self._connected.clear()
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2)
```
#### 2. 数据层封装:实现数据与脚本分离
使用YAML文件管理测试数据,使测试用例可配置化 [ref_4]。
```yaml
# configs/test_data.yaml
test_cases:
echo_service:
- case_id: TC_WS_ECHO_01
description: "测试基础文本回声"
send_data: "Hello, WebSocket!"
expected_response: "Hello, WebSocket!"
assert_type: "equal"
- case_id: TC_WS_ECHO_02
description: "测试JSON对象回声"
send_data:
action: "echo"
payload: "test payload"
timestamp: 1630000000
expected_response:
action: "echo"
payload: "test payload"
timestamp: 1630000000
assert_type: "json_equal"
auth_service:
- case_id: TC_WS_AUTH_01
description: "测试有效Token认证"
send_data:
type: "auth"
token: "valid_token_123"
expected_response:
status: "success"
message: "Authenticated"
assert_type: "json_contains"
```
```python
# utils/data_loader.py
import yaml
import json
import os
from typing import Dict, Any, List
class DataLoader:
"""数据加载器,支持YAML和JSON格式。"""
@staticmethod
def load_yaml(file_path: str) -> Dict[str, Any]:
"""加载YAML文件。"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"YAML file not found: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
@staticmethod
def load_test_cases(data_file: str, service_name: str) -> List[Dict[str, Any]]:
"""
从数据文件中加载指定服务的测试用例。
:param data_file: 数据文件路径
:param service_name: 服务名称,对应YAML中的顶级key
:return: 测试用例列表
"""
data = DataLoader.load_yaml(data_file)
return data.get('test_cases', {}).get(service_name, [])
```
#### 3. 夹具层封装:统一管理测试资源
在`conftest.py`中定义Pytest夹具,管理WebSocket客户端的生命周期,实现不同作用域的连接复用 [ref_2]。
```python
# tests/conftest.py
import pytest
import logging
from core.ws_client import WebSocketClient
from utils.data_loader import DataLoader
import os
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@pytest.fixture(scope="session")
def ws_server_url():
"""会话级夹具:获取WebSocket服务器URL,可从环境变量或配置文件读取。"""
# 示例:从环境变量读取,默认使用公共测试服务器
url = os.getenv("WS_TEST_URL", "wss://echo.websocket.org")
return url
@pytest.fixture(scope="function") # 每个测试函数一个独立连接
def ws_client(ws_server_url):
"""
函数级夹具:为每个测试用例提供一个新的WebSocket客户端。
测试结束后自动关闭连接。
"""
client = WebSocketClient(ws_server_url, logging.getLogger("ws_test"))
client.connect(timeout=5)
yield client # 测试函数在此处执行
client.close() # 测试结束后执行清理
@pytest.fixture(scope="class") # 每个测试类共享一个连接
def ws_client_class_scope(ws_server_url):
"""类级夹具:同一个测试类中的所有方法共享一个WebSocket连接。"""
client = WebSocketClient(ws_server_url)
client.connect(timeout=5)
yield client
client.close()
@pytest.fixture
def echo_test_data():
"""提供回声服务的测试数据。"""
current_dir = os.path.dirname(__file__)
data_file = os.path.join(current_dir, '..', 'configs', 'test_data.yaml')
return DataLoader.load_test_cases(data_file, 'echo_service')
```
#### 4. 用例层实现:编写清晰的数据驱动测试
利用夹具和参数化,编写高度可读、可维护的测试用例。
```python
# tests/test_echo.py
import pytest
import time
class TestEchoServiceBasic:
"""回声服务基础功能测试类。"""
def test_connection(self, ws_client):
"""测试连接是否成功建立。"""
# ws_client夹具已确保连接建立,此处直接断言其内部状态
# 可以通过发送一条消息并接收来验证
test_msg = {"test": "connection"}
ws_client.send(test_msg)
response = ws_client.recv(timeout=3)
assert response is not None
# 根据回声服务器的特性,响应可能与发送的消息相同或包含之
assert response.get("test") == "connection" or response == test_msg
@pytest.mark.parametrize("message", [
"Simple Text",
'{"json": "object"}',
'["list", "of", "items"]',
123, # 数字会被自动转为字符串发送
])
def test_echo_various_formats(self, ws_client, message):
"""参数化测试:验证不同格式消息的回声。"""
ws_client.send(message)
response = ws_client.recv(timeout=2)
# 公共回声服务器通常原样返回字符串
assert str(response) == str(message)
class TestEchoServiceDataDriven:
"""使用外部数据文件驱动的回声服务测试。"""
@pytest.mark.parametrize("case", echo_test_data, ids=lambda c: c['case_id'])
def test_echo_with_data_file(self, ws_client, case):
"""
数据驱动测试。
每个在YAML中定义的用例都会生成一个独立的测试。
ids参数使用case_id作为测试名称,提高报告可读性。
"""
send_data = case['send_data']
expected = case['expected_response']
ws_client.send(send_data)
# 根据断言类型执行不同的验证逻辑
if case['assert_type'] == 'equal':
response = ws_client.recv(timeout=2)
assert response == expected
elif case['assert_type'] == 'json_equal':
response = ws_client.recv(timeout=2)
# 对于JSON,可以比较关键字段或整个对象
import json
if isinstance(response, str):
response = json.loads(response)
assert response == expected
elif case['assert_type'] == 'json_contains':
response = ws_client.recv(timeout=2)
if isinstance(response, str):
import json
response = json.loads(response)
# 验证响应中包含期望的键值对
for key, value in expected.items():
assert key in response
assert response[key] == value
```
#### 5. 业务层封装示例(可选)
对于复杂业务逻辑,可以进一步封装服务类。
```python
# services/echo_service.py
from core.ws_client import WebSocketClient
from typing import Any
class EchoService:
"""封装回声服务的业务操作。"""
def __init__(self, client: WebSocketClient):
self.client = client
def send_and_wait_for_echo(self, data: Any, timeout: int = 3) -> Any:
"""发送消息并等待回声,返回接收到的消息。"""
self.client.send(data)
return self.client.recv(timeout=timeout)
def verify_echo(self, sent_data: Any, received_data: Any) -> bool:
"""验证接收到的消息是否为发送消息的回声。"""
return sent_data == received_data
# 在测试中使用
def test_with_service_layer(ws_client):
service = EchoService(ws_client)
test_data = {"id": 1, "command": "ping"}
response = service.send_and_wait_for_echo(test_data)
assert service.verify_echo(test_data, response)
```
### 四、 运行配置与报告生成
配置`pytest.ini`文件,控制测试行为并生成丰富的报告 [ref_1]。
```ini
# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v # 详细输出
--tb=short # 简短的错误回溯
--strict-markers # 严格检查marker
-m "not slow" # 默认不运行标记为slow的测试
--html=reports/report.html
--self-contained-html
--junitxml=reports/junit.xml
--alluredir=reports/allure-results # 为Allure报告生成数据
markers =
smoke: 冒烟测试
regression: 回归测试
slow: 运行缓慢的测试
websocket: WebSocket相关测试
```
执行测试并生成报告:
```bash
# 运行所有测试
pytest
# 运行标记为smoke的测试
pytest -m smoke
# 运行特定测试文件
pytest tests/test_echo.py -v
# 使用Allure生成精美报告(需先安装allure-pytest)
pytest --alluredir=reports/allure-results
allure serve reports/allure-results
```
### 五、 总结:封装思想的优势
通过上述分层封装,实现了以下目标:
1. **高可维护性**:各层职责单一,修改底层WebSocket库或数据格式只需改动对应模块。
2. **高复用性**:`WebSocketClient`和各类夹具可在所有测试中复用;数据文件可被多个测试模块共享。
3. **强可读性**:测试用例函数专注于“发送什么、期望什么”,业务逻辑清晰。
4. **易扩展性**:新增测试类型