Live feed inplace resampling
-
Any one know if it is possible to resample a feed inplace in a strategy?
I have a live feed coming into a strategy but i can only get 1M, 5M, 1H and 1D resolution on the candles but I need to run on other resolutions like for example 2H. Any ideas to how this is done best?
In other words I guess I need to manipulate the data feed while the strategy is running.
I also ready tries to add the live feed with:
cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=120)
But that does not work.
-
@søren-pallesen said in Live feed inplace resampling:
cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=120)
You can see similar constructs in the samples for
ib
andoanda
Should work. It would help us if you let us know why it didn't work (or why you believe it didn't) and how the original data feed is being created. An isolated line doesn't tell the world much (it does to you, because the context is known to you)
-
Sorry about that. Offcause here you go @Paska-Houso . Some background and context.
The feed and store implementation is based on @Ed-Bartosh work from thie thread: https://community.backtrader.com/topic/623/anyone-use-backtrader-to-do-live-trading-on-bitcoin-exchange which has resulted in this implementation: https://github.com/bartosh/backtrader/tree/ccxt that utilises the CCXT library to facilitate integration against numerus crypto currency exchanges. In other words the live feed and store is hooked up to CCXT. From above Github repo:
CCXTfeed:
#!/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- ############################################################################### # # Copyright (C) 2015, 2016, 2017 Daniel Rodriguez # Copyright (C) 2017 Ed Bartosh # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################### from __future__ import (absolute_import, division, print_function, unicode_literals) from collections import deque from datetime import datetime import backtrader as bt from backtrader.feed import DataBase from backtrader.stores.ccxtstore import CCXTStore class CCXT(DataBase): """ CryptoCurrency eXchange Trading Library Data Feed. Params: - ``historical`` (default: ``False``) If set to ``True`` the data feed will stop after doing the first download of data. The standard data feed parameters ``fromdate`` and ``todate`` will be used as reference. - ``backfill_start`` (default: ``True``) Perform backfilling at the start. The maximum possible historical data will be fetched in a single request. """ params = ( ('historical', False), # only historical download ('backfill_start', False), # do backfilling at the start ) # States for the Finite State Machine in _load _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(3) def __init__(self, exchange, symbol, ohlcv_limit=450, config={}, retries=5): self.symbol = symbol self.ohlcv_limit = ohlcv_limit self.store = CCXTStore(exchange, config, retries) self._data = deque() # data queue for price data self._last_id = '' # last processed trade id for ohlcv self._last_ts = 0 # last processed timestamp for ohlcv def start(self, ): DataBase.start(self) if self.p.fromdate: self._state = self._ST_HISTORBACK self.put_notification(self.DELAYED) self._fetch_ohlcv(self.p.fromdate) else: self._state = self._ST_LIVE self.put_notification(self.LIVE) def _load(self): if self._state == self._ST_OVER: return False while True: if self._state == self._ST_LIVE: if self._timeframe == bt.TimeFrame.Ticks: return self._load_ticks() else: self._fetch_ohlcv() return self._load_ohlcv() elif self._state == self._ST_HISTORBACK: ret = self._load_ohlcv() if ret: return ret else: # End of historical data if self.p.historical: # only historical self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # end of historical else: self._state = self._ST_LIVE self.put_notification(self.LIVE) continue def _fetch_ohlcv(self, fromdate=None): """Fetch OHLCV data into self._data queue""" granularity = self.store.get_granularity(self._timeframe, self._compression) if fromdate: since = int((fromdate - datetime(1970, 1, 1)).total_seconds() * 1000) else: if self._last_ts > 0: since = self._last_ts else: since = None limit = self.ohlcv_limit while True: dlen = len(self._data) for ohlcv in self.store.fetch_ohlcv(self.symbol, timeframe=granularity, since=since, limit=limit)[::-1]: if None in ohlcv: continue tstamp = ohlcv[0] if tstamp > self._last_ts: self._data.append(ohlcv) self._last_ts = tstamp since = tstamp + 1 if dlen == len(self._data): break def _load_ticks(self): if self._last_id is None: # first time get the latest trade only trades = [self.store.fetch_trades(self.symbol)[-1]] else: trades = self.store.fetch_trades(self.symbol) for trade in trades: trade_id = trade['id'] if trade_id > self._last_id: trade_time = datetime.strptime(trade['datetime'], '%Y-%m-%dT%H:%M:%S.%fZ') self._data.append((trade_time, float(trade['price']), float(trade['amount']))) self._last_id = trade_id try: trade = self._data.popleft() except IndexError: return None # no data in the queue trade_time, price, size = trade self.lines.datetime[0] = bt.date2num(trade_time) self.lines.open[0] = price self.lines.high[0] = price self.lines.low[0] = price self.lines.close[0] = price self.lines.volume[0] = size return True def _load_ohlcv(self): try: ohlcv = self._data.popleft() except IndexError: return None # no data in the queue tstamp, open_, high, low, close, volume = ohlcv dtime = datetime.utcfromtimestamp(tstamp // 1000) self.lines.datetime[0] = bt.date2num(dtime) self.lines.open[0] = open_ self.lines.high[0] = high self.lines.low[0] = low self.lines.close[0] = close self.lines.volume[0] = volume return True def haslivedata(self): return self._state == self._ST_LIVE and self._data def islive(self): return not self.p.historical
And the CCXTstore:
#!/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- ############################################################################### # # Copyright (C) 2017 Ed Bartosh <bartosh@gmail.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################### from __future__ import (absolute_import, division, print_function, unicode_literals) import time from functools import wraps import ccxt from ccxt.base.errors import NetworkError import backtrader as bt class CCXTStore(object): '''API provider for CCXT feed and broker classes.''' # Supported granularities _GRANULARITIES = { (bt.TimeFrame.Minutes, 1): '1m', (bt.TimeFrame.Minutes, 3): '3m', (bt.TimeFrame.Minutes, 5): '5m', (bt.TimeFrame.Minutes, 15): '15m', (bt.TimeFrame.Minutes, 30): '30m', (bt.TimeFrame.Minutes, 60): '1h', (bt.TimeFrame.Minutes, 90): '90m', (bt.TimeFrame.Minutes, 120): '2h', (bt.TimeFrame.Minutes, 240): '4h', (bt.TimeFrame.Minutes, 360): '6h', (bt.TimeFrame.Minutes, 480): '8h', (bt.TimeFrame.Minutes, 720): '12h', (bt.TimeFrame.Days, 1): '1d', (bt.TimeFrame.Days, 3): '3d', (bt.TimeFrame.Weeks, 1): '1w', (bt.TimeFrame.Weeks, 2): '2w', (bt.TimeFrame.Months, 1): '1M', (bt.TimeFrame.Months, 3): '3M', (bt.TimeFrame.Months, 6): '6M', (bt.TimeFrame.Years, 1): '1y', } def __init__(self, exchange, config, retries): self.exchange = getattr(ccxt, exchange)(config) self.retries = retries def get_granularity(self, timeframe, compression): if not self.exchange.hasFetchOHLCV: raise NotImplementedError("'%s' exchange doesn't support fetching OHLCV data" % \ self.exchange.name) granularity = self._GRANULARITIES.get((timeframe, compression)) if granularity is None: raise ValueError("backtrader CCXT module doesn't support fetching OHLCV " "data for time frame %s, comression %s" % \ (bt.TimeFrame.getname(timeframe), compression)) if self.exchange.timeframes and granularity not in self.exchange.timeframes: raise ValueError("'%s' exchange doesn't support fetching OHLCV data for " "%s time frame" % (self.exchange.name, granularity)) return granularity def retry(method): @wraps(method) def retry_method(self, *args, **kwargs): for i in range(self.retries): time.sleep(self.exchange.rateLimit / 1000) try: return method(self, *args, **kwargs) except NetworkError: if i == self.retries - 1: raise return retry_method @retry def getcash(self, currency): return self.exchange.fetch_balance()['free'][currency] @retry def getvalue(self, currency): return self.exchange.fetch_balance()['total'][currency] @retry def getposition(self, currency): return self.getvalue(currency) @retry def create_order(self, symbol, order_type, side, amount, price, params): return self.exchange.create_order(symbol=symbol, type=order_type, side=side, amount=amount, price=price, params=params) @retry def cancel_order(self, order_id): return self.exchange.cancel_order(order_id) @retry def fetch_trades(self, symbol): return self.exchange.fetch_trades(symbol) @retry def fetch_ohlcv(self, symbol, timeframe, since, limit): return self.exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
Here is a test script integrating with the crypto exchange Bitmex live with 1 minute bars. My problem is the implementation only support the native resolutions from the exchange which are 1M, 5M, 1H and 1D, however i need to resample to other resolutions and dont know how to get that done.
Test script:
# !/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) import time from datetime import datetime, timedelta import backtrader as bt class TestStrategy(bt.Strategy): def next(self): print('*' * 5, 'NEXT:', bt.num2date(self.data.datetime[0]), self.data._name, self.data.open[0], self.data.high[0], self.data.low[0], self.data.close[0], self.data.volume[0], bt.TimeFrame.getname(self.data._timeframe), len(self.data)) if __name__ == '__main__': cerebro = bt.Cerebro() hist_start_date = datetime.utcnow() - timedelta(minutes=10) data_ticks = bt.feeds.CCXT(exchange='bitmex', symbol='BTC/USD', name="btc_usd_tick", timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, compression=1) cerebro.adddata(data_ticks) cerebro.addstrategy(TestStrategy) cerebro.run()
-
Not to be pedantic, but there is no
cerebro.resampledata
in your code. Testing would be better done if afterdata
to the system (bt.TimeFrame.Minutes, 1
), you did:cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=2)
So you only have to wait 2 minutes until you see if things are working.
And in
TestStrategy
prepare output for as many data feeds as you have added. Start only withclose
and build from there onwards.class TestStrategy(bt.Strategy): def next(self): for i, d in enumerate(self.datas): txt = [] txt.append('%04d' % len(d)) txt.append('Data_%d' % i) txt.append(d.datetime.datetime().isoformat()) txt.append('%.4f' % d.close[0]) print(','.join(txt))
Let us see what happens.
-
@paska-houso said in Live feed inplace resampling:
for as many data feeds as you have added
Yes i know - i took out cerebro.resampledata to give you the script that actually worked :--)
I got your idea and will try it out. Many thx for the suggestion. I will report back later.
-
Holly smokes - I think that did it:
Test script:
# !/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- from __future__ import (absolute_import, division, print_function, unicode_literals) import time from datetime import datetime, timedelta import backtrader as bt class TestStrategy(bt.Strategy): def next(self): for i, d in enumerate(self.datas): txt = [] txt.append('%04d' % len(d)) txt.append('Data_%d' % i) txt.append(d.datetime.datetime().isoformat()) txt.append('%.4f' % d.close[0]) print(','.join(txt)) # print('*' * 5, 'NEXT:', bt.num2date(self.data.datetime[0]), self.data._name, self.data.open[0], self.data.high[0], # self.data.low[0], self.data.close[0], self.data.volume[0], # bt.TimeFrame.getname(self.data._timeframe), len(self.data)) if __name__ == '__main__': cerebro = bt.Cerebro() hist_start_date = datetime.utcnow() - timedelta(minutes=10) data_ticks = bt.feeds.CCXT(exchange='bitmex', symbol='BTC/USD', name="btc_usd_tick", timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, compression=1) cerebro.adddata(data_ticks) cerebro.resampledata(data_ticks, timeframe=bt.TimeFrame.Minutes, compression=2) cerebro.addstrategy(TestStrategy) cerebro.run()
Output:
0002,Data_0,2018-01-08T19:40:00,15040.0000 0001,Data_1,2018-01-08T19:40:00,15040.0000 0003,Data_0,2018-01-08T19:41:00,15038.0000 0001,Data_1,2018-01-08T19:40:00,15040.0000 0004,Data_0,2018-01-08T19:42:00,15054.5000 0002,Data_1,2018-01-08T19:42:00,15054.5000 0005,Data_0,2018-01-08T19:43:00,15020.5000 0002,Data_1,2018-01-08T19:42:00,15054.5000 0006,Data_0,2018-01-08T19:44:00,15016.0000 0003,Data_1,2018-01-08T19:44:00,15016.0000 0007,Data_0,2018-01-08T19:45:00,15030.0000 0003,Data_1,2018-01-08T19:44:00,15016.0000 0008,Data_0,2018-01-08T19:46:00,15053.5000 0004,Data_1,2018-01-08T19:46:00,15053.5000 0009,Data_0,2018-01-08T19:47:00,15081.0000 0004,Data_1,2018-01-08T19:46:00,15053.5000 0010,Data_0,2018-01-08T19:48:00,15081.5000 0005,Data_1,2018-01-08T19:48:00,15081.5000 0011,Data_0,2018-01-08T19:49:00,15040.5000 0005,Data_1,2018-01-08T19:48:00,15081.5000
Thx a bunch Sir ! :-)
-
I actually have the same issue, and I tested with the same code. It runs well for the history part (10 next) of the data, but fails right after that. Anybody has an idea?
import sys import backtrader as bt class TestStrategy(bt.Strategy): def notify_data(self, data, status, *args, **kwargs): print('*' * 5, 'DATA NOTIF:', data._getstatusname(status)) def next(self): msg = '***** NEXT: {} '.format(bt.num2date(self.data.datetime[0])) for d in self.datas: msg += '{} {} {} '.format(d._name, len(d), d.close[0]) print(msg) def run(argv): cerebro = bt.Cerebro() data = bt.feeds.CCXT(exchange='coinbasepro', symbol='BTC/USD', timeframe=bt.TimeFrame.Minutes, compression=1, ohlcv_limit=10) cerebro.adddata(data, name='1M') cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=2, name='2M') cerebro.addstrategy(TestStrategy) cerebro.run() if __name__ == '__main__': run(sys.argv)
here is the error
***** DATA NOTIF: LIVE ***** NEXT: 2018-06-27 06:34:00 1M 1 6084.04 2M 1 6084.04 ***** NEXT: 2018-06-27 06:35:00 1M 2 6084.05 2M 1 6084.04 ***** NEXT: 2018-06-27 06:36:00 1M 3 6084.05 2M 2 6084.05 ***** NEXT: 2018-06-27 06:37:00 1M 4 6084.05 2M 2 6084.05 ***** NEXT: 2018-06-27 06:38:00 1M 5 6084.04 2M 3 6084.04 ***** NEXT: 2018-06-27 06:39:00 1M 6 6080.75 2M 3 6084.04 ***** NEXT: 2018-06-27 06:40:00 1M 7 6080.74 2M 4 6080.74 ***** NEXT: 2018-06-27 06:41:00 1M 8 6080.74 2M 4 6080.74 ***** NEXT: 2018-06-27 06:42:00 1M 9 6080.75 2M 5 6080.75 ***** NEXT: 2018-06-27 06:43:00 1M 10 6080.74 2M 5 6080.75 ***** NEXT: 2018-06-27 06:44:00 1M 11 6080.75 2M 6 6080.75 ***** NEXT: 2018-06-27 06:45:00 1M 12 6080.75 2M 6 6080.75 Traceback (most recent call last): File "test.py", line 80, in <module> run(sys.argv) File "test.py", line 76, in run cerebro.run() File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1127, in run File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1298, in runstrategies File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1557, in _runnext ValueError: min() arg is an empty sequence
-
@dan-yang said in Live feed inplace resampling:
I actually have the same issue, and I tested with the same code. It runs well for the history part (10 next) of the data, but fails right after that. Anybody has an idea?
import sys import backtrader as bt class TestStrategy(bt.Strategy): def notify_data(self, data, status, *args, **kwargs): print('*' * 5, 'DATA NOTIF:', data._getstatusname(status)) def next(self): msg = '***** NEXT: {} '.format(bt.num2date(self.data.datetime[0])) for d in self.datas: msg += '{} {} {} '.format(d._name, len(d), d.close[0]) print(msg) def run(argv): cerebro = bt.Cerebro() data = bt.feeds.CCXT(exchange='coinbasepro', symbol='BTC/USD', timeframe=bt.TimeFrame.Minutes, compression=1, ohlcv_limit=10) cerebro.adddata(data, name='1M') cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=2, name='2M') cerebro.addstrategy(TestStrategy) cerebro.run() if __name__ == '__main__': run(sys.argv)
here is the error
***** DATA NOTIF: LIVE ***** NEXT: 2018-06-27 06:34:00 1M 1 6084.04 2M 1 6084.04 ***** NEXT: 2018-06-27 06:35:00 1M 2 6084.05 2M 1 6084.04 ***** NEXT: 2018-06-27 06:36:00 1M 3 6084.05 2M 2 6084.05 ***** NEXT: 2018-06-27 06:37:00 1M 4 6084.05 2M 2 6084.05 ***** NEXT: 2018-06-27 06:38:00 1M 5 6084.04 2M 3 6084.04 ***** NEXT: 2018-06-27 06:39:00 1M 6 6080.75 2M 3 6084.04 ***** NEXT: 2018-06-27 06:40:00 1M 7 6080.74 2M 4 6080.74 ***** NEXT: 2018-06-27 06:41:00 1M 8 6080.74 2M 4 6080.74 ***** NEXT: 2018-06-27 06:42:00 1M 9 6080.75 2M 5 6080.75 ***** NEXT: 2018-06-27 06:43:00 1M 10 6080.74 2M 5 6080.75 ***** NEXT: 2018-06-27 06:44:00 1M 11 6080.75 2M 6 6080.75 ***** NEXT: 2018-06-27 06:45:00 1M 12 6080.75 2M 6 6080.75 Traceback (most recent call last): File "test.py", line 80, in <module> run(sys.argv) File "test.py", line 76, in run cerebro.run() File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1127, in run File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1298, in runstrategies File "c:\Anaconda3\lib\site-packages\backtrader-1.9.64.122-py3.6.egg\backtrader\cerebro.py", line 1557, in _runnext ValueError: min() arg is an empty sequence
Never mind, I found the issue is with my exchange 'coinbasepro'. It works well on other exchange like 'bitmex'. I will leave the post here just in case other people run into the same issue and can learn from my case.