之前一直使用陶博士的月线反转6.0公式在粪坑A股里做条件选股,以减少每天的复盘工作量。陶博士的模型主要基于欧奈尔的RPS强度结合股价站上重要均线且阶段新高进行选股,我根据自己的使用习惯对相关条件进行了适当调整。但日常使用通达信软件运行陶博士的选股模型有个很麻烦的问题是,每天要不断重复手工做收盘数据下载然后再刷新RPS数据再运行条件选股,大概要花20-30分钟。所以琢磨着是不是通过python程序来构建一个相对简单的命令运行方式。问题是,我几乎是个python dummy,现学也没有这么大精力。于是尝试使用OpenAI的ChatGPT进行实现,结果大超预期,在代码层面几乎不需要我人工修改,无非是花了大概两周的时间进行各种调试。调试也很dummy,就是直接把问题抛给ChatGPT让他帮我解决。再于是就有了以下这段代码,仅供参考,程序不包赚也不包赔。

程序的实现框架

  • 使用Baostock库获取股票数据并缓存到本地,将该程序命名为fetchdata.py
  • 对缓存到本地的数据进行分析,按照我的选股条件进行筛选,将结果打印出来并保存为本地csv文件,将该程序命令为rpstool.py

具体代码

fetchdata.py

#!/usr/bin/env python3

import baostock as bs
import pandas as pd
import os
import json
from datetime import datetime, timedelta

# 初始化baostock
bs.login()

# 缓存目录
cache_dir = "stock_cache"
os.makedirs(cache_dir, exist_ok=True)

def fetch_all_stock_codes():
    """获取所有A股股票代码"""
    rs = bs.query_stock_basic()
    data_list = []
    while (rs.error_code == '0') & rs.next():
        data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)

    # 过滤上市状态为1(上市)且上市满1年时间的股票
    one_year_ago = datetime.now() - timedelta(days=365)
    df = df[(df['status'] == '1') & (df["type"] == '1') & (pd.to_datetime(df['ipoDate']) <= one_year_ago)]

    # Filter for A-shares in Shanghai and Shenzhen
    a_share_stocks = df[df['code'].str.startswith(('sh', 'sz'))]

    return a_share_stocks['code'].tolist()

def fetch_data(bs_code, start_date, end_date):
    """使用baostock获取股票数据"""
    rs = bs.query_history_k_data_plus(
        bs_code,
        "date,code,open,high,low,close,volume,amount",
        start_date=start_date,
        end_date=end_date,
        frequency="d",
        adjustflag="2"  # 前复权
    )

    data_list = []
    while (rs.error_code == '0') & rs.next():
        data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)
    return df

def save_cache(bs_code, df):
    """保存数据到缓存"""
    cache_file = os.path.join(cache_dir, f"{bs_code}.json")
    df.to_json(cache_file, orient="records", date_format="iso")

def load_cache(bs_code):
    """加载缓存文件"""
    cache_file_sh = os.path.join(cache_dir, f"{bs_code}.SH.json")
    cache_file_sz = os.path.join(cache_dir, f"{bs_code}.SZ.json")
    cache_file = cache_file_sh if os.path.exists(cache_file_sh) else cache_file_sz if os.path.exists(cache_file_sz) else None

    if cache_file and os.path.exists(cache_file):
        try:
            with open(cache_file, "r") as f:
                cached_data = json.load(f)
            df = pd.DataFrame(cached_data)
            df["trade_date"] = pd.to_datetime(df["trade_date"])
            df.set_index("trade_date", inplace=True)
            df = df.sort_index()
            return df
        except (json.JSONDecodeError, ValueError):
            print(f"Error reading cache for {bs_code}.")
    return None

def fetch_and_cache_stock_data(bs_code, start_date, end_date):
    """获取并缓存股票数据"""
    df = fetch_data(bs_code, start_date, end_date)
    if not df.empty:
        df["trade_date"] = pd.to_datetime(df["date"])
        df.set_index("trade_date", inplace=True)
        df = df.sort_index()
        save_cache(bs_code, df)

def main():
    # 获取所有A股股票代码
    stock_list = fetch_all_stock_codes()
    end_date = datetime.today().strftime("%Y-%m-%d")
    start_date = (datetime.today() - timedelta(days=365 * 2)).strftime("%Y-%m-%d")

    for idx, bs_code in enumerate(stock_list):
        fetch_and_cache_stock_data(bs_code, start_date, end_date)

        # 显示进度
        progress = (idx + 1) / len(stock_list) * 100
        print(f"Progress: {progress:.2f}% ({idx + 1}/{len(stock_list)})", end="\r")

    print("\nData fetching completed.")

if __name__ == "__main__":
    main()

# 登出baostock
bs.logout()

先在命令行运行python3 fetchdata.py下载数据到本地stock_cache目录。

rpstool.py

#!/usr/bin/env python3

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tabulate import tabulate

# 缓存目录
CACHE_DIR = "stock_cache"
# 当前年份
CURRENT_YEAR = datetime.now().year
YEAR_START_DATE = datetime(CURRENT_YEAR, 1, 1)

def load_cache(bs_code):
    for prefix in ['sh', 'sz']:
        cache_file = os.path.join(CACHE_DIR, f"{prefix}.{bs_code}.json")
        if os.path.exists(cache_file):
            try:
                with open(cache_file, "r") as f:
                    cached_data = json.load(f)
                df = pd.DataFrame(cached_data)
                df["date"] = pd.to_datetime(df["date"])
                df.set_index("date", inplace=True)
                df['close'] = pd.to_numeric(df['close'], errors='coerce')
                return df.sort_index()
            except (json.JSONDecodeError, ValueError):
                print(f"Error reading cache for {bs_code}.")
    print(f"No cache file found for {bs_code}")
    return None

def calculate_moving_averages(df):
    for window in [40, 60, 120, 250]:
        df[f"ma{window}"] = df["close"].rolling(window=window).mean()
    return df

def calculate_rps(all_stocks_data, period):
    all_closes = pd.DataFrame({code: data['close'] for code, data in all_stocks_data.items()})
    pct_changes = all_closes.pct_change(periods=period, fill_method=None)
    ranks = pct_changes.iloc[-1].rank(pct=True)
    rps = ranks * 100
    for code in all_stocks_data.keys():
        all_stocks_data[code][f"rps{period}"] = rps[code]
    return all_stocks_data

def calculate_max_gain_this_year(df):
    if df.index[0] > YEAR_START_DATE:
        return None
    year_data = df[df.index >= YEAR_START_DATE]
    year_start_price = year_data['close'].iloc[0]
    max_price_this_year = year_data['close'].max()
    return ((max_price_this_year - year_start_price) / year_start_price * 100).round(2)

def filter_criteria(df):
    latest = df.iloc[-1]
    # 最近30天(20个交易日)股价新高
    twenty_days_ago = datetime.now() - timedelta(days=30)
    year_high = df[(df.index >= YEAR_START_DATE) & (df.index <= twenty_days_ago)]["close"].max()
    recent_high_condition = df[df.index > twenty_days_ago]["close"].max() >= year_high
    # RPS120与RPS250之和大于185
    rps_condition = (latest["rps120"] + latest["rps250"]) > 185
    # 股价站上40日均线,且60日、120日、250日均线向上发散
    ma_condition = (latest["close"] > latest["ma40"]) and (latest["ma60"] > latest["ma120"]) and (latest["ma60"] > latest["ma250"])
    # 最近30天(20个交易日)最大回撤小于30%
    current_price = latest["close"]
    max_price = df[df.index > twenty_days_ago]["close"].max()
    drawdown_condition = (max_price - current_price) / max_price <= 0.30
    # 今年涨幅不超过50%
    max_gain_this_year = calculate_max_gain_this_year(df)
    max_gain_condition = max_gain_this_year is not None and max_gain_this_year <= 50

    return all([recent_high_condition, rps_condition, ma_condition, drawdown_condition, max_gain_condition])

def process_stock(bs_code, all_stocks_data):
    hist_data = all_stocks_data[bs_code]
    if len(hist_data) >= 250:
        hist_data = calculate_moving_averages(hist_data)
        if filter_criteria(hist_data):
            latest = hist_data.iloc[-1]
            return {
                "code": bs_code,
                "rps50": round(latest["rps50"], 2),
                "rps120": round(latest["rps120"], 2),
                "rps250": round(latest["rps250"], 2),
                "max_yearly_return": calculate_max_gain_this_year(hist_data),
            }
    return None

def main():
    stock_list = [f.split(".")[1] for f in os.listdir(CACHE_DIR) if f.endswith(".json")]
    all_stocks_data = {bs_code: load_cache(bs_code) for bs_code in stock_list if load_cache(bs_code) is not None and len(load_cache(bs_code)) >= 250}

    for period in [50, 120, 250]:
        all_stocks_data = calculate_rps(all_stocks_data, period)

    with ThreadPoolExecutor(max_workers=30) as executor:
        futures = [executor.submit(process_stock, bs_code, all_stocks_data) for bs_code in all_stocks_data.keys()]
        selected_stocks = [result for future in as_completed(futures) if (result := future.result()) is not None]

    selected_stocks_df = pd.DataFrame(selected_stocks)
    timestamp = datetime.today().strftime("%y%m%d")
    selected_stocks_df.to_csv(f"selected_stocks_{timestamp}.csv", index=False)

    print("\nSelected Stocks:")
    print(tabulate(selected_stocks_df, headers="keys", tablefmt="grid", numalign="right"))

if __name__ == "__main__":
    main()

在命令行运行python3 rpstool.py对数据进行分析,这里采用了超线程处理,我设置为30个线程并行处理以加快运行效率。

以2024年8月14日的运行结果为例,打印结果如下:

python3-rpstool.py
打印结果

2 Reponses So Far ↓

  1. druggo:

    赚翻了啊,羡慕

  2. 重新拾起scripting为下岗再就业储备技能

Leave a Reply ↓