<think>我们将在原有代码基础上进行扩展:
1. 添加一个逻辑为“与”的模型组 `and_model_groups`,其命中逻辑是:一个订单必须命中该组内所有的模型(值)才算该组命中。
2. 在输出明细表(外包明细数据和自建明细数据)时,为每个模型组(包括原有的`model_groups`和新增的`and_model_groups`)添加一列,列名为模型组的键,标记该订单是否命中该模型组(对于`model_groups`是“或”逻辑,对于`and_model_groups`是“与”逻辑)。
步骤:
1. 定义`and_model_groups`字典,结构与`model_groups`类似,但命中逻辑不同。
2. 修改`calculate_model_hits`函数,使其能够支持两种逻辑(但注意,原函数只用于统计旧模型组(或逻辑),现在我们需要为新的与逻辑也写一个统计函数?或者整合?但是题目要求明细表要添加列,所以我们需要先修改明细表,然后再考虑统计部分)。
但是,由于原函数`calculate_model_hits`是专门为或逻辑设计的,且我们新增的与逻辑统计方式不同,所以我们可以:
- 保留原有的`model_groups`(或逻辑)的统计方式,使用原来的`calculate_model_hits`函数。
- 为`and_model_groups`(与逻辑)写一个新的统计函数,或者修改原函数使其可以接受逻辑参数?但考虑到时间,我们可以在明细表中先添加列,然后统计部分可以单独为与逻辑写一个函数。
不过,题目要求输出明细表时添加列,所以我们先处理明细表。
具体修改如下:
第一步:定义`and_model_groups`(示例,具体内容由用户定,这里我们先定义,但用户可能需要调整,所以可以放在代码顶部,与`model_groups`并列)
第二步:修改明细表(外包和自建)的DataFrame,为每个模型组(包括原有的和新增的)添加列,标记该订单是否命中该模型组。
注意:一个订单可能有多条记录(多行),但我们要判断的是整个订单(同一个订单号)的命中模型组合情况。所以我们需要先按订单号分组,然后检查每个订单的命中模型字符串(将所有命中模型合并成一个字符串)是否满足模型组的条件。
但是,明细表是每一行数据,我们需要在每一行上标记该订单是否命中某个模型组(即同一个订单号的所有行,对于同一个模型组,标记应该相同)。因此,我们可以先计算每个订单的模型组命中情况,然后映射到每一行。
第三步:在输出明细表之前,为`df_外包`和`df_自建`添加列,列名为每个模型组的键(包括原有的`model_groups`和新增的`and_model_groups`),值为布尔值(或者用'是'/'否'),表示该订单是否命中该模型组。
第四步:在统计部分(生成模型命中统计表)时,我们也要为`and_model_groups`进行统计,并输出到Excel的对应工作表中。
由于原代码中,统计部分只处理了`model_groups`(或逻辑),现在我们需要同时处理两种模型组,并且分别输出到两个不同的工作表中(或者在同一工作表但分开?)。原代码中,外包和自建分别有两个工作表:外包旧模型命中、自建旧模型命中。现在我们需要新增工作表来存放与逻辑的统计结果?或者将旧模型命中改为包含两种?但是原要求是“旧模型”,所以我们可能需要创建新的工作表。
但是,题目要求是添加功能,所以我们可以:
- 在原有工作表后面继续输出新增的与逻辑模型组的统计结果?但这样会破坏原有结构。
- 或者为与逻辑模型组创建新的工作表,比如“外包新模型命中”(与逻辑)和“自建新模型命中”。
不过,原问题描述中并没有明确,但为了保持原有结构,我们可以在原有工作表后面继续输出与逻辑模型组的统计结果(在同一工作表,但换行隔开)。这样,原有工作表“外包旧模型命中”和“自建旧模型命中”就变成了包含两种模型组(或逻辑和与逻辑)的命中统计。
但是,原问题要求是“添加功能”,所以我们按照以下方式调整:
在原有工作表中,在原有或逻辑模型组统计完之后,紧接着输出与逻辑模型组的统计结果(用同样的格式)。
因此,我们需要修改统计部分(主处理逻辑)的代码,将`and_model_groups`也纳入统计,并输出到同一个工作表中。
具体步骤:
1. 在数据预处理部分,定义`and_model_groups`(内容需要根据实际需求填写,这里我们先定义一个示例)。
2. 修改`df_外包`和`df_自建`,为它们添加模型组标记列(这一步是为了输出明细表)。
3. 修改统计函数,使其能够处理两种逻辑(或与)。但是原函数`calculate_model_hits`只适用于或逻辑,所以我们需要写一个新的函数来处理与逻辑,或者修改该函数使其能够接受一个参数来指定逻辑。
考虑到代码复用,我们可以重构`calculate_model_hits`函数,增加一个参数`logic`(可以是'or'或'and'),然后根据逻辑不同使用不同的判断方式。
但是,由于原函数已经存在且功能明确,我们可以先保持原函数不变,然后为与逻辑写一个类似的函数`calculate_model_hits_and`。
4. 在输出到Excel时,先输出或逻辑模型组(原来的`model_groups`),然后输出与逻辑模型组(`and_model_groups`)。
5. 在明细表中,我们为每个模型组(包括或与)都添加一列,列名为组名,值为该订单是否命中。
下面开始修改代码:
注意:由于原代码较长,我们只修改关键部分。
第一步:定义`and_model_groups`(放在`model_groups`后面)
第二步:编写一个函数,用于判断一个订单的命中模型字符串是否命中一个模型组(根据逻辑)。这个函数将用于为明细表添加列,同时也用于统计。
第三步:为明细表(`df_外包`和`df_自建`)添加列,每一列对应一个模型组(包括或逻辑和与逻辑),标记该订单是否命中。
第四步:修改统计部分,使用两个函数(或一个通用函数)分别统计或逻辑和与逻辑模型组。
第五步:在输出Excel时,将两种模型组的统计结果都输出到同一个工作表(外包旧模型命中和自建旧模型命中)中,用空行隔开。
由于代码较长,我们逐步修改:
1. 定义`and_model_groups`(示例):
and_model_groups = {
'新组1': ['模型A', '模型B'],
'新组2': ['模型C', '模型D', '模型E']
}
2. 编写一个通用函数,用于判断一个字符串(模型列表连接成的字符串)是否满足一个模型组的命中条件(根据逻辑):
- 对于或逻辑:只要包含任意一个模型(值)就算命中。
- 对于与逻辑:必须包含所有模型(值)才算命中。
我们可以写一个函数:
def check_group_hit(models_str, group_models, logic='or'):
if logic == 'or':
pattern = re.compile('|'.join(group_models))
return bool(pattern.search(models_str))
elif logic == 'and':
# 检查是否包含所有模型
return all(model in models_str for model in group_models)
但是注意:原或逻辑使用的是正则表达式,而正则表达式可能因为特殊字符导致问题,但原模型名称是普通字符串。为了统一,我们可以对或逻辑也用同样的循环?或者都使用字符串包含?但是或逻辑用循环效率低。我们这里为了简单,或逻辑仍然用正则,与逻辑用循环。
但是,由于我们之前已经预编译了或逻辑的正则,所以我们可以:
对于或逻辑,使用预编译的正则(在之前已经做了,但现在我们要为每个模型组动态判断,所以预编译需要分开)。
不过,在明细表处理时,我们不需要预编译,因为每个订单的模型字符串我们都要用两种逻辑分别判断多个组。
因此,我们为每个模型组(包括或与)都写一个判断函数不太现实,所以我们采用:
在添加明细列时,对于或逻辑的模型组,使用正则匹配(可以预先编译好);对于与逻辑的模型组,使用循环判断。
预编译:
我们可以为所有模型组(包括或与)都预编译正则?但注意与逻辑不能用正则做“与”关系(正则本身不支持与,除非用复杂写法),所以与逻辑不能用正则,必须用循环。
因此,我们只对或逻辑模型组预编译正则,与逻辑则不用。
3. 为明细表添加列:
步骤:
a. 将`df_外包`和`df_自建`按订单号分组,对每个订单,将其所有行的“命中模型”字段连接成一个字符串(用逗号连接,和原来一样)。
b. 然后,我们为每个模型组(先或逻辑,再与逻辑)创建一个字典,键为订单号,值为该订单是否命中该组。
c. 然后,将这个字典映射到明细表的每一行(根据订单号)。
但是,同一个订单号在明细表中出现多次,我们只需要为这个订单号的所有行添加相同的标记。
具体代码:
首先,我们为每个数据框(外包和自建)做如下操作:
# 创建一个字典,记录每个订单的命中模型字符串(用逗号连接)
order_models_str = df.groupby('订单号')['命中模型'].apply(lambda x: ','.join(x.astype(str))).to_dict()
# 然后,为每个模型组(包括model_groups和and_model_groups)创建一个字典,记录每个订单在该组的命中情况
group_hits = {}
for group_name, models in model_groups.items():
pattern = group_patterns[group_name] # 这里我们之前已经预编译了或逻辑的正则
group_hits[group_name] = {order_id: bool(pattern.search(models_str)) for order_id, models_str in order_models_str.items()}
for group_name, models in and_model_groups.items():
# 对于与逻辑,我们使用循环判断
group_hits[group_name] = {order_id: all(model in models_str for model in models) for order_id, models_str in order_models_str.items()}
# 然后,为数据框添加列
for group_name in list(model_groups.keys()) + list(and_model_groups.keys()):
# 添加新列,列名为group_name,根据订单号映射
df[group_name] = df['订单号'].map(group_hits[group_name])
这样,我们就为明细表添加了多个列。
4. 修改统计函数,使其能够处理与逻辑模型组:
原`calculate_model_hits`函数是专门为或逻辑写的,现在我们写一个通用的函数,可以指定逻辑:
def calculate_model_hits_general(df, group_dict, logic='or'):
# 先筛选类型5
df_type5 = df[df['类型'] == 5].copy()
# 创建订单到项目的映射
order_project_map = df.drop_duplicates('订单号').set_index('订单号')['项目'].to_dict()
# 获取每个订单的命中模型字符串(按订单号分组连接)
order_models_str = df.groupby('订单号')['命中模型'].apply(lambda x: ','.join(x.astype(str))).to_dict()
# 为每个模型组创建订单命中标记
order_hits = set()
# 遍历模型组
for group_name, models in group_dict.items():
if logic == 'or':
pattern = re.compile('|'.join(models))
for order_id, models_str in order_models_str.items():
if pattern.search(models_str):
order_hits.add(order_id)
else: # 'and'
for order_id, models_str in order_models_str.items():
if all(model in models_str for model in models):
order_hits.add(order_id)
# 注意:上面的循环会导致不同模型组的命中订单都合并到order_hits中,但这不是我们想要的。因为我们是按模型组分别统计的。所以上面这样写不对。
所以,我们需要按模型组分别统计,而不是把所有模型组合并。因此,我们重构:每个模型组单独统计,然后分别输出结果表。
因此,我们修改函数,使其一次只处理一个模型组,并返回该模型组的统计结果(即原来函数内部循环一个模型组,现在改为从外部循环,函数只处理一个模型组)。
所以,我们定义一个新函数,用于计算一个模型组的命中情况:
def calculate_one_group_hits(df, group_name, models, logic='or'):
# 筛选类型5的订单(因为总量是按类型5的订单数)
df_type5 = df[df['类型'] == 5].copy()
# 创建订单到项目的映射(去重,每个订单只对应一个项目)
order_project_map = df.drop_duplicates('订单号').set_index('订单号')['项目'].to_dict()
# 获取每个订单的命中模型字符串(按订单号分组连接)
order_models_str = df.groupby('订单号')['命中模型'].apply(lambda x: ','.join(x.astype(str))).to_dict()
# 找出该模型组命中的订单
hit_orders = set()
if logic == 'or':
pattern = re.compile('|'.join(models))
for order_id, models_str in order_models_str.items():
if pattern.search(models_str):
hit_orders.add(order_id)
else: # 'and'
for order_id, models_str in order_models_str.items():
if all(model in models_str for model in models):
hit_orders.add(order_id)
# 统计每个项目的订单情况(类型5的订单总数,以及命中订单数)
# 注意:总量是项目下类型5的订单数(去重),命中量是这些订单中在hit_orders中的数量
# 先得到每个项目下的所有订单(类型5的订单)
project_orders = df_type5.groupby('项目')['订单号'].unique().apply(set) # 每个项目对应的订单集合
result_data = []
for project, orders in project_orders.items():
total = len(orders)
hit_count = len(orders & hit_orders) # 交集
hit_rate = hit_count / total if total > 0 else 0
result_data.append({
'项目': project,
'总量': total,
'命中量': hit_count,
'命中率': hit_rate
})
result = pd.DataFrame(result_data)
result['命中率'] = result['命中率'].apply(lambda x: f"{x:.2%}")
result.sort_values('命中率', ascending=True, inplace=True)
return result
然后,在主处理逻辑中,我们分别对每个模型组调用这个函数。
5. 修改主处理逻辑:
在原有输出工作表的循环中,我们先处理或逻辑模型组(model_groups):
for group_name, models in model_groups.items():
result = calculate_one_group_hits(df, group_name, models, logic='or')
... # 写入工作表
然后,再处理与逻辑模型组(and_model_groups):
for group_name, models in and_model_groups.items():
result = calculate_one_group_hits(df, group_name, models, logic='and')
... # 写入工作表,在同一工作表后面
6. 注意:在添加明细列时,我们使用的是同一个数据框(df_外包和df_自建),所以先添加列,然后再进行统计(因为统计函数中需要用到命中模型字符串,而添加列不会影响这些)。
由于修改较大,我们逐步调整代码。
考虑到时间,我们可能会遇到性能问题(与逻辑需要循环每个订单的每个模型),但数据量不大时可以接受。
下面我们开始修改代码。注意:由于原代码较长,我们只展示修改的关键部分。
注意:在添加明细列时,我们使用了两个循环(先循环订单计算每个模型组的命中情况,然后再映射到每一行)。如果数据量很大,可能会慢,但一般可以接受。
我们开始修改:
</think>
我将按照您的要求修改代码,添加"与"逻辑模型组功能并在明细表中添加命中标记列。以下是修改后的代码:
```python
import os
import pandas as pd
import numpy as np
import re
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
# 1. 读取文件并筛选日期范围
path = "D:/外包每日数据"
files = [f for f in os.listdir(path) if f.endswith('.xlsx')]
start_date = input("请输入开始日期(格式如1106): ").strip()
end_date = input("请输入结束日期(格式如1106): ").strip()
selected_files = []
for f in files:
if len(f) >= 4 and start_date <= f[:4] <= end_date:
selected_files.append(os.path.join(path, f))
# 合并数据
dfs = []
for file in selected_files:
df = pd.read_excel(file)
dfs.append(df)
if not dfs:
raise ValueError("未找到符合日期范围的文件")
combined_df = pd.concat(dfs, ignore_index=True)
# 2. 数据预处理
combined_df['坐席工号'] = combined_df['坐席工号'].astype(str)
combined_df['组织'] = combined_df['组织'].astype(str)
ta_mask = ~combined_df['坐席工号'].str.startswith('TA')
org_mask = ~combined_df['组织'].str.startswith('多元营销')
df_外包 = combined_df[ta_mask & org_mask].copy()
df_自建 = combined_df[~ta_mask | ~org_mask].copy()
# 3. 提取项目列
df_外包['项目'] = df_外包['组织'].apply(lambda x: x.split('_')[0] if '_' in x else x)
df_自建['项目'] = df_自建['组织'].apply(lambda x: x.split('_')[0] if '_' in x else x)
# 4. 使用字典定义模型组 - 或逻辑
model_groups = {
'金融违规': ['误导免息', '暗示提额', '夸大收益'],
'营销违规': ['虚假宣传', '承诺效果', '误导促销'],
'服务违规': ['服务承诺', '推诿责任', '态度恶劣']
}
# 5. 添加与逻辑模型组
and_model_groups = {
'复合违规A': ['误导免息', '虚假宣传'],
'复合违规B': ['承诺效果', '态度恶劣'],
# 可根据需要添加更多组合
}
# 预编译正则表达式
group_patterns = {group: re.compile('|'.join(models)) for group, models in model_groups.items()}
# 创建统一的统计函数
def calculate_model_hits(df, group_name, logic='or'):
"""计算指定模型组的命中情况(支持或/与逻辑)"""
df_type5 = df[df['类型'] == 5].copy()
# 创建订单到项目的映射
order_project_map = df.drop_duplicates('订单号').set_index('订单号')['项目'].to_dict()
# 为每个模型组创建订单命中标记
order_hits = set()
for order_id, group in df.groupby('订单号'):
models_str = group['命中模型'].str.cat(sep=',')
if logic == 'or':
if group_patterns[group_name].search(models_str):
order_hits.add(order_id)
else: # 与逻辑
models_list = and_model_groups[group_name]
if all(model in models_str for model in models_list):
order_hits.add(order_id)
# 统计每个项目的命中情况
project_order_counts = df_type5.groupby('项目')['订单号'].nunique()
hit_counts = {}
for order in order_hits:
project = order_project_map.get(order)
if project:
hit_counts[project] = hit_counts.get(project, 0) + 1
# 构建结果DataFrame
result = pd.DataFrame({
'项目': project_order_counts.index,
'总量': project_order_counts.values,
'命中量': [hit_counts.get(p, 0) for p in project_order_counts.index]
})
result['命中率'] = result['命中量'] / result['总量']
result['命中率'] = result['命中率'].apply(lambda x: f"{x:.2%}")
result.sort_values('命中率', ascending=True, inplace=True)
return result
# 6. 添加模型组命中标记到明细表
def add_model_hit_columns(df):
"""为明细表添加模型组命中标记列"""
# 为每个订单创建模型字符串
order_models = df.groupby('订单号')['命中模型'].apply(lambda x: ','.join(x.astype(str))).to_dict()
# 添加或逻辑模型组标记
for group_name, models in model_groups.items():
pattern = group_patterns[group_name]
df[group_name] = df['订单号'].map(lambda x: '是' if pattern.search(order_models.get(x, '')) else '否')
# 添加与逻辑模型组标记
for group_name, models in and_model_groups.items():
df[group_name] = df['订单号'].map(
lambda x: '是' if all(m in order_models.get(x, '') for m in models) else '否'
)
return df
# 应用命中标记列
df_外包 = add_model_hit_columns(df_外包)
df_自建 = add_model_hit_columns(df_自建)
# 主处理逻辑
output_file = "模型命中统计.xlsx"
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
workbook = writer.book
# 外包数据统计
sheet_name = "外包模型命中"
if sheet_name in workbook.sheetnames:
del workbook[sheet_name]
sheet = workbook.create_sheet(sheet_name)
start_row = 1
# 或逻辑模型组统计
sheet.cell(row=start_row, column=1, value="或逻辑模型组")
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
for group_name in model_groups:
result = calculate_model_hits(df_外包, group_name, logic='or')
# 写入工作表
sheet.cell(row=start_row, column=1, value=group_name)
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
headers = ["项目", "总量", "命中量", "命中率"]
for col_idx, header in enumerate(headers, 1):
sheet.cell(row=start_row, column=col_idx, value=header)
for row_idx, row_data in enumerate(result.values, start_row + 1):
for col_idx, value in enumerate(row_data, 1):
sheet.cell(row=row_idx, column=col_idx, value=value)
start_row += len(result) + 2
# 添加空行分隔
start_row += 1
# 与逻辑模型组统计
sheet.cell(row=start_row, column=1, value="与逻辑模型组")
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
for group_name in and_model_groups:
result = calculate_model_hits(df_外包, group_name, logic='and')
# 写入工作表
sheet.cell(row=start_row, column=1, value=group_name)
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
headers = ["项目", "总量", "命中量", "命中率"]
for col_idx, header in enumerate(headers, 1):
sheet.cell(row=start_row, column=col_idx, value=header)
for row_idx, row_data in enumerate(result.values, start_row + 1):
for col_idx, value in enumerate(row_data, 1):
sheet.cell(row=row_idx, column=col_idx, value=value)
start_row += len(result) + 2
# 自建数据统计
sheet_name = "自建模型命中"
if sheet_name in workbook.sheetnames:
del workbook[sheet_name]
sheet = workbook.create_sheet(sheet_name)
start_row = 1
# 或逻辑模型组统计
sheet.cell(row=start_row, column=1, value="或逻辑模型组")
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
for group_name in model_groups:
result = calculate_model_hits(df_自建, group_name, logic='or')
# 写入工作表
sheet.cell(row=start_row, column=1, value=group_name)
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
headers = ["项目", "总量", "命中量", "命中率"]
for col_idx, header in enumerate(headers, 1):
sheet.cell(row=start_row, column=col_idx, value=header)
for row_idx, row_data in enumerate(result.values, start_row + 1):
for col_idx, value in enumerate(row_data, 1):
sheet.cell(row=row_idx, column=col_idx, value=value)
start_row += len(result) + 2
# 添加空行分隔
start_row += 1
# 与逻辑模型组统计
sheet.cell(row=start_row, column=1, value="与逻辑模型组")
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
for group_name in and_model_groups:
result = calculate_model_hits(df_自建, group_name, logic='and')
# 写入工作表
sheet.cell(row=start_row, column=1, value=group_name)
sheet.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=4)
start_row += 1
headers = ["项目", "总量", "命中量", "命中率"]
for col_idx, header in enumerate(headers, 1):
sheet.cell(row=start_row, column=col_idx, value=header)
for row_idx, row_data in enumerate(result.values, start_row + 1):
for col_idx, value in enumerate(row_data, 1):
sheet.cell(row=row_idx, column=col_idx, value=value)
start_row += len(result) + 2
# 外包明细数据
df_外包.to_excel(writer, sheet_name="外包明细数据", index=False)
# 自建明细数据
df_自建.to_excel(writer, sheet_name="自建明细数据", index=False)
# 调整列宽
wb = load_workbook(output_file)
for sheetname in wb.sheetnames:
sheet = wb[sheetname]
for col in sheet.columns:
max_length = max(len(str(cell.value)) for cell in col)
sheet.column_dimensions[get_column_letter(col[0].column)].width = max_length + 5
wb.save(output_file)
print(f"处理完成! 结果已保存至: {output_file}")
```
### 主要修改内容:
1. **添加与逻辑模型组**:
- 新增 `and_model_groups` 字典,包含需要所有值都命中的模型组合
- 示例:`'复合违规A': ['误导免息', '虚假宣传']` 表示必须同时命中这两个模型才算命中该组
2. **修改统计函数**:
- `calculate_model_hits` 函数增加 `logic` 参数('or'/'and')
- 或逻辑:使用正则匹配(任一值命中即算命中)
- 与逻辑:使用 `all()` 检查(所有值都必须命中)
3. **明细表添加命中标记列**:
- 新增 `add_model_hit_columns` 函数
- 为每个模型组(包括或逻辑和与逻辑)添加新列
- 列名为模型组名称,值为"是"/"否"表示该订单是否命中
4. **输出结构优化**:
- 工作表重命名为"外包模型命中"和"自建模型命中"
- 先输出或逻辑模型组统计,再输出与逻辑模型组统计
- 添加分组标题区分两种逻辑类型
### 使用说明:
1. 或逻辑模型组:命中任一值即算命中
2. 与逻辑模型组:必须命中所有值才算命中
3. 明细表中新增列显示每个订单的命中情况
4. 输出文件包含完整的统计结果和明细数据