For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Live feed inplace resampling



  • Any one know if it is possible to resample a feed inplace in a strategy?

    I have a live feed coming into a strategy but i can only get 1M, 5M, 1H and 1D resolution on the candles but I need to run on other resolutions like for example 2H. Any ideas to how this is done best?

    In other words I guess I need to manipulate the data feed while the strategy is running.

    I also ready tries to add the live feed with:

    cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=120)
    

    But that does not work.



  • @søren-pallesen said in Live feed inplace resampling:

    cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=120)
    

    You can see similar constructs in the samples for ib and oanda

    Should work. It would help us if you let us know why it didn't work (or why you believe it didn't) and how the original data feed is being created. An isolated line doesn't tell the world much (it does to you, because the context is known to you)



  • Sorry about that. Offcause here you go @Paska-Houso . Some background and context.

    The feed and store implementation is based on @Ed-Bartosh work from thie thread: https://community.backtrader.com/topic/623/anyone-use-backtrader-to-do-live-trading-on-bitcoin-exchange which has resulted in this implementation: https://github.com/bartosh/backtrader/tree/ccxt that utilises the CCXT library to facilitate integration against numerus crypto currency exchanges. In other words the live feed and store is hooked up to CCXT. From above Github repo:

    CCXTfeed:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
    # Copyright (C) 2017 Ed Bartosh
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    from collections import deque
    from datetime import datetime
    
    import backtrader as bt
    from backtrader.feed import DataBase
    from backtrader.stores.ccxtstore import CCXTStore
    
    class CCXT(DataBase):
        """
        CryptoCurrency eXchange Trading Library Data Feed.
        Params:
          - ``historical`` (default: ``False``)
            If set to ``True`` the data feed will stop after doing the first
            download of data.
            The standard data feed parameters ``fromdate`` and ``todate`` will be
            used as reference.
          - ``backfill_start`` (default: ``True``)
            Perform backfilling at the start. The maximum possible historical data
            will be fetched in a single request.
        """
    
        params = (
            ('historical', False),  # only historical download
            ('backfill_start', False),  # do backfilling at the start
        )
    
        # States for the Finite State Machine in _load
        _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(3)
    
        def __init__(self, exchange, symbol, ohlcv_limit=450, config={}, retries=5):
            self.symbol = symbol
            self.ohlcv_limit = ohlcv_limit
    
            self.store = CCXTStore(exchange, config, retries)
    
            self._data = deque() # data queue for price data
            self._last_id = '' # last processed trade id for ohlcv
            self._last_ts = 0 # last processed timestamp for ohlcv
    
        def start(self, ):
            DataBase.start(self)
    
            if self.p.fromdate:
                self._state = self._ST_HISTORBACK
                self.put_notification(self.DELAYED)
    
                self._fetch_ohlcv(self.p.fromdate)
            else:
                self._state = self._ST_LIVE
                self.put_notification(self.LIVE)
    
        def _load(self):
            if self._state == self._ST_OVER:
                return False
    
            while True:
                if self._state == self._ST_LIVE:
                    if self._timeframe == bt.TimeFrame.Ticks:
                        return self._load_ticks()
                    else:
                        self._fetch_ohlcv()
                        return self._load_ohlcv()
                elif self._state == self._ST_HISTORBACK:
                    ret = self._load_ohlcv()
                    if ret:
                        return ret
                    else:
                        # End of historical data
                        if self.p.historical:  # only historical
                            self.put_notification(self.DISCONNECTED)
                            self._state = self._ST_OVER
                            return False  # end of historical
                        else:
                            self._state = self._ST_LIVE
                            self.put_notification(self.LIVE)
                            continue
    
        def _fetch_ohlcv(self, fromdate=None):
            """Fetch OHLCV data into self._data queue"""
            granularity = self.store.get_granularity(self._timeframe, self._compression)
    
            if fromdate:
                since = int((fromdate - datetime(1970, 1, 1)).total_seconds() * 1000)
            else:
                if self._last_ts > 0:
                    since = self._last_ts
                else:
                    since = None
    
            limit = self.ohlcv_limit
    
            while True:
                dlen = len(self._data)
                for ohlcv in self.store.fetch_ohlcv(self.symbol, timeframe=granularity,
                                                    since=since, limit=limit)[::-1]:
                    if None in ohlcv:
                        continue
    
                    tstamp = ohlcv[0]
                    if tstamp > self._last_ts:
                        self._data.append(ohlcv)
                        self._last_ts = tstamp
                        since = tstamp + 1
    
                if dlen == len(self._data):
                    break
    
        def _load_ticks(self):
            if self._last_id is None:
                # first time get the latest trade only
                trades = [self.store.fetch_trades(self.symbol)[-1]]
            else:
                trades = self.store.fetch_trades(self.symbol)
    
            for trade in trades:
                trade_id = trade['id']
    
                if trade_id > self._last_id:
                    trade_time = datetime.strptime(trade['datetime'], '%Y-%m-%dT%H:%M:%S.%fZ')
                    self._data.append((trade_time, float(trade['price']), float(trade['amount'])))
                    self._last_id = trade_id
    
            try:
                trade = self._data.popleft()
            except IndexError:
                return None # no data in the queue
    
            trade_time, price, size = trade
    
            self.lines.datetime[0] = bt.date2num(trade_time)
            self.lines.open[0] = price
            self.lines.high[0] = price
            self.lines.low[0] = price
            self.lines.close[0] = price
            self.lines.volume[0] = size
    
            return True
    
        def _load_ohlcv(self):
            try:
                ohlcv = self._data.popleft()
            except IndexError:
                return None # no data in the queue
    
            tstamp, open_, high, low, close, volume = ohlcv
    
            dtime = datetime.utcfromtimestamp(tstamp // 1000)
    
            self.lines.datetime[0] = bt.date2num(dtime)
            self.lines.open[0] = open_
            self.lines.high[0] = high
            self.lines.low[0] = low
            self.lines.close[0] = close
            self.lines.volume[0] = volume
    
            return True
    
        def haslivedata(self):
            return self._state == self._ST_LIVE and self._data
    
        def islive(self):
            return not self.p.historical
    

    And the CCXTstore:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2017 Ed Bartosh <bartosh@gmail.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import time
    from functools import wraps
    
    import ccxt
    from ccxt.base.errors import NetworkError
    
    import backtrader as bt
    
    
    class CCXTStore(object):
        '''API provider for CCXT feed and broker classes.'''
    
        # Supported granularities
        _GRANULARITIES = {
            (bt.TimeFrame.Minutes, 1): '1m',
            (bt.TimeFrame.Minutes, 3): '3m',
            (bt.TimeFrame.Minutes, 5): '5m',
            (bt.TimeFrame.Minutes, 15): '15m',
            (bt.TimeFrame.Minutes, 30): '30m',
            (bt.TimeFrame.Minutes, 60): '1h',
            (bt.TimeFrame.Minutes, 90): '90m',
            (bt.TimeFrame.Minutes, 120): '2h',
            (bt.TimeFrame.Minutes, 240): '4h',
            (bt.TimeFrame.Minutes, 360): '6h',
            (bt.TimeFrame.Minutes, 480): '8h',
            (bt.TimeFrame.Minutes, 720): '12h',
            (bt.TimeFrame.Days, 1): '1d',
            (bt.TimeFrame.Days, 3): '3d',
            (bt.TimeFrame.Weeks, 1): '1w',
            (bt.TimeFrame.Weeks, 2): '2w',
            (bt.TimeFrame.Months, 1): '1M',
            (bt.TimeFrame.Months, 3): '3M',
            (bt.TimeFrame.Months, 6): '6M',
            (bt.TimeFrame.Years, 1): '1y',
        }
    
        def __init__(self, exchange, config, retries):
            self.exchange = getattr(ccxt, exchange)(config)
            self.retries = retries
    
        def get_granularity(self, timeframe, compression):
            if not self.exchange.hasFetchOHLCV:
                raise NotImplementedError("'%s' exchange doesn't support fetching OHLCV data" % \
                                          self.exchange.name)
    
            granularity = self._GRANULARITIES.get((timeframe, compression))
            if granularity is None:
                raise ValueError("backtrader CCXT module doesn't support fetching OHLCV "
                                 "data for time frame %s, comression %s" % \
                                 (bt.TimeFrame.getname(timeframe), compression))
    
            if self.exchange.timeframes and granularity not in self.exchange.timeframes:
                raise ValueError("'%s' exchange doesn't support fetching OHLCV data for "
                                 "%s time frame" % (self.exchange.name, granularity))
    
            return granularity
    
        def retry(method):
            @wraps(method)
            def retry_method(self, *args, **kwargs):
                for i in range(self.retries):
                    time.sleep(self.exchange.rateLimit / 1000)
                    try:
                        return method(self, *args, **kwargs)
                    except NetworkError:
                        if i == self.retries - 1:
                            raise
    
            return retry_method
    
        @retry
        def getcash(self, currency):
            return self.exchange.fetch_balance()['free'][currency]
    
        @retry
        def getvalue(self, currency):
            return self.exchange.fetch_balance()['total'][currency]
    
        @retry
        def getposition(self, currency):
            return self.getvalue(currency)
    
        @retry
        def create_order(self, symbol, order_type, side, amount, price, params):
            return self.exchange.create_order(symbol=symbol, type=order_type, side=side,
                                              amount=amount, price=price, params=params)
    
        @retry
        def cancel_order(self, order_id):
            return self.exchange.cancel_order(order_id)
    
        @retry
        def fetch_trades(self, symbol):
            return self.exchange.fetch_trades(symbol)
    
        @retry
        def fetch_ohlcv(self, symbol, timeframe, since, limit):
            return self.exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
    

    Here is a test script integrating with the crypto exchange Bitmex live with 1 minute bars. My problem is the implementation only support the native resolutions from the exchange which are 1M, 5M, 1H and 1D, however i need to resample to other resolutions and dont know how to get that done.

    Test script:

    # !/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    import time
    from datetime import datetime, timedelta
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        def next(self):
            print('*' * 5, 'NEXT:', bt.num2date(self.data.datetime[0]), self.data._name, self.data.open[0], self.data.high[0],
                  self.data.low[0], self.data.close[0], self.data.volume[0],
                  bt.TimeFrame.getname(self.data._timeframe), len(self.data))
    
    if __name__ == '__main__':
        cerebro = bt.Cerebro()
        
        hist_start_date = datetime.utcnow() - timedelta(minutes=10)
        data_ticks = bt.feeds.CCXT(exchange='bitmex', symbol='BTC/USD', name="btc_usd_tick",
                               timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, compression=1)
    
        cerebro.adddata(data_ticks)
        cerebro.addstrategy(TestStrategy)
        cerebro.run()
    


  • Not to be pedantic, but there is no cerebro.resampledata in your code. Testing would be better done if after data to the system (bt.TimeFrame.Minutes, 1), you did:

    cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=2)
    

    So you only have to wait 2 minutes until you see if things are working.

    And in TestStrategy prepare output for as many data feeds as you have added. Start only with close and build from there onwards.

    class TestStrategy(bt.Strategy):
        def next(self):     
            for i, d in enumerate(self.datas):
                txt = []
                txt.append('%04d' % len(d))
                txt.append('Data_%d' % i)
                txt.append(d.datetime.datetime().isoformat())
                txt.append('%.4f' % d.close[0])
                print(','.join(txt))
    

    Let us see what happens.



  • @paska-houso said in Live feed inplace resampling:

    for as many data feeds as you have added

    Yes i know - i took out cerebro.resampledata to give you the script that actually worked :--)

    I got your idea and will try it out. Many thx for the suggestion. I will report back later.



  • Holly smokes - I think that did it:

    Test script:

    # !/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    import time
    from datetime import datetime, timedelta
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        def next(self):
            for i, d in enumerate(self.datas):
                txt = []
                txt.append('%04d' % len(d))
                txt.append('Data_%d' % i)
                txt.append(d.datetime.datetime().isoformat())
                txt.append('%.4f' % d.close[0])
                print(','.join(txt))
                
    #         print('*' * 5, 'NEXT:', bt.num2date(self.data.datetime[0]), self.data._name, self.data.open[0], self.data.high[0],
    #               self.data.low[0], self.data.close[0], self.data.volume[0],
    #               bt.TimeFrame.getname(self.data._timeframe), len(self.data))
    
    
    if __name__ == '__main__':
        cerebro = bt.Cerebro()
        
        hist_start_date = datetime.utcnow() - timedelta(minutes=10)
        data_ticks = bt.feeds.CCXT(exchange='bitmex', symbol='BTC/USD', name="btc_usd_tick",
                               timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, compression=1)
    
    
        cerebro.adddata(data_ticks)
        cerebro.resampledata(data_ticks, timeframe=bt.TimeFrame.Minutes, compression=2)
        cerebro.addstrategy(TestStrategy)
        cerebro.run()
    

    Output:

    0002,Data_0,2018-01-08T19:40:00,15040.0000
    0001,Data_1,2018-01-08T19:40:00,15040.0000
    0003,Data_0,2018-01-08T19:41:00,15038.0000
    0001,Data_1,2018-01-08T19:40:00,15040.0000
    0004,Data_0,2018-01-08T19:42:00,15054.5000
    0002,Data_1,2018-01-08T19:42:00,15054.5000
    0005,Data_0,2018-01-08T19:43:00,15020.5000
    0002,Data_1,2018-01-08T19:42:00,15054.5000
    0006,Data_0,2018-01-08T19:44:00,15016.0000
    0003,Data_1,2018-01-08T19:44:00,15016.0000
    0007,Data_0,2018-01-08T19:45:00,15030.0000
    0003,Data_1,2018-01-08T19:44:00,15016.0000
    0008,Data_0,2018-01-08T19:46:00,15053.5000
    0004,Data_1,2018-01-08T19:46:00,15053.5000
    0009,Data_0,2018-01-08T19:47:00,15081.0000
    0004,Data_1,2018-01-08T19:46:00,15053.5000
    0010,Data_0,2018-01-08T19:48:00,15081.5000
    0005,Data_1,2018-01-08T19:48:00,15081.5000
    0011,Data_0,2018-01-08T19:49:00,15040.5000
    0005,Data_1,2018-01-08T19:48:00,15081.5000
    

    Thx a bunch Sir ! :-)