Daubechies D4 Wavelet Transform via Lifting with Donoho thresholding
-
Here's an indicator (bbdaubechieslift) that provides a Daubechies D4 wavelet transform via lifting for deconstructing a line into frequency sub-bands. It provides calculation of the Donoho threshold amount to minimize theoretical noise. Again, as with the other wavelet indicators, my general goal has been to comply with Daniel's R. vision of minimization of libraries. There is a math.floor library reference but this is easily coded around if desired.
Because this routine pads outwards to nearest 2**n bars, rather than getting a slice of the line, an incremental option isn't included. However, this padding out with symmetrical data does reduce edge effects/discontinuities in the Daubechies D4.
Example usage:
x = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)
or, another way to threshold individual subbands:
x = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)
Threshlist and threshall are mutually exclusive.
donoholev specifies the subband to use for calculating the "optimum" theoretical noise filter -- usually the highest detail subband. Normally donoholev=1 however that doesn't always provide for the optimum noise reduction depending on the signal and the time frames of the forecast intended.
Threshall is the threshold multiplier for the calculated Donoho value which blanket thresholds all wavelet coefficients. The threshlist parameter can be supplied to threshold out individual coefficients for sub-bands, ie. threshlist = [0.5, 0.5, 0.5, 0.5, 0.5]. Please understand that in Daubechies D4 there is some overlap in signal between the bands and thresholding some bands and not others can result in erratic results. The Haar wavelet indicator I previously posted is fully orthogonal as an alternative.
It is extremely easy with this indicator to give it nonsensical parameters, for example a threshlist that is longer than the n in the data len of 2**n. Or a Donoho multiplier that is too low or too high. You may wish to experiment by monitoring the print output showing the threshval calculated by calcdonoho routine. A donoholev of 1 (the first level) will result in small noise reductions, whereas a donoho level calculated from higher levels results in much larger thresholding. If this is your first time, do not use hard thresholding (threshtype = "H") as it will be more difficult to form a coherent result without discontinuities.
I will have open hours available for contract work starting at the end of August. My inundated spam mailbox is bigdavediode2 at the email provider yahoo.com. Apologies for any bugs, but should there be some, and there probably are, at least this gives a good starting point for others to improve upon.
Typical output at multiple subband levels:
-
''' Author: B. Bradford adapted from instructional information at bearcave.com/Ian Kaplan. MIT License Copyright (c) B. Bradford Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' class bbdaubechieslift(bt.indicators.PeriodN): sqrt3 = math.sqrt(3) sqrt2 = math.sqrt(2) plotinfo = dict(subplot=True) lines = ('dblift', ) plotlines = dict(haarlift=dict(alpha=0.75, ls='-', linewidth=1.0)) params = (('coeffpick', None), ('donoholev', 1), ('donflag', True), ('threshlist', []), ('threshall', None), ('threshtype', "H"), ) #Hard or soft thresholding rules 2**coefflev data points required or 2**coefflev @staticmethod def next_power_of_2(x): return 1 if x == 0 else 2 ** (x.bit_length() -1) def init(self): self.addminperiod(2 ** self.params.coefflev + 1) def once(self, start, end): self.blnforward = 1 self.blninverse = 2 #create dummy line to manipulate and desired 2** data length (max or selected) lstinputdata = self.data.array[:] #make a copy of the list/array #db is padded out in the case of None coefflev rather than truncated pow2 = self.next_power_of_2(len(lstinputdata)) * 2 # input needs to be power of two data points pad up to next pow2 #Count backwards through line to leave most recent value in place if incremental: -- not provided here inlist = lstinputdata[:] #mirror inlist to avoid edge discontinuities -- make sure it's even length for daubechies: mirlistlen = len(self.data.array) mirpostlen = int((pow2 - len(lstinputdata))/2) mirprelen = pow2 - mirlistlen - mirpostlen mirlist = inlist[:mirprelen][::-1] + inlist[:mirlistlen] + inlist[:-mirpostlen-1:-1] ftresult = bbdaubechieslift.forwardtrans(self, vec=mirlist) #forward transform #threshedresult = ftresult threshedresult = bbdaubechieslift.wavshrink(self, coeffary=ftresult, threshall=self.p.threshall, threshtype=self.p.threshtype, threshlist=self.params.threshlist, donoholev = self.p.donoholev, donflag=self.p.donflag) #threshedresult = ftresult[:] #if you don't want to threshold/wavshrink/donoho itresult = bbdaubechieslift.inversetrans(self, vec=threshedresult) #inverse transform #demirror array: dummyaryout = itresult[mirprelen:mirlistlen + mirprelen] lstinputdata = dummyaryout[:] # self.lines[0].array[pow2padding:] + itresult #Only run once if not doing incremental -- incremental not provided here. self.lines.dblift.array = lstinputdata #arr.array('d', [lstinputdata[lstinphalf]] * (len(lstinputdata) - lstinphalf)) + lstinputdata[lstinphalf:] def predict(self, vec, N, direction): half = N >> 1 if (direction == self.blnforward): #unnecessarily aggressive floats to avoid casting as int or concatenating strings: vec[half] = float(vec[half]) - (self.sqrt3/4.0)*float(vec[0]) - (((self.sqrt3-2.0)/4.0)*float(vec[half-1])) elif (direction == self.blninverse): vec[half] = float(vec[half]) + (self.sqrt3/4.0)*float(vec[0]) + (((self.sqrt3-2.0)/4.0)*float(vec[half-1])) else: debug.print("daubechies:predict: bad direction value") # predict, forward for n in range (0, half, 1): if (direction == self.blnforward): vec[half+n] = float(vec[half+n]) - (self.sqrt3/4.0)*float(vec[n]) - (((self.sqrt3-2.0)/4.0)*float(vec[n-1])) elif (direction == self.blninverse): vec[half+n] = float(vec[half+n]) + (self.sqrt3/4.0)*float(vec[n]) + (((self.sqrt3-2.0)/4.0)*float(vec[n-1])) else: debug.print("daubechies: predict: bad direction value") break return vec def updateone(self, vec, N, direction): half = N >> 1 for n in range(0, half): updateVal = bbdaubechieslift.sqrt3 * float(vec[half+n]) if (direction == self.blnforward): vec[n] = float(vec[n]) + updateVal elif (direction == self.blninverse): vec[n] = float(vec[n]) - updateVal else: debug.print("daubechies:updateone: bad direction value") break return vec def update(self, vec, N, direction): half = N >> 1 for i in range (0, half-1, 1): if (direction == self.blnforward): vec[i] = float(vec[i]) - float(vec[half + i + 1]) elif (direction == self.blninverse): vec[i] = float(vec[i]) + float(vec[half + i + 1]) else: debug.print("haar update: bad direction value") break if (direction == self.blnforward): vec[half-1] = float(vec[half-1]) - float(vec[half]) elif (direction == self.blninverse): vec[half-1] = float(vec[half-1]) + float(vec[half]) return vec def forwardtrans(self, vec): #The result of forwardTrans is a set of wavelet coefficients #ordered by increasing frequency and an approximate average #of the input data set in vec. N = len(vec) # set for counter: n = N while n > 1: # beware, vec is altered in-place bbdaubechieslift.split(vec, n) bbdaubechieslift.updateone(self, vec, n, self.blnforward) bbdaubechieslift.predict(self, vec, n, self.blnforward) bbdaubechieslift.update(self, vec, n, self.blnforward) bbdaubechieslift.normalize(self, vec, n, self.blnforward) n = n >> 1 return vec def inversetrans(self, vec): N = len(vec) n = 2 while n <= N: # beware, vec is altered in-place bbdaubechieslift.normalize(self, vec, n, self.blninverse) bbdaubechieslift.update(self, vec, n, self.blninverse) bbdaubechieslift.predict(self, vec, n, self.blninverse) bbdaubechieslift.updateone(self, vec, n, self.blninverse) bbdaubechieslift.merge(self, vec, n) n = n << 1 return vec def calcdonoho(self, coeffary, donoholev, accpos): n = len(coeffary) #donoholev -- Get donoho coeffs to threshold from this wavelet level dcoeffstart = -int(sum(accpos[-donoholev:])) if -donoholev + 1 == 0: coefflist = coeffary[dcoeffstart:] else: dcoeffend = -int(sum(accpos[-donoholev + 1:])) coefflist = coeffary[dcoeffstart:dcoeffend] # noise variance sigma from dh is a numeric scalar representing (an estimate of) the additive Gaussian white noise variance. # Estimate the noise variance based on the median absolute deviation (MAD) of the scale one wavelet coefficients. n = len(coefflist) dh = median(coefflist) mad = median([abs(x - dh) for x in coefflist]) # list(map(lambda x: abs(x-dh), coefflist))) #difference from median for 1st scale coeffs # sigma = list(map(abs, diff)) sigma = mad / 0.6745 # noise standard deviation donoho = math.sqrt(2 * math.log(n)) * sigma print ('donoho is: ', donoho) return donoho def normalize(self, vec, N, direction): half = N >> 1 for n in range(0, half): if direction == self.blnforward: vec[half + n] = float(vec[half + n]) - (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) - ( ((bbdaubechieslift.sqrt3 - 2) / 4.0) * float(vec[n - 1])) elif (direction == self.blninverse): vec[half + n] = float(vec[half + n]) + (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) + ( ((bbdaubechieslift.sqrt3 - 2.0) / 4.0) * float(vec[n - 1])) else: print("daubechies:normalize: bad direction value") break return vec def wavshrink(self, coeffary, threshlist, threshall, threshtype, donoholev, donflag=True): accpos = list() accpos += [math.floor((len(coeffary) - 1) / 2) + 2] while sum(accpos) <= len(coeffary): accpos.insert(0, math.floor((accpos[0] - 1) / 2) + 2) accpos = accpos[1:] donoho = self.calcdonoho(coeffary, donoholev = self.p.donoholev, accpos=accpos) if threshlist: for i in range(len(threshlist)-1,-1,-1): sumaccpos = -int(sum(accpos[i:])) sumaccposend = -int(sum(accpos[i + 1:])) if donflag: donohomult = donoho * threshlist[i] if donohomult: for x in range(sumaccpos, sumaccposend): print ("x is : ", x) if threshtype.upper() == "S": if abs(coeffary[x]) < donohomult: coeffary[x] = 0 elif coeffary[x] < -donohomult: coeffary[x] += donohomult elif coeffary[x] > donohomult: coeffary[x] -= donohomult elif threshtype.upper() == "H": if abs(coeffary[x]) < donohomult: coeffary[x] = 0 else: for x in range(sumaccpos, sumaccposend): coeffary[x] = float(coeffary[x]*threshlist[i]) threshedresult = coeffary[:] #versus thresholding all coefficients: if threshall is not None: if donflag: threshval = donoho * threshall else: threshval = threshall if threshval: print("threshval is: ", threshval) for x in range(len(coeffary)): if threshtype.upper() == "S": if abs(coeffary[x]) < threshval: coeffary[x] = 0 if coeffary[x] < -threshval: coeffary[x] += threshval elif coeffary[x] > threshval: coeffary[x] -= threshval elif threshtype.upper() == "H": if abs(coeffary[x]) < threshval: coeffary[x] = 0 threshedresult = coeffary[:] return threshedresult def split(vec, N): start = 1 end = N - 1 while (start < end): #Could do this with list comprehension, but want to stick to the original structure: for i in range(start, end, 2): tmp = vec[i] vec[i] = vec[i + 1] vec[i+1] = tmp start += 1 end -= 1 return vec def merge(self, vec, N): half = N >> 1 start = half-1 end = half while (start > 0): for i in range(start, end, 2): tmp = vec[i] vec[i] = vec[i+1] vec[i+1] = tmp start -= 1 end += 1 return vec
-
And a simple test script to run it:
import backtrader as bt import sys sys.path.append('/docker_stocks/bbwavelet') from datetime import datetime import backtrader.indicators as btind from bbwaveletlifting import bbhaarlift, bbdaubechieslift class firstStrategy(bt.Strategy): def __init__(self): self.startlevel = 1 self.endlevel = 5 self.dbsig = [None] * 10 self.barcount = bbbarcounter(largedist=500, majordist=100, minordist=10, plotdistance=0.07) for i in range(self.startlevel, self.endlevel+1): threshamount = (self.endlevel+1-i)/1000 threshlist = [threshamount] * (self.endlevel) self.dbsig[i] = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True) #Another example usage: self.dbsig[i] = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True) # Variable for our starting cash startcash = 10000 # Create an instance of cerebro cerebro = bt.Cerebro() # Add our strategy cerebro.addstrategy(firstStrategy) # Get Apple data from Yahoo Finance. data = bt.feeds.YahooFinanceData( dataname='AAPL', fromdate=datetime(2017, 4, 1), todate=datetime(2019, 4, 10), buffered=True ) # Add the data to Cerebro cerebro.adddata(data) # Set our desired cash start cerebro.broker.setcash(startcash) # Run over everything cerebro.run() # Get final portfolio Value portvalue = cerebro.broker.getvalue() pnl = portvalue - startcash # Print out the final result print('Final Portfolio Value: ${}'.format(portvalue)) print('P/L: ${}'.format(pnl)) # Finally plot the end results cerebro.plot(style='candlestick')
-
Thanks for sharing!