I haven't looked at this one, but perhaps this one will help? I tried to tighten an existing zigzag indicator up:
Posts made by bigdavediode
RE: Zigzag indicator
RE: How to cached data from IB and use it later
Although this routine is great for its simple flat-file data caching, it does have an implementation problem that can shut down the connection before the downloading of data is completed with its sleep statements. Another issue is that, even for small downloads, the sleep statements hold up processing and when one is trying to do analysis of 100-200 securities/hour these sleep statements have to be removed as below.
Ideally, the callback will just terminate the connection correctly when the "finished" response is received from the api. So I've altered the code below. It's a bit hard to follow due to the callbacks but appears to work effectively. Please note the timeoutsecs argument should be specified as it is the maximum that the routine will wait for the download.
from ib.opt import ibConnection, message from ib.ext.Contract import Contract from time import time, strftime from datetime import datetime import pandas as pd class IBDataCache(object): def _reset_data(self, host='127.0.0.1', port=4001, client_id=None): self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest']) self._s = pd.Series() #Initialize connection as long as it's not already connected: if (not hasattr(self, '_conn')) or (not self._conn.isConnected()): self._conn = ibConnection(host, port, client_id) self._conn.enableLogging() # Register the response callback function and type of data to be returned self._conn.register(self._error_handler, message.Error) self._conn.register(self._historical_data_handler, message.historicalData) self._conn.register(self._save_order_id, 'NextValidId') self._conn.register(self._nextValidId_handler, message.nextValidId) self._conn.connect() def _save_order_id(self, msg): self._next_valid_id = msg.orderId print('save order id',msg.orderId) def _nextValidId_handler(self, msg): print("nextValidId_handler: ", msg) self.inner(sec_type=self.req.m_secType, symbol=self.req.m_symbol, currency=self.req.m_currency, exchange=self.req.m_exchange, \ primaryExchange=self.req.m_primaryExch, endtime=self.req.endtime, duration=self.req.duration, \ bar_size=self.req.bar_size, what_to_show=self.req.what_to_show, use_rth=self.req.use_rth) def _error_handler(self, msg): print("error: ", msg) if not msg: print('disconnecting', self._conn.disconnect()) def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None): self._data_path = data_path self._date_format = date_format self._next_valid_id = 1 self._host = host self._port = port self._client_id = client_id def _historical_data_handler(self, msg): """ Define historical data handler for IB - this will populate our pandas data frame """ # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low) if not 'finished' in str(msg.date): #print ("historical_data_handler: ", msg) try: self._s = ([datetime.strptime(msg.date, self._date_format), msg.open, msg.high, msg.low, msg.close, msg.volume, 0]) except: #for dates only with no time to str: self._s = ([datetime.strptime(msg.date, "%Y%m%d"), msg.open, msg.high, msg.low, msg.close, msg.volume, 0]) self._df.loc[len(self._df)] = self._s else: self._df.set_index('Date', inplace=True) def setAllWithKwArgs(self, **kwargs): #set all attributes to the kwargs to pass along for key, value in kwargs.items(): setattr(self, key, value) def inner(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth): print ("calling inner... setting up req.") self.req = Contract() self.req.m_secType = sec_type self.req.m_symbol = symbol self.req.m_currency = currency self.req.m_exchange = exchange self.primaryExch = primaryExchange self.req.endtime = endtime self.req.duration = duration self.req.bar_size = bar_size self.req.what_to_show = what_to_show self.req.use_rth = use_rth self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration, bar_size, what_to_show, use_rth, 1) def get_dataframe(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth, timeoutsecs): # build filename self.filename = symbol + '_' + sec_type + '_' + exchange + '_' + currency + '_' + \ endtime.replace(' ', '') + '_' + duration.replace(' ', '') + '_' + bar_size.replace(' ', '') + '_' + \ what_to_show + '_' + str(use_rth) + '.csv' self.filename = self.filename.replace('/', '.') self.filename = self._data_path + '/' + self.filename print ("filename: ", self.filename) try: # check if we have this cached self._df = pd.read_csv(self.filename, parse_dates=True, index_col=0) self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%d %H:%M:%S') except IOError: #set up connection: self._reset_data(self._host, self._port, self._client_id) # Not cached. Download it. # Establish a Contract object and the params for the request self.inner(sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth) # Make sure the connection doesn't get disconnected prior the response data return timeout = time() + timeoutsecs while self._conn.isConnected() and time() < timeout: #print(".", end="", flush=True) pass self._conn.disconnect() self._df.to_csv(self.filename) return self._df if __name__ == '__main__': date_format = '%Y%m%d %H:%M:%S' downloader_kwargs = dict( data_path='../data', date_format=date_format, host='127.0.0.1', port=4001, client_id=992 ) downloader = IBDataCache(**downloader_kwargs) stock_kwargs = dict( sec_type='STK', symbol='AAPL', currency='USD', exchange='SMART', primaryExchange='NASDAQ', endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format), duration='2 D', bar_size='30 mins', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print(df) stock_kwargs = dict( sec_type='STK', symbol='MSFT', currency='USD', exchange='SMART', primaryExchange='NASDAQ', endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format), duration='1 D', bar_size='1 min', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print ("IBCacheData") print(df)
(As a side note, the int64 error mentioned above is due to the sleep statement from the original code terminating before the downloading is completed.)
RE: Help with Volume-Weigthed Moving Average (VWMA) indicator needed
Hi, yes, among many other items I've created a rolling VWAP for backtrader which I've found useful. And as a challenge to myself I did it in four actual lines of code. If you use this you need to pay me by shipping me a box of your local delicacies (although not dried fish or lutefisk).
Investopedia has some good entries on VWAP and MVWAP as I recall.
''' Author: B. Bradford MIT License Copyright (c) B. Bradford Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. That they contact me for shipping information for the purpose of sending a local delicacy of their choice native to whatever region they are domiciled. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class VolumeWeightedAveragePrice(bt.Indicator): plotinfo = dict(subplot=False) params = (('period', 30), ) alias = ('VWAP', 'VolumeWeightedAveragePrice',) lines = ('VWAP',) plotlines = dict(VWAP=dict(alpha=0.50, linestyle='-.', linewidth=2.0)) def __init__(self): # Before super to ensure mixins (right-hand side in subclassing) # can see the assignment operation and operate on the line cumvol = bt.ind.SumN(self.data.volume, period = self.p.period) typprice = ((self.data.close + self.data.high + self.data.low)/3) * self.data.volume cumtypprice = bt.ind.SumN(typprice, period=self.p.period) self.lines = cumtypprice / cumvol super(VolumeWeightedAveragePrice, self).__init__()
RE: More IB bugs in live trading
@ard9000 Without putting in an hour on your code I can tell you two things that I've found in my experience have been difficult for me. The first is that stop losses are tricky to implement properly and often they won't show any trades at all, for example, if a position is already opened and the code, hypothetically, only allows one open position at a time. The stop trail would not update when, as one example, there was an issue being called during next at a bar, and thus no trades would subsequently open as there was already a position open.
Another issue was capturing the errors from the data provider, in my case, IB. When I used the standard method of retrieving data, returned data provider error codes were not broken out in debug mode. For example, in your case this could be because you haven't sufficiently specified the exchange/primaryexchange. In that event this could mask the problem, or perhaps obtain varying data from different exchanges on the same instrument due to smart routing.
In my case (with IB) I cache data directly from an IBDataCache object to a pandas dataframe (just search for IB data cache on here if you want to look at this) which facilitated monitoring data provider errors/return codes, such as exchange, and then adjust the primaryexchange or other required data provider api arguments appropriately to specify a particular exchange.
RE: Interactive Broker order OutsideRth attribute
Hi Kevin --
Another option for you is to use the IB data cache code that brettelliot wrote at https://community.backtrader.com/topic/1007/how-to-cached-data-from-ib-and-use-it-later/2
It exposes the IB connection so that the ib api parameter for RTH is available (use_rth).
Also gets you a nice cache and a bit of a speedup as well.
RE: How to cached data from IB and use it later
Your code is extremely useful when dealing with IB's down times. And it's a nice speedup as well.
Just one quick suggestion/bug fix -- I've found that sometimes there's a numpy int64 error generated by line 268 in pandafeed (AttributeError: 'numpy.int64' object has no attribute 'to_pydatetime') due to the fact that some datetimes are stored in int64 in pandas and need to be converted manually. The line I've added to alleviate this is:
try: # check if we have this cached self._df = pd.read_csv(filename, parse_dates=True, index_col=0) self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%dT%H:%M:%S')
More information on this datetime conversion can be found here: https://community.backtrader.com/topic/676/bug-using-pandas-hdf
Again, great work brettelliot.
RE: Zigzag Indicator
@backtrader My pleasure Daniel - I'm probably being too cagey with the above so here's something that might (or might not) help you and what I am working towards. The below is my use of a zigzag indicator with very minimal filtering to identify wavelet inflection points at various increasing frequencies (decomposition level edge detection):
So far so good.
RE: Zigzag Indicator
As well you can specify a minimum threshold for the current bar minus the last bar to qualify as being recognized as a minimum, as well as a threshold for the minimum distance to the last peak/valley. Naturally, as in the case below, if you set these thresholds to non-zero values you can have peaks without valleys and valleys without peaks in some instances.
If you find a bug please let me know and any feedback is appreciated.
And here's a screenshot of this on a randomly (and poorly) selected SMA and the thresholds were just set arbitrarily just to show an example:
Re: Zigzag indicator
The previously posted zigzag study above was quite useful and my compliments to the authors. I needed something slightly different, however, more suitable for real-time identification and so here's a slightly tighter version that's about 80% smaller (from 100 lines of code down to about 18).
This zigzag indicator is more of an indicator than a study and therefore it identifies peaks one bar "late" which is the only way to behave for real time peak identification. Most importantly, it should (in theory) not set peaks/valleys retroactively. It does not offer the various lines of the other zigzag -- I had no need to know the bars since the last peak or valley but these can be added after the fact in a single line by counting a list comprehension or by using a ternary iteration.
''' Author: B. Bradford MIT License Copyright (c) B. Bradford Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' class bbzigzag(bt.Indicator): plotinfo = dict(subplot=True, zigzag=dict(_name='zigzag', color='lightblue', ls='--', _skipnan=True), ) plotlines = dict( zigzag_peak=dict(marker='v', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag_valley=dict(marker='^', markersize=4.0, color='red', fillstyle='full', ls=''), zigzag=dict(_name='zigzag', color='red', ls='--', _skipnan=True), ) params = ( ('up_retrace', 0.015), ('dn_retrace', 0.015), ('bardist', 0.015), # distance to last max/min (perc/100) ('plotdistance', 0.03), #distance to plot arrows (alters high/low indicator lines but not zigzag line) ) lines = ('zigzag', 'zigzag_peak', 'zigzag_valley') def __init__(self): self.setminperiod(2) self.pks =  def once(self, start, end): lstinputdata = self.datas.array[:] self.trnd = [0 if lstinputdata[i] == lstinputdata[i-1] else 1 if lstinputdata[i-1] > lstinputdata[i] else -1 for i in range(1, len(lstinputdata))] self.nogaps = [self.trnd[i-1] if self.trnd[i] == 0 else self.trnd[i] for i in range(len(self.trnd))] nogapslast=self.nogaps for i in range(len(self.nogaps)): if nogapslast != self.nogaps[i]: # peak found if abs(lstinputdata[i] - self.pks[-1]) > self.params.bardist * self.pks[-1]: self.pks.append(lstinputdata[i]) idx = -(len(self.nogaps)-i-1) absplotdist = lstinputdata[i] * self.params.plotdistance if self.nogaps[i] > nogapslast: self.lines.zigzag_peak[idx] = lstinputdata[i] + absplotdist if self.nogaps[i] < nogapslast: self.lines.zigzag_valley[idx] = lstinputdata[i] - absplotdist self.lines.zigzag[idx] = (self.zigzag_peak[idx] - absplotdist) if self.zigzag_peak[idx] == self.zigzag_peak[idx] else (self.zigzag_valley[idx] + absplotdist) if self.zigzag_valley[idx] == self.zigzag_valley[idx] else float('NaN') nogapslast = self.nogaps[i]
RE: Haar Wavelet with Lifting and Incremental Option
@backtrader Thanks -- in the same vein I greatly admire your work on Backtrader. It's excellent.