Thanks. Here it is:
optimize_all.py
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os.path
import sys
import argparse
import datetime
from os import listdir
import numpy as np
import backtrader as bt
from backtrader.analyzers import (SQN, AnnualReturn, TimeReturn, SharpeRatio,
TradeAnalyzer)
from strategies.strategies import *
from sizers import MaxRiskSizer
from commissions import DegiroCommission
from heatmap import my_heatmap
def runstrategy():
args = parse_args()
# Get the dates from the args
fromdate = datetime.datetime.strptime(args.fromdate, '%Y-%m-%d')
todate = datetime.datetime.strptime(args.todate, '%Y-%m-%d')
modpath = os.path.dirname(os.path.abspath(sys.argv[0]))
datapath = os.path.join(modpath, args.data)
final_results_list = []
for file in listdir(datapath):
path = os.path.join(datapath, f'{file}')
data = bt.feeds.YahooFinanceCSVData(
dataname=path,
name=file,
fromdate=fromdate,
todate=todate
)
cerebro = bt.Cerebro(optreturn=False)
cerebro.adddata(data)
cerebro.optstrategy(
EmaCrossLongShort,
#fast=args.fast_period,
#slow=args.slow_period,
fast=range(12,14),
slow=range(47,49),
longonly=args.longonly
)
cerebro.broker.set_coc(True)
cerebro.broker.setcash(args.cash)
cerebro.addsizer(MaxRiskSizer)
comminfo = DegiroCommission()
cerebro.broker.addcommissioninfo(comminfo)
tframes = dict(
days=bt.TimeFrame.Days,
weeks=bt.TimeFrame.Weeks,
months=bt.TimeFrame.Months,
years=bt.TimeFrame.Years)
# Add the Analyzers
cerebro.addanalyzer(SQN)
if args.legacyannual:
cerebro.addanalyzer(AnnualReturn, _name='time_return')
cerebro.addanalyzer(SharpeRatio, legacyannual=True)
else:
cerebro.addanalyzer(TimeReturn, _name='time_return', timeframe=tframes[args.tframe])
cerebro.addanalyzer(SharpeRatio, timeframe=tframes[args.tframe])
cerebro.addanalyzer(TradeAnalyzer)
optimized_runs = cerebro.run()
results_list = []
for run in optimized_runs:
for strategy in run:
PnL = round(strategy.broker.get_value() - args.cash, 2)
my_dict = strategy.analyzers.time_return.get_analysis()
annual_returns = [v for _, v in my_dict.items()]
average_annual_return = sum(annual_returns) / len(annual_returns)
results_list.append([
strategy.params.fast,
strategy.params.slow,
PnL,
round(average_annual_return*100, 2)
])
final_results_list.append(results_list)
# Average results for the different data feeds
arr = np.array(final_results_list)
final_results_list = [[int(val) if val.is_integer() else round(val, 2) for val in i] for i in arr.mean(0)]
if args.plot:
my_heatmap(final_results_list)
def parse_args():
parser = argparse.ArgumentParser(description='TimeReturn')
parser.add_argument('--data', '-d',
default='data/indexes/',
help='data to add to the system')
parser.add_argument('--fromdate', '-f',
default='2005-01-01',
help='Starting date in YYYY-MM-DD format')
parser.add_argument('--todate', '-t',
default='2006-12-31',
help='Starting date in YYYY-MM-DD format')
parser.add_argument('--fast_period', default=13, type=int,
help='Period to apply to the Exponential Moving Average')
parser.add_argument('--slow_period', default=48, type=int,
help='Period to apply to the Exponential Moving Average')
parser.add_argument('--longonly', '-lo', action='store_true',
help='Do only long operations')
group = parser.add_mutually_exclusive_group()
group.add_argument('--tframe', default='years', required=False,
choices=['days', 'weeks', 'months', 'years'],
help='TimeFrame for the returns/Sharpe calculations')
group.add_argument('--legacyannual', action='store_true',
help='Use legacy annual return analyzer')
parser.add_argument('--cash', default=1500, type=int,
help='Starting Cash')
parser.add_argument('--plot', '-p', action='store_true',
help='Plot the read data')
parser.add_argument('--numfigs', '-n', default=1,
help='Plot using numfigs figures')
parser.add_argument('--optimize', '-opt', default=1,
help='Plot using numfigs figures')
return parser.parse_args()
if __name__ == '__main__':
runstrategy()
strategies.py
import backtrader as bt
import backtrader.indicators as btind
class TestStrategy(bt.Strategy):
'''
Buy when there are two consecutive red bars and sell five bars later
'''
def log(self, txt, dt=None):
''' Logging function for this strategy'''
dt = dt or self.datas[0].datetime.date(0)
print('%s, %s' % (dt.isoformat(), txt))
def __init__(self):
# Keep a reference to the "close" line in the data[0] dataseries
self.dataclose = self.datas[0].close
# Keep track of pending orders
self.order = None
self.buyprice = None
self.buycomm = None
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
# Buy/Sell order submitted/accepted to/by broker - Nothing to do
return
# Check if an order has been completed
# Attention: broker could reject order if not enough cash
if order.status in [order.Completed]:
if order.isbuy():
self.log('BUY EXECUTED, Price: %.2f, Cost: %.2f, Commission: %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))
self.buyprice = order.executed.price
self.buycomm = order.executed.comm
elif order.issell():
self.log('SELL EXECUTED, Price: %.2f, Cost: %.2f, Commission: %.2f' %
(order.executed.price,
order.executed.value,
order.executed.comm))
self.bar_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Order Canceled/Margin/Rejected')
self.order = None
def notify_trade(self, trade):
if not trade.isclosed:
return
self.log('OPERATION PROFIT, GROSS %.2f, NET %.2f' %
(trade.pnl, trade.pnlcomm))
def next(self):
if self.order:
return
if not self.position:
# Not yet in the market... we MIGHT BUY if...
if self.dataclose[0] < self.dataclose[-1]:
if self.dataclose[-1] < self.dataclose[-2]:
self.log('BUY CREATE, %.2f' % self.dataclose[0])
self.order = self.buy()
else:
# Already in the market... we might sell
if len(self) >= (self.bar_executed + 5):
self.log('SELL CREATE, %.2f' % self.dataclose[0])
# Keep track of the created order to avoid a 2nd order
self.order = self.sell()
class SMAcrossover(bt.Strategy):
params = (('fast', 20), ('slow', 50),)
def log(self, txt, dt=None):
dt = dt or self.datas[0].datetime.date(0)
#print(f'{dt.isoformat()} {txt}') # Comment this line when running optimization
def __init__(self):
self.dataclose = self.datas[0].close
self.order = None
fast_sma, slow_sma = bt.ind.SMA(period=self.p.fast), bt.ind.SMA(period=self.p.slow)
self.crossover = bt.indicators.CrossOver(fast_sma, slow_sma)
#self.signal_add(bt.SIGNAL_LONGSHORT, bt.ind.CrossOver(sma1, sma2))
def notify_trade(self, trade):
if not trade.isclosed:
return
self.log(f'GROSS {trade.pnl:.2f}, NET {trade.pnlcomm:.2f}')
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
# An active Buy/Sell order has been submitted/accepted - Nothing to do
return
# Check if an order has been completed
# Attention: broker could reject order if not enough cash
if order.status in [order.Completed]:
if order.isbuy():
self.log(f'BUY EXECUTED, {order.executed.price:.2f}')
elif order.issell():
self.log(f'SELL EXECUTED, {order.executed.price:.2f}')
self.bar_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Order Canceled/Margin/Rejected')
# Reset orders
self.order = None
def next(self):
# Check for open orders
if self.order:
return
if self.crossover > 0:
self.log(f'BUY CREATE {self.dataclose[0]:.2f}')
self.order = self.buy()
elif self.crossover < 0:
self.log(f'SELL CREATE {self.dataclose[0]:.2f}')
self.order = self.sell()
class EmaCrossLongShort(bt.Strategy):
'''This strategy buys/sells upong the close price crossing
upwards/downwards an Exponential Moving Average.
It can be a long-only strategy by setting the param "longonly" to True
'''
params = dict(
fast=13,
slow=48,
printout=True,
longonly=False,
)
def log(self, txt, dt=None):
if self.p.printout:
dt = dt or self.data.datetime[0]
dt = bt.num2date(dt)
print(f'{dt.isoformat()}, {txt}')
def __init__(self):
self.orderid = None # to control operation entries
fast_ema, slow_ema = btind.MovAv.EMA(period=self.p.fast), btind.MovAv.EMA(period=self.p.slow)
self.signal = btind.CrossOver(fast_ema, slow_ema)
self.log(f'Initial portfolio value of {self.broker.get_value():.2f}\n')
def start(self):
pass
def next(self):
if self.orderid:
return # if an order is active, no new orders are allowed
if self.signal > 0.0: # cross upwards
if self.position:
self.log(f'CLOSE SHORT position of {abs(self.position.size)} shares '
f'at {self.data.close[0]:.2f}')
self.close()
self.log(f'BUY {self.getsizing()} shares at {self.data.close[0]}')
self.buy()
elif self.signal < 0.0:
if self.position:
self.log(f'CLOSE LONG position of {self.position.size} shares '
f'at {self.data.close[0]:.2f}')
self.close()
if not self.p.longonly:
self.log(f'SELL {abs(self.getsizing())} shares at '
f'{self.data.close[0]}')
self.sell()
def notify_order(self, order):
if order.status in [bt.Order.Submitted, bt.Order.Accepted]:
return # Await further notifications
if order.status == order.Completed:
if order.isbuy():
buytxt = f'BUY COMPLETED. ' \
f'Size: {order.executed.size}, ' \
f'Price: {order.executed.price:.2f}, ' \
f'Commission: {order.executed.comm:.2f}'
self.log(buytxt, order.executed.dt)
else:
selltxt = 'SELL COMPLETED. ' \
f'Size: {abs(order.executed.size)}, ' \
f'Price: {order.executed.price:.2f}, ' \
f'Commission: {order.executed.comm:.2f}'
self.log(selltxt, order.executed.dt)
elif order.status in [order.Expired, order.Canceled, order.Margin]:
self.log(f'{order.Status[order.status]}')
pass # Simply log
# Allow new orders
self.orderid = None
def notify_trade(self, trade):
if trade.isclosed:
self.log(f'TRADE COMPLETED, '
f'Portfolio: {self.broker.get_value():.2f}, '
f'Gross: {trade.pnl:.2f}, '
f'Net: {trade.pnlcomm:.2f}')
elif trade.justopened:
#self.log('TRADE OPENED, SIZE %2d' % trade.size)
pass
sizers.py
import backtrader as bt
class MaxRiskSizer(bt.Sizer):
params = (('risk', 0.95),)
def __init__(self):
if self.p.risk > 1 or self.p.risk < 0:
raise ValueError('The risk parameter is a percentage which must be'
'entered as a float. e.g. 0.5')
def _getsizing(self, comminfo, cash, data, isbuy):
#return comminfo.getsize(data.close[0], self.broker.get_value())
return round(self.broker.get_value() * self.p.risk / data.close[0])
class MaxRiskSizer2(bt.Sizer):
params = (('risk', 0.98),)
def __init__(self):
if self.p.risk > 1 or self.p.risk < 0:
raise ValueError('The risk parameter is a percentage which must be'
'entered as a float. e.g. 0.5')
def _getsizing(self, comminfo, cash, data, isbuy):
position = self.broker.getposition(data)
if not position:
size = comminfo.getsize(data.close[0], cash * self.p.risk)
else:
size = position.size
return size
class FixedRerverser(bt.Sizer):
'''This would remove the burden from the Strategy to decide if a position
has to be reversed or opened, the Sizer is in control and can at any time
be replaced without affecting the logic.
'''
def _getsizing(self, comminfo, cash, data, isbuy):
position = self.broker.getposition(data)
size = self.p.stake * (1 + (position.size != 0))
return size
class LongOnly(bt.Sizer):
params = (('stake', 1),)
def _getsizing(self, comminfo, cash, data, isbuy):
if isbuy:
return self.p.stake
# Sell situation
position = self.broker.getposition(data)
if not position.size:
return 0 # do not sell if nothing is open
return self.p.stake
commissions.py
import backtrader as bt
class DegiroCommission(bt.CommInfoBase):
params = (('per_share', 0.004), ('flat', 0.5),)
def _getcommission(self, size, price, pseudoexec):
return self.p.flat + abs(size) * self.p.per_share
heatmap.py
import datetime
import os.path
import sys
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.patches import Rectangle
def my_heatmap(data):
data = np.array(data)
xs = np.unique(data[:, 1].astype(int))
ys = np.unique(data[:, 0].astype(int))
vals = data[:, 3].reshape(len(ys), len(xs))
min_val_ndx = np.unravel_index(np.argmin(vals, axis=None), vals.shape)
max_val_ndx = np.unravel_index(np.argmax(vals, axis=None), vals.shape)
cmap = LinearSegmentedColormap.from_list('', ['red', 'orange', 'yellow', 'chartreuse', 'limegreen'])
ax = sns.heatmap(vals, xticklabels=xs, yticklabels=ys, cmap=cmap, annot=True, fmt='.2f')
ax.add_patch(Rectangle(min_val_ndx[::-1], 1, 1, fill=False, edgecolor='blue', lw=3, clip_on=False))
ax.add_patch(Rectangle(max_val_ndx[::-1], 1, 1, fill=False, edgecolor='blue', lw=3, clip_on=False))
plt.tight_layout()
plt.show()