网站首页 > 技术文章 正文
我现在有一个交易想法,通过KDJ的金叉和死叉来完成交易。那应该怎么操作呢? 接下来,我们来完成量化炒股的另一个重要的方面----构建模型。 废话不多说,直接上代码:
构建模型如下:
- base_strategy.py
import os, json
# 策略接口基类
class Strategy:
def __init__(self, strategy_name):
self.strategy_name = strategy_name #策略名称
self.strategy_para = self.load_strategy_config(strategy_name)
def load_strategy_config(self, strategy_name):
"""加载策略配置文件内容"""
try:
strategy_path = "strategy_config"
file_path = os.path.join(strategy_path, f"{strategy_name}.json") # 修改为.json扩展名
# 检查文件是否存在
with open(file_path, "r", encoding="utf-8") as file:
content = json.load(file)
return json.dumps(content, ensure_ascii=False) # 转换为单行JSON字符串
except FileNotFoundError:
print("未找到策略配置文件")
except Exception as e:
print(f"加载策略配置文件时出错: {e}")
return None
def execute(self, df):
raise NotImplementedError("子类必须实现 execute 方法")
2.kdj_strategy.py
import json
import pandas as pd
from strategy.base_strategy import Strategy
default_strategy_para = {
"kdj_strategy": '''
{
"策略配置": {
"策略模式": "Higt",
"缠绕下门限": 35,
"缠绕上门限": 65,
"超买门限": 80,
"超卖门限": 20
},
"应用配置": {
"策略名称": "kdj_strategy",
"保存报告": "yes",
"保存代码": "yes"
}
}
'''
}
class KDJStrategy(Strategy):
def __init__(self, strategy_name):
# 调用父类的初始化方法
super().__init__(strategy_name)
if self.strategy_para is None:
self.strategy_para = json.loads(default_strategy_para[strategy_name])
# 如果传入的配置为空,则使用默认配置
self.config = self.strategy_para["策略配置"]
def calculate_kdj(self, data, n=9, m1=3, m2=3):
"""计算KDJ指标,并直接在传入的data上修改"""
low_list = data['Low'].rolling(window=n, min_periods=1).min()
high_list = data['High'].rolling(window=n, min_periods=1).max()
rsv = (data['Close'] - low_list) / (high_list - low_list) * 100
data['K'] = rsv.ewm(com=m1 - 1, adjust=False).mean()
data['D'] = data['K'].ewm(com=m2 - 1, adjust=False).mean()
data['J'] = 3 * data['K'] - 2 * data['D']
def calculate_ma(self, data, window=5):
"""计算简单移动平均线,并直接在传入的data上修改"""
data[f'MA_{window}'] = data['Close'].rolling(window=window).mean()
def calculate_rsi(self, data, window=3):
"""计算RSI指标,并直接在传入的data上修改"""
delta = data['Close'].diff(1)
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
data['RSI'] = 100 - (100 / (1 + rs))
def detect_kdj_signals(self, data):
"""检测KDJ指标的买入决策和卖出信号,结合趋势判断和其他过滤条件"""
all_signals = []
processed_dates = set() # 用于记录已经处理过的日期
threshold = 5 # 设置阈值范围为45到65
overbought_threshold = 80 # KDJ 超买阈值
oversold_threshold = 20 # KDJ 超卖阈值
rsi_overbought_threshold = 60 # RSI 超买阈值(短期)
rsi_oversold_threshold = 40 # RSI 超卖阈值(短期)
last_buy_date = None # 用于记录最后一次买入的日期
ma_window = 3 # 移动平均线窗口大小
# 检查 data 是否为空
if data is None or data.empty:
return []
try:
# 计算KDJ指标
self.calculate_kdj(data)
# 计算移动平均线
self.calculate_ma(data, window=ma_window)
# 计算RSI指标(短期)
self.calculate_rsi(data, window=3)
# 遍历每一行数据
for date, row in data.iterrows():
date_str = date.strftime('%Y-%m-%d')
k_value = row['K']
d_value = row['D']
rsi_value = row['RSI']
close_price = row['Close']
ma_value = row[f'MA_{ma_window}']
# 如果当前日期已经处理过,则跳过
if date in processed_dates:
continue
# 判断是否在50附近反复缠绕
if 50 - threshold <= k_value <= 50 + threshold and 50 - threshold <= d_value <= 50 + threshold:
continue # 如果在50附近,跳过当前日期
# 判断是否处于超买状态
if k_value > overbought_threshold or d_value > overbought_threshold or rsi_value > rsi_overbought_threshold:
continue # 如果处于超买状态,跳过当前日期
# 判断是否处于超卖状态
if k_value < oversold_threshold or d_value < oversold_threshold or rsi_value < rsi_oversold_threshold:
continue # 如果处于超卖状态,跳过当前日期
# 判断金叉买入信号
if date > data.index[0]: # 确保有前一天的数据
prev_row = data.loc[data.index[data.index < date][-1]]
if prev_row['K'] < prev_row['D'] and k_value > d_value:
# 结合趋势判断:只有当价格在MA上方且RSI未超买时才买入
if close_price > ma_value and rsi_value < rsi_overbought_threshold:
buy_price = row['Close']
all_signals.append((date_str, ('Close', '^', 'green', '买', '买入决策', buy_price)))
processed_dates.add(date) # 标记当前日期为已处理
last_buy_date = date # 记录最后一次买入的日期
# 判断死叉卖出信号
if last_buy_date is not None and date > last_buy_date: # 确保有买入后再考虑卖出
if date > data.index[0]: # 确保有前一天的数据
prev_row = data.loc[data.index[data.index < date][-1]]
if prev_row['K'] > prev_row['D'] and k_value < d_value:
# 结合趋势判断:只有当价格在MA下方且RSI未超卖时才卖出
#if close_price < ma_value and rsi_value > rsi_oversold_threshold
if close_price < ma_value:
sell_price = row['Close']
all_signals.append((date_str, ('Close', 'v', 'red', '卖', '卖出决策', sell_price)))
processed_dates.add(date) # 标记当前日期为已处理
last_buy_date = None # 重置最后一次买入的日期
except Exception as e:
print(f"在处理数据时发生错误: {e}")
return all_signals
def execute(self, df):
# 执行策略并返回信号
return self.detect_kdj_signals(df)
3.factroy_strategy.py
from strategy.kdj_strategy import KDJStrategy
# 工厂类:StrategyFactory
class StrategyFactory:
def __init__(self, strategy_name):
pass
@staticmethod
def create_strategy(strategy_name):
# 根据策略名称创建对应的策略对象
if strategy_name == "kdj_strategy":
return KDJStrategy(strategy_name)
else:
raise ValueError(f"未知的策略名称: {strategy_name}")
至此,我们的KDJ交易模型已经搭建完毕。 在这里,我使用了软件里的设计模式----工厂模式和策略模式。 这里不深入讨论软件设计模式的应用。但是,我之所以这样设计,是因为后续我们还会加入搭建的其他交易模型,而使用软件的设计模式,可以大大简化工作量。在后面的使用,大家能体会到这种设计的便利性。
- 上一篇: Excel函数解答统计指定费用问题_如何统计各种费用表
- 下一篇: 笔试编程 | 二分查找、数组、排序
猜你喜欢
- 2025-09-14 微信小程序实现蓝牙打印小票功能_蓝牙小票打印软件
- 2025-09-14 一篇文章就够!ArrayList源码分析及扩容机制
- 2025-09-14 Day261:DataLoader 和 Dataset_dataloader函数
- 2025-09-14 为啥count(*)会这么慢?别再被公众号误导了!
- 2025-09-14 电子表格函数入门(5)- 查找函数INDEX和MATCH组合
- 2025-09-14 Python数据分析笔记#8.1 层次化索引
- 2025-09-14 看完这篇ArrayList,offer还不来?
- 2025-09-14 选数【Python和C++实现】_选择语句python
- 2025-09-14 笔试编程 | 二分查找、数组、排序
- 2025-09-14 Excel函数解答统计指定费用问题_如何统计各种费用表
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)