优秀的编程知识分享平台

网站首页 > 技术文章 正文

跟我学量化选股之5:构建模型_股票量化建模

nanyue 2025-09-14 23:36:11 技术文章 1 ℃

我现在有一个交易想法,通过KDJ的金叉和死叉来完成交易。那应该怎么操作呢? 接下来,我们来完成量化炒股的另一个重要的方面----构建模型。 废话不多说,直接上代码:

构建模型如下:

  1. base_strategy.py
import os, json

# 策略接口基类
class Strategy:
    def __init__(self, strategy_name):
        self.strategy_name = strategy_name #策略名称
        self.strategy_para = self.load_strategy_config(strategy_name)

    def load_strategy_config(self, strategy_name):
        """加载策略配置文件内容"""
        try:
            strategy_path = "strategy_config"
            file_path = os.path.join(strategy_path, f"{strategy_name}.json")  # 修改为.json扩展名
            # 检查文件是否存在
            with open(file_path, "r", encoding="utf-8") as file:
                content = json.load(file)
                return json.dumps(content, ensure_ascii=False)  # 转换为单行JSON字符串
        except FileNotFoundError:
            print("未找到策略配置文件")
        except Exception as e:
            print(f"加载策略配置文件时出错: {e}")
        return None


    def execute(self, df):
        raise NotImplementedError("子类必须实现 execute 方法")


2.kdj_strategy.py

import json
import pandas as pd
from strategy.base_strategy import Strategy

default_strategy_para = {
    "kdj_strategy": '''
    {
        "策略配置": {
            "策略模式": "Higt",
            "缠绕下门限": 35,
            "缠绕上门限": 65,
            "超买门限": 80,
            "超卖门限": 20
        },
        "应用配置": {
            "策略名称": "kdj_strategy",
            "保存报告": "yes",
            "保存代码": "yes"
        }
    }
    '''
}
class KDJStrategy(Strategy):
    def __init__(self, strategy_name):
        # 调用父类的初始化方法
        super().__init__(strategy_name)
        if self.strategy_para is None:
            self.strategy_para = json.loads(default_strategy_para[strategy_name])

        # 如果传入的配置为空,则使用默认配置
        self.config = self.strategy_para["策略配置"]

    def calculate_kdj(self, data, n=9, m1=3, m2=3):
        """计算KDJ指标,并直接在传入的data上修改"""
        low_list = data['Low'].rolling(window=n, min_periods=1).min()
        high_list = data['High'].rolling(window=n, min_periods=1).max()
        rsv = (data['Close'] - low_list) / (high_list - low_list) * 100
        data['K'] = rsv.ewm(com=m1 - 1, adjust=False).mean()
        data['D'] = data['K'].ewm(com=m2 - 1, adjust=False).mean()
        data['J'] = 3 * data['K'] - 2 * data['D']

    def calculate_ma(self, data, window=5):
        """计算简单移动平均线,并直接在传入的data上修改"""
        data[f'MA_{window}'] = data['Close'].rolling(window=window).mean()

    def calculate_rsi(self, data, window=3):
        """计算RSI指标,并直接在传入的data上修改"""
        delta = data['Close'].diff(1)
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        data['RSI'] = 100 - (100 / (1 + rs))

    def detect_kdj_signals(self, data):
        """检测KDJ指标的买入决策和卖出信号,结合趋势判断和其他过滤条件"""
        all_signals = []
        processed_dates = set()  # 用于记录已经处理过的日期
        threshold = 5  # 设置阈值范围为45到65
        overbought_threshold = 80  # KDJ 超买阈值
        oversold_threshold = 20  # KDJ 超卖阈值
        rsi_overbought_threshold = 60  # RSI 超买阈值(短期)
        rsi_oversold_threshold = 40  # RSI 超卖阈值(短期)
        last_buy_date = None  # 用于记录最后一次买入的日期
        ma_window = 3  # 移动平均线窗口大小

        # 检查 data 是否为空
        if data is None or data.empty:
            return []

        try:
            # 计算KDJ指标
            self.calculate_kdj(data)
            # 计算移动平均线
            self.calculate_ma(data, window=ma_window)
            # 计算RSI指标(短期)
            self.calculate_rsi(data, window=3)

            # 遍历每一行数据
            for date, row in data.iterrows():
                date_str = date.strftime('%Y-%m-%d')
                k_value = row['K']
                d_value = row['D']
                rsi_value = row['RSI']
                close_price = row['Close']
                ma_value = row[f'MA_{ma_window}']

                # 如果当前日期已经处理过,则跳过
                if date in processed_dates:
                    continue

                # 判断是否在50附近反复缠绕
                if 50 - threshold <= k_value <= 50 + threshold and 50 - threshold <= d_value <= 50 + threshold:
                    continue  # 如果在50附近,跳过当前日期

                # 判断是否处于超买状态
                if k_value > overbought_threshold or d_value > overbought_threshold or rsi_value > rsi_overbought_threshold:
                    continue  # 如果处于超买状态,跳过当前日期

                # 判断是否处于超卖状态
                if k_value < oversold_threshold or d_value < oversold_threshold or rsi_value < rsi_oversold_threshold:
                    continue  # 如果处于超卖状态,跳过当前日期

                # 判断金叉买入信号
                if date > data.index[0]:  # 确保有前一天的数据
                    prev_row = data.loc[data.index[data.index < date][-1]]
                    if prev_row['K'] < prev_row['D'] and k_value > d_value:
                        # 结合趋势判断:只有当价格在MA上方且RSI未超买时才买入
                        if close_price > ma_value and rsi_value < rsi_overbought_threshold:
                            buy_price = row['Close']
                            all_signals.append((date_str, ('Close', '^', 'green', '买', '买入决策', buy_price)))
                            processed_dates.add(date)  # 标记当前日期为已处理
                            last_buy_date = date  # 记录最后一次买入的日期

                # 判断死叉卖出信号
                if last_buy_date is not None and date > last_buy_date:  # 确保有买入后再考虑卖出
                    if date > data.index[0]:  # 确保有前一天的数据
                        prev_row = data.loc[data.index[data.index < date][-1]]
                        if prev_row['K'] > prev_row['D'] and k_value < d_value:
                            # 结合趋势判断:只有当价格在MA下方且RSI未超卖时才卖出
                            #if close_price < ma_value and rsi_value > rsi_oversold_threshold
                            if close_price < ma_value:
                                sell_price = row['Close']
                                all_signals.append((date_str, ('Close', 'v', 'red', '卖', '卖出决策', sell_price)))
                                processed_dates.add(date)  # 标记当前日期为已处理
                                last_buy_date = None  # 重置最后一次买入的日期

        except Exception as e:
            print(f"在处理数据时发生错误: {e}")

        return all_signals

    def execute(self, df):
        # 执行策略并返回信号
        return self.detect_kdj_signals(df)

3.factroy_strategy.py

from strategy.kdj_strategy import  KDJStrategy


# 工厂类:StrategyFactory
class StrategyFactory:
    def __init__(self, strategy_name):
        pass

    @staticmethod
    def create_strategy(strategy_name):
        # 根据策略名称创建对应的策略对象
        if strategy_name == "kdj_strategy":
            return KDJStrategy(strategy_name)
        else:
            raise ValueError(f"未知的策略名称: {strategy_name}")


至此,我们的KDJ交易模型已经搭建完毕。 在这里,我使用了软件里的设计模式----工厂模式和策略模式。 这里不深入讨论软件设计模式的应用。但是,我之所以这样设计,是因为后续我们还会加入搭建的其他交易模型,而使用软件的设计模式,可以大大简化工作量。在后面的使用,大家能体会到这种设计的便利性。

Tags:

最近发表
标签列表