<think>我们之前尝试将代码分模块组织,但遇到了导入问题。现在,为了简化并避免模块导入问题,我们将所有类放在一个文件中。这样,我们就不需要处理包和模块导入的复杂性,特别适合小型项目和竞赛使用。
目标:创建一个交通流模拟程序,用于生成模拟数据,参加统计建模大赛。
我们将在一个文件中定义以下类:
1. Intersection(交叉口)
2. Road(道路)
3. Vehicle(车辆)
4. TrafficLight(交通灯)
5. RoadNetwork(路网)
此外,我们还需要一个主循环来运行模拟,并记录数据。
注意:我们将使用纯模拟数据,因此需要设计合理的车辆生成、移动和交通灯控制逻辑。
步骤:
1. 定义各个类,注意它们之间的引用关系。
2. 设计模拟循环,以固定时间步长更新整个系统。
3. 记录每个时间步的关键数据,例如:
- 每个交叉口的队列长度
- 每条道路的车辆数(利用率)
- 每个交通灯的状态
- 车辆的位置和速度等
为了简化,我们假设:
- 道路是单向的
- 车辆按照预定路线行驶(简单路线,例如从一条路到下一条路)
- 交通灯控制一个交叉口的某些方向
我们将在一个Python文件中实现所有内容,并命名为`traffic_simulation_single_file.py`。
下面是完整的代码:
</think>
# 单文件交通流模拟系统
下面是一个完整的单文件交通流模拟系统,专为统计建模大赛设计。这个实现将所有类整合在一个文件中,避免了模块导入问题,同时保留了所有核心功能。
```python
# traffic_simulation.py
import logging
import time
import random
import csv
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Any
# 配置日志系统
def setup_logger(name: str, log_file: str, level=logging.INFO) -> logging.Logger:
"""设置并返回一个日志记录器"""
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.FileHandler(log_file)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
return logger
# 车辆类
class Vehicle:
"""表示车辆的类"""
def __init__(self, id: str, length: float = 4.5, width: float = 1.8):
self.id = id
self.length = length
self.width = width
self.current_road: Optional['Road'] = None
self.position_on_road: float = 0.0 # 在道路上的位置(距离起点)
self.speed: float = 0.0 # 当前速度 (m/s)
self.acceleration: float = 0.0 # 当前加速度 (m/s²)
self.max_speed: float = 50.0 / 3.6 # 最大速度 (50 km/h -> m/s)
self.max_acceleration: float = 2.0 # 最大加速度 (m/s²)
self.max_deceleration: float = 5.0 # 最大减速度 (m/s²)
self.destination: Optional['Intersection'] = None # 目的地交叉口
self.route: List['Road'] = [] # 行驶路线
self.total_distance: float = 0.0 # 总行驶距离
self.total_time: float = 0.0 # 总行驶时间
self.waiting_time: float = 0.0 # 等待时间(在队列中)
self.logger = setup_logger(f"Vehicle_{id}", f"logs/vehicle_{id}.log", level=logging.DEBUG)
def update(self, time_step: float):
"""更新车辆状态"""
self.total_time += time_step
# 如果没有在道路上,跳过更新
if not self.current_road:
return
# 计算加速度
front_vehicle, distance = self.current_road.get_front_vehicle(self)
self.calculate_acceleration(front_vehicle, distance)
# 更新速度和位置
self.speed += self.acceleration * time_step
self.speed = max(0, min(self.speed, self.max_speed, self.current_road.speed_limit))
# 计算新位置
new_position = self.position_on_road + self.speed * time_step
# 检查是否到达道路尽头
if new_position > self.current_road.length:
# 进入交叉口队列
self.enter_intersection_queue()
else:
self.position_on_road = new_position
self.total_distance += self.speed * time_step
def calculate_acceleration(self, front_vehicle: Optional['Vehicle'], distance: float):
"""根据前车距离计算加速度"""
if front_vehicle is None:
# 没有前车,加速到最大速度
self.acceleration = min(self.acceleration + 0.5, self.max_acceleration)
else:
# 根据安全距离调整加速度
safe_distance = self.speed * 2.0 # 2秒车距
if distance < safe_distance:
# 太近,减速
self.acceleration = max(-self.max_deceleration, -5.0)
elif distance > safe_distance * 1.5:
# 有足够空间,加速
self.acceleration = min(self.acceleration + 0.5, self.max_acceleration)
else:
# 保持当前速度
self.acceleration = 0.0
def enter_intersection_queue(self):
"""车辆到达道路尽头,进入交叉口队列"""
if self.current_road and self.current_road.to_intersection:
self.logger.debug(f"Vehicle {self.id} entering queue at intersection {self.current_road.to_intersection.id}")
self.current_road.to_intersection.add_to_queue(self)
self.current_road.vehicles.remove(self)
self.current_road = None
self.position_on_road = 0.0
def move_to_next_road(self, road: 'Road'):
"""车辆从交叉口移动到新道路"""
self.logger.debug(f"Vehicle {self.id} moving to road {road.id}")
self.current_road = road
self.position_on_road = 0.0
road.vehicles.append(self)
def get_state(self) -> dict:
"""获取车辆当前状态"""
return {
"id": self.id,
"position": self.position_on_road,
"speed": self.speed * 3.6, # 转换为km/h
"acceleration": self.acceleration,
"current_road": self.current_road.id if self.current_road else None,
"total_distance": self.total_distance,
"total_time": self.total_time,
"waiting_time": self.waiting_time
}
# 交叉口类
class Intersection:
"""表示交叉口的类"""
def __init__(self, id: str, x: float, y: float):
self.id = id
self.x = x
self.y = y
self.incoming_roads: List['Road'] = []
self.outgoing_roads: List['Road'] = []
self.queue: List[Vehicle] = []
self.queue_length = 0
self.max_queue_length = 0
self.total_vehicles_processed = 0
self.average_wait_time = 0.0
self.logger = setup_logger(f"Intersection_{id}", f"logs/intersection_{id}.log")
def add_incoming_road(self, road: 'Road'):
"""添加入口道路"""
if road not in self.incoming_roads:
self.incoming_roads.append(road)
self.logger.info(f"Added incoming road {road.id}")
def add_outgoing_road(self, road: 'Road'):
"""添加出口道路"""
if road not in self.outgoing_roads:
self.outgoing_roads.append(road)
self.logger.info(f"Added outgoing road {road.id}")
def add_to_queue(self, vehicle: Vehicle):
"""添加车辆到队列"""
self.queue.append(vehicle)
self.queue_length += 1
if self.queue_length > self.max_queue_length:
self.max_queue_length = self.queue_length
self.logger.debug(f"Vehicle {vehicle.id} added to queue. Queue length: {self.queue_length}")
def remove_from_queue(self) -> Optional[Vehicle]:
"""从队列中移除车辆"""
if self.queue:
vehicle = self.queue.pop(0)
self.queue_length -= 1
self.total_vehicles_processed += 1
self.logger.debug(f"Vehicle {vehicle.id} removed from queue. Queue length: {self.queue_length}")
return vehicle
return None
def update_wait_time_statistics(self, time_step: float):
"""更新等待时间统计"""
for vehicle in self.queue:
vehicle.waiting_time += time_step
# 更新平均等待时间
if self.total_vehicles_processed > 0:
total_wait = sum(v.waiting_time for v in self.queue)
self.average_wait_time = (self.average_wait_time * (self.total_vehicles_processed - len(self.queue)) + total_wait) / self.total_vehicles_processed
def get_state(self) -> dict:
"""获取交叉口当前状态"""
return {
"id": self.id,
"queue_length": self.queue_length,
"max_queue_length": self.max_queue_length,
"total_vehicles_processed": self.total_vehicles_processed,
"average_wait_time": self.average_wait_time,
"incoming_roads": [r.id for r in self.incoming_roads],
"outgoing_roads": [r.id for r in self.outgoing_roads]
}
# 道路类
class Road:
"""表示道路的类"""
def __init__(self, id: str, from_intersection: Intersection, to_intersection: Intersection,
length: float, lanes: int = 1, speed_limit: float = 50.0):
self.id = id
self.from_intersection = from_intersection
self.to_intersection = to_intersection
self.length = length
self.lanes = lanes
self.speed_limit = speed_limit / 3.6 # 转换为m/s
self.vehicles: List[Vehicle] = []
self.utilization = 0.0 # 道路利用率 (0-1)
self.total_vehicles = 0
self.average_speed = 0.0
self.logger = setup_logger(f"Road_{id}", f"logs/road_{id}.log")
# 连接交叉口
from_intersection.add_outgoing_road(self)
to_intersection.add_incoming_road(self)
def get_front_vehicle(self, vehicle: Vehicle) -> Tuple[Optional[Vehicle], float]:
"""获取前车和前车距离"""
if vehicle not in self.vehicles:
return None, float('inf')
# 按位置排序车辆
sorted_vehicles = sorted(self.vehicles, key=lambda v: v.position_on_road)
index = sorted_vehicles.index(vehicle)
if index < len(sorted_vehicles) - 1:
front_vehicle = sorted_vehicles[index + 1]
distance = front_vehicle.position_on_road - vehicle.position_on_road - front_vehicle.length
return front_vehicle, max(0, distance)
return None, float('inf')
def update(self, time_step: float):
"""更新道路状态"""
# 计算道路利用率
max_vehicles = self.length / 7 # 假设每辆车平均占用7米
self.utilization = min(1.0, len(self.vehicles) / max_vehicles) if max_vehicles > 0 else 0.0
# 计算平均速度
if self.vehicles:
total_speed = sum(v.speed for v in self.vehicles)
self.average_speed = total_speed / len(self.vehicles)
def add_vehicle(self, vehicle: Vehicle):
"""添加车辆到道路"""
if vehicle not in self.vehicles:
self.vehicles.append(vehicle)
self.total_vehicles += 1
vehicle.current_road = self
vehicle.position_on_road = 0.0
self.logger.debug(f"Vehicle {vehicle.id} added to road")
def get_state(self) -> dict:
"""获取道路当前状态"""
return {
"id": self.id,
"vehicle_count": len(self.vehicles),
"utilization": self.utilization,
"average_speed": self.average_speed * 3.6, # 转换为km/h
"total_vehicles": self.total_vehicles,
"from_intersection": self.from_intersection.id,
"to_intersection": self.to_intersection.id
}
# 交通灯类
@dataclass
class TrafficLight:
"""表示交通灯的类"""
intersection: Intersection
red_time: int
green_time: int = 30
yellow_time: int = 5
controlled_directions: List[Tuple[Road, Road]] = field(default_factory=list)
def __post_init__(self):
self.cycle_time = self.green_time + self.yellow_time + self.red_time
self.state = "red"
self.time_in_state = 0.0
self.logger = setup_logger(f"TrafficLight_{self.intersection.id}", f"logs/trafficlight_{self.intersection.id}.log")
self.logger.info(f"Traffic light initialized at intersection {self.intersection.id}")
def add_direction(self, incoming_road: Road, outgoing_road: Road):
"""添加控制方向"""
if (incoming_road, outgoing_road) not in self.controlled_directions:
self.controlled_directions.append((incoming_road, outgoing_road))
self.logger.info(f"Added controlled direction: {incoming_road.id} -> {outgoing_road.id}")
def update(self, time_step: float):
"""更新交通灯状态"""
self.time_in_state += time_step
# 状态转换逻辑
if self.state == "green" and self.time_in_state >= self.green_time:
self.state = "yellow"
self.time_in_state = 0.0
self.logger.info("Changed to yellow light")
elif self.state == "yellow" and self.time_in_state >= self.yellow_time:
self.state = "red"
self.time_in_state = 0.0
self.logger.info("Changed to red light")
elif self.state == "red" and self.time_in_state >= self.red_time:
self.state = "green"
self.time_in_state = 0.0
self.logger.info("Changed to green light")
def is_green_for_direction(self, incoming_road: Road, outgoing_road: Road) -> bool:
"""检查特定方向是否为绿灯"""
return self.state == "green" and (incoming_road, outgoing_road) in self.controlled_directions
def get_current_state(self) -> str:
"""获取当前状态"""
return self.state
def get_state(self) -> dict:
"""获取交通灯当前状态"""
return {
"intersection": self.intersection.id,
"state": self.state,
"time_in_state": self.time_in_state,
"controlled_directions": [(incoming.id, outgoing.id) for incoming, outgoing in self.controlled_directions]
}
# 路网类
class RoadNetwork:
"""道路网络类"""
def __init__(self):
self.intersections: Dict[str, Intersection] = {}
self.roads: Dict[str, Road] = {}
self.traffic_lights: Dict[str, TrafficLight] = {}
self.vehicles: Dict[str, Vehicle] = {}
self.time: float = 0.0
self.simulation_speed: float = 1.0 # 模拟速度倍率
self.data_logger = setup_logger("DataLogger", "logs/simulation_data.csv")
self.logger = setup_logger("RoadNetwork", "logs/road_network.log")
self.logger.info("RoadNetwork initialized")
# 创建CSV文件并写入标题
with open("logs/simulation_data.csv", "w", newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
"timestamp", "simulation_time",
"vehicle_count", "total_distance", "total_waiting_time",
"avg_speed", "avg_utilization", "avg_queue_length"
])
def add_intersection(self, id: str, x: float, y: float) -> Intersection:
"""添加交叉口"""
if id not in self.intersections:
intersection = Intersection(id, x, y)
self.intersections[id] = intersection
self.logger.info(f"Added intersection {id}")
return intersection
return self.intersections[id]
def add_road(self, id: str, from_id: str, to_id: str, length: float,
lanes: int = 1, speed_limit: float = 50.0) -> Road:
"""添加道路"""
if id not in self.roads:
from_intersection = self.intersections.get(from_id)
to_intersection = self.intersections.get(to_id)
if not from_intersection or not to_intersection:
self.logger.error(f"Invalid intersection IDs: from={from_id}, to={to_id}")
return None
road = Road(id, from_intersection, to_intersection, length, lanes, speed_limit)
self.roads[id] = road
self.logger.info(f"Added road {id} from {from_id} to {to_id}")
return road
return self.roads[id]
def add_traffic_light(self, intersection_id: str, red_time: int,
green_time: int = 30, yellow_time: int = 5) -> TrafficLight:
"""添加交通灯"""
if intersection_id not in self.traffic_lights:
intersection = self.intersections.get(intersection_id)
if not intersection:
self.logger.error(f"Intersection {intersection_id} not found")
return None
traffic_light = TrafficLight(intersection, red_time, green_time, yellow_time)
self.traffic_lights[intersection_id] = traffic_light
self.logger.info(f"Added traffic light at intersection {intersection_id}")
return traffic_light
return self.traffic_lights[intersection_id]
def add_controlled_direction(self, intersection_id: str, incoming_road_id: str, outgoing_road_id: str):
"""为交通灯添加控制方向"""
traffic_light = self.traffic_lights.get(intersection_id)
incoming_road = self.roads.get(incoming_road_id)
outgoing_road = self.roads.get(outgoing_road_id)
if traffic_light and incoming_