This commit is contained in:
guoyz 2025-04-02 23:06:11 +08:00
parent c30c43a325
commit f0814c7a27
9 changed files with 1969 additions and 0 deletions

1
回测/CreateConbinations.py Executable file
View File

@ -0,0 +1 @@
# 创建组合

88
回测/EmailTest.py Executable file
View File

@ -0,0 +1,88 @@
import imaplib
import smtplib
import email
from email.header import decode_header
from email.mime.text import MIMEText
from imapclient import IMAPClient
def send_email(subject, body, to_email):
# 这个函数名为send_email它接受三个参数
# subject邮件主题、body邮件内容和to_email收件人的电子邮件地址
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
# mail_pass = 'pwvzuqbiysqshgha'# qq授权码
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = from_email
msg['To'] = to_email
server = smtplib.SMTP('smtp.126.com', 25)
# server = smtplib.SMTP('smtp.qq.com', 465)
server.starttls()
server.login(from_email, mail_pass)
server.sendmail(from_email, to_email, msg.as_string())
server.quit()
def get_latest_email_body(to_email):
# 连接到126邮箱的IMAP服务器
mail = IMAPClient("imap.126.com")
from_email = 'yizeguo1@126.com'
mail_pass = 'CHRIZKWQSRWYLBOL'# 126授权码
mail.login(from_email, mail_pass)
mail.id_({"name": "IMAPClient", "version": "2.1.0"})
# mail.list_folders()
print(mail.list_folders())
# 选择收件箱
messages = mail.select_folder(folder="inbox", readonly=True)
# 搜索发件人为指定邮箱的所有邮件
messages = mail.search(None, f'(FROM "{to_email}")')
# 获取邮件ID列表
mail_ids = messages[0].split()
if not mail_ids:
print("No emails found.")
return None
# 获取最新一封邮件的ID
latest_email_id = mail_ids[-1]
# 获取最新一封邮件的数据
status, msg_data = mail.fetch(latest_email_id, "(RFC822)")
for response_part in msg_data:
if isinstance(response_part, tuple):
# 解析邮件
msg = email.message_from_bytes(response_part[1])
# 获取纯文本邮件正文
if msg.get_content_type() == "text/plain":
body = msg.get_payload(decode=True).decode()
print("Body:", body)
return body
# 关闭连接并登出
mail.close()
mail.logout()
return None
if __name__ == "__main__":
# send_email("测试", 'test', "guoyize2209@163.com")
# id_info = {
# 'name': 'myname',
# 'version': '1.0.0',
# 'vendor': 'myclient',
# 'support-email': 'yizeguo1@126.com'
# }
body = get_latest_email_body("guoyize2209@163.com")
if body:
print("Latest email body:", body)
else:
print("No plain text email found.")

861
回测/TurtleClassNew.py Executable file
View File

@ -0,0 +1,861 @@
import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta
import pandas as pd
import mplfinance as mpf
def CalTrueFluc(data, day):
H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
TrueFluc = np.max([H_L, H_PDC, PDC_L])
print('high', data.iloc[day]['最高'], 'low', data.iloc[day]['最低'], 'TrueRange', TrueFluc)
return TrueFluc
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
# A股数据 东方财富网
# all_data = ak.stock_zh_a_spot_em()
# 基金实时数据 东方财富网
# fund_etf_spot_em_df = ak.fund_etf_spot_em()
# 后复权历史数据
# fund_etf_hist_em_df = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date="20240408", adjust="hfq")
# fund_etf_hist_em_df.to_csv('513300data.csv', index=False)
# data = pd.read_csv('513300data.csv')
# # 一、计算头寸规模
# # 真实波动幅度 = max (H-L, H-pdc, pdc-L)
# today = datetime.today()
# # print(today)
# # print(data.iloc[-1]['成交额'])
# TrueFlucs = []
# Nserious = np.zeros(101)
# last120days = np.arange(-120, -100)
# for i in last120days:
# H_L = data.iloc[i]['最高'] - data.iloc[i]['最低']
# H_PDC = data.iloc[i]['最高'] - data.iloc[i-1]['收盘']
# PDC_L = data.iloc[i-1]['收盘'] - data.iloc[i]['最低']
# TrueFlucs.append(np.max([H_L, H_PDC, PDC_L]))
# # 求简单平均,放入N序列第一个
# Nsimple = np.average(TrueFlucs)
# Nserious[0] = Nsimple
# # 计算-21到-1的N
# last100days = np.arange(-100, 0)
# for i in range(0,100):
# day = last100days[i]
# H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
# H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
# PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
# TrueFluc = np.max([H_L, H_PDC, PDC_L])
# Ntemp = (19 * Nserious[i] + TrueFluc)/20
# Nserious[i+1] = Ntemp
# # print(Nserious)
# total_rows = len(data)
# Ndata = np.zeros(total_rows)
# Ndata[total_rows-101:] = Nserious
# # NewColumn = [0]*(total_rows-101) + Nserious
# data['N'] = Ndata
# data.to_csv('513300data-N.csv', index=False)
# pass
# -----------------------更新atr----------------------
"""已有数据与新数据对比补充新的N,同时更新数据库
"""
# Today = datetime.today()
# # print(Today)
# formatted_date = Today.strftime("%Y%m%d")
# # print(formatted_date)
# CurrentData = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date=formatted_date, adjust="hfq")
# CurrentData = calc_sma_atr_pd(CurrentData, 20)
# CurrentData.to_csv('513300data-N.csv', index=False)
# pass
# ------------------计算头寸规模 资金10w, 1%波动------------
# money = 100000
# OldData = pd.read_csv('513300data-N.csv')
# N = OldData.iloc[-1]['ATR']
# # N = 0.473
# Price = OldData.iloc[-1]['收盘']
# # Price = 5.60
# EveryUnit = 0.0025 * money /(N*100*Price)
# print('单位',EveryUnit)
# print(113*100*Price)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR
Position Size
买入卖出加仓等行为
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode) -> None:
self.TradeCode = TradeCode
def GetRecentData(self):
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date="20130101", end_date=formatted_date, adjust="")
return CurrentData
def CalATR(self, data, ATRday, SaveOrNot):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
self.CurrentData = calc_sma_atr_pd(data, ATRday)
self.N = self.CurrentData['ATR']
if SaveOrNot:
self.CurrentData.to_csv('513300data-N.csv', index=False)
print("csv保存成功")
return self.N
def CalPositionSize(self, RiskCoef, Capital):
"""计算PosizionSize 持有的单位该单位某标的1N波动对应RiskCoef * Capital资金
Args:
RiskCoef (_type_): 风险系数
Capital (_type_): 资金
"""
N = self.CurrentData.iloc[-1]['ATR']
# N = 0.473
Price = self.CurrentData.iloc[-1]['收盘']
# Price = 5.60
self.PositionSize = RiskCoef * Capital /( N*100*Price) # 默认用股票形式了 100
return self.PositionSize
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Open'] = self.CurrentData['开盘'][-days:]
Klinedf['High'] = self.CurrentData['最高'][-days:]
Klinedf['Low'] = self.CurrentData['最低'][-days:]
Klinedf['Close'] = self.CurrentData['收盘'][-days:]
Klinedf['Volume'] = self.CurrentData['成交量'][-days:]
Klinedf.set_index(dates, inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian[['Upper', 'Lower']])])
def calculate_donchian_channel(self, days, n):
"""
计算唐奇安通道days一共多少日 n多少日唐奇安
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame至少包含"High""Low"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper", "Lower", "Middle"
"""
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.CurrentData['最高'][-days:].rolling(n).max()
Donchian['Lower'] = self.CurrentData['最低'][-days:].rolling(n).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
class Trade(object):
"""具有以下功能:
接收Turtle Class作为输入
1 数据准备
a 获取数据
b 计算atr
c 计算55日2010日Donchian
2 总持有单位判断
3 定义系统2 超过55日xxx分段加仓
4 回测功能
5 实时功能 问题实时功能如何同时监控几个item
78
Args:
object (_type_): _description_
"""
def __init__(self, turtles, riskcoe, cash, StartTime, EndTime) -> None:
"""接收所有的turtles
Args:
turtles (_type_): _description_
"""
self.turtles = turtles
self.riskcoe = riskcoe
self.cash = cash
self.Capital = cash
self.StartTime = StartTime
self.EndTime = EndTime
self.TrigerTime = 0
# self.BuyStates = {
# "trigger_count": 0, # 触发次数
# "BuyPrice": None, # 买入/持有价格
# "StopPrice":None, # 止损价格
# "quantity": 0, # 持有份数
# "N": 0, # ATR
# "available_cash": self.cash # 可用资金
# }
# 0"trigger_count", 1"BuyPrice", 2"StopPrice", 3"quantity", 4"N", 5"available_cash"
self.BuyStates = [[0, None, None, 0, 0, self.cash]]
self.tradeslog = [] # 交易记录
self.current_week = None # 当前周数
# def TurtleDataPre(self):
# for turtle in self.turtles:
# turtle.CalATR(20, True)
# turtle.Donchian20 = turtle.calculate_donchian_channel(500, 20)
# turtle.Donchian10 = turtle.calculate_donchian_channel(500, 10)
# turtle.Donchian55 = turtle.calculate_donchian_channel(500, 55)
# pass
def PortfolioPositon(self):
"""总共能持有多少个单位
"""
pass
def TestBuyStocks(self, PriceNow, date):
# 回测中的买入函数 如果开盘价大于55日最高价大于四份价格执行加满
# 返回self.BuyStates
# 实盘中应该是触发买入信号发送买入邮件价格份数。当前Turtle程序暂停收到邮件返回更新买入价格份数
# 更新BuyStates:"触发次数"、"买入/持有价格"、"持有份数"、"N"、"可用资金"
N = self.ThisWeekN
Shares = self.ThisWeekPosizionSize
# 更新 BuyStates
if self.TrigerTime == 0: # 第一次买入是直接修改
# if self.BuyStates[0] == 0:
self.TrigerTime += 1
BuyPrice = PriceNow
AddPrice = PriceNow + 1/2 * N
StopPrice = PriceNow - 2 * N
available_cash = self.cash - Shares * BuyPrice
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
self.BuyStates = [[self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]]
# 更新log
#
AllShares = sum(row[4] for row in self.BuyStates)
NetValue = available_cash + AllShares * BuyPrice
self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue])
else: # 加仓的操作在BuyStates后边追加
self.TrigerTime += 1
BuyPrice = PriceNow
AddPrice = PriceNow + 1/2 * N
StopPrice = PriceNow - 2 * N
available_cash = self.BuyStates[self.TrigerTime-2][6] - Shares * BuyPrice
self.BuyStates.append([self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash])
# 更新log
AllShares = sum(row[4] for row in self.BuyStates)
NetValue = available_cash + AllShares * BuyPrice
self.tradeslog.append([date, 'Buy', Shares, PriceNow, N, available_cash, NetValue])
pass
def TestStopSaleStocks(self, PriceNow, date):
# 回测中的卖出函数,仓位全卖
N = self.ThisWeekN
# Shares应该等于所有持仓的和
Shares = sum(row[4] for row in self.BuyStates)
# Shares = sum(self.BuyStates[:, 4])
available_cash = self.BuyStates[-1][6] + Shares * PriceNow
# 更新log
NetValue = available_cash
self.tradeslog.append([date, 'StopSale', Shares, PriceNow, N, available_cash, NetValue])
# self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash))
# TrigerTime归0
self.TrigerTime = 0
# self.cash更新
self.cash = available_cash
# 回到初始状态
self.BuyStates = [[0, None, None, None, 0, 0, self.cash]]
def TestOutSaleStocks(self, PriceNow, date):
# 回测中的卖出函数,仓位全卖
N = self.ThisWeekN
# Shares应该等于所有持仓的和
Shares = sum(row[4] for row in self.BuyStates)
# Shares = sum(self.BuyStates[:, 4])
available_cash = self.BuyStates[-1][6] + Shares * PriceNow
# 更新log
NetValue = available_cash
self.tradeslog.append([date, 'OutSale', Shares, PriceNow, N, available_cash, NetValue])
# self.trades.append((date, 'Sale', Shares, PriceNow, N, available_cash))
# TrigerTime归0
self.TrigerTime = 0
# self.cash更新
self.cash = available_cash
# 回到初始状态
self.BuyStates = [[0, None, None, None, 0, 0, self.cash]]
def system2Enter(self, PriceNow, TempDonchian55Upper):
"""以50日突破为基础的较简单的长线系统
入市价格超过了前55日的最高价或最低价就建立头寸
- 如果价格超过55日最高价买入一个单位建立多头头寸
- 如果价格跌破55日最低价卖出一个单位建立空头头寸
退出
增加单位突破时只建立一个单位建立后以1/2N的间隔增加头寸以前面指令的实际成交价为基础实际成交价+1/2N
单个品种最大4个单位
以多头编写
"""
# # 触发次数
# Trigertime = 0
# 只有没买入过,且 现价超过55日最高价是一次突破直接以突破价格买入--没有持仓且价格向上突破
if self.TrigerTime == 0 and PriceNow > TempDonchian55Upper[-1]:
# 买入
return True
else:
return False
def system1EnterNormal(self, PriceNow, TempDonchian20Upper, BreakOutLog):
# 没有持仓且价格向上突破---此时包含两种情形1 对某标的首次使用系统2 已发生过突破,此时上次突破天然是失败的
if self.TrigerTime == 0 and PriceNow > TempDonchian20Upper[-1]:
# 买入
return True
elif self.TrigerTime != 0 and PriceNow > TempDonchian20Upper[-1]:
self.system1BreakoutValid(PriceNow)
if BreakOutLog[-1][5] == 'Lose': # TT!= 0且突破且上一次突破unseccessful
return True
else:
return False
else:
return False
def system1EnterSafe(self, PriceNow, TempDonchian55Upper):
if PriceNow > TempDonchian55Upper[-1]: # 保底的55日突破
return True
else:
return False
def system1BreakoutValid(self, priceNow):
"""判断前一次突破是否成功是log[-1][5]写入“win”否则写入“Lose”
"""
if priceNow < self.BreakOutLog[-1][3]:
self.BreakOutLog[-1][5] = 'Lose'
else:
self.BreakOutLog[-1][5] = 'None'
pass
def system2Out(self, PriceNow, TempDonchian20Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian20Lower[-1]:
# 退出
return True
else:
return False
def system1Out(self, PriceNow, TempDonchian10Lower):
# 退出:低于20日最低价多头方向,空头以突破20日最高价为止损价格--有持仓且价格向下突破
if self.TrigerTime != 0 and PriceNow < TempDonchian10Lower[-1]:
# 退出
return True
else:
return False
def system2Add(self, PriceNow):
"""加仓判断:如果当前价格>上一次买入后的加仓价格则加仓
"""
if self.TrigerTime < 4 and PriceNow > self.BuyStates[self.TrigerTime - 1][2]:
# 买入
return True
else:
return False
def system2Stop(self, PriceNow):
"""止损判断:如果当前价格<上一次买入后的止损价格则止损
"""
if PriceNow < self.BuyStates[self.TrigerTime - 1][3]:
# 买入
return True
else:
return False
def system2CalDonchian(self):
# 按照system2的要求计算唐奇安通道上下边界--55日上界20日下界
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.RecentAlldata['最高'].rolling(55).max()
Donchian['Lower'] = self.RecentAlldata['最低'].rolling(20).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def system1CalDonchian(self):
# 按照system2的要求计算唐奇安通道上下边界--55日上界20日下界
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper55'] = self.RecentAlldata['最高'].rolling(55).max()
Donchian['Upper20'] = self.RecentAlldata['最高'].rolling(20).max()
Donchian['Lower'] = self.RecentAlldata['最低'].rolling(10).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian
def CalPositionSize(self, N, price):
PositionSize = self.riskcoe * self.Capital /(N) # 默认用股票形式了 100
IntPositionSize = int(PositionSize // 100) * 100
return IntPositionSize
def TestSys2Function(self):
"""回测函数,对于某个标的,
准备:获取价格数据计算ATR,N,头寸单位唐奇安通道上下边界
运行过程:按时间推进开始时间-结束时间按照系统1或2或组合形式执行买入加仓退出止损
操作记录:形成一个log
"""
# ------------------准备阶段----------------
# 1、获取标的价格
self.RecentAlldata = self.turtles.GetRecentData()
self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期'])
self.RecentAlldata.set_index('日期', inplace=True)
# 2、计算ATR
self.N = self.turtles.CalATR(self.RecentAlldata, 20, None)
# self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash)
# 3、计算唐奇安通道上下边界--system2
self.Donchian = self.system2CalDonchian()
# 开始、结束日期
RowStart = self.RecentAlldata.index.get_loc(self.StartTime)
RowEnd = self.RecentAlldata.index.get_loc(self.EndTime)
# 准备迭代使用的数据
TempData = pd.DataFrame() # 存储最高、最低价等信息
TempN = []
TempDonchian55Upper = []
TempDonchian20Lower = []
# ------------------运行阶段----------------
for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows():
# 增加一个迭代更新的数据
TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True)
# day <=55不执行任何操作跳过,等待
# day < 55,<20刚开始运行N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿
# if TempData.shape[0] < 20:
# 每天更新上下边界
TempDonchian20Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2])
# if TempData.shape[0] < 55:
TempDonchian55Upper.append(self.Donchian['Upper'].iloc[RowStart + TempData.shape[0]-2])
# 检查是否是新的一周
if self.current_week is None or date.week != self.current_week:
self.current_week = date.week
# 更新atr、头寸规模
self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2]
TempN.append(self.ThisWeekN)
self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘'])
# 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入
if self.TrigerTime == 0:
if self.system2Enter(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 开盘突破后,最高价能否加仓?
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 最高价突破,买入
elif self.system2Enter(row["最高"], TempDonchian55Upper):
self.TestBuyStocks(TempDonchian55Upper[-1]+0.001, date)
delta = math.floor((row['最高']-TempDonchian55Upper[-1]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((TempDonchian55Upper[-1] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
# self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]
elif 1 <= self.TrigerTime < 4: # 加仓1-3次考虑止损和加仓
# 开盘价加仓
if self.system2Add(row["开盘"]):
self.TestBuyStocks(row["开盘"], date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 最高价加仓
elif self.system2Add(row["最高"]):
self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN +0.001), date)
# 止损
elif self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.TrigerTime == 4: # 满仓 考虑止损和退出
# 止损
if self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian20Lower):
self.TestOutSaleStocks(TempDonchian20Lower[-1] - 0.001, date)
# 交易日结束,重新计算高低突破点:
print("---------------回测结束,回测日志如下----------------")
for sublist in self.tradeslog:
print(sublist)
def TestSys1Function(self):
"""回测函数,对于某个标的,
准备:获取价格数据计算ATR,N,头寸单位唐奇安通道上下边界
运行过程:按时间推进开始时间-结束时间按照系统1或2或组合形式执行买入加仓退出止损
操作记录:形成一个log
"""
# ------------------准备阶段----------------
# 1、获取标的价格
self.RecentAlldata = self.turtles.GetRecentData()
self.RecentAlldata['日期'] = pd.to_datetime(self.RecentAlldata['日期'])
self.RecentAlldata.set_index('日期', inplace=True)
# 2、计算ATR
self.N = self.turtles.CalATR(self.RecentAlldata, 20, None)
# self.PositionSize = self.turtles.CalPositionSize(self.riskcoe, self.cash)
# 3、计算唐奇安通道上下边界--system2
self.Donchian = self.system1CalDonchian()
# 开始、结束日期
RowStart = self.RecentAlldata.index.get_loc(self.StartTime)
RowEnd = self.RecentAlldata.index.get_loc(self.EndTime)
# 准备迭代使用的数据
TempData = pd.DataFrame() # 存储最高、最低价等信息
TempN = []
TempDonchian55Upper = []
TempDonchian20Upper = []
TempDonchian10Lower = []
self.BreakOutLog = [['date', 'breakout', 'BOprice', 'LosePrice', 'ValidOrNot', 'WinOrLose']]
# ------------------运行阶段----------------
for date, row in self.RecentAlldata[RowStart:RowEnd].iterrows():
# 增加一个迭代更新的数据
TempData = pd.concat([TempData, row.to_frame().T], ignore_index=True)
# day <=55不执行任何操作跳过,等待
# day < 55,<20刚开始运行N、Donchian数据从总数据拿, 不用框时间,直接从总数据拿
# if TempData.shape[0] < 20:
# 每天更新上下边界
TempDonchian10Lower.append(self.Donchian['Lower'].iloc[RowStart + TempData.shape[0]-2])
# if TempData.shape[0] < 55:
TempDonchian55Upper.append(self.Donchian['Upper55'].iloc[RowStart + TempData.shape[0]-2])
TempDonchian20Upper.append(self.Donchian['Upper20'].iloc[RowStart + TempData.shape[0]-2])
# 检查是否是新的一周
if self.current_week is None or date.week != self.current_week:
self.current_week = date.week
# 更新atr、头寸规模
self.ThisWeekN = self.N.iloc[RowStart + TempData.shape[0]-2]
TempN.append(self.ThisWeekN)
self.ThisWeekPosizionSize = self.CalPositionSize(self.ThisWeekN, TempData.iloc[-1]['收盘'])
# 如果空仓,判断当天开盘价是否突破,收盘价是否突破,突破则买入
if self.TrigerTime == 0:
if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(row["开盘"], date)
# 写入BreakOut状态
self.BreakOutLog.append([date, 'breakout', row["开盘"], row["开盘"] - 2*self.ThisWeekN, 'Valid','None'])
# 开盘突破后,最高价能否加仓?
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 最高价突破买入
elif self.system1EnterNormal(row["最高"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(TempDonchian20Upper[-1] + 0.001, date)
# 写入BreakOut状态
self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None'])
# 价格差取整是能加仓几手,循环加仓
delta = math.floor((row['最高']-TempDonchian20Upper[-1]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 3:
for i in range(delta):
self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif 3 < delta:
for i in range(3):
self.TestBuyStocks((TempDonchian20Upper[-1] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
elif self.system1EnterSafe(row["最高"], TempDonchian55Upper):
self.TestBuyStocks(TempDonchian55Upper[-1] + 0.001, date)
# 0"trigger_count", 1"BuyPrice", 2"AddPrice", 3"StopPrice", 4"quantity", 5"N", 6"available_cash"
# self.BuyStates = [self.TrigerTime, BuyPrice, AddPrice, StopPrice, Shares, N, available_cash]
elif 1 <= self.TrigerTime < 4: # 加仓1-3次考虑止损和加仓
# 开盘价突破
if self.system1EnterNormal(row["开盘"], TempDonchian20Upper, self.BreakOutLog):
self.TestBuyStocks(row["开盘"], date)
self.BreakOutLog.append([date, 'breakout', TempDonchian20Upper[-1] + 0.001, TempDonchian20Upper[-1] + 0.001 - 2*self.ThisWeekN, 'Valid','None'])
elif self.system1EnterSafe(row["开盘"], TempDonchian55Upper):
self.TestBuyStocks(row["开盘"], date)
# 开盘价加仓
if self.system2Add(row["开盘"]):
self.TestBuyStocks(row["开盘"], date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-row['开盘']) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((row["开盘"] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 最高价加仓
elif self.system2Add(row["最高"]):
self.TestBuyStocks(self.BuyStates[self.TrigerTime - 1][2] + 0.001, date)
if self.TrigerTime == 4:
# 满仓了,不能再加
pass
# 开盘加仓后,最高价能否继续加仓?
else:
delta = math.floor((row['最高']-self.BuyStates[self.TrigerTime - 1][2]) / (1/2 * self.ThisWeekN))
if 1 <= delta <= 4-self.TrigerTime:
for i in range(delta):
self.TestBuyStocks((self.BuyStates[self.TrigerTime - 1][2] + (i+1) * 1/2 * self.ThisWeekN + 0.001), date)
# 止损
elif self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.TrigerTime == 4: # 满仓 考虑止损和退出
# 止损
if self.system2Stop(row["收盘"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
elif self.system2Stop(row["最低"]):
self.TestStopSaleStocks(self.BuyStates[self.TrigerTime - 1][3] - 0.001, date)
# 止盈
elif self.system2Out(row["收盘"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
elif self.system2Out(row["最低"], TempDonchian10Lower):
self.TestOutSaleStocks(TempDonchian10Lower[-1] - 0.001, date)
# 交易日结束,重新计算高低突破点:
print("---------------回测结束,回测日志如下----------------")
for sublist in self.tradeslog:
print(sublist)
for sublist in self.BreakOutLog:
print(sublist)
# nsdk513300 = TurtleTrading(513300)
# # 每周更新
# nsdk513300.GetRecentData()
# nsdk513300.CalATR(20, True)
# nsdk513300.ReadExistData('513300data-N.csv')
# nsdk513300.CalPositionSize(0.0025, 100000)
# # 每天更新
# nsdk513300.Donchian20 = nsdk513300.calculate_donchian_channel(500, 20)
# nsdk513300.Donchian10 = nsdk513300.calculate_donchian_channel(500, 10)
# nsdk513300.Donchian55 = nsdk513300.calculate_donchian_channel(500, 55)
# nsdk513300.DrawKLine(500)
# print(nsdk513300.PositionSize)
if __name__ == "__main__":
nsdk513300 = TurtleTrading(513300)
# nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2023-1-3', '2024-05-9')
nsdk513300test = Trade(nsdk513300, 0.0025, 100000, '2014-1-15', '2025-02-18')
nsdk513300test.TestSys2Function()
# nsdk513300test.TestSys1Function()

235
回测/TurtleClass_old.py Executable file
View File

@ -0,0 +1,235 @@
import numpy as np
import akshare as ak
import os
from datetime import datetime, timedelta
import pandas as pd
import mplfinance as mpf
def CalTrueFluc(data, day):
H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
TrueFluc = np.max([H_L, H_PDC, PDC_L])
print('high', data.iloc[day]['最高'], 'low', data.iloc[day]['最低'], 'TrueRange', TrueFluc)
return TrueFluc
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
# A股数据 东方财富网
# all_data = ak.stock_zh_a_spot_em()
# 基金实时数据 东方财富网
# fund_etf_spot_em_df = ak.fund_etf_spot_em()
# 后复权历史数据
# fund_etf_hist_em_df = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date="20240408", adjust="hfq")
# fund_etf_hist_em_df.to_csv('513300data.csv', index=False)
# data = pd.read_csv('513300data.csv')
# # 一、计算头寸规模
# # 真实波动幅度 = max (H-L, H-pdc, pdc-L)
# today = datetime.today()
# # print(today)
# # print(data.iloc[-1]['成交额'])
# TrueFlucs = []
# Nserious = np.zeros(101)
# last120days = np.arange(-120, -100)
# for i in last120days:
# H_L = data.iloc[i]['最高'] - data.iloc[i]['最低']
# H_PDC = data.iloc[i]['最高'] - data.iloc[i-1]['收盘']
# PDC_L = data.iloc[i-1]['收盘'] - data.iloc[i]['最低']
# TrueFlucs.append(np.max([H_L, H_PDC, PDC_L]))
# # 求简单平均,放入N序列第一个
# Nsimple = np.average(TrueFlucs)
# Nserious[0] = Nsimple
# # 计算-21到-1的N
# last100days = np.arange(-100, 0)
# for i in range(0,100):
# day = last100days[i]
# H_L = data.iloc[day]['最高'] - data.iloc[day]['最低']
# H_PDC = data.iloc[day]['最高'] - data.iloc[day-1]['收盘']
# PDC_L = data.iloc[day-1]['收盘'] - data.iloc[day]['最低']
# TrueFluc = np.max([H_L, H_PDC, PDC_L])
# Ntemp = (19 * Nserious[i] + TrueFluc)/20
# Nserious[i+1] = Ntemp
# # print(Nserious)
# total_rows = len(data)
# Ndata = np.zeros(total_rows)
# Ndata[total_rows-101:] = Nserious
# # NewColumn = [0]*(total_rows-101) + Nserious
# data['N'] = Ndata
# data.to_csv('513300data-N.csv', index=False)
# pass
# -----------------------更新atr----------------------
"""已有数据与新数据对比补充新的N,同时更新数据库
"""
# Today = datetime.today()
# # print(Today)
# formatted_date = Today.strftime("%Y%m%d")
# # print(formatted_date)
# CurrentData = ak.fund_etf_hist_em(symbol="513300", period="daily", start_date="20130101", end_date=formatted_date, adjust="hfq")
# CurrentData = calc_sma_atr_pd(CurrentData, 20)
# CurrentData.to_csv('513300data-N.csv', index=False)
# pass
# ------------------计算头寸规模 资金10w, 1%波动------------
# money = 100000
# OldData = pd.read_csv('513300data-N.csv')
# N = OldData.iloc[-1]['ATR']
# # N = 0.473
# Price = OldData.iloc[-1]['收盘']
# # Price = 5.60
# EveryUnit = 0.0025 * money /(N*100*Price)
# print('单位',EveryUnit)
# print(113*100*Price)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR
Position Size
买入卖出加仓等行为
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode) -> None:
self.TradeCode = TradeCode
def CalATR(self, ATRday, SaveOrNot):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date="20130101", end_date=formatted_date, adjust="hfq")
self.CurrentData = calc_sma_atr_pd(CurrentData, ATRday)
if SaveOrNot:
self.CurrentData.to_csv('513300data-N.csv', index=False)
print("csv保存成功")
def CalPositionSize(self, RiskCoef, Capital):
"""计算PosizionSize 持有的单位该单位某标的1N波动对应RiskCoef * Capital资金
Args:
RiskCoef (_type_): 风险系数
Capital (_type_): 资金
"""
N = self.CurrentData.iloc[-1]['ATR']
# N = 0.473
Price = self.CurrentData.iloc[-1]['收盘']
# Price = 5.60
self.PositionSize = RiskCoef * Capital /( N*100*Price) # 默认用股票形式了 100
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Open'] = self.CurrentData['开盘'][-days:]
Klinedf['High'] = self.CurrentData['最高'][-days:]
Klinedf['Low'] = self.CurrentData['最低'][-days:]
Klinedf['Close'] = self.CurrentData['收盘'][-days:]
Klinedf['Volume'] = self.CurrentData['成交量'][-days:]
Klinedf.set_index(dates, inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian[['Upper', 'Lower']])])
def calculate_donchian_channel(self, days, n):
"""
计算唐奇安通道days一共多少日 n多少日唐奇安
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame至少包含"High""Low"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper", "Lower", "Middle"
"""
self.Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
self.Donchian['Upper'] = self.CurrentData['最高'][-days:].rolling(n).max()
self.Donchian['Lower'] = self.CurrentData['最低'][-days:].rolling(n).min()
# 计算中间线
self.Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
# return data[['Upper', 'Lower', 'Middle']]
nsdk513300 = TurtleTrading(513300)
# nsdk513300.CalATR(20, True)
nsdk513300.ReadExistData('513300data-N.csv')
# nsdk513300.CalPositionSize(0.0025, 100000)
nsdk513300.calculate_donchian_channel(500, 20)
nsdk513300.DrawKLine(500)
# print(nsdk513300.PositionSize)

139
回测/TurtleOnTime.py Normal file
View File

@ -0,0 +1,139 @@
import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta, date
import pandas as pd
import mplfinance as mpf
def calc_sma_atr_pd(kdf,period):
"""计算TR与ATR
Args:
kdf (_type_): 历史数据
period (_type_): ATR周期
Returns:
_type_: 返回kdf增加TR与ATR列
"""
kdf['HL'] = kdf['最高'] - kdf['最低']
kdf['HC'] = np.abs(kdf['最高'] - kdf['收盘'].shift(1))
kdf['LC'] = np.abs(kdf['最低'] - kdf['收盘'].shift(1))
kdf['TR'] = np.round(kdf[['HL','HC','LC']].max(axis=1), 3)
# ranges = pd.concat([high_low, high_close, low_close], axis=1)
# true_range = np.max(ranges, axis=1)
kdf['ATR'] = np.round(kdf['TR'].rolling(period).mean(), 3)
return kdf.drop(['HL','HC','LC'], axis = 1)
class TurtleTrading(object):
"""对象范围较小对某一个标的创建一个海龟如513300
计算ATR
Position Size
买入卖出加仓等行为
Args:
object (_type_): _description_
"""
def __init__(self, TradeCode) -> None:
self.TradeCode = TradeCode
def GetRecentData(self):
"""获取某个标的的最近数据,从两年前到今天, 计算后的数据保存在self.CurrentData
Returns:
_type_: _description_
"""
Today = datetime.today()
# print(Today)
formatted_date = Today.strftime("%Y%m%d")
two_years_ago = date.today() - timedelta(days=365*2).strftime("%Y%m%d")
# print(formatted_date)
Code = f"{self.TradeCode}"
self.CurrentData = ak.fund_etf_hist_em(symbol=Code, period="daily", start_date=two_years_ago, end_date=formatted_date, adjust="")
# return CurrentData
def CalATR(self, data, ATRday, SaveOrNot):
"""计算某个标的的ATR从上市日到今天, 计算后的数据保存在self.CurrentData
Args:
ATRday: 多少日ATR
SaveOrNot (_type_): 是否保存.csv数据
"""
self.CurrentData = calc_sma_atr_pd(data, ATRday)
self.N = self.CurrentData['ATR']
if SaveOrNot:
self.CurrentData.to_csv('513300data-N.csv', index=False)
print("csv保存成功")
return self.N
def CalPositionSize(self, RiskCoef, Capital):
"""计算PosizionSize 持有的单位该单位某标的1N波动对应RiskCoef * Capital资金
Args:
RiskCoef (_type_): 风险系数
Capital (_type_): 资金
"""
N = self.CurrentData.iloc[-1]['ATR']
# N = 0.473
Price = self.CurrentData.iloc[-1]['收盘']
# Price = 5.60
self.PositionSize = RiskCoef * Capital /( N*100*Price) # 默认用股票形式了 100
return self.PositionSize
def ReadExistData(self, data):
"""除了通过发请求获取数据也可以读本地的数据库赋值给self.CurrentData
Args:
data (_type_): 本地csv名称
"""
self.CurrentData = pd.read_csv(data)
def DrawKLine(self, days):
"""画出k线图看看,画出最近days天的K线图
"""
# 日期部分
dates = pd.to_datetime(self.CurrentData['日期'][-days:])
# Klinedf['Data'] = pd.to_datetime(self.CurrentData['日期'])
Klinedf = pd.DataFrame()
# Klinedf.set_index = Klinedf['Data']
# 其他数据
Klinedf['Open'] = self.CurrentData['开盘'][-days:]
Klinedf['High'] = self.CurrentData['最高'][-days:]
Klinedf['Low'] = self.CurrentData['最低'][-days:]
Klinedf['Close'] = self.CurrentData['收盘'][-days:]
Klinedf['Volume'] = self.CurrentData['成交量'][-days:]
Klinedf.set_index(dates, inplace=True)
# 画图
mpf.plot(Klinedf, type='candle', style='yahoo', volume=False, mav=(5,), addplot=[mpf.make_addplot(self.Donchian[['Upper', 'Lower']])])
def calculate_donchian_channel(self, days, n):
"""
计算唐奇安通道days一共多少日 n多少日唐奇安
参数:
self.CurrentData (DataFrame): 包含价格数据的Pandas DataFrame至少包含"High""Low"
n (int): 时间周期
返回:self.Donchian
DataFrame: 唐奇安通道的DataFrame包含"Upper", "Lower", "Middle"
"""
Donchian = pd.DataFrame()
# 计算最高价和最低价的N日移动平均线
Donchian['Upper'] = self.CurrentData['最高'][-days:].rolling(n).max()
Donchian['Lower'] = self.CurrentData['最低'][-days:].rolling(n).min()
# # 计算中间线
# Donchian['Middle'] = (self.Donchian['Upper'] + self.Donchian['Lower']) / 2
return Donchian

645
回测/TurtleOnTime_ai.py Executable file
View File

@ -0,0 +1,645 @@
"""海龟实时
"""
import numpy as np
import math
import akshare as ak
import os
from datetime import datetime, timedelta
import pandas as pd
import mplfinance as mpf
import TurtleClassNew
# -----------------------------------
# 创建组合,先用一个测试
conbinations = []
# 我是否需要当前组合的信息:需要
# 什么东西 股票还是etf
# 风险系数risk_coef; atr头寸单位
# 系数是多少每1%波动多少钱 atr 买4份一共多少钱
# 组合总共会花掉多少钱
# 每个item应该具有的属性
# code
# ATR
# price
# risk_coef
# capital
# 每个月调整risk_coef和captial
# 初始化函数
# 初始化conbinations中的数据
# 监盘函数:
# 数据整理保存函数,收盘后开始
for item in conbinations:
# 创建Turtle实例ETF与股票获取数据代码不同
pass
# 获取数据每5分钟获取一次
# 计算唐奇安通道 每天收盘计算
#
# https://akshare.akfamily.xyz/data/stock/stock.html#id9
import akshare as ak
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import json
from typing import Dict, List, Tuple
import logging
from decimal import Decimal
class DatabaseManager:
def __init__(self, db_path: str = "turtle_trading.db"):
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""创建必要的数据表"""
# 交易信号记录表
self.conn.execute('''
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stock_code TEXT,
signal_type TEXT,
suggested_price REAL,
suggested_quantity INTEGER,
timestamp DATETIME,
status TEXT
)''')
# 实际交易记录表
self.conn.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
signal_id INTEGER,
actual_price REAL,
actual_quantity INTEGER,
timestamp DATETIME,
FOREIGN KEY (signal_id) REFERENCES signals (id)
)''')
# 持仓状态表
self.conn.execute('''
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
stock_code TEXT,
quantity INTEGER,
avg_cost REAL,
entry_price REAL,
last_price REAL,
stop_loss REAL,
target_price REAL,
position_type TEXT,
timestamp DATETIME,
status TEXT
)''')
def get_position(self, stock_code: str) -> Dict:
"""获取股票当前持仓信息"""
cursor = self.conn.execute('''
SELECT * FROM positions
WHERE stock_code = ? AND status = 'ACTIVE'
''', (stock_code,))
position = cursor.fetchone()
if position:
return {
'id': position[0],
'stock_code': position[1],
'quantity': position[2],
'avg_cost': position[3],
'entry_price': position[4],
'last_price': position[5],
'stop_loss': position[6],
'target_price': position[7],
'position_type': position[8],
'timestamp': position[9],
'status': position[10]
}
return None
def update_position(self, stock_code: str, last_price: float):
"""更新持仓的最新价格"""
self.conn.execute('''
UPDATE positions
SET last_price = ?, timestamp = ?
WHERE stock_code = ? AND status = 'ACTIVE'
''', (last_price, datetime.now(), stock_code))
self.conn.commit()
def create_position(self, stock_code: str, quantity: int,
entry_price: float, position_type: str,
stop_loss: float, target_price: float):
"""创建新持仓"""
self.conn.execute('''
INSERT INTO positions (
stock_code, quantity, avg_cost, entry_price, last_price,
stop_loss, target_price, position_type, timestamp, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (stock_code, quantity, entry_price, entry_price, entry_price,
stop_loss, target_price, position_type, datetime.now(), 'ACTIVE'))
self.conn.commit()
def close_position(self, stock_code: str):
"""关闭持仓"""
self.conn.execute('''
UPDATE positions
SET status = 'CLOSED', timestamp = ?
WHERE stock_code = ? AND status = 'ACTIVE'
''', (datetime.now(), stock_code))
self.conn.commit()
class TurtleStrategy:
def __init__(self, lookback_days: int = 20):
self.lookback_days = lookback_days
def calculate_signals(self, df: pd.DataFrame,
current_position: Dict = None) -> Dict:
"""计算交易信号,考虑当前持仓状态"""
# 计算技术指标
df['high_20'] = df['high'].rolling(20).max()
df['low_10'] = df['low'].rolling(10).min()
df['atr'] = self._calculate_atr(df)
current = df.iloc[-1]
prev = df.iloc[-2]
signal = {
'type': None,
'price': None,
'quantity': 0,
'stop_loss': None,
'target_price': None
}
if current_position:
# 持仓状态下的信号计算
return self._calculate_position_signals(
current_position, current, prev, df
)
else:
# 无持仓状态下的信号计算
return self._calculate_entry_signals(current, prev, df)
def _calculate_position_signals(self, position: Dict,
current: pd.Series,
prev: pd.Series,
df: pd.DataFrame) -> Dict:
"""计算持仓状态下的信号"""
signal = {
'type': None,
'price': current['close'],
'quantity': 0,
'stop_loss': position['stop_loss'],
'target_price': position['target_price']
}
# 检查止损
if current['low'] <= position['stop_loss']:
signal['type'] = 'STOP_LOSS'
signal['quantity'] = position['quantity']
return signal
# 检查获利目标
if current['high'] >= position['target_price']:
signal['type'] = 'TAKE_PROFIT'
signal['quantity'] = position['quantity']
return signal
# 检查加仓条件
if position['position_type'] == 'LONG':
if current['close'] > position['entry_price'] * 1.05: # 5%盈利时考虑加仓
signal['type'] = 'ADD'
signal['quantity'] = self._calculate_position_size(
current['close'], df['atr'].iloc[-1]
)
# 更新止损为前低
signal['stop_loss'] = df['low'].rolling(5).min().iloc[-1]
# 检查减仓条件
elif position['position_type'] == 'SHORT':
if current['close'] < position['entry_price'] * 0.95: # 5%盈利时考虑加仓
signal['type'] = 'REDUCE'
signal['quantity'] = self._calculate_position_size(
current['close'], df['atr'].iloc[-1]
)
# 更新止损为前高
signal['stop_loss'] = df['high'].rolling(5).max().iloc[-1]
return signal
def _calculate_entry_signals(self, current: pd.Series,
prev: pd.Series,
df: pd.DataFrame) -> Dict:
"""计算入场信号"""
signal = {
'type': None,
'price': current['close'],
'quantity': 0,
'stop_loss': None,
'target_price': None
}
atr = df['atr'].iloc[-1]
# 多头入场
if current['close'] > prev['high_20']:
signal['type'] = 'BUY'
signal['quantity'] = self._calculate_position_size(
current['close'], atr
)
signal['stop_loss'] = current['close'] - 2 * atr
signal['target_price'] = current['close'] + 4 * atr
# 空头入场
elif current['close'] < prev['low_10']:
signal['type'] = 'SELL'
signal['quantity'] = self._calculate_position_size(
current['close'], atr
)
signal['stop_loss'] = current['close'] + 2 * atr
signal['target_price'] = current['close'] - 4 * atr
return signal
def _calculate_atr(self, df: pd.DataFrame) -> pd.Series:
"""计算ATR指标"""
df['tr'] = np.maximum(
df['high'] - df['low'],
np.maximum(
abs(df['high'] - df['close'].shift(1)),
abs(df['low'] - df['close'].shift(1))
)
)
return df['tr'].rolling(20).mean()
def _calculate_position_size(self, price: float, atr: float) -> int:
"""计算持仓规模"""
risk_per_trade = 100000 * 0.01 # 假设账户规模100000每次风险1%
return int(risk_per_trade / (atr * 100))
class EmailManager:
def __init__(self, config_path: str = "email_config.json"):
with open(config_path) as f:
self.config = json.load(f)
def send_signal(self, stock_code: str, signal_type: str,
suggested_price: float, suggested_quantity: int,
stop_loss: float = None, target_price: float = None) -> bool:
"""发送交易信号邮件"""
subject = f"交易信号: {stock_code} - {signal_type}"
content = f"""
股票代码: {stock_code}
信号类型: {signal_type}
建议价格: {suggested_price}
建议数量: {suggested_quantity}
止损价位: {stop_loss if stop_loss else ''}
目标价位: {target_price if target_price else ''}
请回复实际成交价格和数量, 格式:
价格,数量
例如: 10.5,100
"""
return self._send_email(subject, content)
def send_position_update(self, position: Dict,
current_price: float) -> bool:
"""发送持仓更新邮件"""
subject = f"持仓更新: {position['stock_code']}"
# 计算收益
profit = (current_price - position['avg_cost']) * position['quantity']
profit_pct = (current_price / position['avg_cost'] - 1) * 100
content = f"""
股票代码: {position['stock_code']}
当前价格: {current_price}
持仓数量: {position['quantity']}
平均成本: {position['avg_cost']}
止损价位: {position['stop_loss']}
目标价位: {position['target_price']}
当前收益: {profit:.2f} ({profit_pct:.2f}%)
持仓类型: {position['position_type']}
"""
return self._send_email(subject, content)
def _send_email(self, subject: str, content: str) -> bool:
"""发送邮件的具体实现"""
try:
msg = MIMEText(content, 'plain', 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = self.config['sender']
msg['To'] = self.config['receiver']
with smtplib.SMTP_SSL(self.config['smtp_server'],
self.config['smtp_port']) as server:
server.login(self.config['username'], self.config['password'])
server.sendmail(self.config['sender'],
[self.config['receiver']],
msg.as_string())
return True
except Exception as e:
logging.error(f"发送邮件失败: {str(e)}")
return False
class TurtleTrader:
def __init__(self, config_path: str = "config.json"):
self.db = DatabaseManager()
self.strategy = TurtleStrategy()
self.email = EmailManager()
# 加载配置
with open(config_path) as f:
self.config = json.load(f)
def process_stock(self, stock_code: str):
"""处理单个股票"""
try:
# 获取当前持仓状态
position = self.db.get_position(stock_code)
# 获取股票数据
df = ak.stock_zh_a_hist(
symbol=stock_code,
period="daily",
start_date="20230101",
end_date=datetime.now().strftime("%Y%m%d"),
adjust="qfq"
)
# 计算信号
signal = self.strategy.calculate_signals(df, position)
current_price = df['close'].iloc[-1]
# 更新持仓的最新价格
if position:
self.db.update_position(stock_code, current_price)
# 定期发送持仓更新
self.email.send_position_update(position, current_price)
if signal['type']:
# 保存信号
signal_id = self.db.save_signal(
stock_code, signal['type'],
signal['price'], signal['quantity']
)
# 发送邮件
self.email.send_signal(
stock_code, signal['type'],
signal['price'], signal['quantity'],
signal['stop_loss'], signal['target_price']
)
except Exception as e:
logging.error(f"处理股票 {stock_code} 时发生错误: {str(e)}")
def process_feedback(self, signal_id: int, actual_price: float,
actual_quantity: int):
"""处理交易反馈并更新持仓状态"""
try:
# 获取原始信号
cursor = self.db.conn.execute('''
SELECT stock_code, signal_type, suggested_price
FROM signals WHERE id = ?
''', (signal_id,))
signal = cursor.fetchone()
if not signal:
raise ValueError(f"Signal ID {signal_id} not found")
stock_code, signal_type, suggested_price = signal
# 保存实际交易记录
self.db.save_trade(signal_id, actual_price, actual_quantity)
# 更新持仓状态
current_position = self.db.get_position(stock_code)
if signal_type in ['BUY', 'ADD']:
if current_position:
# 计算新的平均成本
total_cost = (current_position['avg_cost'] *
current_position['quantity'] +
actual_price * actual_quantity)
total_quantity = (current_position['quantity'] +
actual_quantity)
new_avg_cost = total_cost / total_quantity
# 更新持仓
self.db.conn.execute('''
UPDATE positions
SET quantity = ?, avg_cost = ?, last_price = ?,
timestamp = ?
WHERE id = ?
''', (total_quantity, new_avg_cost, actual_price,
datetime.now(), current_position['id']))
else:
# 创建新持仓
# 使用ATR计算止损和目标价位
df = self._get_stock_data(stock_code)
atr = self.strategy._calculate_atr(df).iloc[-1]
stop_loss = actual_price - 2 * atr
target_price = actual_price + 4 * atr
self.db.create_position(
stock_code, actual_quantity, actual_price,
'LONG', stop_loss, target_price
)
elif signal_type in ['SELL', 'REDUCE', 'STOP_LOSS', 'TAKE_PROFIT']:
if current_position:
remaining_quantity = (current_position['quantity'] -
actual_quantity)
if remaining_quantity > 0:
# 部分平仓
self.db.conn.execute('''
UPDATE positions
SET quantity = ?, last_price = ?, timestamp = ?
WHERE id = ?
''', (remaining_quantity, actual_price,
datetime.now(), current_position['id']))
else:
# 完全平仓
self.db.close_position(stock_code)
self.db.conn.commit()
except Exception as e:
logging.error(f"处理交易反馈时发生错误: {str(e)}")
raise
def _get_stock_data(self, stock_code: str, days: int = 30) -> pd.DataFrame:
"""获取股票历史数据"""
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
df = ak.stock_zh_a_hist(
symbol=stock_code,
period="daily",
start_date=start_date.strftime("%Y%m%d"),
end_date=end_date.strftime("%Y%m%d"),
adjust="qfq"
)
return df
class PerformanceAnalyzer:
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
def analyze(self) -> Dict:
"""分析交易和持仓表现"""
# 分析交易执行质量
trade_metrics = self._analyze_trade_execution()
# 分析持仓表现
position_metrics = self._analyze_positions()
return {
"trade_execution": trade_metrics,
"position_performance": position_metrics
}
def _analyze_trade_execution(self) -> Dict:
"""分析交易执行质量"""
cursor = self.db.conn.execute('''
SELECT s.stock_code, s.signal_type, s.suggested_price,
s.suggested_quantity, t.actual_price, t.actual_quantity
FROM signals s
JOIN trades t ON s.id = t.signal_id
WHERE s.status = 'EXECUTED'
''')
trades = cursor.fetchall()
if not trades:
return {"message": "没有足够的交易数据进行分析"}
# 计算关键指标
price_slippage = []
quantity_fill = []
execution_delay = []
for trade in trades:
price_diff = (trade[4] - trade[2]) / trade[2] * 100
quantity_diff = trade[5] / trade[3] * 100
price_slippage.append(price_diff)
quantity_fill.append(quantity_diff)
return {
"total_trades": len(trades),
"avg_price_slippage": np.mean(price_slippage),
"max_price_slippage": max(price_slippage),
"avg_quantity_fill": np.mean(quantity_fill),
"price_slippage_std": np.std(price_slippage)
}
def _analyze_positions(self) -> Dict:
"""分析持仓表现"""
cursor = self.db.conn.execute('''
SELECT stock_code, quantity, avg_cost, entry_price,
last_price, stop_loss, target_price, position_type,
timestamp, status
FROM positions
''')
positions = cursor.fetchall()
if not positions:
return {"message": "没有持仓数据进行分析"}
active_positions = []
closed_positions = []
total_profit = 0
win_count = 0
for pos in positions:
profit = (pos[4] - pos[2]) * pos[1] # (last_price - avg_cost) * quantity
profit_pct = (pos[4] / pos[2] - 1) * 100
if pos[9] == 'ACTIVE':
active_positions.append({
'stock_code': pos[0],
'profit': profit,
'profit_pct': profit_pct
})
else:
closed_positions.append({
'stock_code': pos[0],
'profit': profit,
'profit_pct': profit_pct
})
if profit > 0:
win_count += 1
total_profit += profit
return {
"active_positions": len(active_positions),
"closed_positions": len(closed_positions),
"total_profit": total_profit,
"win_rate": win_count / len(closed_positions) if closed_positions else 0,
"avg_profit_active": np.mean([p['profit'] for p in active_positions]) if active_positions else 0,
"avg_profit_closed": np.mean([p['profit'] for p in closed_positions]) if closed_positions else 0,
"best_position": max([p['profit_pct'] for p in active_positions + closed_positions]) if positions else 0,
"worst_position": min([p['profit_pct'] for p in active_positions + closed_positions]) if positions else 0
}
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='turtle_trader.log'
)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
try:
trader = TurtleTrader()
analyzer = PerformanceAnalyzer(trader.db)
while True:
# 处理所有配置的股票
for stock_code in trader.config['stock_codes']:
trader.process_stock(stock_code)
# 定期进行性能分析
if datetime.now().hour == 15: # 每天收盘后进行分析
analysis = analyzer.analyze()
logging.info(f"每日性能分析报告: {json.dumps(analysis, indent=2)}")
# 等待下一个检查周期
time.sleep(trader.config['check_interval'])
except KeyboardInterrupt:
logging.info("系统正常关闭")
except Exception as e:
logging.error(f"系统运行出错: {str(e)}")
raise
if __name__ == "__main__":
main()

BIN
回测/海龟.xmind Executable file

Binary file not shown.

BIN
回测/海龟回测 1.xmind Executable file

Binary file not shown.

BIN
回测/海龟实时.xmind Executable file

Binary file not shown.