### 问题解构
针对您的需求,我们需要构建一个脱离 TradingView 平台的独立脚本。核心任务是将缠论中的几何形态逻辑转化为程序代码。我们将采用 **Python** 语言,结合 **Pandas** 进行数据处理,利用 **Matplotlib** 进行可视化绘图。
主要逻辑解构如下:
1. **数据预处理**:清洗K线数据,确保包含高开低收(OHLC)信息。
2. **分型识别**:根据缠论定义,识别顶分型和底分型。
3. **笔的生成**:处理分型之间的包含关系,连接相邻的顶底分型形成笔。
4. **线段的生成**:基于笔的特征序列,判断线段的破坏与延伸。
5. **中枢识别**:根据线段的重合区间确定中枢。
6. **可视化绘制**:将计算出的几何对象绘制在图表上。
### 方案推演
在编程实现中,我们需要处理缠论中复杂的“包含处理”逻辑。为了代码的清晰度和可运行性,本方案将采用标准化的分型识别算法,并构建一个面向对象的结构来管理笔、线段和中枢。
* **顶底分型**:顶分型要求中间K线高点最高、低点最高;底分型要求中间K线低点最低、高点最低。
* **笔的生成**:从底分型到顶分型为向上笔,反之亦然。必须满足“顶底分型之间至少隔着一根独立K线”的原则。
* **线段**:线段由至少三笔构成。这里为了演示,我们简化逻辑,采用“特征序列”的破坏来判断线段的端点。
* **中枢**:由至少三个连续的有重叠区间的线段(或笔)构成。中枢区间为重叠部分的最高点下沿和最低点上沿。
### 具体代码实现
以下是一个完整的 Python 脚本示例。该脚本生成模拟的K线数据,计算缠论结构,并绘制出笔、线段和中枢。
```python
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
# ==========================================
# 1. 数据生成模块
# ==========================================
def generate_mock_data(days=100):
"""生成模拟的K线数据"""
np.random.seed(42)
dates = pd.date_range(start=datetime.now(), periods=days)
price = 100
prices = []
for _ in range(days):
change = np.random.normal(0, 2) # 随机波动
price += change
high = price + abs(np.random.normal(0, 1))
low = price - abs(np.random.normal(0, 1))
open_p = price + np.random.normal(0, 0.5)
close_p = price + np.random.normal(0, 0.5)
# 确保 high max, low min
high = max(open_p, close_p, high)
low = min(open_p, close_p, low)
prices.append([dates[_], open_p, high, low, close_p])
df = pd.DataFrame(prices, columns=['Date', 'Open', 'High', 'Low', 'Close'])
return df
# ==========================================
# 2. 缠论核心算法模块
# ==========================================
def find_fractals(df):
"""识别顶分型和底分型"""
fractals = []
# 遍历中间的K线,忽略头尾
for i in range(1, len(df) - 1):
curr = df.iloc[i]
prev = df.iloc[i-1]
next_ = df.iloc[i+1]
# 顶分型:High最高,且Low最高(或至少不比两边低)
is_top = (curr['High'] > prev['High']) and (curr['High'] > next_['High']) and \
(curr['Low'] > prev['Low']) and (curr['Low'] > next_['Low'])
# 底分型:Low最低,且High最低(或至少不比两边高)
is_bottom = (curr['Low'] < prev['Low']) and (curr['Low'] < next_['Low']) and \
(curr['High'] < prev['High']) and (curr['High'] < next_['High'])
if is_top:
fractals.append({'index': i, 'type': 'top', 'price': curr['High'], 'date': curr['Date']})
elif is_bottom:
fractals.append({'index': i, 'type': 'bottom', 'price': curr['Low'], 'date': curr['Date']})
return pd.DataFrame(fractals)
def build_bi(fractals_df, min_k_bars=3):
"""构建笔:连接顶底分型,过滤掉相邻的同类分型"""
if fractals_df.empty:
return pd.DataFrame()
bis = []
valid_fractals = [fractals_df.iloc[0].to_dict()]
# 1. 合并相邻的同类型分型(只保留极端的)
for i in range(1, len(fractals_df)):
curr = fractals_df.iloc[i]
last = valid_fractals[-1]
if curr['type'] == last['type']:
# 同类型,保留更极端的(顶取更高,底取更低)
if (curr['type'] == 'top' and curr['price'] > last['price']) or \
(curr['type'] == 'bottom' and curr['price'] < last['price']):
valid_fractals[-1] = curr.to_dict()
else:
# 检查间隔是否满足要求
if curr['index'] - last['index'] >= min_k_bars:
valid_fractals.append(curr.to_dict())
else:
# 间隔太近,覆盖前一个(简化处理逻辑)
valid_fractals[-1] = curr.to_dict()
# 2. 配对连接成笔
bi_list = []
for i in range(len(valid_fractals) - 1):
start = valid_fractals[i]
end = valid_fractals[i+1]
# 必须是一顶一底
if start['type'] != end['type']:
bi_list.append({
'start_index': start['index'],
'start_price': start['price'],
'end_index': end['index'],
'end_price': end['price'],
'direction': 'up' if start['type'] == 'bottom' else 'down'
})
return pd.DataFrame(bi_list)
def build_segments(bi_df):
"""构建线段:简化逻辑,基于笔的破坏"""
if bi_df.empty:
return pd.DataFrame()
segments = []
# 初始线段
current_segment = [bi_df.iloc[0].to_dict()]
for i in range(1, len(bi_df)):
curr_bi = bi_df.iloc[i].to_dict()
last_bi = current_segment[-1]
# 如果当前笔方向与线段方向相反,且击穿了线段起点(或关键点),则线段可能结束
# 这里采用简化的特征序列破坏逻辑:
# 如果向上线段中出现向下笔跌破前向上笔的低点,则原线段结束
if curr_bi['direction'] != last_bi['direction']:
# 检查破坏关系
is_broken = False
if current_segment[0]['direction'] == 'up':
# 向上线段,看是否跌破前低
if curr_bi['end_price'] < last_bi['start_price']:
is_broken = True
else:
# 向下线段,看是否升破前高
if curr_bi['end_price'] > last_bi['start_price']:
is_broken = True
if is_broken:
# 结束当前线段
segments.append({
'start_index': current_segment[0]['start_index'],
'start_price': current_segment[0]['start_price'],
'end_index': last_bi['end_index'],
'end_price': last_bi['end_price'],
'direction': current_segment[0]['direction']
})
# 开始新线段
current_segment = [curr_bi]
else:
# 线段延续,加入笔
current_segment.append(curr_bi)
else:
# 同向笔,通常意味着线段延续(忽略包含关系的复杂处理)
current_segment.append(curr_bi)
# 添加最后一段
if len(current_segment) >= 1:
start = current_segment[0]
end = current_segment[-1]
segments.append({
'start_index': start['start_index'],
'start_price': start['start_price'],
'end_index': end['end_index'],
'end_price': end['end_price'],
'direction': start['direction']
})
return pd.DataFrame(segments)
def find_zhongshu(segments_df, df):
"""识别中枢:寻找至少三个连续线段的重叠区间"""
if len(segments_df) < 3:
return pd.DataFrame()
zhongshus = []
# 简单滑动窗口检测
for i in range(len(segments_df) - 2):
s1 = segments_df.iloc[i]
s2 = segments_df.iloc[i+1]
s3 = segments_df.iloc[i+2]
# 计算三个线段的高低区间
# 注意:线段也是有方向的,中枢计算的是价格区间的交集
range1 = [min(s1['start_price'], s1['end_price']), max(s1['start_price'], s1['end_price'])]
range2 = [min(s2['start_price'], s2['end_price']), max(s2['start_price'], s2['end_price'])]
range3 = [min(s3['start_price'], s3['end_price']), max(s3['start_price'], s3['end_price'])]
# 计算交集
low_max = max(range1[0], range2[0], range3[0])
high_min = min(range1[1], range2[1], range3[1])
if low_max < high_min:
# 存在重叠,形成中枢
zhongshus.append({
'low': low_max,
'high': high_min,
'start_index': s1['start_index'], # 中枢开始时间
'end_index': s3['end_index'], # 中枢结束时间
'start_date': df.iloc[s1['start_index']]['Date'],
'end_date': df.iloc[s3['end_index']]['Date']
})
return pd.DataFrame(zhongshus)
# ==========================================
# 3. 可视化绘制模块
# ==========================================
def plot_chanlun(df, fractals, bis, segments, zhongshus):
fig, ax = plt.subplots(figsize=(14, 8))
# 绘制K线
for i in range(len(df)):
curr = df.iloc[i]
# 颜色:涨绿跌红 (国内习惯) 或 涨红跌绿 (国际),这里用红涨绿跌
color = 'red' if curr['Close'] >= curr['Open'] else 'green'
# 绘制影线
ax.plot([i, i], [curr['Low'], curr['High']], color=color, linewidth=1)
# 绘制实体
ax.plot([i, i], [curr['Open'], curr['Close']], color=color, linewidth=3)
# 绘制分型
for _, f in fractals.iterrows():
ax.scatter(f['index'], f['price'], color='blue', marker='^' if f['type']=='top' else 'v', s=50, zorder=5)
# 绘制笔 (黄色粗线)
for _, bi in bis.iterrows():
ax.plot([bi['start_index'], bi['end_index']],
[bi['start_price'], bi['end_price']],
color='orange', linewidth=1.5, linestyle='-', alpha=0.8, label='Bi')
# 绘制线段 (青色线)
for _, seg in segments.iterrows():
ax.plot([seg['start_index'], seg['end_index']],
[seg['start_price'], seg['end_price']],
color='cyan', linewidth=2.5, linestyle='-', alpha=0.9, label='Segment')
# 绘制中枢 (灰色矩形框)
for _, zs in zhongshus.iterrows():
ax.fill_between([zs['start_index'], zs['end_index']], zs['low'], zs['high'],
color='gray', alpha=0.2, label='Zhongshu')
ax.text(zs['start_index'], zs['high'], 'ZS', fontsize=10, color='gray')
# 格式化X轴
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m-%d'))
fig.autofmt_xdate()
plt.title("ChanLun Theory: Fractals, Bi, Segments and Zhongshu")
plt.grid(True, alpha=0.3)
# 处理图例重复
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(by_label.values(), by_label.keys())
plt.show()
# ==========================================
# 4. 主程序执行
# ==========================================
if __name__ == "__main__":
# 1. 获取数据
df = generate_mock_data(days=100)
# 2. 计算缠论结构
# 找分型
fractals_df = find_fractals(df)
# 生成笔 (间隔至少3根K线)
bi_df = build_bi(fractals_df, min_k_bars=3)
# 生成线段
segments_df = build_segments(bi_df)
# 生成中枢
zhongshu_df = find_zhongshu(segments_df, df)
# 3. 打印结果信息
print(f"识别到顶底分型数量: {len(fractals_df)}")
print(f"生成笔的数量: {len(bi_df)}")
print(f"生成线段数量: {len(segments_df)}")
print(f"生成中枢数量: {len(zhongshu_df)}")
# 4. 绘图
plot_chanlun(df, fractals_df, bi_df, segments_df, zhongshu_df)
```
### 代码功能详解
1. **数据生成 (`generate_mock_data`)**:
为了演示脚本的独立性,使用 `numpy` 生成随机漫步的模拟K线数据,包含日期、开盘价、最高价、最低价和收盘价。
2. **分型识别 (`find_fractals`)**:
遍历K线,判断当前K线的高点是否比前后两根都高(顶分型),或者低点是否比前后两根都低(底分型)。这是缠论几何形态的基石 [ref_2]。
3. **构建笔 (`build_bi`)**:
* **合并同类项**:如果连续出现两个顶分型,只保留更高的那个;连续底分型,只保留更低的那个。
* **间隔过滤**:缠论要求顶底分型之间至少有一根非共用K线,代码中通过 `min_k_bars` 参数控制。
* **连接**:将符合条件的顶和底依次连接,形成向上笔或向下笔。
4. **构建线段 (`build_segments`)**:
线段由笔构成。代码实现了一个简化的特征序列破坏逻辑:如果反向的笔击穿了前一笔的端点,则认为原线段结束,新线段开始。这模拟了线段生成的动态过程。
5. **中枢识别 (`find_zhongshu`)**:
中枢本质上是价格区间的重叠。代码计算连续三个线段的高低点范围,求出交集。如果交集存在(即最高点的下沿 > 最低点的上沿),则绘制出一个灰色的矩形框代表中枢。
6. **可视化 (`plot_chanlun`)**:
使用 Matplotlib 绘制K线图,并在其上叠加绘制:
* **蓝色三角**:顶底分型。
* **橙色细线**:笔。
* **青色粗线**:线段。
* **灰色矩形**:中枢。
该脚本完全独立运行,不依赖任何特定的交易平台数据接口,展示了从原始数据到缠论形态识别的完整流程。