For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Daubechies D4 Wavelet Transform via Lifting with Donoho thresholding

• Here's an indicator (bbdaubechieslift) that provides a Daubechies D4 wavelet transform via lifting for deconstructing a line into frequency sub-bands. It provides calculation of the Donoho threshold amount to minimize theoretical noise. Again, as with the other wavelet indicators, my general goal has been to comply with Daniel's R. vision of minimization of libraries. There is a math.floor library reference but this is easily coded around if desired.

Because this routine pads outwards to nearest 2**n bars, rather than getting a slice of the line, an incremental option isn't included. However, this padding out with symmetrical data does reduce edge effects/discontinuities in the Daubechies D4.

Example usage:

x = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)

or, another way to threshold individual subbands:

x = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)

Threshlist and threshall are mutually exclusive.

donoholev specifies the subband to use for calculating the "optimum" theoretical noise filter -- usually the highest detail subband. Normally donoholev=1 however that doesn't always provide for the optimum noise reduction depending on the signal and the time frames of the forecast intended.

Threshall is the threshold multiplier for the calculated Donoho value which blanket thresholds all wavelet coefficients. The threshlist parameter can be supplied to threshold out individual coefficients for sub-bands, ie. threshlist = [0.5, 0.5, 0.5, 0.5, 0.5]. Please understand that in Daubechies D4 there is some overlap in signal between the bands and thresholding some bands and not others can result in erratic results. The Haar wavelet indicator I previously posted is fully orthogonal as an alternative.

It is extremely easy with this indicator to give it nonsensical parameters, for example a threshlist that is longer than the n in the data len of 2**n. Or a Donoho multiplier that is too low or too high. You may wish to experiment by monitoring the print output showing the threshval calculated by calcdonoho routine. A donoholev of 1 (the first level) will result in small noise reductions, whereas a donoho level calculated from higher levels results in much larger thresholding. If this is your first time, do not use hard thresholding (threshtype = "H") as it will be more difficult to form a coherent result without discontinuities.

I will have open hours available for contract work starting at the end of August. My inundated spam mailbox is bigdavediode2 at the email provider yahoo.com. Apologies for any bugs, but should there be some, and there probably are, at least this gives a good starting point for others to improve upon.

Typical output at multiple subband levels: • '''
Author: B. Bradford adapted from instructional information at
bearcave.com/Ian Kaplan.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

class bbdaubechieslift(bt.indicators.PeriodN):

sqrt3 = math.sqrt(3)
sqrt2 = math.sqrt(2)

plotinfo = dict(subplot=True)
lines = ('dblift', )
plotlines = dict(haarlift=dict(alpha=0.75, ls='-', linewidth=1.0))
params = (('coeffpick', None),
('donoholev', 1),
('donflag', True),
('threshlist', []),
('threshall', None),
('threshtype', "H"), )      #Hard or soft thresholding rules  2**coefflev data points required or 2**coefflev

@staticmethod
def next_power_of_2(x):
return 1 if x == 0 else 2 ** (x.bit_length() -1)

def init(self):
self.addminperiod(2 ** self.params.coefflev + 1)

def once(self, start, end):
self.blnforward = 1
self.blninverse = 2

#create dummy line to manipulate and desired 2** data length (max or selected)
lstinputdata = self.data.array[:]                                   #make a copy of the list/array

#db is padded out in the case of None coefflev rather than truncated
pow2 = self.next_power_of_2(len(lstinputdata)) * 2  # input needs to be power of two data points pad up to next pow2

#Count backwards through line to leave most recent value in place if incremental: -- not provided here

inlist = lstinputdata[:]

#mirror inlist to avoid edge discontinuities -- make sure it's even length for daubechies:

mirlistlen = len(self.data.array)
mirpostlen = int((pow2 - len(lstinputdata))/2)
mirprelen = pow2 - mirlistlen - mirpostlen
mirlist =  inlist[:mirprelen][::-1] + inlist[:mirlistlen] + inlist[:-mirpostlen-1:-1]

ftresult = bbdaubechieslift.forwardtrans(self, vec=mirlist)            #forward transform
#threshedresult = ftresult
threshedresult = bbdaubechieslift.wavshrink(self, coeffary=ftresult, threshall=self.p.threshall, threshtype=self.p.threshtype, threshlist=self.params.threshlist, donoholev = self.p.donoholev, donflag=self.p.donflag)
#threshedresult = ftresult[:]       #if you don't want to threshold/wavshrink/donoho
itresult = bbdaubechieslift.inversetrans(self, vec=threshedresult)    #inverse transform

#demirror array:
dummyaryout = itresult[mirprelen:mirlistlen + mirprelen]

lstinputdata = dummyaryout[:]                                      # self.lines.array[pow2padding:] + itresult

#Only run once if not doing incremental -- incremental not provided here.

self.lines.dblift.array = lstinputdata       #arr.array('d', [lstinputdata[lstinphalf]] * (len(lstinputdata) - lstinphalf)) + lstinputdata[lstinphalf:]

def predict(self, vec, N, direction):
half = N >> 1

if (direction == self.blnforward):
#unnecessarily aggressive floats to avoid casting as int or concatenating strings:
vec[half] = float(vec[half]) - (self.sqrt3/4.0)*float(vec) - (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
elif (direction == self.blninverse):
vec[half] = float(vec[half]) + (self.sqrt3/4.0)*float(vec) + (((self.sqrt3-2.0)/4.0)*float(vec[half-1]))
else:
debug.print("daubechies:predict: bad direction value")

# predict, forward
for n in range (0, half, 1):
if (direction == self.blnforward):
vec[half+n] = float(vec[half+n]) - (self.sqrt3/4.0)*float(vec[n]) - (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
elif (direction == self.blninverse):
vec[half+n] = float(vec[half+n]) + (self.sqrt3/4.0)*float(vec[n]) + (((self.sqrt3-2.0)/4.0)*float(vec[n-1]))
else:
debug.print("daubechies: predict: bad direction value")
break
return vec

def updateone(self, vec, N, direction):
half = N >> 1
for n in range(0, half):
updateVal = bbdaubechieslift.sqrt3 * float(vec[half+n])

if (direction == self.blnforward):
vec[n] = float(vec[n]) + updateVal
elif (direction == self.blninverse):
vec[n] = float(vec[n]) - updateVal
else:
debug.print("daubechies:updateone: bad direction value")
break
return vec

def update(self, vec, N, direction):
half = N >> 1

for i in range (0, half-1, 1):
if (direction == self.blnforward):
vec[i] = float(vec[i]) - float(vec[half + i + 1])
elif (direction == self.blninverse):
vec[i] = float(vec[i]) + float(vec[half + i + 1])
else:
debug.print("haar update: bad direction value")
break

if (direction == self.blnforward):
vec[half-1] = float(vec[half-1]) - float(vec[half])
elif (direction == self.blninverse):
vec[half-1] = float(vec[half-1]) + float(vec[half])
return vec

def forwardtrans(self, vec):

#The result of forwardTrans is a set of wavelet coefficients
#ordered by increasing frequency and an approximate average
#of the input data set in vec.

N = len(vec)

# set for counter:
n = N
while n > 1:
# beware, vec is altered in-place
bbdaubechieslift.split(vec, n)
bbdaubechieslift.updateone(self, vec, n, self.blnforward)
bbdaubechieslift.predict(self, vec, n, self.blnforward)
bbdaubechieslift.update(self, vec, n, self.blnforward)
bbdaubechieslift.normalize(self, vec, n, self.blnforward)
n = n >> 1
return vec

def inversetrans(self, vec):
N = len(vec)
n = 2
while n <= N:
# beware, vec is altered in-place
bbdaubechieslift.normalize(self, vec, n, self.blninverse)
bbdaubechieslift.update(self, vec, n, self.blninverse)
bbdaubechieslift.predict(self, vec, n, self.blninverse)
bbdaubechieslift.updateone(self, vec, n, self.blninverse)
bbdaubechieslift.merge(self, vec, n)
n = n << 1
return vec

def calcdonoho(self, coeffary, donoholev, accpos):

n = len(coeffary)
#donoholev -- Get donoho coeffs to threshold from this wavelet level

dcoeffstart = -int(sum(accpos[-donoholev:]))
if -donoholev + 1 == 0:
coefflist = coeffary[dcoeffstart:]
else:
dcoeffend = -int(sum(accpos[-donoholev + 1:]))
coefflist = coeffary[dcoeffstart:dcoeffend]

# noise variance sigma from dh is a numeric scalar representing (an estimate of) the additive Gaussian white noise variance.
# Estimate the noise variance based on the median absolute deviation (MAD) of the scale one wavelet coefficients.
n = len(coefflist)
dh = median(coefflist)
mad = median([abs(x - dh) for x in
coefflist])  # list(map(lambda x: abs(x-dh), coefflist)))      #difference from median for 1st scale coeffs
# sigma = list(map(abs, diff))
sigma = mad / 0.6745  # noise standard deviation
donoho = math.sqrt(2 * math.log(n)) * sigma
print ('donoho is:  ', donoho)
return donoho

def normalize(self, vec, N, direction):
half = N >> 1
for n in range(0, half):
if direction == self.blnforward:
vec[half + n] = float(vec[half + n]) - (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) - (
((bbdaubechieslift.sqrt3 - 2) / 4.0) * float(vec[n - 1]))
elif (direction == self.blninverse):
vec[half + n] = float(vec[half + n]) + (bbdaubechieslift.sqrt3 / 4.0) * float(vec[n]) + (
((bbdaubechieslift.sqrt3 - 2.0) / 4.0) * float(vec[n - 1]))
else:
print("daubechies:normalize: bad direction value")
break
return vec

def wavshrink(self, coeffary, threshlist, threshall, threshtype, donoholev, donflag=True):

accpos = list()
accpos += [math.floor((len(coeffary) - 1) / 2) + 2]

while sum(accpos) <= len(coeffary):
accpos.insert(0, math.floor((accpos - 1) / 2) + 2)
accpos = accpos[1:]

donoho = self.calcdonoho(coeffary, donoholev = self.p.donoholev, accpos=accpos)

if threshlist:
for i in range(len(threshlist)-1,-1,-1):
sumaccpos = -int(sum(accpos[i:]))
sumaccposend = -int(sum(accpos[i + 1:]))
if donflag:
donohomult = donoho * threshlist[i]
if donohomult:
for x in range(sumaccpos, sumaccposend):
print ("x is : ", x)

if threshtype.upper() == "S":
if abs(coeffary[x]) < donohomult:
coeffary[x] = 0
elif coeffary[x] < -donohomult:
coeffary[x] += donohomult
elif coeffary[x] > donohomult:
coeffary[x] -= donohomult
elif threshtype.upper() == "H":
if abs(coeffary[x]) < donohomult:
coeffary[x] = 0
else:
for x in range(sumaccpos, sumaccposend):
coeffary[x] = float(coeffary[x]*threshlist[i])

threshedresult = coeffary[:]

#versus thresholding all coefficients:

if threshall is not None:
if donflag:
threshval = donoho * threshall
else:
threshval = threshall

if threshval:
print("threshval is: ", threshval)
for x in range(len(coeffary)):
if threshtype.upper() == "S":
if abs(coeffary[x]) < threshval:
coeffary[x] = 0
if coeffary[x] < -threshval:
coeffary[x] += threshval
elif coeffary[x] > threshval:
coeffary[x] -= threshval
elif threshtype.upper() == "H":
if abs(coeffary[x]) < threshval:
coeffary[x] = 0
threshedresult = coeffary[:]

return threshedresult

def split(vec, N):
start = 1
end = N - 1

while (start < end):
#Could do this with list comprehension, but want to stick to the original structure:
for i in range(start, end, 2):
tmp = vec[i]
vec[i] = vec[i + 1]
vec[i+1] = tmp
start += 1
end -= 1

return vec

def merge(self, vec, N):
half = N >> 1
start = half-1
end = half

while (start > 0):
for i in range(start, end, 2):
tmp = vec[i]
vec[i] = vec[i+1]
vec[i+1] = tmp
start -= 1
end += 1

return vec

• And a simple test script to run it:

import backtrader as bt
import sys

sys.path.append('/docker_stocks/bbwavelet')

from datetime import datetime
import backtrader.indicators as btind
from bbwaveletlifting import bbhaarlift, bbdaubechieslift

class firstStrategy(bt.Strategy):
def __init__(self):
self.startlevel = 1
self.endlevel = 5
self.dbsig = [None] * 10

self.barcount = bbbarcounter(largedist=500, majordist=100, minordist=10, plotdistance=0.07)

for i in range(self.startlevel, self.endlevel+1):
threshamount = (self.endlevel+1-i)/1000
threshlist = [threshamount] * (self.endlevel)

self.dbsig[i] = bbdaubechieslift(self.data.close, threshall=threshamount, threshtype = "S", donoholev = 3, donflag=True, subplot=True)

#Another example usage:  self.dbsig[i] = bbdaubechieslift(self.data.close, threshlist = [0.5, 0.5, 0.5, 0.5, 0.5], threshtype = "S", donoholev = 3, donflag=True, subplot=True)

# Variable for our starting cash
startcash = 10000

# Create an instance of cerebro
cerebro = bt.Cerebro()

# Add our strategy

# Get Apple data from Yahoo Finance.
data = bt.feeds.YahooFinanceData(
dataname='AAPL',
fromdate=datetime(2017, 4, 1),
todate=datetime(2019, 4, 10),
buffered=True
)

# Add the data to Cerebro

# Set our desired cash start
cerebro.broker.setcash(startcash)

# Run over everything
cerebro.run()

# Get final portfolio Value
portvalue = cerebro.broker.getvalue()
pnl = portvalue - startcash

# Print out the final result
print('Final Portfolio Value: \${}'.format(portvalue))
print('P/L: \${}'.format(pnl))

# Finally plot the end results
cerebro.plot(style='candlestick')

• Thanks for sharing!

});