@asus-rog Does anyone have an idea why adding a HeikinAshi indicator breaks this script?

Does it have to do with HA setting a seed?

For data produced with HA commented out and HA uncommented, see results data screenshots at the end of this post.

def prenext(self):
# seed recursive value
self.lines.ha_open[0] = (self.data.open[0] + self.data.close[0]) / 2.0

If you want to reproduce it, here's a minimal example script. Run it once with these lines commented out and once uncommented:

self.ha = bt.ind.HeikinAshi(self.datas[0])
self.ha.csv = True

IMPORTANT: YOU NEED TO CREATE A DATA FEED WITH YOUR DATA FOR IT TO WORK!

Please adapt this accordingly:

# Create a Data Feed
data = bt.feeds.GenericCSVData(
dataname='Historical data/es-1m.csv',
separator=";",
fromdate=datetime.datetime(2012, 5, 1),
todate=datetime.datetime(2012, 5, 31),
dtformat=("%d/%m/%Y"),
timeformat=("%H:%M:%S"),
time=1,
datetime=0,
high=3,
low=4,
open=2,
close=5,
volume=6,
openinterest=-1,
timeframe=bt.TimeFrame.Minutes
)
import datetime # For datetime objects
import backtrader as bt
import math
import matplotlib
matplotlib.use('QT5Agg')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
class ZigZag(bt.ind.PeriodN):
'''
Identifies Peaks/Troughs of a timeseries
'''
lines = (
'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago',
'zigzag_peak', 'zigzag_valley', 'zigzag', 'last_zigzag',
)
# Fancy plotting name
# plotlines = dict(logreturn=dict(_name='log_ret'))
plotinfo = dict(
subplot=False,
plotlinelabels=True, plotlinevalues=True, plotvaluetags=True,
)
plotlines = dict(
trend=dict(marker='', markersize=0.0, ls='', _plotskip=True),
last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True),
last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True),
last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True),
zigzag_peak=dict(marker='v', markersize=4.0, color='red',
fillstyle='full', ls=''),
zigzag_valley=dict(marker='^', markersize=4.0, color='red',
fillstyle='full', ls=''),
zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True),
last_zigzag=dict(_name='last_zigzag', color='blue', ls='--', _skipnan=True),
)
# update value to standard for Moving Averages
params = (
('period', 2),
('up_retrace', 0.1),
('dn_retrace', 0.1),
('bardist', 0.015), # distance to max/min in absolute perc
)
def __init__(self):
super(ZigZag, self).__init__()
if not self.p.up_retrace:
raise ValueError('Upward retracement should not be zero.')
if not self.p.dn_retrace:
raise ValueError('Downward retracement should not be zero.')
if self.p.up_retrace < 0:
self.p.up_retrace = -self.p.up_retrace
if self.p.dn_retrace > 0:
self.p.dn_retrace = -self.p.dn_retrace
self.p.up_retrace = self.p.up_retrace / 100
self.p.dn_retrace = self.p.dn_retrace / 100
self.missing_val = float('nan')
def prenext(self):
self.lines.trend[0] = 0
self.lines.last_pivot_t[0] = 0
self.lines.last_pivot_x[0] = self.data[0]
self.lines.last_pivot_ago[0] = 0
self.lines.zigzag_peak[0] = self.missing_val
self.lines.zigzag_valley[0] = self.missing_val
self.lines.zigzag[0] = self.missing_val
self.lines.last_zigzag[0] = self.missing_val
def next(self):
data = self.data
trend = self.lines.trend
last_pivot_t = self.lines.last_pivot_t
last_pivot_x = self.lines.last_pivot_x
last_pivot_ago = self.lines.last_pivot_ago
zigzag_peak = self.lines.zigzag_peak
zigzag_valley = self.lines.zigzag_valley
zigzag = self.lines.zigzag
last_zigzag = self.lines.last_zigzag
x = data[0]
r = x / last_pivot_x[-1] - 1
curr_idx = len(data) - 1
trend[0] = trend[-1]
last_pivot_x[0] = last_pivot_x[-1]
last_pivot_t[0] = last_pivot_t[-1]
last_pivot_ago[0] = curr_idx - last_pivot_t[0]
zigzag_peak[0] = self.missing_val
zigzag_valley[0] = self.missing_val
zigzag[0] = self.missing_val
last_zigzag[0] = x
if trend[-1] == 0:
if r >= self.p.up_retrace:
piv = last_pivot_x[0] * (1 - self.p.bardist)
zigzag_valley[-int(last_pivot_ago[0])] = piv
zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0]
trend[0] = 1
last_pivot_x[0] = x
last_pivot_t[0] = curr_idx
elif r <= self.p.dn_retrace:
piv = last_pivot_x[0] * (1 + self.p.bardist)
zigzag_peak[-int(last_pivot_ago[0])] = piv
zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0]
trend[0] = -1
last_pivot_x[0] = x
last_pivot_t[0] = curr_idx
elif trend[-1] == -1:
if r >= self.p.up_retrace:
piv = last_pivot_x[0] * (1 - self.p.bardist)
zigzag_valley[-int(last_pivot_ago[0])] = piv
zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0]
trend[0] = 1
last_pivot_x[0] = x
last_pivot_t[0] = curr_idx
elif x < last_pivot_x[-1]:
last_pivot_x[0] = x
last_pivot_t[0] = curr_idx
elif trend[-1] == 1:
if r <= self.p.dn_retrace:
piv = last_pivot_x[0] * (1 + self.p.bardist)
zigzag_peak[-int(last_pivot_ago[0])] = piv
zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0]
trend[0] = -1
last_pivot_x[0] = x
last_pivot_t[0] = curr_idx
elif x > last_pivot_x[-1]:
last_pivot_t[0] = curr_idx
last_pivot_x[0] = x
idx = 1
while idx < len(self.zigzag) and math.isnan(zigzag[-idx]):
last_zigzag[-idx] = self.missing_val
idx += 1
if idx < len(self.data):
last_zigzag[-idx] = zigzag[-idx]
class TestStrategy(bt.Strategy):
def log(self, txt, dt=None):
''' Logging function fot this strategy'''
dt = dt or self.datas[0].datetime.datetime(0)
print('%s, %s' % (dt.isoformat(), txt))
def __init__(self):
# Keep a reference to the "close" line in the data[0] dataseries
self.dataclose = self.datas[0].close
# Add indicators
# In main plot
self.zz = ZigZag(self.data, plotname='ZZ', period=2, up_retrace=0.2, dn_retrace=0.2)
self.zz.csv = True
self.ha = bt.ind.HeikinAshi(self.datas[0])
self.ha.csv = True
if __name__ == '__main__':
cerebro = bt.Cerebro()
cerebro.addobserver(bt.observers.BuySell, barplot=True, bardist=0.0005)
# Add a strategy
cerebro.addstrategy(TestStrategy)
# Create a Data Feed
data = bt.feeds.GenericCSVData(
dataname='Historical data/es-1m.csv',
separator=";",
fromdate=datetime.datetime(2012, 5, 1),
todate=datetime.datetime(2012, 5, 31),
dtformat=("%d/%m/%Y"),
timeformat=("%H:%M:%S"),
time=1,
datetime=0,
high=3,
low=4,
open=2,
close=5,
volume=6,
openinterest=-1,
timeframe=bt.TimeFrame.Minutes
)
# Add the Data Feed to Cerebro
cerebro.adddata(data)
# Set our desired cash start
cerebro.broker.setcash(100000.0)
# Add a writer to put out csv file
cerebro.addwriter(bt.WriterFile, csv=True, out='strategy_results')
# Run over everything
cerebro.run()
# Plot the result
cerebro.plot(style='candlestick',
iplot = False)
# Safe result files
results = pd.read_csv("strategy_results", sep=",", header=1)
split_row = results.index[results['Id'].str.contains('=====', na=False)][0]
results_data = results.iloc[:split_row,:]
results_data.to_csv("results_data")
summary = results.iloc[split_row:,:]
summary.to_csv("summary")

Note how it seems to break the columns zigzag_peak, zigzag_valley, zigzag and last_zigzag.

Due to this zigzag_peak, zigzag_valley, zigzag become all nan, see below.

I presume that it breaks the first row output for the ZZ script, breaking all subsequent lines.

WITHOUT HeikinAshi, it works:

Screenshot 2022-05-20 174413.png

WITH HeikinAshi, it's broken:

Screenshot 2022-05-20 174621_2.png

Any help is greatly appreciated :)