之前一直使用陶博士的月线反转6.0公式在粪坑A股里做条件选股,以减少每天的复盘工作量。陶博士的模型主要基于欧奈尔的RPS强度结合股价站上重要均线且阶段新高进行选股,我根据自己的使用习惯对相关条件进行了适当调整。但日常使用通达信软件运行陶博士的选股模型有个很麻烦的问题是,每天要不断重复手工做收盘数据下载然后再刷新RPS数据再运行条件选股,大概要花20-30分钟。所以琢磨着是不是通过python程序来构建一个相对简单的命令运行方式。问题是,我几乎是个python dummy,现学也没有这么大精力。于是尝试使用OpenAI的ChatGPT进行实现,结果大超预期,在代码层面几乎不需要我人工修改,无非是花了大概两周的时间进行各种调试。调试也很dummy,就是直接把问题抛给ChatGPT让他帮我解决。再于是就有了以下这段代码,仅供参考,程序不包赚也不包赔。
程序的实现框架
- 使用Baostock库获取股票数据并缓存到本地,将该程序命名为
fetchdata.py
- 对缓存到本地的数据进行分析,按照我的选股条件进行筛选,将结果打印出来并保存为本地csv文件,将该程序命令为
rpstool.py
具体代码
fetchdata.py
#!/usr/bin/env python3
import baostock as bs
import pandas as pd
import os
import json
from datetime import datetime, timedelta
# 初始化baostock
bs.login()
# 缓存目录
cache_dir = "stock_cache"
os.makedirs(cache_dir, exist_ok=True)
def fetch_all_stock_codes():
"""获取所有A股股票代码"""
rs = bs.query_stock_basic()
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
# 过滤上市状态为1(上市)且上市满1年时间的股票
one_year_ago = datetime.now() - timedelta(days=365)
df = df[(df['status'] == '1') & (df["type"] == '1') & (pd.to_datetime(df['ipoDate']) <= one_year_ago)]
# Filter for A-shares in Shanghai and Shenzhen
a_share_stocks = df[df['code'].str.startswith(('sh', 'sz'))]
return a_share_stocks['code'].tolist()
def fetch_data(bs_code, start_date, end_date):
"""使用baostock获取股票数据"""
rs = bs.query_history_k_data_plus(
bs_code,
"date,code,open,high,low,close,volume,amount",
start_date=start_date,
end_date=end_date,
frequency="d",
adjustflag="2" # 前复权
)
data_list = []
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
return df
def save_cache(bs_code, df):
"""保存数据到缓存"""
cache_file = os.path.join(cache_dir, f"{bs_code}.json")
df.to_json(cache_file, orient="records", date_format="iso")
def load_cache(bs_code):
"""加载缓存文件"""
cache_file_sh = os.path.join(cache_dir, f"{bs_code}.SH.json")
cache_file_sz = os.path.join(cache_dir, f"{bs_code}.SZ.json")
cache_file = cache_file_sh if os.path.exists(cache_file_sh) else cache_file_sz if os.path.exists(cache_file_sz) else None
if cache_file and os.path.exists(cache_file):
try:
with open(cache_file, "r") as f:
cached_data = json.load(f)
df = pd.DataFrame(cached_data)
df["trade_date"] = pd.to_datetime(df["trade_date"])
df.set_index("trade_date", inplace=True)
df = df.sort_index()
return df
except (json.JSONDecodeError, ValueError):
print(f"Error reading cache for {bs_code}.")
return None
def fetch_and_cache_stock_data(bs_code, start_date, end_date):
"""获取并缓存股票数据"""
df = fetch_data(bs_code, start_date, end_date)
if not df.empty:
df["trade_date"] = pd.to_datetime(df["date"])
df.set_index("trade_date", inplace=True)
df = df.sort_index()
save_cache(bs_code, df)
def main():
# 获取所有A股股票代码
stock_list = fetch_all_stock_codes()
end_date = datetime.today().strftime("%Y-%m-%d")
start_date = (datetime.today() - timedelta(days=365 * 2)).strftime("%Y-%m-%d")
for idx, bs_code in enumerate(stock_list):
fetch_and_cache_stock_data(bs_code, start_date, end_date)
# 显示进度
progress = (idx + 1) / len(stock_list) * 100
print(f"Progress: {progress:.2f}% ({idx + 1}/{len(stock_list)})", end="\r")
print("\nData fetching completed.")
if __name__ == "__main__":
main()
# 登出baostock
bs.logout()
先在命令行运行python3 fetchdata.py
下载数据到本地stock_cache
目录。
rpstool.py
#!/usr/bin/env python3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tabulate import tabulate
# 缓存目录
CACHE_DIR = "stock_cache"
# 当前年份
CURRENT_YEAR = datetime.now().year
YEAR_START_DATE = datetime(CURRENT_YEAR, 1, 1)
def load_cache(bs_code):
for prefix in ['sh', 'sz']:
cache_file = os.path.join(CACHE_DIR, f"{prefix}.{bs_code}.json")
if os.path.exists(cache_file):
try:
with open(cache_file, "r") as f:
cached_data = json.load(f)
df = pd.DataFrame(cached_data)
df["date"] = pd.to_datetime(df["date"])
df.set_index("date", inplace=True)
df['close'] = pd.to_numeric(df['close'], errors='coerce')
return df.sort_index()
except (json.JSONDecodeError, ValueError):
print(f"Error reading cache for {bs_code}.")
print(f"No cache file found for {bs_code}")
return None
def calculate_moving_averages(df):
for window in [40, 60, 120, 250]:
df[f"ma{window}"] = df["close"].rolling(window=window).mean()
return df
def calculate_rps(all_stocks_data, period):
all_closes = pd.DataFrame({code: data['close'] for code, data in all_stocks_data.items()})
pct_changes = all_closes.pct_change(periods=period, fill_method=None)
ranks = pct_changes.iloc[-1].rank(pct=True)
rps = ranks * 100
for code in all_stocks_data.keys():
all_stocks_data[code][f"rps{period}"] = rps[code]
return all_stocks_data
def calculate_max_gain_this_year(df):
if df.index[0] > YEAR_START_DATE:
return None
year_data = df[df.index >= YEAR_START_DATE]
year_start_price = year_data['close'].iloc[0]
max_price_this_year = year_data['close'].max()
return ((max_price_this_year - year_start_price) / year_start_price * 100).round(2)
def filter_criteria(df):
latest = df.iloc[-1]
# 最近30天(20个交易日)股价新高
twenty_days_ago = datetime.now() - timedelta(days=30)
year_high = df[(df.index >= YEAR_START_DATE) & (df.index <= twenty_days_ago)]["close"].max()
recent_high_condition = df[df.index > twenty_days_ago]["close"].max() >= year_high
# RPS120与RPS250之和大于185
rps_condition = (latest["rps120"] + latest["rps250"]) > 185
# 股价站上40日均线,且60日、120日、250日均线向上发散
ma_condition = (latest["close"] > latest["ma40"]) and (latest["ma60"] > latest["ma120"]) and (latest["ma60"] > latest["ma250"])
# 最近30天(20个交易日)最大回撤小于30%
current_price = latest["close"]
max_price = df[df.index > twenty_days_ago]["close"].max()
drawdown_condition = (max_price - current_price) / max_price <= 0.30
# 今年涨幅不超过50%
max_gain_this_year = calculate_max_gain_this_year(df)
max_gain_condition = max_gain_this_year is not None and max_gain_this_year <= 50
return all([recent_high_condition, rps_condition, ma_condition, drawdown_condition, max_gain_condition])
def process_stock(bs_code, all_stocks_data):
hist_data = all_stocks_data[bs_code]
if len(hist_data) >= 250:
hist_data = calculate_moving_averages(hist_data)
if filter_criteria(hist_data):
latest = hist_data.iloc[-1]
return {
"code": bs_code,
"rps50": round(latest["rps50"], 2),
"rps120": round(latest["rps120"], 2),
"rps250": round(latest["rps250"], 2),
"max_yearly_return": calculate_max_gain_this_year(hist_data),
}
return None
def main():
stock_list = [f.split(".")[1] for f in os.listdir(CACHE_DIR) if f.endswith(".json")]
all_stocks_data = {bs_code: load_cache(bs_code) for bs_code in stock_list if load_cache(bs_code) is not None and len(load_cache(bs_code)) >= 250}
for period in [50, 120, 250]:
all_stocks_data = calculate_rps(all_stocks_data, period)
with ThreadPoolExecutor(max_workers=30) as executor:
futures = [executor.submit(process_stock, bs_code, all_stocks_data) for bs_code in all_stocks_data.keys()]
selected_stocks = [result for future in as_completed(futures) if (result := future.result()) is not None]
selected_stocks_df = pd.DataFrame(selected_stocks)
timestamp = datetime.today().strftime("%y%m%d")
selected_stocks_df.to_csv(f"selected_stocks_{timestamp}.csv", index=False)
print("\nSelected Stocks:")
print(tabulate(selected_stocks_df, headers="keys", tablefmt="grid", numalign="right"))
if __name__ == "__main__":
main()
在命令行运行python3 rpstool.py
对数据进行分析,这里采用了超线程处理,我设置为30个线程并行处理以加快运行效率。
以2024年8月14日的运行结果为例,打印结果如下:
赚翻了啊,羡慕
重新拾起scripting为下岗再就业储备技能