For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Daubechies D4 Wavelet Transform via Lifting with Donoho thresholding



  • Here's an indicator (bbdaubechieslift) that provides a Daubechies D4 wavelet transform via lifting for deconstructing a line into frequency sub-bands. It provides calculation of the Donoho threshold amount to minimize theoretical noise. Again, as with the other wavelet indicators, my general goal has been to comply with Daniel's R. vision of minimization of libraries. There is a math.floor library reference but this is easily coded around if desired.

    Because this routine pads outwards to nearest 2**n bars, rather than getting a slice of the line, an incremental option isn't included. However, this padding out with symmetrical data does reduce edge effects/discontinuities in the Daubechies D4.

    Example usage:

    x = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)

    or, another way to threshold individual subbands:

    x = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)

    Threshlist and threshall are mutually exclusive.

    donoholev specifies the subband to use for calculating the "optimum" theoretical noise filter -- usually the highest detail subband. Normally donoholev=1 however that doesn't always provide for the optimum noise reduction depending on the signal and the time frames of the forecast intended.

    Threshall is the threshold multiplier for the calculated Donoho value which blanket thresholds all wavelet coefficients. The threshlist parameter can be supplied to threshold out individual coefficients for sub-bands, ie. threshlist = [0.5, 0.5, 0.5, 0.5, 0.5]. Please understand that in Daubechies D4 there is some overlap in signal between the bands and thresholding some bands and not others can result in erratic results. The Haar wavelet indicator I previously posted is fully orthogonal as an alternative.

    It is extremely easy with this indicator to give it nonsensical parameters, for example a threshlist that is longer than the n in the data len of 2**n. Or a Donoho multiplier that is too low or too high. You may wish to experiment by monitoring the print output showing the threshval calculated by calcdonoho routine. A donoholev of 1 (the first level) will result in small noise reductions, whereas a donoho level calculated from higher levels results in much larger thresholding. If this is your first time, do not use hard thresholding (threshtype = "H") as it will be more difficult to form a coherent result without discontinuities.

    I will have open hours available for contract work starting at the end of August. My inundated spam mailbox is bigdavediode2 at the email provider yahoo.com. Apologies for any bugs, but should there be some, and there probably are, at least this gives a good starting point for others to improve upon.

    Typical output at multiple subband levels:

    Screenshot dblift from 2019-08-07 19-02-45.png



  • '''
    Author: B. Bradford adapted from instructional information at 
            bearcave.com/Ian Kaplan.
    
    MIT License
    
    Copyright (c) B. Bradford
    
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:
    
    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.
    
    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
    '''
    
    class bbdaubechieslift(bt.indicators.PeriodN):
    
        sqrt3 = math.sqrt(3)
        sqrt2 = math.sqrt(2)
    
        plotinfo = dict(subplot=True)
        lines = ('dblift', )
        plotlines = dict(haarlift=dict(alpha=0.75, ls='-', linewidth=1.0))
        params = (('coeffpick', None),
                  ('donoholev', 1),
                  ('donflag', True),
                  ('threshlist', []),
                  ('threshall', None),
                  ('threshtype', "H"), )      #Hard or soft thresholding rules  2**coefflev data points required or 2**coefflev
    
        @staticmethod
        def next_power_of_2(x):
            return 1 if x == 0 else 2 ** (x.bit_length() -1)
    
    
    
        def init(self):
            self.addminperiod(2 ** self.params.coefflev + 1)
    
        def once(self, start, end):
            self.blnforward = 1
            self.blninverse = 2
    
            #create dummy line to manipulate and desired 2** data length (max or selected)
            lstinputdata = self.data.array[:]                                   #make a copy of the list/array
    
            #db is padded out in the case of None coefflev rather than truncated
            pow2 = self.next_power_of_2(len(lstinputdata)) * 2  # input needs to be power of two data points pad up to next pow2
            
    
            #Count backwards through line to leave most recent value in place if incremental: -- not provided here
            
            inlist = lstinputdata[:]
    
            #mirror inlist to avoid edge discontinuities -- make sure it's even length for daubechies:
            
            mirlistlen = len(self.data.array)
            mirpostlen = int((pow2 - len(lstinputdata))/2)
            mirprelen = pow2 - mirlistlen - mirpostlen
            mirlist =  inlist[:mirprelen][::-1] + inlist[:mirlistlen] + inlist[:-mirpostlen-1:-1]
    
            ftresult = bbdaubechieslift.forwardtrans(self, vec=mirlist)            #forward transform
            #threshedresult = ftresult
            threshedresult = bbdaubechieslift.wavshrink(self, coeffary=ftresult, threshall=self.p.threshall, threshtype=self.p.threshtype, threshlist=self.params.threshlist, donoholev = self.p.donoholev, donflag=self.p.donflag)
            #threshedresult = ftresult[:]       #if you don't want to threshold/wavshrink/donoho
            itresult = bbdaubechieslift.inversetrans(self, vec=threshedresult)    #inverse transform
    
            #demirror array:
            dummyaryout = itresult[mirprelen:mirlistlen + mirprelen]
    
    
            lstinputdata = dummyaryout[:]                                      # self.lines[0].array[pow2padding:] + itresult
    
            #Only run once if not doing incremental -- incremental not provided here.
    
            self.lines.dblift.array = lstinputdata       #arr.array('d', [lstinputdata[lstinphalf]] * (len(lstinputdata) - lstinphalf)) + lstinputdata[lstinphalf:]
    
    
    
        def predict(self, vec, N, direction):
            half = N >> 1
    
            if (direction == self.blnforward):
                #unnecessarily aggressive floats to avoid casting as int or concatenating strings:
                vec[half] = float(vec[half]) - (self.sqrt3/4.0)*float(vec[0]) - (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
            elif (direction == self.blninverse):
                vec[half] = float(vec[half]) + (self.sqrt3/4.0)*float(vec[0]) + (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
            else:
                debug.print("daubechies:predict: bad direction value")
    
    
            # predict, forward
            for n in range (0, half, 1):
                if (direction == self.blnforward):
                    vec[half+n] = float(vec[half+n]) - (self.sqrt3/4.0)*float(vec[n]) - (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
                elif (direction == self.blninverse):
                    vec[half+n] = float(vec[half+n]) + (self.sqrt3/4.0)*float(vec[n]) + (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
                else:
                    debug.print("daubechies: predict: bad direction value")
                    break
            return vec
    
    
        def updateone(self, vec, N, direction):
            half = N >> 1
            for n in range(0, half):
                updateVal = bbdaubechieslift.sqrt3 * float(vec[half+n])
    
                if (direction == self.blnforward):
                    vec[n] = float(vec[n]) + updateVal
                elif (direction == self.blninverse):
                    vec[n] = float(vec[n]) - updateVal
                else:
                    debug.print("daubechies:updateone: bad direction value")
                    break
            return vec
    
    
        def update(self, vec, N, direction):
            half = N >> 1
    
            for i in range (0, half-1, 1):
                if (direction == self.blnforward):
                    vec[i] = float(vec[i]) - float(vec[half + i + 1])
                elif (direction == self.blninverse):
                    vec[i] = float(vec[i]) + float(vec[half + i + 1])
                else:
                    debug.print("haar update: bad direction value")
                    break
    
            if (direction == self.blnforward):
                vec[half-1] = float(vec[half-1]) - float(vec[half])
            elif (direction == self.blninverse):
                vec[half-1] = float(vec[half-1]) + float(vec[half])
            return vec
    
    
        def forwardtrans(self, vec):
            
            #The result of forwardTrans is a set of wavelet coefficients
            #ordered by increasing frequency and an approximate average
            #of the input data set in vec.  
            
    
            N = len(vec)
           
            # set for counter:
            n = N
            while n > 1:
                # beware, vec is altered in-place
                bbdaubechieslift.split(vec, n)
                bbdaubechieslift.updateone(self, vec, n, self.blnforward)
                bbdaubechieslift.predict(self, vec, n, self.blnforward)
                bbdaubechieslift.update(self, vec, n, self.blnforward)
                bbdaubechieslift.normalize(self, vec, n, self.blnforward)
                n = n >> 1
            return vec
    
        def inversetrans(self, vec):
            N = len(vec)
            n = 2
            while n <= N:
                # beware, vec is altered in-place
                bbdaubechieslift.normalize(self, vec, n, self.blninverse)
                bbdaubechieslift.update(self, vec, n, self.blninverse)
                bbdaubechieslift.predict(self, vec, n, self.blninverse)
                bbdaubechieslift.updateone(self, vec, n, self.blninverse)
                bbdaubechieslift.merge(self, vec, n)
                n = n << 1
            return vec
    
    
        def calcdonoho(self, coeffary, donoholev, accpos):
    
            n = len(coeffary)
            #donoholev -- Get donoho coeffs to threshold from this wavelet level
    
            dcoeffstart = -int(sum(accpos[-donoholev:]))
            if -donoholev + 1 == 0:
                coefflist = coeffary[dcoeffstart:]
            else:
                dcoeffend = -int(sum(accpos[-donoholev + 1:]))
                coefflist = coeffary[dcoeffstart:dcoeffend]
    
            # noise variance sigma from dh is a numeric scalar representing (an estimate of) the additive Gaussian white noise variance.
            # Estimate the noise variance based on the median absolute deviation (MAD) of the scale one wavelet coefficients.
            n = len(coefflist)
            dh = median(coefflist)
            mad = median([abs(x - dh) for x in
                          coefflist])  # list(map(lambda x: abs(x-dh), coefflist)))      #difference from median for 1st scale coeffs
            # sigma = list(map(abs, diff))
            sigma = mad / 0.6745  # noise standard deviation
            donoho = math.sqrt(2 * math.log(n)) * sigma
            print ('donoho is:  ', donoho)
            return donoho
    
        def normalize(self, vec, N, direction):
            half = N >> 1
            for n in range(0, half):
                if direction == self.blnforward:
                    vec[half + n] = float(vec[half + n]) - (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) - (
                    ((bbdaubechieslift.sqrt3 - 2) / 4.0) * float(vec[n - 1]))
                elif (direction == self.blninverse):
                    vec[half + n] = float(vec[half + n]) + (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) + (
                    ((bbdaubechieslift.sqrt3 - 2.0) / 4.0) * float(vec[n - 1]))
                else:
                    print("daubechies:normalize: bad direction value")
                    break
            return vec
    
    
        def wavshrink(self, coeffary, threshlist, threshall, threshtype, donoholev, donflag=True):
           
            accpos = list()
            accpos += [math.floor((len(coeffary) - 1) / 2) + 2]
    
            while sum(accpos) <= len(coeffary):
                accpos.insert(0, math.floor((accpos[0] - 1) / 2) + 2)
            accpos = accpos[1:]
    
    
            donoho = self.calcdonoho(coeffary, donoholev = self.p.donoholev, accpos=accpos)
    
            if threshlist:
                for i in range(len(threshlist)-1,-1,-1):
                    sumaccpos = -int(sum(accpos[i:]))
                    sumaccposend = -int(sum(accpos[i + 1:]))
                    if donflag:
                        donohomult = donoho * threshlist[i]
                        if donohomult:
                            for x in range(sumaccpos, sumaccposend):
                                print ("x is : ", x)
    
                                if threshtype.upper() == "S":
                                    if abs(coeffary[x]) < donohomult:
                                        coeffary[x] = 0
                                    elif coeffary[x] < -donohomult:
                                        coeffary[x] += donohomult
                                    elif coeffary[x] > donohomult:
                                        coeffary[x] -= donohomult
                                elif threshtype.upper() == "H":
                                    if abs(coeffary[x]) < donohomult:
                                        coeffary[x] = 0
                    else:
                        for x in range(sumaccpos, sumaccposend):
                            coeffary[x] = float(coeffary[x]*threshlist[i])
    
    
                threshedresult = coeffary[:]
    
    
            #versus thresholding all coefficients:
    
            if threshall is not None:
                if donflag:
                    threshval = donoho * threshall
                else:
                    threshval = threshall
                    
                if threshval:
                    print("threshval is: ", threshval)
                    for x in range(len(coeffary)):
                        if threshtype.upper() == "S":
                            if abs(coeffary[x]) < threshval:
                                coeffary[x] = 0
                            if coeffary[x] < -threshval:
                                coeffary[x] += threshval
                            elif coeffary[x] > threshval:
                                coeffary[x] -= threshval
                        elif threshtype.upper() == "H":
                            if abs(coeffary[x]) < threshval:
                                coeffary[x] = 0
                threshedresult = coeffary[:]
    
    
    
            return threshedresult
    
    
    
    
        def split(vec, N):
            start = 1
            end = N - 1
    
            while (start < end):
                #Could do this with list comprehension, but want to stick to the original structure:
                for i in range(start, end, 2):
                    tmp = vec[i]
                    vec[i] = vec[i + 1]
                    vec[i+1] = tmp
                start += 1
                end -= 1
    
            return vec
    
        def merge(self, vec, N):
            half = N >> 1
            start = half-1
            end = half
    
            while (start > 0):
                for i in range(start, end, 2):
                    tmp = vec[i]
                    vec[i] = vec[i+1]
                    vec[i+1] = tmp
                start -= 1
                end += 1
    
            return vec
    
    


  • And a simple test script to run it:

    import backtrader as bt
    import sys
    
    sys.path.append('/docker_stocks/bbwavelet')
    
    from datetime import datetime
    import backtrader.indicators as btind
    from bbwaveletlifting import bbhaarlift, bbdaubechieslift
    
    class firstStrategy(bt.Strategy):
        def __init__(self):
            self.startlevel = 1
            self.endlevel = 5
            self.dbsig = [None] * 10
    
            self.barcount = bbbarcounter(largedist=500, majordist=100, minordist=10, plotdistance=0.07)
    
    
            for i in range(self.startlevel, self.endlevel+1):
                threshamount = (self.endlevel+1-i)/1000
                threshlist = [threshamount] * (self.endlevel)
    
                self.dbsig[i] = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)
    
                #Another example usage:  self.dbsig[i] = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)
    
    
    
    
    
    # Variable for our starting cash
    startcash = 10000
    
    # Create an instance of cerebro
    cerebro = bt.Cerebro()
    
    # Add our strategy
    cerebro.addstrategy(firstStrategy)
    
    # Get Apple data from Yahoo Finance.
    data = bt.feeds.YahooFinanceData(
        dataname='AAPL',
        fromdate=datetime(2017, 4, 1),
        todate=datetime(2019, 4, 10),
        buffered=True
    )
    
    # Add the data to Cerebro
    cerebro.adddata(data)
    
    # Set our desired cash start
    cerebro.broker.setcash(startcash)
    
    # Run over everything
    cerebro.run()
    
    # Get final portfolio Value
    portvalue = cerebro.broker.getvalue()
    pnl = portvalue - startcash
    
    # Print out the final result
    print('Final Portfolio Value: ${}'.format(portvalue))
    print('P/L: ${}'.format(pnl))
    
    # Finally plot the end results
    cerebro.plot(style='candlestick')
    
    


  • Thanks for sharing!


Log in to reply
 

});