# 基于CS架构的Python简单聊天程序实现
下面我将详细介绍一个基于CS(客户端-服务器)架构的简单聊天程序的完整实现,包含服务端和客户端代码。
## 程序架构设计
| 组件 | 功能描述 | 技术实现 |
|------|----------|----------|
| 服务端 | 监听客户端连接、消息转发、用户管理 | Socket + 多线程 |
| 客户端 | 用户界面、消息发送接收、连接管理 | Socket + Tkinter GUI |
## 服务端实现代码
```python
import socket
import threading
import json
from datetime import datetime
class ChatServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.clients = {} # 存储客户端连接信息
self.server_socket = None
self.running = False
def start_server(self):
"""启动聊天服务器"""
try:
# 创建TCP socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"聊天服务器已启动,监听地址: {self.host}:{self.port}")
print("等待客户端连接...")
# 接受客户端连接的线程
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
print(f"新客户端连接: {client_address}")
# 为每个客户端创建独立线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except Exception as e:
if self.running:
print(f"接受客户端连接时出错: {e}")
except Exception as e:
print(f"服务器启动失败: {e}")
def handle_client(self, client_socket, client_address):
"""处理单个客户端连接"""
client_name = f"用户{len(self.clients)+1}"
try:
# 发送欢迎消息并要求输入用户名
welcome_msg = {
"type": "system",
"content": "欢迎加入聊天室!请输入你的用户名:",
"timestamp": datetime.now().strftime("%H:%M:%S")
}
client_socket.send(json.dumps(welcome_msg).encode('utf-8'))
# 接收用户名
username_data = client_socket.recv(1024).decode('utf-8')
username_info = json.loads(username_data)
client_name = username_info.get('content', client_name)
# 注册客户端
self.clients[client_socket] = {
"name": client_name,
"address": client_address
}
# 广播用户加入消息
join_msg = {
"type": "system",
"content": f"{client_name} 加入了聊天室",
"timestamp": datetime.now().strftime("%H:%M:%S")
}
self.broadcast_message(join_msg, exclude_client=client_socket)
print(f"用户 {client_name} 加入聊天室,当前在线用户: {len(self.clients)}")
# 持续接收和处理消息
while self.running:
try:
data = client_socket.recv(1024)
if not data:
break
message_data = json.loads(data.decode('utf-8'))
self.process_message(client_socket, message_data)
except json.JSONDecodeError:
print(f"消息格式错误 from {client_name}")
except Exception as e:
print(f"处理消息时出错: {e}")
break
except Exception as e:
print(f"处理客户端 {client_address} 时出错: {e}")
finally:
# 客户端断开连接
self.remove_client(client_socket, client_name)
def process_message(self, client_socket, message_data):
"""处理客户端发送的消息"""
message_type = message_data.get("type", "chat")
content = message_data.get("content", "")
client_name = self.clients[client_socket]["name"]
if message_type == "chat":
# 广播聊天消息
chat_msg = {
"type": "chat",
"sender": client_name,
"content": content,
"timestamp": datetime.now().strftime("%H:%M:%S")
}
self.broadcast_message(chat_msg)
print(f"消息广播: {client_name}: {content}")
elif message_type == "private":
# 私聊消息(简化实现)
target_user = message_data.get("target", "")
private_msg = {
"type": "private",
"sender": client_name,
"content": content,
"timestamp": datetime.now().strftime("%H:%M:%S")
}
# 这里可以扩展实现具体用户的私聊
def broadcast_message(self, message, exclude_client=None):
"""向所有客户端广播消息"""
message_json = json.dumps(message)
disconnected_clients = []
for client_socket in self.clients.keys():
if client_socket != exclude_client:
try:
client_socket.send(message_json.encode('utf-8'))
except:
disconnected_clients.append(client_socket)
# 清理断开连接的客户端
for client in disconnected_clients:
self.remove_client(client, self.clients[client]["name"])
def remove_client(self, client_socket, client_name):
"""移除断开连接的客户端"""
if client_socket in self.clients:
del self.clients[client_socket]
# 广播用户离开消息
leave_msg = {
"type": "system",
"content": f"{client_name} 离开了聊天室",
"timestamp": datetime.now().strftime("%H:%M:%S")
}
self.broadcast_message(leave_msg)
print(f"用户 {client_name} 断开连接,当前在线用户: {len(self.clients)}")
try:
client_socket.close()
except:
pass
def stop_server(self):
"""停止服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
print("服务器已停止")
# 启动服务器
if __name__ == "__main__":
server = ChatServer()
try:
server.start_server()
except KeyboardInterrupt:
server.stop_server()
```
## 客户端实现代码
```python
import socket
import threading
import json
import tkinter as tk
from tkinter import scrolledtext, messagebox
from datetime import datetime
class ChatClient:
def __init__(self):
self.server_host = 'localhost'
self.server_port = 8888
self.client_socket = None
self.username = ""
self.connected = False
# 创建GUI界面
self.create_gui()
def create_gui(self):
"""创建客户端GUI界面"""
self.root = tk.Tk()
self.root.title("简单聊天程序")
self.root.geometry("600x500")
self.root.resizable(True, True)
# 连接设置框架
connection_frame = tk.Frame(self.root)
connection_frame.pack(pady=10, padx=10, fill=tk.X)
tk.Label(connection_frame, text="服务器地址:").grid(row=0, column=0, padx=5)
self.host_entry = tk.Entry(connection_frame, width=15)
self.host_entry.insert(0, self.server_host)
self.host_entry.grid(row=0, column=1, padx=5)
tk.Label(connection_frame, text="端口:").grid(row=0, column=2, padx=5)
self.port_entry = tk.Entry(connection_frame, width=8)
self.port_entry.insert(0, str(self.server_port))
self.port_entry.grid(row=0, column=3, padx=5)
tk.Label(connection_frame, text="用户名:").grid(row=0, column=4, padx=5)
self.username_entry = tk.Entry(connection_frame, width=15)
self.username_entry.grid(row=0, column=5, padx=5)
self.connect_button = tk.Button(
connection_frame,
text="连接服务器",
command=self.toggle_connection
)
self.connect_button.grid(row=0, column=6, padx=10)
# 聊天显示区域
self.chat_display = scrolledtext.ScrolledText(
self.root,
wrap=tk.WORD,
width=70,
height=20,
state=tk.DISABLED
)
self.chat_display.pack(pady=10, padx=10, fill=tk.BOTH, expand=True)
# 消息发送区域
message_frame = tk.Frame(self.root)
message_frame.pack(pady=10, padx=10, fill=tk.X)
self.message_entry = tk.Entry(message_frame, width=50)
self.message_entry.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
self.message_entry.bind('<Return>', self.send_message)
self.send_button = tk.Button(
message_frame,
text="发送",
command=self.send_message,
state=tk.DISABLED
)
self.send_button.pack(side=tk.RIGHT, padx=5)
# 状态栏
self.status_var = tk.StringVar()
self.status_var.set("未连接")
status_label = tk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_label.pack(fill=tk.X, padx=10, pady=5)
def toggle_connection(self):
"""连接或断开服务器"""
if not self.connected:
self.connect_to_server()
else:
self.disconnect_from_server()
def connect_to_server(self):
"""连接到聊天服务器"""
try:
self.server_host = self.host_entry.get()
self.server_port = int(self.port_entry.get())
self.username = self.username_entry.get().strip()
if not self.username:
messagebox.showerror("错误", "请输入用户名")
return
# 创建socket连接
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.server_host, self.server_port))
self.connected = True
# 更新UI状态
self.connect_button.config(text="断开连接")
self.send_button.config(state=tk.NORMAL)
self.status_var.set(f"已连接到 {self.server_host}:{self.server_port}")
# 启动消息接收线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 发送用户名到服务器
username_msg = {
"type": "username",
"content": self.username
}
self.client_socket.send(json.dumps(username_msg).encode('utf-8'))
except Exception as e:
messagebox.showerror("连接错误", f"无法连接到服务器: {e}")
self.connected = False
def disconnect_from_server(self):
"""断开服务器连接"""
if self.client_socket:
try:
self.client_socket.close()
except:
pass
self.connected = False
self.connect_button.config(text="连接服务器")
self.send_button.config(state=tk.DISABLED)
self.status_var.set("未连接")
self.display_message("系统", "已断开与服务器的连接", "system")
def receive_messages(self):
"""接收服务器消息的线程函数"""
while self.connected:
try:
data = self.client_socket.recv(1024)
if not data:
break
message_data = json.loads(data.decode('utf-8'))
self.handle_received_message(message_data)
except json.JSONDecodeError:
print("接收到的消息格式错误")
except Exception as e:
if self.connected:
print(f"接收消息时出错: {e}")
break
# 连接断开
if self.connected:
self.root.after(0, self.disconnect_from_server)
def handle_received_message(self, message_data):
"""处理接收到的消息"""
msg_type = message_data.get("type", "chat")
content = message_data.get("content", "")
timestamp = message_data.get("timestamp", "")
sender = message_data.get("sender", "系统")
if msg_type == "system":
self.display_message("系统", content, "system")
elif msg_type == "chat":
self.display_message(sender, content, "chat", timestamp)
def display_message(self, sender, content, msg_type, timestamp=""):
"""在聊天显示区域显示消息"""
self.chat_display.config(state=tk.NORMAL)
# 根据消息类型设置不同的显示格式
if msg_type == "system":
self.chat_display.insert(tk.END, f"[系统] {content}\n", "system")
else:
time_display = timestamp if timestamp else datetime.now().strftime("%H:%M:%S")
self.chat_display.insert(tk.END, f"[{time_display}] {sender}: {content}\n", "chat")
self.chat_display.config(state=tk.DISABLED)
self.chat_display.see(tk.END) # 自动滚动到底部
def send_message(self, event=None):
"""发送消息到服务器"""
if not self.connected:
messagebox.showerror("错误", "未连接到服务器")
return
message = self.message_entry.get().strip()
if not message:
return