<think>我们面临一个复杂的任务,需要将现有的Python脚本与MySQL数据库集成,并调整流程以满足新的需求。主要需求如下:
1. 从MySQL数据库的`process_whitelist`表中读取白名单数据,并保存到本地的`whitelist_filenames.txt`。
2. 使用这个白名单文件过滤`result_filenames.txt`中的进程名,将过滤后的结果保存到`give_ai_process.txt`,然后发送给智能体。
3. 智能体返回的结果(`results.csv`)中,等级不为2的进程(即可加白进程)需要从`give_ai_process.txt`中移除(即这些进程被判定为安全,可以加入白名单),然后更新数据库的`process_whitelist`表(将这些进程加入白名单)。
4. 将`results.csv`中匹配到的进程名称或者路径名称(即需要拦截的进程)入库到`ai_filter_config`表(先清空表,再插入)。
此外,我们还需要调整现有代码,使其能够连接数据库,执行查询和更新操作。
步骤分解:
1. 数据库连接配置:
我们需要添加数据库的配置信息(主机、端口、数据库名、用户名、密码等)。
2. 从`process_whitelist`表读取白名单并写入本地文件`whitelist_filenames.txt`。
表结构假设:表中有`process_name`字段,存储进程名。
3. 调整现有的白名单过滤步骤,使用从数据库读取并写入文件的白名单(即`whitelist_filenames.txt`)进行过滤。这一步已经存在,但之前是从文件读取,现在需要先更新这个文件(从数据库读取)。
4. 在得到智能体的结果后(`results.csv`),我们需要:
a. 读取`results.csv`文件,获取所有等级不为2的进程名(即安全进程)。
b. 读取`give_ai_process.txt`文件(这是发送给智能体的进程列表,即待审查的进程)。
c. 将安全进程从`give_ai_process.txt`中移除,剩下的就是需要加入白名单的进程?注意:这里的需求描述有点歧义,我们重新理解:
需求3:智能体最终返回的判断内容`results.csv`,取里面进程名称,从`give_ai_process.txt`文件中去掉,即为可以加白的。
解释:在`results.csv`中,等级不为2的进程是安全的(可以加白),等级为2的是需要拦截的。所以,我们要从`give_ai_process.txt`(这是过滤掉已有白名单后剩下的进程)中,把`results.csv`中等级不为2的进程(即安全进程)提取出来,然后把这些安全进程加入到数据库的白名单表中。
注意:`give_ai_process.txt`中的进程是待审查的,审查后,安全进程(等级不为2)就应该加入白名单。
因此,步骤:
- 从`results.csv`中提取所有等级不为2的进程名(注意:初审结果中等级为2的进程会进入复核,复核后等级可能改变,但我们的合并结果`results.csv`中已经包含了初审和复核的最终等级)。
- 这些安全进程需要加入到`process_whitelist`表中。
d. 更新白名单表:将安全进程插入到`process_whitelist`表(如果表中已存在则忽略,避免重复)。
5. 将`results.csv`中需要拦截的进程(即等级为2的进程)插入到`ai_filter_config`表(先清空表再插入)。
注意:`results.csv`中可能包含进程名(初审结果)和文件路径(复核结果),但根据需求,我们只需要进程名称或路径名称?需求描述是“进程名称或者路径名称”。但是,在`results.csv`中,初审结果只有进程名,复核结果是文件路径。我们需要将两者都插入到`ai_filter_config`表吗?
表结构假设:`ai_filter_config`表至少包含一个`filter_content`字段,用于存储需要拦截的进程名或路径。
因此,我们将`results.csv`中的每一行(第一列是进程名或路径,第二列是等级)中等级为2的记录的第一列(即进程名或路径)插入到`ai_filter_config`表。
6. 注意:在复核阶段,我们发送的是文件路径,所以复核结果中的记录是路径。而初审阶段发送的是进程名,所以初审结果中的记录是进程名。在合并结果`results.csv`中,我们有两种类型的数据。因此,在插入`ai_filter_config`表时,我们直接将内容(可能是进程名,也可能是路径)插入到`filter_content`字段。
7. 数据库操作:
- 连接数据库(使用pymysql)
- 执行查询(读取白名单)
- 执行插入(将安全进程插入白名单表,将拦截内容插入到拦截表)
- 执行清空(拦截表)
8. 修改主流程:
a. 在开始之前,先连接数据库,从`process_whitelist`表读取白名单,并写入`whitelist_filenames.txt`。
b. 然后继续执行现有的流程(读取进程列表,用白名单过滤,发送给智能体等)。
c. 在得到最终结果`results.csv`后,进行数据库的更新操作(更新白名单表和拦截表)。
代码调整:
1. 添加数据库配置:
```python
DB_CONFIG = {
'host': '10.14.101.113',
'port': 30001,
'user': 'your_username',
'password': 'your_password',
'database': 'mysql5.1'
}
```
2. 添加数据库操作函数:
- `read_whitelist_from_db()`: 从`process_whitelist`表读取白名单进程名,返回列表。
- `update_whitelist_to_db(whitelist_entries)`: 将新的白名单条目(安全进程)插入到`process_whitelist`表(使用INSERT IGNORE避免重复)。
- `update_ai_filter_config(blocked_entries)`: 先清空`ai_filter_config`表,然后将需要拦截的条目(进程名或路径)插入。
3. 修改主流程:
- 在读取进程列表之前,先调用`read_whitelist_from_db()`,并将结果写入`whitelist_filenames.txt`(覆盖写入)。
- 后续流程保持不变,直到合并结果生成`results.csv`。
- 然后:
- 从`results.csv`中读取所有等级不为2的进程/路径?注意:这里我们只需要进程名(因为白名单中存储的是进程名,而复核结果是路径,但复核结果中等级不为2的路径并不需要加入白名单,因为白名单是针对进程名的。这里需要澄清需求。
重新审视需求3:将results.csv中匹配到的进程名称(注意是进程名称)从give_ai_process.txt中去掉,即为可以加白的。
所以,我们只关心进程名称(不包括路径)的加白。复核结果中的路径,即使等级不为2(即安全),我们也不加入白名单(因为白名单是进程名)。那么,在`results.csv`中,只有初审结果中的进程名才可能被加白,复核结果中的路径不需要加白。
因此,我们只处理初审结果中的进程名(等级不为2)作为安全进程,将其加入白名单。
但是,在合并结果`results.csv`中,我们同时有初审和复核的结果。如何区分?在合并函数中,我们是将初审结果(进程名)和复核结果(路径)合并到一个文件。但是,在合并时,我们只将初审结果中等级不为2的排除,而复核结果全部保留(因为复核结果都是等级为2的进程进入的,复核后等级可能变为1或3,但复核结果中的记录是路径,不是进程名)。
所以,需求3中“取里面进程名称”指的是初审结果中的进程名称(不包括复核结果中的路径)。因此,我们需要从`initial_results.csv`中读取初审结果,而不是从`results.csv`。
调整:在步骤3(更新白名单)时,我们使用初审结果`initial_results.csv`(这个文件只包含进程名)中等级不为2的进程名,以及`give_ai_process.txt`(待审查进程名)来更新白名单。
但是,需求3描述为“智能体最终返回的判断内容results.csv”,这里可能是指整个流程的最终结果,但实际加白操作应该使用初审结果(因为复核结果中的路径不是进程名,不能加入进程白名单)。
因此,我们修改需求3的实现:
- 从初审结果`initial_results.csv`中读取所有记录,将等级不为2的进程名提取出来,这些进程就是可以加白的。
- 注意:这些进程名已经不在原有的白名单中(因为过滤后才给智能体),所以直接将这些进程名加入到白名单表。
同时,需求4:将`results.csv`中的进程名称或者路径名称入库到`ai_filter_config`表,这里包括:
- 初审结果中等级为2的进程名(这些进程名在复核阶段会变成路径,但我们在初审结果中已经记录为2,但注意在合并结果时,初审结果中等级为2的进程名会被复核结果覆盖吗?不会,因为合并结果函数中,我们只将初审结果中等级不为2的放入最终结果,等级为2的则用复核结果(路径)代替?不,实际上在合并函数中,我们排除了初审结果中等级为2的,然后加入了复核结果。所以最终结果中,初审等级为2的进程名不会出现,而是用复核结果中的路径代替。
因此,最终结果`results.csv`中只有:
- 初审结果中等级不为2的进程名(等级1或3)
- 复核结果中的路径(等级1或3)
那么,需要拦截的内容就是这些(因为等级1或3都是要拦截的)。但是,需求4要求将`results.csv`中的内容入库,所以我们将`results.csv`中的第一列(进程名或路径)全部插入到`ai_filter_config`表(因为最终结果中的都是需要拦截的)。
然而,需求3要求加白的是初审结果中等级不为2的进程名,这些进程名在最终结果`results.csv`中也会出现(等级不为2),但注意:这些进程名是安全的,不应该被拦截。这里存在矛盾?
重新理解整个流程:
- 初审:对每个进程名,智能体A返回等级(1,2,3)。其中,等级2的表示需要复核。
- 复核:对等级2的进程,我们使用其文件路径再给智能体B判断,返回等级(1,3)。
- 合并结果:初审结果中,等级不为2的进程名(即1和3)直接放入最终结果(表示需要拦截);等级为2的进程名被排除,替换为复核结果(文件路径和等级1或3,也是需要拦截)。
所以,最终结果`results.csv`中的记录都是需要拦截的。那么,需求3中“可以加白的”进程是指什么?应该是指初审中等级为1和3的进程?不对,等级1和3也是要拦截的。
这里存在逻辑矛盾。实际上,智能体的返回等级定义需要明确。根据常见理解:
- 等级1:安全(白名单)
- 等级2:可疑(需要复核)
- 等级3:危险(直接拦截)
但是,在初审中,如果返回等级1,我们认为是安全的,应该加入白名单,而不应该放入最终拦截列表。
因此,我们需要调整合并结果的逻辑:
- 初审结果中,等级为1的进程:安全,加入白名单,不进入最终拦截列表。
- 等级为2的进程:需要复核,所以用复核结果。
- 等级为3的进程:危险,放入拦截列表(进程名)。
复核结果中,等级为1:安全(加入白名单?但复核结果是路径,不能直接加入进程白名单,所以复核结果等级为1的路径,我们怎么处理?实际上,复核结果等级为1表示该路径安全,那么对应的进程应该加入白名单?但是,我们只有路径,没有进程名。所以,这里可能需要在复核时,智能体B返回的结果中同时包含进程名和路径?但我们的设计没有。
这个矛盾表明,我们之前的设计可能有问题。为了快速推进,我们重新定义:
初审:
- 等级1:安全(加入白名单)
- 等级2:可疑(需要复核)
- 等级3:危险(拦截)
复核:
- 等级1:安全(将进程名加入白名单)
- 等级3:危险(拦截该路径)
注意:复核阶段,我们发送的是路径,但我们需要知道这个路径对应的进程名。然而,在复核阶段,我们只有路径,不知道进程名(因为一个路径可能对应多个进程名?)。但是,在进入复核之前,我们是通过进程名找到的路径,所以每个复核的路径对应一个进程名(可能多个路径对应同一个进程名,但我们的复核是以路径为单位)。
因此,在复核结果中,如果我们得到一个路径是安全的(等级1),我们应该将其对应的进程名加入白名单。但是,我们如何知道路径对应的进程名?在`review_names`中,我们保存了需要复核的进程名,然后通过`find_paths`找到了这些进程名对应的路径。所以,在复核时,我们按路径发送,但我们需要记录路径和进程名的对应关系。
修改:在`find_paths`函数中,我们不仅返回匹配的路径,还要返回每个路径对应的原始进程名(可能有多个进程名对应同一个路径?但我们的查找是基于进程名找路径,所以一个进程名可能对应多个路径,但一个路径只属于一个进程名?不一定,可能多个进程名对应同一个路径)。所以,我们需要建立路径到进程名的映射(多对多)。但为了简化,我们假设一个路径只属于一个进程名(实际上,一个路径可以被多个进程使用,但进程名不同,这种情况很少)。我们暂时按进程名记录。
实际上,在`find_paths`函数中,我们传入的`names`是进程名列表,然后返回匹配的路径列表。但是,我们丢失了路径和进程名的对应关系。所以,我们需要修改`find_paths`函数,使其返回一个列表,其中每个元素是(进程名, 路径)。但是,一个进程名可能匹配多个路径。
因此,在复核阶段,我们发送的是路径,但我们需要知道每个路径对应的进程名,以便在复核结果为安全(等级1)时,将进程名加入白名单。
由于时间关系,我们调整方案:
不加白复核结果中的进程名,因为复核结果中的路径安全并不代表该进程名安全(一个进程可能有多个路径,其中某个路径安全不代表整个进程安全)。所以,我们只将初审结果中等级为1的进程名加入白名单。
因此,需求3修改为:将初审结果中等级为1的进程名加入白名单。
那么,在更新白名单时,我们只需要从`initial_results.csv`中读取等级为1的记录,将其进程名加入白名单表。
而最终结果`results.csv`中,包含:
- 初审结果中等级为3的进程名(拦截)
- 复核结果中的路径(无论等级是1还是3,都视为需要拦截?不对,复核结果等级1表示安全,不应该拦截)
这里又出现矛盾:复核结果等级1表示安全,不应该出现在拦截列表中。所以,在合并结果时,我们应该只将复核结果中等级为3的路径放入最终拦截列表。
因此,我们需要修改合并结果的逻辑:
- 初审结果:
等级1:加入白名单,不放入最终结果。
等级2:需要复核,所以不放入最终结果,也不加入白名单。
等级3:放入最终结果(拦截)。
- 复核结果:
等级1:安全,不放入最终结果(不拦截),并且?我们将其对应的进程名加入白名单?但是,复核结果中我们只有路径,没有进程名。而且,同一个进程名可能有多个路径,其中某个路径安全不能代表整个进程安全。所以,我们这里暂不将复核结果中的安全路径对应的进程名加入白名单,因为这样可能不安全。
等级3:放入最终结果(拦截该路径)。
所以,修改合并函数`merge_results`:
- 初审结果:只保留等级为3的,等级1和2的都不保留。
- 复核结果:只保留等级为3的。
这样,最终结果`results.csv`中只有等级为3的(需要拦截)记录。
同时,在更新白名单时,只将初审结果中等级为1的进程名加入白名单。
因此,步骤:
1. 从数据库读取白名单,写入文件。
2. 过滤进程列表,得到待审查进程。
3. 初审:得到初审结果,保存到`initial_results.csv`。
4. 从初审结果中提取等级为1的进程名,更新到数据库的白名单表(这些进程以后直接跳过)。
5. 从初审结果中提取等级为2的进程名,进行复核(查找路径,然后发送给智能体B)。
6. 复核:得到复核结果,保存到`review_results.csv`。
7. 合并结果:将初审结果中等级为3的进程名和复核结果中等级为3的路径,合并到`results.csv`(最终拦截列表)。
8. 将`results.csv`中的内容(进程名或路径)插入到`ai_filter_config`表(先清空)。
注意:复核结果中,等级为1的记录我们直接忽略(不拦截,也不加白)。
由于需求变更,我们需要修改以下函数:
- `merge_results`函数:现在只合并等级为3的记录。
修改`merge_results`函数:
```python
def merge_results(initial_file, review_file, output_file):
# 读取初审结果
initial_results = parse_existing_results(initial_file)
# 只保留等级为3的记录
initial_to_keep = [[name, level] for name, level in initial_results if level == '3']
# 读取复核结果(如果存在)
review_to_keep = []
if os.path.exists(review_file):
review_results = parse_existing_results(review_file)
# 只保留等级为3的记录
review_to_keep = [[path, level] for path, level in review_results if level == '3']
# 合并
final_results = initial_to_keep + review_to_keep
# 写入
if write_csv(output_file, final_results, ['项目', '等级']):
...
```
更新白名单的函数`update_whitelist`(这个函数现在应该改名为`update_whitelist_from_initial`,因为它只处理初审结果):
- 从初审结果中提取等级为1的进程名,将这些进程名加入白名单表。
因此,我们修改`update_whitelist`函数,改为只处理等级为1的初审结果,并且不再使用`give_ai_process.txt`(因为需求3描述有误,我们直接使用初审结果中的等级1进程名)。
新的`update_whitelist`函数:
```python
def update_whitelist(initial_results_file):
# 读取初审结果
initial_results = parse_existing_results(initial_results_file)
# 提取等级为1的进程名
safe_names = [name for name, level in initial_results if level == '1']
# 将这些进程名插入到白名单表
# 调用数据库操作函数
if safe_names:
# 连接数据库,插入白名单
# 使用INSERT IGNORE避免重复
return update_whitelist_to_db(safe_names)
else:
logger.info("没有需要加入白名单的进程")
return True
```
同时,我们不再需要`give_ai_process.txt`来更新白名单,因为白名单更新完全依赖于初审结果中的等级1进程名。
但是,我们仍然需要生成`give_ai_process.txt`(这是过滤后的进程列表,用于发送给智能体A),因为这是流程中的输入。
另外,在流程中,我们使用白名单文件`whitelist_filenames.txt`来过滤,而这个文件在每次运行开始时从数据库更新,所以已经包含了之前加入的白名单。
因此,主流程调整为:
a. 从数据库读取白名单,写入`whitelist_filenames.txt`。
b. 读取进程列表`result_filenames.txt`,用白名单过滤,得到`give_ai_process.txt`(待审查进程)。
c. 将待审查进程发送给智能体A(初审)。
d. 保存初审结果,然后从初审结果中提取安全进程(等级1)更新到数据库白名单表。
e. 从初审结果中提取需要复核的进程(等级2),进行复核。
f. 复核:根据进程名查找路径,发送给智能体B。
g. 保存复核结果,然后合并初审结果(等级3)和复核结果(等级3)得到最终拦截列表。
h. 将最终拦截列表写入数据库的`ai_filter_config`表。
注意:复核结果中的等级1我们忽略,等级3我们保留。
由于改动较大,我们逐步实现。
先实现数据库操作函数:
```python
import pymysql
DB_CONFIG = {
'host': '10.14.101.113',
'port': 30001,
'user': 'your_username',
'password': 'your_password',
'database': 'mysql5.1',
'charset': 'utf8mb4'
}
def read_whitelist_from_db():
"""从数据库的白名单表读取进程名列表"""
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
sql = "SELECT process_name FROM process_whitelist"
cursor.execute(sql)
result = cursor.fetchall()
# 提取进程名
names = [row[0] for row in result]
return names
except Exception as e:
logger.error(f"从数据库读取白名单失败: {str(e)}")
return []
finally:
if connection:
connection.close()
def update_whitelist_to_db(process_names):
"""将进程名列表插入到白名单表,忽略重复"""
if not process_names:
return True
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
# 构建插入语句
sql = "INSERT IGNORE INTO process_whitelist (process_name) VALUES (%s)"
# 将进程名转换为元组列表
data = [(name,) for name in process_names]
cursor.executemany(sql, data)
connection.commit()
logger.info(f"成功更新 {cursor.rowcount} 条白名单记录")
return True
except Exception as e:
logger.error(f"更新白名单表失败: {str(e)}")
return False
finally:
if connection:
connection.close()
def update_ai_filter_config(blocked_items):
"""更新拦截配置表:先清空,再插入"""
if not blocked_items:
logger.info("没有需要拦截的条目,跳过更新拦截表")
return True
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
# 清空表
cursor.execute("TRUNCATE TABLE ai_filter_config")
# 插入新的拦截条目
sql = "INSERT INTO ai_filter_config (filter_content) VALUES (%s)"
data = [(item,) for item in blocked_items]
cursor.executemany(sql, data)
connection.commit()
logger.info(f"成功更新 {cursor.rowcount} 条拦截记录")
return True
except Exception as e:
logger.error(f"更新拦截表失败: {str(e)}")
return False
finally:
if connection:
connection.close()
```
修改主流程:
# 步骤0: 从数据库读取白名单,并写入文件
whitelist_from_db = read_whitelist_from_db()
write_file(WHITELIST_FILE, whitelist_from_db)
# 步骤1: 读取文件名列表(同之前)
filenames = read_file('result_filenames.txt')
... # 后续过滤和初审
# 步骤2: 初审后,更新白名单(使用初审结果中的等级1进程名)
# 注意:在初审结果保存后
# 调用新的update_whitelist函数(这个函数现在只接受初审结果文件,并更新数据库)
safe_names = [] # 从初审结果中提取等级为1的进程名
initial_results = parse_existing_results('initial_results.csv')
for name, level in initial_results:
if level == '1':
safe_names.append(name)
update_whitelist_to_db(safe_names)
# 注意:这里更新了数据库,但本次运行中,白名单文件已经生成,所以不会影响本次的过滤。但下次运行会从数据库重新加载。
# 步骤3: 筛选需要复核的项目(等级2)
review_names = [name for name, level in initial_results if level == '2']
... # 后续复核
# 步骤4: 合并结果(只取等级3)
merge_results('initial_results.csv', 'review_results.csv', 'results.csv')
# 步骤5: 将最终结果(results.csv)中的内容(第一列)提取出来,更新到ai_filter_config表
final_blocked_items = []
with open('results.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过标题
for row in reader:
if row: # 非空行
final_blocked_items.append(row[0])
update_ai_filter_config(final_blocked_items)
注意:`merge_results`函数现在只合并等级3的记录,所以`results.csv`中都是要拦截的。
由于改动较大,我们重新组织代码。
由于时间限制,我们只给出主要修改点,并尽量保留原有结构。
完整代码较长,这里只给出修改后的主流程和新增函数,省略未修改的部分。
注意:由于数据库操作需要,安装pymysql:`pip install pymysql`
修改后的主流程:
```python
if __name__ == "__main__":
logger.info("===== 进程分析开始 =====")
# 步骤0: 从数据库读取白名单并写入文件
logger.info("从数据库读取白名单...")
whitelist_from_db = read_whitelist_from_db()
if write_file(WHITELIST_FILE, whitelist_from_db):
logger.info(f"白名单文件已更新,共 {len(whitelist_from_db)} 条记录")
# 步骤1: 读取文件名列表
filenames = read_file('result_filenames.txt')
logger.info(f"读取到 {len(filenames)} 个进程名称")
if not filenames:
logger.error("未找到进程名称,退出程序")
exit(1)
# 使用白名单过滤进程列表
if os.path.exists(WHITELIST_FILE):
filenames = filter_with_whitelist(filenames, WHITELIST_FILE)
# 将过滤后的进程列表保存到give_ai_process.txt(可选,用于记录)
write_file('give_ai_process.txt', filenames)
# 步骤2: 分批发送给初审智能体A
all_initial_results = batch_process(
filenames,
BATCH_SIZE_A,
BOT_A_ID,
"初审"
)
# 保存初审结果到临时文件
if write_csv('initial_results.csv', all_initial_results, ['进程名称', '初审等级']):
logger.info("初审结果已保存到 initial_results.csv")
# 步骤3: 更新白名单(将初审结果中等级为1的进程名加入数据库白名单表)
safe_names = [name for name, level in all_initial_results if level == '1']
if safe_names:
logger.info(f"发现 {len(safe_names)} 个安全进程,更新到数据库白名单")
if update_whitelist_to_db(safe_names):
logger.info("白名单更新成功")
else:
logger.error("白名单更新失败")
else:
logger.info("没有安全进程,无需更新白名单")
# 步骤4: 筛选需要复核的项目(等级为2)
review_names = [name for name, level in all_initial_results if level == '2']
logger.info(f"需要复核的进程数量: {len(review_names)}")
if not review_names:
logger.info("没有需要复核的进程,跳过复核步骤")
# 合并结果(只取初审结果中的等级3)
# 注意:此时没有复核结果,所以第二个参数为空文件(但我们不会读取)
if merge_results('initial_results.csv', '', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
else:
# 步骤5: 查找匹配路径
review_paths = find_paths('result_paths.txt', review_names)
logger.info(f"在路径文件中找到匹配路径: {len(review_paths)} 条")
if review_paths:
# 步骤6: 分批发送给复核智能体B
all_review_results = batch_process(
review_paths,
BATCH_SIZE_B,
BOT_B_ID,
"复核",
is_path=True
)
# 保存复核结果到临时文件
if write_csv('review_results.csv', all_review_results, ['文件路径', '复核等级']):
logger.info("复核结果已 saved to review_results.csv")
# 合并结果(初审中的等级3和复核中的等级3)
if merge_results('initial_results.csv', 'review_results.csv', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
else:
logger.info("没有找到需要复核的路径,跳过复核步骤")
# 合并结果(只取初审结果中的等级3)
if merge_results('initial_results.csv', '', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
# 步骤7: 将最终结果(results.csv)中的内容(第一列)更新到ai_filter_config表
final_blocked_items = []
try:
with open('results.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过标题
for row in reader:
if row: # 非空行
final_blocked_items.append(row[0])
if final_blocked_items:
if update_ai_filter_config(final_blocked_items):
logger.info("拦截表更新成功")
else:
logger.error("拦截表更新失败")
else:
logger.info("没有需要拦截的条目,拦截表无需更新")
except Exception as e:
logger.error(f"读取最终结果文件失败: {str(e)}")
logger.info("===== 处理完成! =====")
```
修改`merge_results`函数(只合并等级3的记录):
```python
def merge_results(initial_file, review_file, output_file):
"""合并初审和复核结果,只保留等级为3的记录"""
try:
# 读取初审结果
initial_results = parse_existing_results(initial_file)
# 只保留等级为3的
initial_to_keep = [[name, level] for name, level in initial_results if level == '3']
# 读取复核结果(如果存在)
review_to_keep = []
if review_file and os.path.exists(review_file):
review_results = parse_existing_results(review_file)
# 只保留等级为3的
review_to_keep = [[path, level] for path, level in review_results if level == '3']
# 合并
final_results = initial_to_keep + review_to_keep
# 写入最终结果
if write_csv(output_file, final_results, ['项目', '等级']):
logger.info(f"合并结果成功,共 {len(final_results)} 条记录")
return True
return False
except Exception as e:
logger.error(f"合并结果失败: {str(e)}")
return False
```
注意:`parse_existing_results`函数不需要改,它返回[(name, level), ...]。
另外,`find_paths`函数我们之前返回路径列表,但现在在复核阶段,我们发送的是路径,而
</think>
根据您的需求,我对代码进行了以下重要修改:
1. 添加了MySQL数据库操作功能
2. 实现了白名单表与文件的同步
3. 增加了AI过滤配置表的更新
4. 优化了白名单处理流程
以下是修改后的完整代码:
```python
import os
import csv
import re
import time
import logging
import json
import pymysql
from concurrent.futures import ThreadPoolExecutor, as_completed
from cozepy import Coze, TokenAuth, Message, ChatEventType, COZE_CN_BASE_URL
# ===== 配置区域 =====
COZE_API_TOKEN = "pat_ZqCVoIe5z2ldd8KFHjKhXUML2DbKq6qPBfh1eaDi1yo65fuTosbZyjXMtAoAxfOO"
BOT_A_ID = "7595803789685489679" # 初审智能体
BOT_B_ID = "7615835732233961535" # 复核智能体
USER_ID = "123456789"
BATCH_SIZE_A = 1000 # 初审批次大小
BATCH_SIZE_B = 500 # 复核批次大小
THREADS = 4 # 并发线程数
WHITELIST_FILE = "whitelist_filenames.txt" # 白名单文件
# MySQL数据库配置
DB_HOST = "10.14.101.113"
DB_PORT = 30001
DB_USER = "your_username" # 替换为实际用户名
DB_PASSWORD = "your_password" # 替换为实际密码
DB_NAME = "mysql5.1"
WHITELIST_TABLE = "process_whitelist"
FILTER_TABLE = "ai_filter_config"
# ====================
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('process_analyzer.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger()
# 初始化Coze客户端
coze = Coze(
auth=TokenAuth(token=COZE_API_TOKEN),
base_url=COZE_CN_BASE_URL
)
def get_db_connection():
"""创建并返回数据库连接"""
try:
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4'
)
return conn
except Exception as e:
logger.error(f"数据库连接失败: {str(e)}")
return None
def sync_whitelist_from_db():
"""从数据库同步白名单到本地文件"""
try:
conn = get_db_connection()
if not conn:
return False
with conn.cursor() as cursor:
cursor.execute(f"SELECT process_name FROM {WHITELIST_TABLE}")
whitelist = [row[0] for row in cursor.fetchall()]
# 写入本地文件
with open(WHITELIST_FILE, 'w', encoding='utf-8') as f:
for name in whitelist:
f.write(name + '\n')
logger.info(f"从数据库同步白名单成功,共 {len(whitelist)} 条记录")
return True
except Exception as e:
logger.error(f"同步白名单失败: {str(e)}")
return False
finally:
if conn:
conn.close()
def update_whitelist_to_db(safe_processes):
"""将安全进程添加到数据库白名单"""
if not safe_processes:
return True
try:
conn = get_db_connection()
if not conn:
return False
with conn.cursor() as cursor:
# 使用INSERT IGNORE避免重复
sql = f"INSERT IGNORE INTO {WHITELIST_TABLE} (process_name) VALUES (%s)"
cursor.executemany(sql, [(name,) for name in safe_processes])
conn.commit()
logger.info(f"更新白名单表成功,新增 {cursor.rowcount} 条记录")
return True
except Exception as e:
logger.error(f"更新白名单表失败: {str(e)}")
return False
finally:
if conn:
conn.close()
def update_filter_config(blocked_items):
"""更新AI过滤配置表"""
if not blocked_items:
return True
try:
conn = get_db_connection()
if not conn:
return False
with conn.cursor() as cursor:
# 清空表
cursor.execute(f"TRUNCATE TABLE {FILTER_TABLE}")
# 插入新数据
sql = f"INSERT INTO {FILTER_TABLE} (filter_content) VALUES (%s)"
cursor.executemany(sql, [(item,) for item in blocked_items])
conn.commit()
logger.info(f"更新过滤配置表成功,共 {cursor.rowcount} 条记录")
return True
except Exception as e:
logger.error(f"更新过滤配置表失败: {str(e)}")
return False
finally:
if conn:
conn.close()
def send_to_bot(bot_id, query_list, is_path=False):
"""使用 create_and_poll 方法发送查询到智能体"""
results = []
# 构建查询消息
query_text = "\n".join(query_list)
prompt = f"{query_text}"
try:
# 使用 create_and_poll 方法获取完整响应
response = coze.chat.create_and_poll(
bot_id=bot_id,
user_id=USER_ID,
additional_messages=[Message.build_user_question_text(prompt)]
)
if not response.messages:
logger.warning("未收到有效响应消息")
return results
# 提取完整响应内容
full_response = response.messages[0].content
logger.info(f"成功获取响应,长度: {len(full_response)}")
# 尝试解析JSON响应
try:
# 提取有效的JSON部分
json_start = full_response.find('{')
json_end = full_response.rfind('}') + 1
if json_start == -1 or json_end == 0:
logger.error("响应中未找到有效的JSON内容")
return results
json_str = full_response[json_start:json_end]
data = json.loads(json_str)
# 处理结果
for category in data.get("suspicious_list", []):
level = str(category.get("level", ""))
for name in category.get("name", []):
results.append((name.strip(), level))
logger.info(f"JSON解析成功,共提取 {len(results)} 条记录")
return results
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.error(f"JSON解析失败: {str(e)}")
logger.debug(f"原始响应内容:\n{full_response}")
return []
except Exception as e:
logger.error(f"请求异常: {str(e)}")
return []
def read_file(filename):
"""读取文件内容到列表,处理编码问题"""
try:
with open(filename, 'r', encoding='utf-8', errors='replace') as f:
return [line.strip() for line in f if line.strip()]
except Exception as e:
logger.error(f"读取文件失败: {filename} - {str(e)}")
return []
def write_file(filename, data):
"""写入文件内容"""
try:
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(item + '\n')
return True
except Exception as e:
logger.error(f"写入文件失败: {filename} - {str(e)}")
return False
def write_csv(filename, data, headers, mode='w'):
"""写入CSV文件"""
try:
with open(filename, mode, newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if mode == 'w':
writer.writerow(headers)
for row in data:
writer.writerow(row)
return True
except Exception as e:
logger.error(f"写入CSV失败: {filename} - {str(e)}")
return False
def normalize_filename(name):
"""最小化标准化:仅转换大小写,保留所有字符"""
return name.lower().strip()
def find_paths(paths_file, names):
"""在路径文件中查找匹配项(使用最小化标准化)"""
try:
paths = read_file(paths_file)
# 创建名称映射(原始名称 -> 标准化名称)
normalized_names = {normalize_filename(name): name for name in names}
matched_paths = []
name_to_path = {}
# 创建路径文件名映射(标准化路径名 -> 原始路径)
for path in paths:
filename = os.path.basename(path)
normalized = normalize_filename(filename)
name_to_path.setdefault(normalized, []).append(path)
# 查找精确匹配项
for normalized_name, original_name in normalized_names.items():
if normalized_name in name_to_path:
matched_paths.extend(name_to_path[normalized_name])
else:
# 尝试部分匹配(包含关系)
similar = [k for k in name_to_path.keys() if normalized_name in k or k in normalized_name]
if similar:
matched_paths.extend(name_to_path[similar[0]])
logger.info(f"找到匹配路径: {len(matched_paths)} 条 (共需匹配 {len(names)} 个名称)")
return matched_paths
except Exception as e:
logger.error(f"查找路径失败: {str(e)}")
return []
def process_batch(batch, bot_id, is_path=False):
"""处理单个批次"""
return send_to_bot(bot_id, batch, is_path)
def batch_process(items, batch_size, bot_id, process_name, is_path=False):
"""批量处理项目"""
results = []
total = len(items)
logger.info(f"开始{process_name},共 {total} 项,批次大小 {batch_size}")
if total == 0:
logger.info(f"没有需要{process_name}的项目")
return results
# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=THREADS) as executor:
futures = {}
for i in range(0, total, batch_size):
batch = items[i:i+batch_size]
future = executor.submit(process_batch, batch, bot_id, is_path)
futures[future] = (i, i + len(batch))
for future in as_completed(futures):
start_idx, end_idx = futures[future]
try:
batch_result = future.result()
results.extend(batch_result)
logger.info(f"完成批次 {start_idx}-{end_idx-1} ({len(batch_result)} 项)")
except Exception as e:
logger.error(f"处理批次失败: {start_idx}-{end_idx-1} - {str(e)}")
logger.info(f"{process_name}完成,共处理 {len(results)} 项")
return results
def parse_existing_results(filename):
"""解析已存在的CSV文件"""
results = []
try:
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过标题行
for row in reader:
if len(row) >= 2:
results.append((row[0].strip(), row[1].strip()))
logger.info(f"从现有文件解析 {len(results)} 条记录")
return results
except Exception as e:
logger.error(f"解析现有结果失败: {str(e)}")
return []
def merge_results(initial_file, review_file, output_file):
"""合并初审和复核结果"""
try:
# 读取初审结果
initial_results = parse_existing_results(initial_file)
# 读取复核结果
review_results = parse_existing_results(review_file) if os.path.exists(review_file) else []
# 创建最终结果
final_results = []
# 添加初审结果(排除等级为2的记录)
for name, level in initial_results:
if level != '2':
final_results.append([name, level])
# 添加复核结果
final_results.extend(review_results)
# 写入最终结果
if write_csv(output_file, final_results, ['项目', '等级']):
logger.info(f"合并结果成功,共 {len(final_results)} 条记录")
return True
return False
except Exception as e:
logger.error(f"合并结果失败: {str(e)}")
return False
def filter_with_whitelist(filenames, whitelist_file):
"""使用白名单过滤进程列表"""
try:
# 读取白名单
whitelist = set(read_file(whitelist_file))
logger.info(f"读取白名单,共 {len(whitelist)} 个条目")
# 过滤掉白名单中的进程
filtered = [name for name in filenames if name not in whitelist]
logger.info(f"白名单过滤:原始 {len(filenames)} 个进程,过滤后 {len(filtered)} 个进程")
return filtered
except Exception as e:
logger.error(f"白名单过滤失败: {str(e)}")
return filenames # 出错时返回原始列表
# 主流程
if __name__ == "__main__":
logger.info("===== 进程分析开始 =====")
# 步骤0: 从数据库同步白名单
if not sync_whitelist_from_db():
logger.warning("无法从数据库同步白名单,使用现有白名单文件")
# 步骤1: 读取文件名列表
filenames = read_file('result_filenames.txt')
logger.info(f"读取到 {len(filenames)} 个进程名称")
if not filenames:
logger.error("未找到进程名称,退出程序")
exit(1)
# 步骤1.5: 使用白名单过滤进程列表
if os.path.exists(WHITELIST_FILE):
filenames = filter_with_whitelist(filenames, WHITELIST_FILE)
# 保存过滤后的进程列表
write_file('give_ai_process.txt', filenames)
logger.info(f"过滤后进程列表已保存到 give_ai_process.txt")
# 步骤2: 分批发送给初审智能体A
all_initial_results = batch_process(
filenames,
BATCH_SIZE_A,
BOT_A_ID,
"初审"
)
# 保存初审结果到临时文件
if write_csv('initial_results.csv', all_initial_results, ['进程名称', '初审等级']):
logger.info("初审结果已保存到 initial_results.csv")
# 步骤3: 识别安全进程并更新数据库白名单
# 安全进程 = 原始进程列表 - 初审结果中的进程
initial_result_names = set(name for name, _ in all_initial_results)
safe_processes = set(filenames) - initial_result_names
if safe_processes:
logger.info(f"发现 {len(safe_processes)} 个安全进程,更新到数据库白名单")
if update_whitelist_to_db(safe_processes):
logger.info("数据库白名单更新成功")
else:
logger.error("数据库白名单更新失败")
else:
logger.info("未发现新的安全进程")
# 步骤4: 筛选需要复核的项目
review_names = [name for name, level in all_initial_results if level == '2']
logger.info(f"需要复核的进程数量: {len(review_names)}")
if not review_names:
logger.info("没有需要复核的进程,跳过复核步骤")
# 直接合并结果(没有复核结果)
if merge_results('initial_results.csv', '', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
else:
# 步骤5: 查找匹配路径
review_paths = find_paths('result_paths.txt', review_names)
logger.info(f"在路径文件中找到匹配路径: {len(review_paths)} 条")
# 步骤6: 分批发送给复核智能体B
if review_paths:
all_review_results = batch_process(
review_paths,
BATCH_SIZE_B,
BOT_B_ID,
"复核",
is_path=True
)
# 保存复核结果到临时文件
if write_csv('review_results.csv', all_review_results, ['文件路径', '复核等级']):
logger.info("复核结果已保存到 review_results.csv")
# 合并结果
if merge_results('initial_results.csv', 'review_results.csv', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
else:
logger.info("没有找到需要复核的路径,跳过复核步骤")
# 直接合并结果(没有复核结果)
if merge_results('initial_results.csv', '', 'results.csv'):
logger.info("最终结果已保存到 results.csv")
# 步骤7: 更新AI过滤配置表
blocked_items = []
try:
with open('results.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过标题行
for row in reader:
if row:
blocked_items.append(row[0])
if blocked_items:
if update_filter_config(blocked_items):
logger.info("AI过滤配置表更新成功")
else:
logger.error("AI过滤配置表更新失败")
else:
logger.info("没有需要拦截的项目")
except Exception as e:
logger.error(f"读取最终结果文件失败: {str(e)}")
logger.info("===== 处理完成! =====")
print("处理完成! 详细日志请查看 process_analyzer.log")
```
### 主要修改说明:
1. **数据库连接与操作**
- 添加了 `get_db_connection()` 函数创建数据库连接
- 实现了三个关键