Zigzag indicator
-
I'm new, Can anyone help me ? /zigzag indicator/
http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:zigzag
https://github.com/joshuaulrich/TTR/blob/master/src/zigzag.c
@backtrader @ab_traderclass Zigzag(bt.Indicator): lines = ('zigzag',) plotinfo = dict(subplot=False) params = ( ('change', 40), ('use_percent', False), ('use_retrace', False), ('use_last_ex', False), ) def __init__(self): self.reference = { 'price': None, 'index': None, } self.inflection = { 'price': None, 'index': None, } if self.params.use_percent: self.change = self.params.change / 100.0 else: self.change = self.params.change self.reference['price'] = (self.data.high.array[0] + self.data.low.array[0]) / 2 self.reference['index'] = 0 self.inflection['price'] = (self.data.high.array[0] + self.data.low.array[0]) / 2 self.inflection['index'] = 1 self.extreme_min = 0.0 self.extreme_max = 0.0 self.local_min = 0.0 self.local_max = 0.0 self.signal = 0 self.i = 0 def next(self): self.i = len(self.data.open)-1 # Initialize all zigzag values to NA self.zigzag.array.append(NAN) if self.params.use_percent: #If % change given (absolute move) self.extreme_min = self.inflection['price'] * (1.0 - self.change) self.extreme_max = self.inflection['price'] * (1.0 + self.change) else: #If $ change given (only absolute moves make sense) self.extreme_min = self.inflection['price'] - self.change self.extreme_max = self.inflection['price'] + self.change #Find local maximum and minimum self.local_max = self.inflection['price'] if self.inflection['price'] > self.data.high.array[self.i] else self.data.high.array[self.i] self.local_min = self.inflection['price'] if self.inflection['price'] < self.data.low.array[self.i] else self.data.low.array[self.i] ##Find first trend if self.signal == 0: if self.params.use_retrace: #Retrace prior move self.signal = 1 if (self.inflection['price'] >= self.reference['price']) else -1 else: #Absolute move if self.local_min <= self.extreme_min: #Confirmed Downtrend self.signal = -1 if self.local_max >= self.extreme_max: #Confirmed Uptrend self.signal = 1 ##Downtrend if self.signal == -1: #New Minimum if self.data.low.array[self.i] == self.local_min: #Last Extreme if self.params.use_last_ex: self.inflection['price'] = self.data.low.array[self.i] self.inflection['index'] = self.i else: #First Extreme if (self.data.low.array[self.i] != self.data.low[-1]): self.inflection['price'] = self.data.low.array[self.i] self.inflection['index'] = self.i #Retrace prior move if self.params.use_retrace: self.extreme_max = self.inflection['price'] +((self.reference['price'] - self.inflection['price']) * self.change) #Trend Reversal if self.data.high.array[self.i] >= self.extreme_max: self.zigzag[self.reference['index']] = self.reference['price'] self.reference = copy.copy(self.inflection) self.inflection['price'] = self.data.high.array[self.i] self.inflection['index'] = self.i self.signal = 1 #Uptrend if self.signal == 1: #New Maximum if self.data.high.array[self.i] == self.local_max: #Last Extreme if self.params.use_last_ex: self.inflection['price'] = self.data.high.array[self.i] self.inflection['index'] = self.i else: #First Extreme if (self.data.high.array[self.i] != self.data.high[-1]): self.inflection['price'] = self.data.high.array[self.i] self.inflection['index'] = self.i #Retrace prior move if self.params.use_retrace: self.extreme_min = self.inflection['price'] -((self.inflection['price'] - self.reference['price']) * self.change) #Trend Reversal if (self.data.low.array[self.i] <= self.extreme_min): self.zigzag[self.reference['index']] = self.reference['price'] self.reference = copy.copy(self.inflection) self.inflection['price'] = self.data.low.array[self.i] self.inflection['index'] = self.i self.signal = -1 self.i = self.i + 1 self.zigzag[self.reference['index']] = self.reference['price'] self.zigzag[self.inflection['index']] = self.inflection['price']
-
Here is some code that implements the indicator. I couldn't figure out how to draw the lines between the turning points though. Instead, the indicator is displaying a light blue symbol above/below the turning points.
Note: This should be considered as a study, not an indicator (similar to the Fractal study), as it peaks into the future.
import backtrader.indicators as btind import numpy as np class ZigZag(btind.Indicator): ''' Identifies Peaks/Troughs of a timeseries ''' lines = ('trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago', 'zigzag_peak', 'zigzag_valley', 'zigzag') # Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict(subplot=False, plotlinelabels=False, plotlinevalues=False, plotvaluetags=False, plot=True) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='lightblue', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='lightblue', fillstyle='full', ls=''), zigzag=dict(color='lightblue', ls='--') ) # update value to standard for Moving Averages params = ( ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if self.p.up_retrace == 0: raise ValueError('Upward retracement should not be zero.') if self.p.dn_retrace == 0: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.addminperiod(2) self.missing_val = np.nan def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx
Use this code to create the study:
self.zigzag = ZigZag(self.datas[0], up_retrace=5, dn_retrace=-5, plot=True)
The following lines are created:
- trend: {-1, 0, 1}
- last_pivot_t: index of last turning point
- last_pivot_x: price of last turning point
- last_pivot_ago: number of bars since last turning point
- zigzag_peak: contains value at each peak (displayed above the turning point)
- zigzag_valley: contains value at each valley (displayed under the turning point)
- zigzag: zigzag_peak + zigzag_valley (points to connect with lines to display the zigzag, but couldn't figure out how to plot it)
-
@usmacscientist said in Zigzag indicator:
I couldn't figure out how to draw the lines between the turning points though
It wasn't possible (using the integrated features) until today. See the result below (although with
lightblue
things are difficult to see and that's why I marked the turning points withred
for the sample)The key is adding now
_skipnan = True
, to the options of a plotline. This is becausenan
values simply interrupt plotting. This is actually adequate for many things, but prevents things like having just two points which have to be joined together like in this case.Some cosmetic changes including the
_skipnan
for the linezigzag
(and removing thenumpy
dependency)Available in the
development
branch with this commit: https://github.com/backtrader/backtrader/commit/0b8d6aea7ba009d9d77e8d397bb696fb40f75d2eclass ZigZag(bt.ind.PeriodN): ''' Identifies Peaks/Troughs of a timeseries ''' lines = ( 'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago', 'zigzag_peak', 'zigzag_valley', 'zigzag', ) # Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='lightblue', ls='--', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('NaN') def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = piv trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_id
-
@backtrader Great! Thanks.
-
Modified version of the code so the lines are drawn on the peak/valley points:
class ZigZag(bt.ind.PeriodN): ''' Identifies Peaks/Troughs of a timeseries ''' lines = ( 'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago', 'zigzag_peak', 'zigzag_valley', 'zigzag', ) # Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('NaN') def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_t[0] = curr_idx
Is there a way to plot the last line (between last validated turning point and last price)? I can't figure out how to set the value of the indictor on the last bar to the last price.
-
@usmacscientist said in Zigzag indicator:
Is there a way to plot the last line (between last validated turning point and last price)? I can't figure out how to set the value of the indictor on the last bar to the last price.
There is no line because the indicator sets no value. And that's because the indicator has no way of knowing that the data feed has finished.
Indicators are meant to be as stupid as possible and simply perform an operation (and do so in an idempotent manner). This is the reason, for example, why indicators carry no
datetime
payload.A potential trick: set always
zigzag
to the lastprice and invalidate it in the next cycle if not valid. At the end of the stream there will be no next chance to invalidate it. -
I managed to get it to work. Here is the code:
class ZigZag(bt.ind.PeriodN): ''' Identifies Peaks/Troughs of a timeseries ''' lines = ( 'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago', 'zigzag_peak', 'zigzag_valley', 'zigzag', 'last_zigzag', ) # Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), last_zigzag=dict(_name='last_zigzag', color='blue', ls='--', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('nan') def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val self.lines.last_zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag last_zigzag = self.lines.last_zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val last_zigzag[0] = x if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_t[0] = curr_idx last_pivot_x[0] = x idx = 1 while idx < len(self.zigzag) and math.isnan(zigzag[-idx]): last_zigzag[-idx] = self.missing_val idx += 1 if idx < len(self.data): last_zigzag[-idx] = zigzag[-idx]
-
Reaaaally impressed ! Nice contribution guys :D
-
I try to use Zigzag function to check W-bottom, M-top. A quick question, it seems you are setting indicator value by future data. For example, at day 11, you detected day 10 is valley-pivot and mark day 10 as valley.
-
i try to run you refer code above, but this python code return bellow. any suggestion?
RecursionError: maximum recursion depth exceeded while calling a Python object
this my source code:
from datetime import datetime
import backtrader as btclass ZigZag(bt.ind.PeriodN):
lines = (
'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago',
'zigzag_peak', 'zigzag_valley', 'zigzag', 'last_zigzag',
)# Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), last_zigzag=dict(_name='last_zigzag', color='blue', ls='--', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('nan') self.zigzag = ZigZag() def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val self.lines.last_zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag last_zigzag = self.lines.last_zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val last_zigzag[0] = x if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_t[0] = curr_idx last_pivot_x[0] = x idx = 1 while idx < len(self.zigzag) and math.isnan(zigzag[-idx]): last_zigzag[-idx] = self.missing_val idx += 1 if idx < len(self.data): last_zigzag[-idx] = zigzag[-idx]
cerebro = bt.Cerebro()
data = bt.feeds.YahooFinanceData(dataname='MSFT', fromdate=datetime(2011, 1, 1),
todate=datetime(2012, 12, 31))
cerebro.adddata(data)cerebro.addstrategy(ZigZag)
cerebro.run()
cerebro.plot() -
or any one,
can you share complete code of the zigzag indicator i really needit to try and evaluate my trades history using python and backtrader? please
Than you -
I haven't looked at this one, but perhaps this one will help? I tried to tighten an existing zigzag indicator up:
https://community.backtrader.com/topic/1390/zigzag-indicator
-
@yogi-wahyu said in Zigzag indicator:
any suggestion?
Yes. Read and pay attention to the details.
For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/
This is at the top of each page and if not used, renders your code basically as something which cannot be read.
In any case ...
@yogi-wahyu said in Zigzag indicator:
cerebro.addstrategy(ZigZag)
An
Indicator
is NOT aStrategy
. You probably want to look at the Quickstart to see how things work. -
oke, Thans for helping me, and it work complete.
:)this is my fully code:
from datetime import datetime
import backtrader as bt
import math
%matplotlib notebookclass ZigZag(bt.ind.PeriodN):
'''
Identifies Peaks/Troughs of a timeseries
'''
lines = (
'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago',
'zigzag_peak', 'zigzag_valley', 'zigzag', 'last_zigzag',
)# Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), last_zigzag=dict(_name='last_zigzag', color='blue', ls='--', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('nan') def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val self.lines.last_zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag last_zigzag = self.lines.last_zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val last_zigzag[0] = x if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_t[0] = curr_idx last_pivot_x[0] = x idx = 1 while idx < len(self.zigzag) and math.isnan(zigzag[-idx]): last_zigzag[-idx] = self.missing_val idx += 1 if idx < len(self.data): last_zigzag[-idx] = zigzag[-idx]
class SmaCross(bt.SignalStrategy):
params = (('pfast', 10), ('pslow', 30),)
def init(self):
sma1, sma2 = bt.ind.SMA(period=self.p.pfast), bt.ind.SMA(period=self.p.pslow)
self.signal_add(bt.SIGNAL_LONG, bt.ind.CrossOver(sma1, sma2))cerebro = bt.Cerebro()
data = bt.feeds.YahooFinanceData(dataname='MSFT', fromdate=datetime(2011, 1, 1),
todate=datetime(2012, 12, 31))
cerebro.adddata(data)cerebro.addstrategy(SmaCross)
cerebro.addindicator(ZigZag)
cerebro.run()
cerebro.plot(style='candlestick') -
@USMacScientist HI! I am few days new to this.
I have your zig zag called in strategy class like any other indicator in quickstart guide. The chart produced is great! I appreciate you sharing your code. I produce the chart as well which is great.
self.zz = ZigZag(self.data, plotname='ZZ')
I like to humbly ask you how do I access your other lines in this study such as last_pivor_t etc?
I am trying to access the values stored in these lines. Thanks! -
@asus-rog Have you tried to use it for 2 timeframes? I've tried with resampling in backtrader, resampling in pandas, but nothing helps. Everytime I've got just Nan.
-
@asus-rog Does anyone have an idea why adding a HeikinAshi indicator breaks this script?
Does it have to do with HA setting a seed?
For data produced with HA commented out and HA uncommented, see results data screenshots at the end of this post.def prenext(self): # seed recursive value self.lines.ha_open[0] = (self.data.open[0] + self.data.close[0]) / 2.0
If you want to reproduce it, here's a minimal example script. Run it once with these lines commented out and once uncommented:
self.ha = bt.ind.HeikinAshi(self.datas[0]) self.ha.csv = True
IMPORTANT: YOU NEED TO CREATE A DATA FEED WITH YOUR DATA FOR IT TO WORK!
Please adapt this accordingly:# Create a Data Feed data = bt.feeds.GenericCSVData( dataname='Historical data/es-1m.csv', separator=";", fromdate=datetime.datetime(2012, 5, 1), todate=datetime.datetime(2012, 5, 31), dtformat=("%d/%m/%Y"), timeformat=("%H:%M:%S"), time=1, datetime=0, high=3, low=4, open=2, close=5, volume=6, openinterest=-1, timeframe=bt.TimeFrame.Minutes )
import datetime # For datetime objects import backtrader as bt import math import matplotlib matplotlib.use('QT5Agg') import matplotlib.pyplot as plt import pandas as pd import numpy as np class ZigZag(bt.ind.PeriodN): ''' Identifies Peaks/Troughs of a timeseries ''' lines = ( 'trend', 'last_pivot_t', 'last_pivot_x', 'last_pivot_ago', 'zigzag_peak', 'zigzag_valley', 'zigzag', 'last_zigzag', ) # Fancy plotting name # plotlines = dict(logreturn=dict(_name='log_ret')) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_t=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_x=dict(marker='', markersize=0.0, ls='', _plotskip=True), last_pivot_ago=dict(marker='', markersize=0.0, ls='', _plotskip=True), zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), last_zigzag=dict(_name='last_zigzag', color='blue', ls='--', _skipnan=True), ) # update value to standard for Moving Averages params = ( ('period', 2), ('up_retrace', 0.1), ('dn_retrace', 0.1), ('bardist', 0.015), # distance to max/min in absolute perc ) def __init__(self): super(ZigZag, self).__init__() if not self.p.up_retrace: raise ValueError('Upward retracement should not be zero.') if not self.p.dn_retrace: raise ValueError('Downward retracement should not be zero.') if self.p.up_retrace < 0: self.p.up_retrace = -self.p.up_retrace if self.p.dn_retrace > 0: self.p.dn_retrace = -self.p.dn_retrace self.p.up_retrace = self.p.up_retrace / 100 self.p.dn_retrace = self.p.dn_retrace / 100 self.missing_val = float('nan') def prenext(self): self.lines.trend[0] = 0 self.lines.last_pivot_t[0] = 0 self.lines.last_pivot_x[0] = self.data[0] self.lines.last_pivot_ago[0] = 0 self.lines.zigzag_peak[0] = self.missing_val self.lines.zigzag_valley[0] = self.missing_val self.lines.zigzag[0] = self.missing_val self.lines.last_zigzag[0] = self.missing_val def next(self): data = self.data trend = self.lines.trend last_pivot_t = self.lines.last_pivot_t last_pivot_x = self.lines.last_pivot_x last_pivot_ago = self.lines.last_pivot_ago zigzag_peak = self.lines.zigzag_peak zigzag_valley = self.lines.zigzag_valley zigzag = self.lines.zigzag last_zigzag = self.lines.last_zigzag x = data[0] r = x / last_pivot_x[-1] - 1 curr_idx = len(data) - 1 trend[0] = trend[-1] last_pivot_x[0] = last_pivot_x[-1] last_pivot_t[0] = last_pivot_t[-1] last_pivot_ago[0] = curr_idx - last_pivot_t[0] zigzag_peak[0] = self.missing_val zigzag_valley[0] = self.missing_val zigzag[0] = self.missing_val last_zigzag[0] = x if trend[-1] == 0: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == -1: if r >= self.p.up_retrace: piv = last_pivot_x[0] * (1 - self.p.bardist) zigzag_valley[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = 1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x < last_pivot_x[-1]: last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif trend[-1] == 1: if r <= self.p.dn_retrace: piv = last_pivot_x[0] * (1 + self.p.bardist) zigzag_peak[-int(last_pivot_ago[0])] = piv zigzag[-int(last_pivot_ago[0])] = last_pivot_x[0] trend[0] = -1 last_pivot_x[0] = x last_pivot_t[0] = curr_idx elif x > last_pivot_x[-1]: last_pivot_t[0] = curr_idx last_pivot_x[0] = x idx = 1 while idx < len(self.zigzag) and math.isnan(zigzag[-idx]): last_zigzag[-idx] = self.missing_val idx += 1 if idx < len(self.data): last_zigzag[-idx] = zigzag[-idx] class TestStrategy(bt.Strategy): def log(self, txt, dt=None): ''' Logging function fot this strategy''' dt = dt or self.datas[0].datetime.datetime(0) print('%s, %s' % (dt.isoformat(), txt)) def __init__(self): # Keep a reference to the "close" line in the data[0] dataseries self.dataclose = self.datas[0].close # Add indicators # In main plot self.zz = ZigZag(self.data, plotname='ZZ', period=2, up_retrace=0.2, dn_retrace=0.2) self.zz.csv = True self.ha = bt.ind.HeikinAshi(self.datas[0]) self.ha.csv = True if __name__ == '__main__': cerebro = bt.Cerebro() cerebro.addobserver(bt.observers.BuySell, barplot=True, bardist=0.0005) # Add a strategy cerebro.addstrategy(TestStrategy) # Create a Data Feed data = bt.feeds.GenericCSVData( dataname='Historical data/es-1m.csv', separator=";", fromdate=datetime.datetime(2012, 5, 1), todate=datetime.datetime(2012, 5, 31), dtformat=("%d/%m/%Y"), timeformat=("%H:%M:%S"), time=1, datetime=0, high=3, low=4, open=2, close=5, volume=6, openinterest=-1, timeframe=bt.TimeFrame.Minutes ) # Add the Data Feed to Cerebro cerebro.adddata(data) # Set our desired cash start cerebro.broker.setcash(100000.0) # Add a writer to put out csv file cerebro.addwriter(bt.WriterFile, csv=True, out='strategy_results') # Run over everything cerebro.run() # Plot the result cerebro.plot(style='candlestick', iplot = False) # Safe result files results = pd.read_csv("strategy_results", sep=",", header=1) split_row = results.index[results['Id'].str.contains('=====', na=False)][0] results_data = results.iloc[:split_row,:] results_data.to_csv("results_data") summary = results.iloc[split_row:,:] summary.to_csv("summary")
Note how it seems to break the columns zigzag_peak, zigzag_valley, zigzag and last_zigzag.
Due to this zigzag_peak, zigzag_valley, zigzag become all nan, see below.
I presume that it breaks the first row output for the ZZ script, breaking all subsequent lines.WITHOUT HeikinAshi, it works:
WITH HeikinAshi, it's broken:
Any help is greatly appreciated :)