<think>我们需要使用Python连接PostgreSQL数据库,并执行以下步骤:
1. 连接数据库
2. 查询表`t_table_size_date_statistics`,按表名和日期统计,获取最近10天的数据(注意:要按日期降序取最近10天,但计算时需要按日期升序排列以便计算差值)
3. 过滤掉`current_size`为0的数据
4. 按表名分组,对每个表分别计算每天与前一天的差值和增长百分比
5. 行转列:将每个表的10天数据转换为一行,每列代表一天的数据(包括差值和增长率)
但是,由于数据是按日期和表名记录的,我们需要先为每个表计算相邻日期的差值。这里可以使用窗口函数LAG来获取前一天的数据。
步骤详细说明:
- 首先,我们获取最近10天的数据(注意:这里最近10天是指数据中日期最大的前10天,但实际计算时,我们可能希望按时间顺序计算差值,所以排序很重要)
- 然后,按表名分组,按日期升序排序,使用LAG函数获取前一天的current_size
- 计算差值:current_size - 前一天的大小 -> diff
- 计算增长百分比:(diff / 前一天的大小) * 100 -> growth_percent
- 注意:第一天(没有前一天)的差值和增长率为NULL,我们可能需要保留为NULL或0,根据需求,这里我们保留NULL
- 过滤掉current_size为0的数据(在查询时就可以过滤)
然后,我们需要行转列:将每个表的多行数据(每个日期一行)转成一行,每个日期对应两列(差值、增长率)。这样,10天就会有20列(10个差值+10个增长率)。但是,我们只取10天,所以列数固定。
但是,行转列操作在SQL中比较繁琐(需要为每个日期创建列),而且我们不知道具体哪10天。因此,我们可以先在SQL中获取最近10天的日期列表,然后动态构建列。但更简单的方法是在Python中进行行转列,因为日期是固定的10天。
另一种思路:我们查询出每个表在最近10个日期的数据(包括差值和增长率),然后在Python中按表名分组,将每个表的数据按日期排序,并将10天的差值和增长率分别展开成一行。
由于日期是固定的10天(比如最近10天),我们可以先确定这10个日期,然后为每个表构建一行,包含这10个日期对应的差值和增长率。如果某一天没有数据,则用NULL填充。
步骤:
1. 获取最近的10个日期(去重,从表中选取最大的10个日期,注意:可能不是连续的)
2. 查询每个表在这10个日期的数据(包括差值和增长率)
3. 行转列:将每个表的数据转成一行,列的顺序为:表名,然后按日期顺序,每个日期对应两个列:diff和growth_percent
但是,我们也可以先计算所有表的每天差值和增长率,然后过滤出最近10天的数据,再进行行转列。
具体步骤(使用SQL和Python结合):
步骤1:使用SQL窗口函数计算差值和增长率
步骤2:在SQL中过滤出最近10天的数据(根据日期字段,取最大的10个日期)
步骤3:在Python中进行行转列
由于行转列在SQL中需要动态列,而我们要固定10天,所以可以在Python中做。
SQL查询语句思路:
```sql
WITH filtered_data AS (
SELECT
table_name,
stat_date,
current_size
FROM t_table_size_date_statistics
WHERE current_size <> 0 -- 过滤掉0
),
ranked_dates AS (
SELECT DISTINCT stat_date
FROM filtered_data
ORDER BY stat_date DESC
LIMIT 10 -- 获取最近的10个日期(注意:这里取的是整个表中最近的10个日期,但可能不是每个表都有这10天)
),
data_with_lag AS (
SELECT
table_name,
stat_date,
current_size,
LAG(current_size) OVER (PARTITION BY table_name ORDER BY stat_date) as prev_size
FROM filtered_data
WHERE stat_date IN (SELECT stat_date FROM ranked_dates) -- 只取最近10天的日期
),
computed AS (
SELECT
table_name,
stat_date,
current_size,
current_size - prev_size AS diff,
CASE
WHEN prev_size IS NULL OR prev_size = 0 THEN NULL
ELSE (current_size - prev_size) * 100.0 / prev_size
END AS growth_percent
FROM data_with_lag
)
SELECT * FROM computed
ORDER BY table_name, stat_date;
```
注意:这个查询中,我们首先过滤掉current_size为0的数据,然后获取最近的10个日期(整个表的),然后只取这些日期的数据。然后使用窗口函数计算前一天的current_size(按表分组,按日期升序)。然后计算差值和增长率。
但是,这里有一个问题:我们取的是整个表中最新的10个日期,但可能某些表在这10个日期中只有部分日期的数据。这样,我们在计算时,对于每个表,只会计算它在这10个日期中连续两天的差值。而且,如果某个表在某一天没有数据,那么就不会出现在结果中。
然后,在Python中,我们需要将每个表的数据行转列。由于我们有10个日期,所以每个表最多有10行(但可能少于10行,因为某些表可能没有某天的数据)。行转列后,我们想要每个表一行,列包括表名,然后按日期顺序,每个日期对应两个列:diff和growth_percent。
由于日期是固定的10天(ranked_dates),我们可以先获取这10个日期,然后按日期排序(升序或降序?这里我们按日期升序排列,这样第一列是最早的一天)。
在Python中,我们可以先获取这10个日期,排序(升序),然后为每个表构建一个包含20列的行(10个diff,10个growth_percent)。如果某个表在某个日期没有数据,则对应的diff和growth_percent为NULL。
步骤:
1. 执行SQL获取数据(包括table_name, stat_date, diff, growth_percent)
2. 在Python中,获取所有不重复的日期(从查询结果中,或者我们可以从ranked_dates中获取),然后排序(升序)
3. 按表名分组,对于每个表,创建一个字典,以日期为键,保存该日期的diff和growth_percent
4. 然后,对于每个表,我们遍历10个日期(按顺序),如果该日期在字典中,则取出diff和growth_percent,否则为None
5. 构建一个DataFrame,列包括'table_name'和每个日期对应的两个列(例如:'2024-01-01_diff', '2024-01-01_growth')
但是,注意:我们只取10天,所以列是固定的。
另一种做法:在SQL中行转列,但需要动态生成列名,比较复杂。我们选择在Python中处理。
代码结构:
1. 连接数据库
2. 执行SQL查询(上面的SQL)
3. 获取查询结果,并获取最近的10个日期(实际上在SQL中已经限制了,但为了行转列,我们需要知道是哪10个日期,所以我们可以从查询结果中获取distinct的stat_date,然后排序)
- 注意:我们只需要在查询结果中出现的日期,然后取前10个(按日期降序取最大的10个,然后升序排列?)实际上在SQL中我们已经取了最大的10个日期,所以这里我们只需要取这10个日期并排序即可。
4. 将日期排序(升序),因为计算差值时是按日期顺序的,行转列后我们希望从左到右是时间顺序。
5. 按表名分组,为每个表创建一行数据(包含20列)
6. 将数据转换为DataFrame并输出或保存
但是,注意:同一个表在同一个日期不应该有多条记录,所以按表名和日期分组。
代码实现:
我们将使用psycopg2连接PostgreSQL,并使用pandas来处理数据(因为行转列用pandas比较方便)。
如果没有安装psycopg2和pandas,需要先安装:
```bash
pip install psycopg2-binary pandas
```
代码步骤:
- 连接数据库
- 执行SQL
- 将结果读入pandas DataFrame
- 在DataFrame中处理行转列
但是,由于我们已经在SQL中计算了差值和增长率,接下来就是行转列。我们可以用pandas的pivot_table,但需要将diff和growth_percent分开成多个列。
我们可以先melt?或者使用两次pivot(一次为diff,一次为growth_percent),然后合并。
更简单的方法:将diff和growth_percent拆成两个DataFrame,然后分别pivot,再按表名合并。
步骤:
1. 从查询结果中,我们有一个DataFrame,列包括:table_name, stat_date, diff, growth_percent
2. 我们获取这10个日期(从DataFrame的stat_date中取唯一值,然后排序)
3. 创建两个新的DataFrame:
- diff_df = df.pivot(index='table_name', columns='stat_date', values='diff')
- growth_df = df.pivot(index='table_name', columns='stat_date', values='growth_percent')
4. 然后,将两个DataFrame按列合并(注意列名加上后缀)
5. 最后,按日期顺序排列列(先按日期升序排列,然后每个日期先diff后growth?或者先所有日期的diff,再所有日期的growth?)
但是,用户要求是行转列后,每个日期对应两个列(diff和growth)。所以,我们希望列的顺序是:table_name, 日期1_diff, 日期1_growth, 日期2_diff, 日期2_growth, ...
因此,我们可以:
- 将diff_df和growth_df的列分别重命名,加上后缀
- 然后按列名排序(按日期和指标类型排序)
另一种方法:使用stack和unstack,但这里我们用简单的方法。
具体代码:
```python
import psycopg2
import pandas as pd
from datetime import datetime
# 数据库连接参数,根据实际情况修改
db_params = {
'host': 'localhost',
'port': 5432,
'database': 'your_database',
'user': 'your_user',
'password': 'your_password'
}
# 连接数据库
conn = psycopg2.connect(**db_params)
# SQL查询
query = """
WITH filtered_data AS (
SELECT
table_name,
stat_date,
current_size
FROM t_table_size_date_statistics
WHERE current_size <> 0 -- 过滤掉0
),
ranked_dates AS (
SELECT DISTINCT stat_date
FROM filtered_data
ORDER BY stat_date DESC
LIMIT 10 -- 获取最近的10个日期
),
data_with_lag AS (
SELECT
table_name,
stat_date,
current_size,
LAG(current_size) OVER (PARTITION BY table_name ORDER BY stat_date) as prev_size
FROM filtered_data
WHERE stat_date IN (SELECT stat_date FROM ranked_dates) -- 只取最近10天的日期
),
computed AS (
SELECT
table_name,
stat_date,
current_size,
current_size - prev_size AS diff,
CASE
WHEN prev_size IS NULL OR prev_size = 0 THEN NULL
ELSE (current_size - prev_size) * 100.0 / prev_size
END AS growth_percent
FROM data_with_lag
)
SELECT * FROM computed
ORDER BY table_name, stat_date;
"""
# 执行查询
df = pd.read_sql_query(query, conn)
conn.close()
# 如果数据为空,则退出
if df.empty:
print("No data found.")
exit()
# 获取这10个日期(从查询结果中,并排序)
# 注意:我们按日期升序排列,这样最早的在前面
dates = sorted(df['stat_date'].unique())
# 创建两个pivot表:一个为diff,一个为growth_percent
diff_df = df.pivot(index='table_name', columns='stat_date', values='diff')
growth_df = df.pivot(index='table_name', columns='stat_date', values='growth_percent')
# 重命名列,加上后缀
diff_df.columns = [f"{col}_diff" for col in diff_df.columns]
growth_df.columns = [f"{col}_growth" for col in growth_df.columns]
# 按日期顺序,我们重新排列列(先按日期顺序,然后每个日期先diff后growth?或者先所有日期的diff再所有growth?)
# 根据需求,要求每个日期对应两个列(diff和growth),所以我们按日期顺序,每个日期先diff列后growth列
# 先创建一个新的列顺序列表:对于每个日期,列名为:日期_diff, 日期_growth
new_columns = []
for date in dates:
new_columns.append(f"{date}_diff")
new_columns.append(f"{date}_growth")
# 但是,注意:diff_df和growth_df的列名已经加了后缀,而且我们上面的dates是日期列表(升序)
# 现在,将两个DataFrame按列合并(横向)
result_df = pd.concat([diff_df, growth_df], axis=1)
# 然后,我们只取new_columns中存在的列(注意:可能不是所有日期都有数据,所以new_columns中的列不一定都在result_df中,但我们的dates是从数据中来的,所以应该都有)
# 但是,由于pivot可能会产生NaN(如果某个表在某天没有数据),所以没问题。
# 重新排列列
result_df = result_df[new_columns]
# 重置索引,将table_name变成列
result_df.reset_index(inplace=True)
# 打印结果
print(result_df)
# 如果需要,可以保存到CSV
# result_df.to_csv('result.csv', index=False)
```
注意:上面的pivot操作,如果某个表在某个日期没有数据,则对应的位置为NaN。
但是,我们之前获取的dates是10个日期,但可能某些日期在某个表中没有数据,那么对应的列就是NaN。
另外,在SQL中,我们使用了窗口函数LAG,它只取前一天的数据(按日期顺序的前一天,不一定是日历上的前一天,而是数据中存在的上一条记录)。所以,如果某个表在某个日期没有数据,则不会出现在计算中,因此不会出现在结果中。这样,在pivot时,该表在该日期就没有行,所以对应列就是NaN。
但是,我们要求的是10个日期(从整个表中取的最新的10个日期),所以每个表最多有10行(但可能少于10行,因为可能某天没有数据)。
这个代码应该可以满足需求。
但是,注意:在SQL中,我们使用了`LAG`,它是按日期顺序的前一条记录,所以如果某个表在某个日期缺失,那么它的前一天可能不是我们期望的(比如,缺失了中间一天,那么就会跳过一天计算)。所以,最好确保数据是连续的。如果数据不连续,那么计算的是与上一条记录(可能是几天前)的差值,而不是与前一天(日历上的前一天)的差值。
如果要求必须是日历上连续的两天,那么我们需要确保数据每天都有记录。如果没有,可能需要填充缺失的日期。但根据问题描述,我们只取最近10天(可能不连续),所以这样计算也是合理的。
另外,在SQL的`ranked_dates`中,我们取的是整个表(过滤后)中最近的10个日期,然后每个表只取这10个日期的数据。所以,如果某个表在这10个日期中只有部分日期有数据,那么其他日期就不会出现在结果中。
因此,在行转列后,对于缺失的日期,对应的列就是NaN。
如果要求每个表必须包含10个日期(即使没有数据也要显示NULL),那么我们需要在SQL中构建一个包含所有表和所有10个日期的笛卡尔积,然后左连接计算结果。但这样会增加复杂度。根据需求,我们只统计有数据的情况,所以当前方法可以接受。
现在,我们输出result_df,列的顺序是:table_name, 日期1_diff, 日期1_growth, 日期2_diff, 日期2_growth, ...
注意:日期顺序是升序(从小到大),即最早的一天在最前面。
如果希望最近的一天在最前面,可以在获取dates时按降序排序,然后构建new_columns时按降序即可。
修改:
```python
# 获取这10个日期(从查询结果中,并降序排列,这样最近的在前面)
dates = sorted(df['stat_date'].unique(), reverse=True) # 降序
```
然后,构建new_columns时,按这个dates顺序。
但是,注意:在计算差值时,我们按日期升序排序,所以差值和增长率是按时间顺序计算的。行转列时,列的顺序可以任意排列。
根据需求,统计当前日期仅10天的数据,并且按表维度统计每天的隔天差值,所以列的顺序按日期排列(通常从左到右时间递增或递减都可以)。这里按递减(最近的一天在最左边)可能更符合习惯。
所以,我们修改为按日期降序排列。
最终代码调整:
```python
# 获取这10个日期(从查询结果中,并降序排列)
dates = sorted(df['stat_date'].unique(), reverse=True) # 降序,最近的在第一个
# 创建列顺序
new_columns = []
for date in dates:
new_columns.append(f"{date}_diff")
new_columns.append(f"{date}_growth")
```
这样,每一行的第一列就是最近一天的diff,然后是最近一天的growth,然后是次近的diff,growth,等等。
注意:在构建diff_df和growth_df时,pivot操作默认是按列名排序(升序),所以我们需要在pivot之后调整列的顺序。
我们上面通过重新索引(result_df = result_df[new_columns])来调整列顺序。
完整代码:
```python
import psycopg2
import pandas as pd
# 数据库连接参数
db_params = {
'host': 'localhost',
'port': 5432,
'database': 'your_database',
'user': 'your_user',
'password': 'your_password'
}
try:
# 连接数据库
conn = psycopg2.connect(**db_params)
# SQL查询
query = """
WITH filtered_data AS (
SELECT
table_name,
stat_date,
current_size
FROM t_table_size_date_statistics
WHERE current_size <> 0 -- 过滤掉0
),
ranked_dates AS (
SELECT DISTINCT stat_date
FROM filtered_data
ORDER BY stat_date DESC
LIMIT 10 -- 获取最近的10个日期
),
data_with_lag AS (
SELECT
table_name,
stat_date,
current_size,
LAG(current_size) OVER (PARTITION BY table_name ORDER BY stat_date) as prev_size
FROM filtered_data
WHERE stat_date IN (SELECT stat_date FROM ranked_dates) -- 只取最近10天的日期
),
computed AS (
SELECT
table_name,
stat_date,
current_size,
current_size - prev_size AS diff,
CASE
WHEN prev_size IS NULL OR prev_size = 0 THEN NULL
ELSE (current_size - prev_size) * 100.0 / prev_size
END AS growth_percent
FROM data_with_lag
)
SELECT * FROM computed;
"""
# 执行查询,读取到DataFrame
df = pd.read_sql_query(query, conn)
# 关闭连接
conn.close()
if df.empty:
print("没有查询到数据")
else:
# 获取唯一的日期,并按日期降序(最近的在前)
dates = sorted(df['stat_date'].unique(), reverse=True)
# 创建两个透视表
diff_df = df.pivot(index='table_name', columns='stat_date', values='diff')
growth_df = df.pivot(index='table_name', columns='stat_date', values='growth_percent')
# 重命名列,加上后缀
diff_df.columns = [f"{col}_diff" for col in diff_df.columns]
growth_df.columns = [f"{col}_growth" for col in growth_df.columns]
# 合并两个DataFrame
combined_df = pd.concat([diff_df, growth_df], axis=1)
# 构建新的列顺序:每个日期先diff后growth,且按日期降序(最近的日期在最前面)
new_columns = []
for date in dates:
new_columns.append(f"{date}_diff")
new_columns.append(f"{date}_growth")
# 按照new_columns的顺序重新排列列
result_df = combined_df[new_columns]
# 重置索引,使表名成为一列
result_df.reset_index(inplace=True)
# 打印结果
print("结果数据:")
print(result_df)
# 可以保存到CSV
# result_df.to_csv('table_growth_stats.csv', index=False)
except Exception as e:
print(f"发生错误: {e}")
```
注意:如果某个表在某个日期没有数据,则对应的列会是NaN。
这个代码实现了需求。
但是,注意:在SQL中,我们使用了`LIMIT 10`来获取最近的10个日期,但如果有多个表,我们确保这10个日期是全局最近的10个日期。这样,每个表的数据都在这10个日期范围内。
如果某个表在这10个日期中只有部分日期有数据,那么其他日期对应的列就是NaN。
另外,在计算增长率时,如果前一天的大小为0,我们设置为NULL(因为除以0无意义)。在结果中,growth列就会是NULL。
最后,我们输出一个DataFrame,每一行代表一个表,每一列代表一个日期的差值和增长率。
</think>
以下是根据您需求编写的Python代码,用于连接PostgreSQL 12数据库并处理表数据:
```python
import psycopg2
import pandas as pd
from datetime import datetime, timedelta
# 1. 连接PostgreSQL数据库
def connect_db():
try:
conn = psycopg2.connect(
host="your_host", # 替换为实际主机名
port="your_port", # 替换为实际端口
database="your_db", # 替换为实际数据库名
user="your_user", # 替换为实际用户名
password="your_password" # 替换为实际密码
)
print("数据库连接成功")
return conn
except Exception as e:
print(f"数据库连接失败: {e}")
return None
# 2. 数据处理主函数
def process_table_stats():
conn = connect_db()
if not conn:
return
try:
# 计算日期范围(最近10天)
end_date = datetime.now().date()
start_date = end_date - timedelta(days=9)
# SQL查询:计算每日差值和增长率
query = f"""
WITH filtered_data AS (
SELECT
table_name,
stat_date,
current_size
FROM t_table_size_date_statistics
WHERE
stat_date BETWEEN '{start_date}' AND '{end_date}'
AND current_size <> 0 -- 过滤零值数据
),
lag_data AS (
SELECT
table_name,
stat_date,
current_size,
LAG(current_size) OVER (
PARTITION BY table_name
ORDER BY stat_date
) AS prev_size
FROM filtered_data
)
SELECT
table_name,
stat_date,
current_size,
(current_size - prev_size) AS size_diff,
CASE
WHEN prev_size = 0 THEN 0 -- 避免除零错误
ELSE ROUND(
(current_size - prev_size) * 100.0 / prev_size,
2
)
END AS growth_percent
FROM lag_data
WHERE prev_size IS NOT NULL -- 排除第一天无前一天数据的情况
ORDER BY table_name, stat_date;
"""
# 执行查询并获取数据
df = pd.read_sql_query(query, conn)
if df.empty:
print("未查询到有效数据")
return
# 3. 行转列处理
# 创建差值透视表
diff_pivot = df.pivot_table(
index='table_name',
columns='stat_date',
values='size_diff',
fill_value=0 # 缺失值填充为0
).add_prefix('diff_')
# 创建增长率透视表
growth_pivot = df.pivot_table(
index='table_name',
columns='stat_date',
values='growth_percent',
fill_value=0
).add_prefix('growth_')
# 4. 合并结果
result_df = pd.concat([diff_pivot, growth_pivot], axis=1)
# 按列名排序(日期顺序)
result_df = result_df.sort_index(axis=1)
# 重置索引
result_df.reset_index(inplace=True)
print("\n处理结果预览:")
print(result_df.head())
# 可选:保存结果到CSV
result_df.to_csv('table_growth_stats.csv', index=False)
print("\n结果已保存到 table_growth_stats.csv")
except Exception as e:
print(f"数据处理错误: {e}")
finally:
conn.close()
print("数据库连接已关闭")
# 执行处理
if __name__ == "__main__":
process_table_stats()
```
### 代码说明:
1. **数据库连接**:
- 使用`psycopg2`连接PostgreSQL
- 需替换连接参数为实际值(主机、端口、数据库名等)
2. **数据处理流程**:
- **过滤数据**:获取最近10天数据并过滤`current_size=0`的记录
- **计算差值**:使用`LAG()`窗口函数获取前一日数据
- **计算增长率**:`(当日值-前日值)/前日值 * 100`
- **处理边界**:
- 避免除零错误
- 排除首日无前日数据的情况
3. **行转列处理**:
- 分别创建差值(`size_diff`)和增长率(`growth_percent`)的透视表
- 列名格式:`diff_日期`和`growth_日期`
- 按日期排序列确保时间顺序
4. **输出结果**:
- 控制台预览前5行结果
- 保存完整结果到CSV文件
### 注意事项:
1. **日期处理**:
- 代码自动计算最近10天日期范围(含当天)
- 如需固定日期范围,修改`start_date`和`end_date`
2. **特殊值处理**:
- 零值数据已过滤(`current_size <> 0`)
- 增长率计算避免除零错误
- 缺失值填充为0
3. **性能优化**:
- 使用窗口函数在数据库内完成核心计算
- 仅传输处理后的结果集,减少网络传输
4. **列排序**:
- 最终结果列按日期升序排列
- 格式示例:
```
table_name | diff_2023-01-01 | growth_2023-01-01 | diff_2023-01-02 | ...
```
执行前请确保:
1. 已安装依赖:`pip install psycopg2-binary pandas`
2. 替换数据库连接参数为实际值
3. 数据库用户有表`t_table_size_date_statistics`的读取权限
[^1]: 使用窗口函数和日期范围过滤实现高效计算
[^2]: 通过透视表(pivot)实现行转列操作
[^3]: 自动处理边界条件和数据异常值