Navigation

    Backtrader Community

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups
    • Search
    For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

    Daubechies D4 Wavelet Transform via Lifting with Donoho thresholding

    Indicators/Strategies/Analyzers
    2
    4
    427
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • B
      bigdavediode last edited by bigdavediode

      Here's an indicator (bbdaubechieslift) that provides a Daubechies D4 wavelet transform via lifting for deconstructing a line into frequency sub-bands. It provides calculation of the Donoho threshold amount to minimize theoretical noise. Again, as with the other wavelet indicators, my general goal has been to comply with Daniel's R. vision of minimization of libraries. There is a math.floor library reference but this is easily coded around if desired.

      Because this routine pads outwards to nearest 2**n bars, rather than getting a slice of the line, an incremental option isn't included. However, this padding out with symmetrical data does reduce edge effects/discontinuities in the Daubechies D4.

      Example usage:

      x = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)

      or, another way to threshold individual subbands:

      x = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)

      Threshlist and threshall are mutually exclusive.

      donoholev specifies the subband to use for calculating the "optimum" theoretical noise filter -- usually the highest detail subband. Normally donoholev=1 however that doesn't always provide for the optimum noise reduction depending on the signal and the time frames of the forecast intended.

      Threshall is the threshold multiplier for the calculated Donoho value which blanket thresholds all wavelet coefficients. The threshlist parameter can be supplied to threshold out individual coefficients for sub-bands, ie. threshlist = [0.5, 0.5, 0.5, 0.5, 0.5]. Please understand that in Daubechies D4 there is some overlap in signal between the bands and thresholding some bands and not others can result in erratic results. The Haar wavelet indicator I previously posted is fully orthogonal as an alternative.

      It is extremely easy with this indicator to give it nonsensical parameters, for example a threshlist that is longer than the n in the data len of 2**n. Or a Donoho multiplier that is too low or too high. You may wish to experiment by monitoring the print output showing the threshval calculated by calcdonoho routine. A donoholev of 1 (the first level) will result in small noise reductions, whereas a donoho level calculated from higher levels results in much larger thresholding. If this is your first time, do not use hard thresholding (threshtype = "H") as it will be more difficult to form a coherent result without discontinuities.

      I will have open hours available for contract work starting at the end of August. My inundated spam mailbox is bigdavediode2 at the email provider yahoo.com. Apologies for any bugs, but should there be some, and there probably are, at least this gives a good starting point for others to improve upon.

      Typical output at multiple subband levels:

      Screenshot dblift from 2019-08-07 19-02-45.png

      1 Reply Last reply Reply Quote 1
      • B
        bigdavediode last edited by bigdavediode

        '''
        Author: B. Bradford adapted from instructional information at 
                bearcave.com/Ian Kaplan.
        
        MIT License
        
        Copyright (c) B. Bradford
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
        '''
        
        class bbdaubechieslift(bt.indicators.PeriodN):
        
            sqrt3 = math.sqrt(3)
            sqrt2 = math.sqrt(2)
        
            plotinfo = dict(subplot=True)
            lines = ('dblift', )
            plotlines = dict(haarlift=dict(alpha=0.75, ls='-', linewidth=1.0))
            params = (('coeffpick', None),
                      ('donoholev', 1),
                      ('donflag', True),
                      ('threshlist', []),
                      ('threshall', None),
                      ('threshtype', "H"), )      #Hard or soft thresholding rules  2**coefflev data points required or 2**coefflev
        
            @staticmethod
            def next_power_of_2(x):
                return 1 if x == 0 else 2 ** (x.bit_length() -1)
        
        
        
            def init(self):
                self.addminperiod(2 ** self.params.coefflev + 1)
        
            def once(self, start, end):
                self.blnforward = 1
                self.blninverse = 2
        
                #create dummy line to manipulate and desired 2** data length (max or selected)
                lstinputdata = self.data.array[:]                                   #make a copy of the list/array
        
                #db is padded out in the case of None coefflev rather than truncated
                pow2 = self.next_power_of_2(len(lstinputdata)) * 2  # input needs to be power of two data points pad up to next pow2
                
        
                #Count backwards through line to leave most recent value in place if incremental: -- not provided here
                
                inlist = lstinputdata[:]
        
                #mirror inlist to avoid edge discontinuities -- make sure it's even length for daubechies:
                
                mirlistlen = len(self.data.array)
                mirpostlen = int((pow2 - len(lstinputdata))/2)
                mirprelen = pow2 - mirlistlen - mirpostlen
                mirlist =  inlist[:mirprelen][::-1] + inlist[:mirlistlen] + inlist[:-mirpostlen-1:-1]
        
                ftresult = bbdaubechieslift.forwardtrans(self, vec=mirlist)            #forward transform
                #threshedresult = ftresult
                threshedresult = bbdaubechieslift.wavshrink(self, coeffary=ftresult, threshall=self.p.threshall, threshtype=self.p.threshtype, threshlist=self.params.threshlist, donoholev = self.p.donoholev, donflag=self.p.donflag)
                #threshedresult = ftresult[:]       #if you don't want to threshold/wavshrink/donoho
                itresult = bbdaubechieslift.inversetrans(self, vec=threshedresult)    #inverse transform
        
                #demirror array:
                dummyaryout = itresult[mirprelen:mirlistlen + mirprelen]
        
        
                lstinputdata = dummyaryout[:]                                      # self.lines[0].array[pow2padding:] + itresult
        
                #Only run once if not doing incremental -- incremental not provided here.
        
                self.lines.dblift.array = lstinputdata       #arr.array('d', [lstinputdata[lstinphalf]] * (len(lstinputdata) - lstinphalf)) + lstinputdata[lstinphalf:]
        
        
        
            def predict(self, vec, N, direction):
                half = N >> 1
        
                if (direction == self.blnforward):
                    #unnecessarily aggressive floats to avoid casting as int or concatenating strings:
                    vec[half] = float(vec[half]) - (self.sqrt3/4.0)*float(vec[0]) - (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
                elif (direction == self.blninverse):
                    vec[half] = float(vec[half]) + (self.sqrt3/4.0)*float(vec[0]) + (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
                else:
                    debug.print("daubechies:predict: bad direction value")
        
        
                # predict, forward
                for n in range (0, half, 1):
                    if (direction == self.blnforward):
                        vec[half+n] = float(vec[half+n]) - (self.sqrt3/4.0)*float(vec[n]) - (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
                    elif (direction == self.blninverse):
                        vec[half+n] = float(vec[half+n]) + (self.sqrt3/4.0)*float(vec[n]) + (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
                    else:
                        debug.print("daubechies: predict: bad direction value")
                        break
                return vec
        
        
            def updateone(self, vec, N, direction):
                half = N >> 1
                for n in range(0, half):
                    updateVal = bbdaubechieslift.sqrt3 * float(vec[half+n])
        
                    if (direction == self.blnforward):
                        vec[n] = float(vec[n]) + updateVal
                    elif (direction == self.blninverse):
                        vec[n] = float(vec[n]) - updateVal
                    else:
                        debug.print("daubechies:updateone: bad direction value")
                        break
                return vec
        
        
            def update(self, vec, N, direction):
                half = N >> 1
        
                for i in range (0, half-1, 1):
                    if (direction == self.blnforward):
                        vec[i] = float(vec[i]) - float(vec[half + i + 1])
                    elif (direction == self.blninverse):
                        vec[i] = float(vec[i]) + float(vec[half + i + 1])
                    else:
                        debug.print("haar update: bad direction value")
                        break
        
                if (direction == self.blnforward):
                    vec[half-1] = float(vec[half-1]) - float(vec[half])
                elif (direction == self.blninverse):
                    vec[half-1] = float(vec[half-1]) + float(vec[half])
                return vec
        
        
            def forwardtrans(self, vec):
                
                #The result of forwardTrans is a set of wavelet coefficients
                #ordered by increasing frequency and an approximate average
                #of the input data set in vec.  
                
        
                N = len(vec)
               
                # set for counter:
                n = N
                while n > 1:
                    # beware, vec is altered in-place
                    bbdaubechieslift.split(vec, n)
                    bbdaubechieslift.updateone(self, vec, n, self.blnforward)
                    bbdaubechieslift.predict(self, vec, n, self.blnforward)
                    bbdaubechieslift.update(self, vec, n, self.blnforward)
                    bbdaubechieslift.normalize(self, vec, n, self.blnforward)
                    n = n >> 1
                return vec
        
            def inversetrans(self, vec):
                N = len(vec)
                n = 2
                while n <= N:
                    # beware, vec is altered in-place
                    bbdaubechieslift.normalize(self, vec, n, self.blninverse)
                    bbdaubechieslift.update(self, vec, n, self.blninverse)
                    bbdaubechieslift.predict(self, vec, n, self.blninverse)
                    bbdaubechieslift.updateone(self, vec, n, self.blninverse)
                    bbdaubechieslift.merge(self, vec, n)
                    n = n << 1
                return vec
        
        
            def calcdonoho(self, coeffary, donoholev, accpos):
        
                n = len(coeffary)
                #donoholev -- Get donoho coeffs to threshold from this wavelet level
        
                dcoeffstart = -int(sum(accpos[-donoholev:]))
                if -donoholev + 1 == 0:
                    coefflist = coeffary[dcoeffstart:]
                else:
                    dcoeffend = -int(sum(accpos[-donoholev + 1:]))
                    coefflist = coeffary[dcoeffstart:dcoeffend]
        
                # noise variance sigma from dh is a numeric scalar representing (an estimate of) the additive Gaussian white noise variance.
                # Estimate the noise variance based on the median absolute deviation (MAD) of the scale one wavelet coefficients.
                n = len(coefflist)
                dh = median(coefflist)
                mad = median([abs(x - dh) for x in
                              coefflist])  # list(map(lambda x: abs(x-dh), coefflist)))      #difference from median for 1st scale coeffs
                # sigma = list(map(abs, diff))
                sigma = mad / 0.6745  # noise standard deviation
                donoho = math.sqrt(2 * math.log(n)) * sigma
                print ('donoho is:  ', donoho)
                return donoho
        
            def normalize(self, vec, N, direction):
                half = N >> 1
                for n in range(0, half):
                    if direction == self.blnforward:
                        vec[half + n] = float(vec[half + n]) - (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) - (
                        ((bbdaubechieslift.sqrt3 - 2) / 4.0) * float(vec[n - 1]))
                    elif (direction == self.blninverse):
                        vec[half + n] = float(vec[half + n]) + (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) + (
                        ((bbdaubechieslift.sqrt3 - 2.0) / 4.0) * float(vec[n - 1]))
                    else:
                        print("daubechies:normalize: bad direction value")
                        break
                return vec
        
        
            def wavshrink(self, coeffary, threshlist, threshall, threshtype, donoholev, donflag=True):
               
                accpos = list()
                accpos += [math.floor((len(coeffary) - 1) / 2) + 2]
        
                while sum(accpos) <= len(coeffary):
                    accpos.insert(0, math.floor((accpos[0] - 1) / 2) + 2)
                accpos = accpos[1:]
        
        
                donoho = self.calcdonoho(coeffary, donoholev = self.p.donoholev, accpos=accpos)
        
                if threshlist:
                    for i in range(len(threshlist)-1,-1,-1):
                        sumaccpos = -int(sum(accpos[i:]))
                        sumaccposend = -int(sum(accpos[i + 1:]))
                        if donflag:
                            donohomult = donoho * threshlist[i]
                            if donohomult:
                                for x in range(sumaccpos, sumaccposend):
                                    print ("x is : ", x)
        
                                    if threshtype.upper() == "S":
                                        if abs(coeffary[x]) < donohomult:
                                            coeffary[x] = 0
                                        elif coeffary[x] < -donohomult:
                                            coeffary[x] += donohomult
                                        elif coeffary[x] > donohomult:
                                            coeffary[x] -= donohomult
                                    elif threshtype.upper() == "H":
                                        if abs(coeffary[x]) < donohomult:
                                            coeffary[x] = 0
                        else:
                            for x in range(sumaccpos, sumaccposend):
                                coeffary[x] = float(coeffary[x]*threshlist[i])
        
        
                    threshedresult = coeffary[:]
        
        
                #versus thresholding all coefficients:
        
                if threshall is not None:
                    if donflag:
                        threshval = donoho * threshall
                    else:
                        threshval = threshall
                        
                    if threshval:
                        print("threshval is: ", threshval)
                        for x in range(len(coeffary)):
                            if threshtype.upper() == "S":
                                if abs(coeffary[x]) < threshval:
                                    coeffary[x] = 0
                                if coeffary[x] < -threshval:
                                    coeffary[x] += threshval
                                elif coeffary[x] > threshval:
                                    coeffary[x] -= threshval
                            elif threshtype.upper() == "H":
                                if abs(coeffary[x]) < threshval:
                                    coeffary[x] = 0
                    threshedresult = coeffary[:]
        
        
        
                return threshedresult
        
        
        
        
            def split(vec, N):
                start = 1
                end = N - 1
        
                while (start < end):
                    #Could do this with list comprehension, but want to stick to the original structure:
                    for i in range(start, end, 2):
                        tmp = vec[i]
                        vec[i] = vec[i + 1]
                        vec[i+1] = tmp
                    start += 1
                    end -= 1
        
                return vec
        
            def merge(self, vec, N):
                half = N >> 1
                start = half-1
                end = half
        
                while (start > 0):
                    for i in range(start, end, 2):
                        tmp = vec[i]
                        vec[i] = vec[i+1]
                        vec[i+1] = tmp
                    start -= 1
                    end += 1
        
                return vec
        
        
        1 Reply Last reply Reply Quote 1
        • B
          bigdavediode last edited by

          And a simple test script to run it:

          import backtrader as bt
          import sys
          
          sys.path.append('/docker_stocks/bbwavelet')
          
          from datetime import datetime
          import backtrader.indicators as btind
          from bbwaveletlifting import bbhaarlift, bbdaubechieslift
          
          class firstStrategy(bt.Strategy):
              def __init__(self):
                  self.startlevel = 1
                  self.endlevel = 5
                  self.dbsig = [None] * 10
          
                  self.barcount = bbbarcounter(largedist=500, majordist=100, minordist=10, plotdistance=0.07)
          
          
                  for i in range(self.startlevel, self.endlevel+1):
                      threshamount = (self.endlevel+1-i)/1000
                      threshlist = [threshamount] * (self.endlevel)
          
                      self.dbsig[i] = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)
          
                      #Another example usage:  self.dbsig[i] = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)
          
          
          
          
          
          # Variable for our starting cash
          startcash = 10000
          
          # Create an instance of cerebro
          cerebro = bt.Cerebro()
          
          # Add our strategy
          cerebro.addstrategy(firstStrategy)
          
          # Get Apple data from Yahoo Finance.
          data = bt.feeds.YahooFinanceData(
              dataname='AAPL',
              fromdate=datetime(2017, 4, 1),
              todate=datetime(2019, 4, 10),
              buffered=True
          )
          
          # Add the data to Cerebro
          cerebro.adddata(data)
          
          # Set our desired cash start
          cerebro.broker.setcash(startcash)
          
          # Run over everything
          cerebro.run()
          
          # Get final portfolio Value
          portvalue = cerebro.broker.getvalue()
          pnl = portvalue - startcash
          
          # Print out the final result
          print('Final Portfolio Value: ${}'.format(portvalue))
          print('P/L: ${}'.format(pnl))
          
          # Finally plot the end results
          cerebro.plot(style='candlestick')
          
          
          1 Reply Last reply Reply Quote 1
          • A
            ab_trader last edited by

            Thanks for sharing!

            • If my answer helped, hit reputation up arrow at lower right corner of the post.
            • Python Debugging With Pdb
            • New to python and bt - check this out
            1 Reply Last reply Reply Quote 0
            • 1 / 1
            • First post
              Last post
            Copyright © 2016, 2017, 2018, 2019, 2020, 2021 NodeBB Forums | Contributors