For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Anyone use backtrader to do live trading on Bitcoin exchange?



  • @xord37 Thanks for the fix! I've just merged it.

    Are you able to use feed in minute time frame after the fix?



  • Thanks Ed for your work!
    I've taken your data feed and broker classes and tried to make them more compliant with the backtrader 'store' paradigm. In particular I've moved all 'fetch' methods to the store. I've also added an 'account' thread to the broker, which speeds up all cash/value/position polling, and a method for filling in ohlcv structures when ccxt does not provide.

    Not being much of a git expert, I'm not sure what to do with the code. Will be happy to post here or anywhere else



  • @sutpen Hi, that sounds great! Can you post your changes here please?



  • @Ed-Bartosh Sure. I later thought that that may not be in line with backtrader philosophy because I use pandas to resample ticks to candles. May be a nono here.
    Another thing to note is that ohlcv candles are only published when they are closed.

    ccxttstore:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import collections
    from datetime import datetime, timedelta
    import time as _time
    import json
    import threading
    import numpy as np
    import pandas as pd
    from collections import deque
    
    import ccxt
    
    import backtrader as bt
    from backtrader.metabase import MetaParams
    
    from backtrader.utils.py3 import queue, with_metaclass
    from backtrader.utils import AutoDict
    
    class CCXTqueue(deque):
        def __init(self):
            super(CCXTqueue, self).__init__()
        def put(self, x):
            return(self.append(x))
        def get(self):
            return(self.popleft())
    
    class MetaCCXTStore(MetaParams):
        def __init__(cls, name, bases, dct):
            '''Class has already been created ... register'''
            # Initialize the class
            super(MetaCCXTStore, cls).__init__(name, bases, dct)
    
    
    class CCXTStore(with_metaclass(MetaCCXTStore, object)):
        '''Class wrapping to control the connections to CCXT.
    
        Params:
        exchange
        config
        symbol
        ohlcv_limit
        '''
    
        BrokerCls = None  # broker class will autoregister
        DataCls = None  # data class will auto register
    
        params = (
            ('exchange', 'hitbtc2'),
            ('config', {}),
            ('symbol', 'BTC/USD'),
            ('ohlcv_limit', 10),
            ('account_tmout', 10.0),  # account balance refresh timeout
        )
    
        @classmethod
        def getdata(cls, *args, **kwargs):
            '''Returns ``DataCls`` with args, kwargs'''
            return cls.DataCls(*args, **kwargs)
    
        @classmethod
        def getbroker(cls, *args, **kwargs):
            '''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
            return cls.BrokerCls(*args, **kwargs)
    
        def __init__(self):
            super(CCXTStore, self).__init__()
    
            self.notifs = collections.deque()  # store notifications for cerebro
    
            self._env = None  # reference to cerebro for general notifications
            self.broker = None  # broker instance
            self.datas = list()  # datas that have registered over start
    
            self.exchange = getattr(ccxt, self.p.exchange)(self.p.config)
            self.symbol = self.p.symbol
            self.currency = self.symbol.split('/')[1]
            self.coin = self.symbol.split('/')[0]
    
            self._last_id = None  # last processed data id (trade id or timestamp for ohlcv)
            self._tmp_queue = CCXTqueue()
    
            self._cash = 0.0
            self._value = 0.0
            self._evt_acct = threading.Event()
    
            return
    
        def start(self, data=None, broker=None):
            # Datas require some processing to kickstart data reception
            if data is None and broker is None:
                self.cash = None
                return
    
            if data is not None:
                self._env = data._env
                # For datas simulate a queue with None to kickstart co
                self.datas.append(data)
    
                if self.broker is not None:
                    self.broker.data_started(data)
    
            if broker is not None:
                self.broker = broker
                self.broker_threads()
    
        def stop(self):
            # signal end of thread
            if self.broker is not None:
                #self.q_ordercreate.put(None)
                #self.q_orderclose.put(None)
                self.q_account.put(None)
            return
    
        def put_notification(self, msg, *args, **kwargs):
            self.notifs.append((msg, args, kwargs))
    
        def get_notifications(self):
            '''Return the pending "store" notifications'''
            self.notifs.append(None)  # put a mark / threads could still append
            return [x for x in iter(self.notifs.popleft, None)]
    
        # CCXT supported granularities
        # Supported granularities
        _GRANULARITIES = {
            (bt.TimeFrame.Minutes, 1): '1m',
            (bt.TimeFrame.Minutes, 3): '3m',
            (bt.TimeFrame.Minutes, 5): '5m',
            (bt.TimeFrame.Minutes, 15): '15m',
            (bt.TimeFrame.Minutes, 30): '30m',
            (bt.TimeFrame.Minutes, 60): '1h',
            (bt.TimeFrame.Minutes, 90): '90m',
            (bt.TimeFrame.Minutes, 120): '2h',
            (bt.TimeFrame.Minutes, 240): '4h',
            (bt.TimeFrame.Minutes, 360): '6h',
            (bt.TimeFrame.Minutes, 480): '8h',
            (bt.TimeFrame.Minutes, 720): '12h',
            (bt.TimeFrame.Days, 1): '1d',
            (bt.TimeFrame.Days, 3): '3d',
            (bt.TimeFrame.Weeks, 1): '1w',
            (bt.TimeFrame.Weeks, 2): '2w',
            (bt.TimeFrame.Months, 1): '1M',
            (bt.TimeFrame.Months, 3): '3M',
            (bt.TimeFrame.Months, 6): '6M',
            (bt.TimeFrame.Years, 1): '1y',
        }
    
        def get_granularity(self, timeframe, compression):
            return self._GRANULARITIES.get((timeframe, compression), None)
    
        def _fetch_ticks(self, queue, **kwds):
            _time.sleep(self.exchange.rateLimit / 1000)  # time.sleep wants seconds
            if self._last_id is None:
                # first time get the latest trade only
                try:
                    ex_reply = [self.exchange.fetch_trades(self.symbol)[-1]]
                except Exception as e:
                    error_response = ValueError("Error reading OHLC from exchange")
                    queue.put(e)
                    return
    
            else:
                try:
                    ex_reply = self.exchange.fetch_trades(self.symbol)
                except Exception as e:
                    error_response = ValueError("Error reading OHLC from exchange")
                    queue.put(e)
                    return
    
            # returned values in ascending or descending order
            direction = -1 if ((len(ex_reply)>1) and (ex_reply[0]['id']>ex_reply[1]['id'])) else 1
            ex_reply = [r for r in ex_reply if ((r['timestamp']>self._last_id))]
            if len(ex_reply)>0:
                self._last_id = max(self._last_id, int(np.max([int(r['timestamp']) for r in ex_reply])))
    
            for trade in ex_reply[::direction]:
                #trade_time = datetime.strptime(trade['time'], '%Y-%m-%dT%H:%M:%S.%fZ')
                queue.put((trade['timestamp'], float(trade['price']), float(trade['amount'])))
    
            return
    
        def fetch_ohlcv_generic(self, queue, dtbegin, timeframe, compression, **kwds):
            if self.exchange.hasFetchOHLCV:
                retval =  self._fetch_ohlcv(queue, dtbegin, timeframe, compression)
            else:
                retval = self._fetch_ohlcv_from_ticks(queue, dtbegin, timeframe, compression)
            return retval
    
        def _fetch_ohlcv(self, queue, dtbegin, timeframe, compression, **kwds):
            granularity = self._GRANULARITIES.get((timeframe, compression))
            if granularity is None:
                error_response = ValueError("'%s' exchange doesn't support fetching OHLCV data for "
                                 "time frame %s, comression %s" % \
                                 (self.exchange.name, bt.TimeFrame.getname(timeframe),compression))
                queue.put(error_response)
                return
    
    
            if dtbegin is not None:
                since = int((dtbegin - datetime(1970, 1, 1)).total_seconds() * 1000)
                limit = None
            else:
                since = None
                limit = self.p.ohlcv_limit
    
            _time.sleep(self.exchange.rateLimit / 1000) # time.sleep wants seconds
            upto = int((datetime.utcnow().replace(second=0, microsecond=0) - datetime(1970, 1, 1)).total_seconds() * 1000)
            try:
                ex_reply = self.exchange.fetch_ohlcv(self.symbol, timeframe=granularity,since=since, limit=limit)
            except Exception as e:
                error_response = ValueError("Error reading OHLC from exchange")
                queue.put(e)
                return
    
            # returned values in ascending or descending order
            direction = -1 if ((len(ex_reply)>1) and (ex_reply[0][0]>ex_reply[1][0])) else 1
            # remove ongoing and previously seen candles
            ex_reply = [r for r in ex_reply if ((r[0]<upto) and (r[0]>self._last_id))]
            if len(ex_reply)>0:
                self._last_id = max(self._last_id, int(np.max(np.asarray(ex_reply)[:,0])))
            # remove extras based on limit or from date
            if limit and len(ex_reply)>limit:
                ex_reply = ex_reply[:limit] if direction ==1 else ex_reply[(-1*limit):]
            elif since:
                ex_reply  = [r for r in ex_reply if r[0]>=since]
            for ohlcv in ex_reply[::direction]:
                queue.put(ohlcv)
            return
    
        def _fetch_ohlcv_from_ticks(self, queue, dtbegin, timeframe, compression, **kwds):
            # granularity
            granularity = self._GRANULARITIES.get((timeframe, compression))
            if granularity is None:
                error_response = ValueError("'%s' exchange doesn't support fetching OHLCV data for "
                                 "time frame %s, comression %s" % \
                                 (self.exchange.name, bt.TimeFrame.getname(timeframe),compression))
                queue.put(error_response)
                return
            pd_granularity = granularity.replace('m','T')
    
            if dtbegin is not None:
                since = int((dtbegin - datetime(1970, 1, 1)).total_seconds() * 1000)
                limit = None
            else:
                since = None
                limit = self.p.ohlcv_limit
    
            # push ticks into temporary queue
            self._fetch_ticks(self._tmp_queue, **kwds)
            if isinstance(self._tmp_queue[0], Exception):
                queue.put(self._tmp_queue.get())
                return
    
            # get ticks upto last minute that is already 'closed'
            upto = int((datetime.utcnow().replace(second=0, microsecond=0) - datetime(1970, 1, 1)).total_seconds() * 1000)
            timestamps = np.asarray(self._tmp_queue)[:, 0]
            if ((len(timestamps[(timestamps>=since) & (timestamps<upto)])==0) or
                (len(timestamps[timestamps>=upto])==0)):
                return
    
            # pop the relevant ticks
            ex_reply = []
            finished = len(self._tmp_queue)==0
            while not finished:
                res  = self._tmp_queue[0] if len(self._tmp_queue)>0 else None
                if (res is None) or (res[0] >=upto):
                    finished = True
                    break
                ex_reply.append(self._tmp_queue.popleft())
    
    
            # trades to candles
            ex_reply = np.asarray(ex_reply)
            df = pd.DataFrame(data={'timestamp':ex_reply[:, 0],
                                    'datetime':[datetime.utcfromtimestamp(tt // 1000) for tt in ex_reply[:,0]],
                                    'open':ex_reply[:, 1],
                                    'high':ex_reply[:, 1],'low':ex_reply[:, 1],
                                    'close':ex_reply[:, 1], 'volume':ex_reply[:, 2]})
    
            df = df.set_index(pd.to_datetime(df.datetime))
            ohlcv_df = df.resample(pd_granularity, closed='left', label='left').agg({
                                        'open': lambda s: s.dropna().iloc[0] if len(s.dropna()) > 0 else np.nan,
                                        'high': np.max,
                                        'low': np.min,
                                        'close': lambda s: s.dropna().iloc[-1] if len(s.dropna()) > 0 else np.nan,
                                        'volume': np.sum})
    
    
    
            # remove extras based on limit or from date
            if limit and len(ohlcv_df)>limit:
                ohlcv_df = ohlcv_df.head(limit)
            for i, ohlcv in ohlcv_df.iterrows():
                queue.put([int((ohlcv.name - datetime(1970, 1, 1)).total_seconds() * 1000), ohlcv.open, ohlcv.high, ohlcv.low, ohlcv.close, ohlcv.volume])
            return
    
        def get_cash(self):
            return self._cash
    
        def get_value(self):
            return self._value
    
        def get_position(self):
            try:
                accinfo = self.exchange.fetch_balance()
            except Exception as e:
                self.put_notification(e)
                return None
    
            self._position = accinfo['total'][self.coin]
            return self._position
    
        def broker_threads(self):
            self.q_account = queue.Queue()
            self.q_account.put(True)  # force an immediate update
            t = threading.Thread(target=self._t_account)
            t.daemon = True
            t.start()
    
            # self.q_ordercreate = queue.Queue()
            # t = threading.Thread(target=self._t_order_create)
            # t.daemon = True
            # t.start()
            #
            # self.q_orderclose = queue.Queue()
            # t = threading.Thread(target=self._t_order_cancel)
            # t.daemon = True
            # t.start()
    
            # Wait once for the values to be set
            self._evt_acct.wait(self.p.account_tmout)
    
        def _t_account(self):
            while True:
                try:
                    msg = self.q_account.get(timeout=self.p.account_tmout)
                    if msg is None:
                        break  # end of thread
                except queue.Empty:  # tmout -> time to refresh
                    pass
    
                try:
                    accinfo = self.exchange.fetch_balance()
                except Exception as e:
                    self.put_notification(e)
                    continue
    
                try:
                    self._cash = accinfo['free'][self.currency]
                    self._value = accinfo['total'][self.currency]
                    #self._position = accinfo['total'][self.coin]
                except KeyError:
                    pass
    
                self._evt_acct.set()
    

    ccxtbroker:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import collections
    from copy import copy
    from datetime import date, datetime, timedelta
    import threading
    
    from backtrader.feed import DataBase
    from backtrader import (TimeFrame, num2date, date2num, BrokerBase,
                            Order, BuyOrder, SellOrder, OrderBase, OrderData)
    from backtrader.utils.py3 import bytes, with_metaclass, MAXFLOAT, queue
    from backtrader.metabase import MetaParams
    from backtrader.comminfo import CommInfoBase
    from backtrader.position import Position
    from backtrader.stores import ccxtstore
    from backtrader.utils import AutoDict, AutoOrderedDict
    from backtrader.comminfo import CommInfoBase
    
    import ccxt
    
    class CCXTOrder(OrderBase):
        def __init__(self, owner, data, ccxt_order):
            self.owner = owner
            self.data = data
            self.ccxt_order = ccxt_order
            self.ordtype = self.Buy if ccxt_order['info']['side'] == 'buy' else self.Sell
            self.size = float(ccxt_order['info']['original_amount'])
    
            super(CCXTOrder, self).__init__()
    
    class MetaCCXTBroker(BrokerBase.__class__):
        def __init__(cls, name, bases, dct):
            '''Class has already been created ... register'''
            # Initialize the class
            super(MetaCCXTBroker, cls).__init__(name, bases, dct)
            ccxtstore.CCXTStore.BrokerCls = cls
    
    
    class CCXTBroker(with_metaclass(MetaCCXTBroker, BrokerBase)):
        '''Broker implementation for CCXT cryptocurrency trading library.
    
        This class maps the orders/positions from CCXT to the
        internal API of ``backtrader``.
        '''
    
    
        order_types = {Order.Market: 'market',
                       Order.Limit: 'limit',
                       Order.Stop: 'stop',
                       Order.StopLimit: 'stop limit'}
    
    
        def __init__(self,   **kwargs):
            super(CCXTBroker, self).__init__()
    
            self.ccxt = ccxtstore.CCXTStore(**kwargs)
            self.notifs = queue.Queue()  # holds orders which are notified
    
            self.startingcash = self.ccxt.get_cash()
            self.startingvalue = self.ccxt.get_value()
    
        def getcash(self):
            return self.ccxt.get_cash()
    
    
        def getvalue(self):
            return self.ccxt.get_value()
    
    
        def get_notification(self):
            try:
                return self.notifs.get(False)
            except queue.Empty:
                return None
    
    
        def notify(self, order):
            self.notifs.put(order)
    
    
        def getposition(self, data):
            return self.ccxt.get_position()
    
    
        def _submit(self, owner, data, exectype, side, amount, price, params):
            order_type = self.order_types.get(exectype)
            ccxt_order = self.ccxt.exchange.create_order(symbol=data.ccxt.symbol, type=order_type, side=side,
                                                    amount=amount, price=price, params=params)
            order = CCXTOrder(owner, data, ccxt_order)
            self.notify(order)
            return order
    
    
        def buy(self, owner, data, size, price=None, plimit=None,
                exectype=None, valid=None, tradeid=0, oco=None,
                trailamount=None, trailpercent=None,
                **kwargs):
            return self._submit(owner, data, exectype, 'buy', size, price, kwargs)
    
    
        def sell(self, owner, data, size, price=None, plimit=None,
                 exectype=None, valid=None, tradeid=0, oco=None,
                 trailamount=None, trailpercent=None,
                 **kwargs):
            return self._submit(owner, data, exectype, 'sell', size, price, kwargs)
    
    
        def cancel(self, order):
            return self.ccxt.exchange.cancel_order(self, order['id'])
    

    ccxtdata:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    from datetime import datetime, timedelta
    from time import sleep
    
    from backtrader.feed import DataBase
    from backtrader import TimeFrame, date2num, num2date
    from backtrader.utils.py3 import (integer_types, queue, string_types,with_metaclass)
    from backtrader.metabase import MetaParams
    from backtrader.stores import ccxtstore
    
    
    class MetaCCXTData(DataBase.__class__):
        def __init__(cls, name, bases, dct):
            '''Class has already been created ... register'''
            # Initialize the class
            super(MetaCCXTData, cls).__init__(name, bases, dct)
    
            # Register with the store
            ccxtstore.CCXTStore.DataCls = cls
    
    
    class CCXTData(with_metaclass(MetaCCXTData, DataBase)):
        '''CCXT Data Feed.
    
        Params:
          - ``qcheck`` (default: ``0.5``)
    
            Time in seconds to wake up if no data is received to give a chance to
            resample/replay packets properly and pass notifications up the chain
    
          - ``historical`` (default: ``False``)
    
            If set to ``True`` the data feed will stop after doing the first
            download of data.
    
            The standard data feed parameters ``fromdate`` and ``todate`` will be
            used as reference.
    
            The data feed will make multiple requests if the requested duration is
            larger than the one allowed by IB given the timeframe/compression
            chosen for the data.
    
        '''
        params = (
            ('qcheck', 0.5),
            ('historical', False),  # only history
        )
    
        _store = ccxtstore.CCXTStore
    
        # States for the Finite State Machine in _load
        _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(4)
    
        def islive(self):
            '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
            should be deactivated'''
            return not self.p.historical
    
    
        def __init__(self, **kwargs):
            self.ccxt = self._store(**kwargs)
    
    
        def setenvironment(self, env):
            '''Receives an environment (cerebro) and passes it over to the store it
            belongs to'''
            super(CCXTData, self).setenvironment(env)
            env.addstore(self.ccxt)
    
        def start(self):
            '''Starts the CCXT connecction '''
            super(CCXTData, self).start()
    
            # Create attributes as soon as possible
            self._ex_queue = queue.Queue()
            self._state = self._ST_OVER
    
            # Kickstart store and get queue to wait on
            self.ccxt.start(data=self)
    
            self._start_finish()
            self._state = self._ST_START  # initial state for _load
            self._st_start()
            return
    
        def _st_start(self, instart=True, tmout=None):
            if self.p.fromdate:
                self.put_notification(self.DELAYED)
                self._state = self._ST_HISTORBACK
                if self._timeframe != TimeFrame.Ticks:
                    self.ccxt.fetch_ohlcv_generic(self._ex_queue, self.p.fromdate, self._timeframe, self._compression)
            else:
                self._state = self._ST_LIVE
                self.put_notification(self.LIVE)
            return
    
    
        def stop(self):
            '''Stops and tells the store to stop'''
            super(CCXTData, self).stop()
            self.ccxt.stop()
    
        def haslivedata(self):
            return self._state == self._ST_LIVE and self._ex_queue
    
        def _load(self):
            if self._state == self._ST_OVER:
                return False
    
            if self._timeframe == TimeFrame.Ticks:
                fetch_f = self.ccxt._fetch_ticks
                load_f = self._load_ticks
            else:
                fetch_f = self.ccxt.fetch_ohlcv_generic
                load_f = self._load_ohlcv
    
            while True:
                if self._state == self._ST_LIVE:
                    try:
                        if self._ex_queue.qsize()==0:
                            raise queue.Empty
                        msg = (self._ex_queue.get(timeout=self._qcheck))
                    except queue.Empty:
                        fetch_f(queue=self._ex_queue, dtbegin=None, timeframe=self._timeframe, compression=self._compression)
                        continue
    
                    if (msg is None) or (isinstance(msg, Exception)):
                        self.put_notification(self.DISCONNECTED, msg=str(msg))
                        self._state = self._ST_OVER
                        return False  # failed
    
                elif self._state == self._ST_HISTORBACK:
                    try:
                        if self._ex_queue.qsize()==0:
                            raise queue.Empty
                        msg = (self._ex_queue.get(timeout=self._qcheck))
                    except queue.Empty:
                        # End of historical data
                        if self.p.historical:  # only historical
                            self.put_notification(self.DISCONNECTED, msg='End of historical')
                            self._state = self._ST_OVER
                            return False  # end of historical
                        else:
                            self._state = self._ST_LIVE
                            self.put_notification(self.LIVE)
                            continue
    
                    if (msg is None) or (isinstance(msg, Exception)):  # Conn broken during historical/backfilling
                        self.put_notification(self.DISCONNECTED, msg=str(msg))
                        self._state = self._ST_OVER
                        return False  # failed
    
                return load_f(msg)
    
        def _load_ticks(self, msg):
            trade_time, price, amount = msg
            dtobj = datetime.utcfromtimestamp(trade_time // 1000)
            dt = date2num(dtobj)
    
            self.lines.datetime[0] = dt
            self.lines.open[0] = price
            self.lines.high[0] = price
            self.lines.low[0] = price
            self.lines.close[0] = price
            self.lines.volume[0] = amount
    
            print("%s: loaded tick: time: %s, price: %s, size: %s" % (self._name, dtobj.strftime('%Y-%m-%d %H:%M:%S'), price, amount))
    
            return True
    
        def _load_ohlcv(self, msg):
            tstamp, open_, high, low, close, volume = msg
            dtobj = datetime.utcfromtimestamp(tstamp // 1000)
            dt = date2num(dtobj)
    
            self.lines.datetime[0] = dt
            self.lines.open[0] = open_
            self.lines.high[0] = high
            self.lines.low[0] = low
            self.lines.close[0] = close
            self.lines.volume[0] = volume
    
            print("%s: loaded ohlcv:  time: %s, open: %s, high: %s, low: %s, close: %s, volume: %s, num_lines: %i" % \
                  (self._name, dtobj.strftime('%Y-%m-%d %H:%M:%S'), open_, high, low, close, volume, len(self.lines)))
    
            return True
    


  • @Ed-Bartosh
    Yeah.. After the fix, the minute time frame has worked, but now I have trouble with the day time frame /:

    When I'm trying to get the feed for the last 25 days, it gets stuck after just 12 days.
    It could be that there are no history data for the other days but I don't think that it should hang.

    hist_start_date = pandas.bdate_range(end=(datetime.datetime.now() - datetime.timedelta(days=25)), periods=1)[0].to_pydatetime()
    data_days = bt.feeds.CCXT(exchange="gdax", symbol="BTC/USD", timeframe=bt.TimeFrame.Days,
                              fromdate=hist_start_date)
    

    It tries to fetch the ohlcv but it can't pass the 1512345600000 time stamp. In the end it returns from _fetch_ohlcv with no results, and then it seems to try to do it again (calls "_load" again) in an eternal loop.



  • Super initiative. Any tips on best practise in installing this?



  • Hi,

    I love both backtrader and ccxt so I'm really pleased that this is happening :)
    For IB and other feeds, setting historical=True gives the following behaviour (taken from the docs):

    The data feed will make multiple requests if the requested duration is larger than the one allowed by IB given the
    timeframe/compression chosen for the data.

    But it appears that CCXT feeds are only making one request. Eg:

    hist_start_date = datetime.utcnow() - timedelta(days=50)
    data_min = bt.feeds.CCXT(exchange="gdax", symbol="BTC/USD", name="btc_usd_min",
                             timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, historical=True)
    

    This only returns the first 200 minutes, starting from 50 days ago. I may have completely misunderstood what's occurring, but I'd love to help out so feel free to point me in the right direction :)



  • @Søren-Pallesen from the top of my head:

    1. install ccxt (it's better to do it in virtualenv)
    pip install ccxt
    
    1. clone backtrader with ccxt support and reset source tree to ccxt branch:
    git clone https://github.com/bartosh/backtrader.git
    cd backtrader
    git checkout -b ccxt origin/ccxt
    
    1. install backtrader:
    python ./setup.py install
    
    1. copy & paste below test strategy code into some file, e.g. bt-ccxt-test.py
    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2017 Ed Bartosh
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import sys
    import time
    
    from datetime import datetime, timedelta
    
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        def next(self):
            for data in self.datas:
                print('*' * 5, 'NEXT:', bt.num2date(data.datetime[0]), data._name, data.open[0], data.high[0],
                      data.low[0], data.close[0], data.volume[0],
                      bt.TimeFrame.getname(data._timeframe), len(data))
                if not self.getposition(data):
                    order = self.buy(data, exectype=bt.Order.Limit, size=10, price=data.close[0])
                else:
                    order = self.sell(data, exectype=bt.Order.Limit, size=10, price=data.close[0])
    
        def notify_order(self, order):
            print('*' * 5, "NOTIFY ORDER", order)
    
    def runstrategy(argv):
        # Create a cerebro
        cerebro = bt.Cerebro()
    
        # Create broker
        broker_config = {'urls': {'api': 'https://api.sandbox.gemini.com'},
                         'apiKey': '<your api key>',
                         'secret': '<your secret key>',
                         'nonce': lambda: str(int(time.time() * 1000))
                        }
        broker = bt.brokers.CCXTBroker(exchange='gemini', currency='USD', config=broker_config)
        cerebro.setbroker(broker)
    
        # Create data feeds
        data_ticks = bt.feeds.CCXT(exchange='gdax', symbol='BTC/USD', name="btc_usd_tick",
                                   timeframe=bt.TimeFrame.Ticks, compression=1)
        cerebro.adddata(data_ticks)
    
        hist_start_date = datetime.utcnow() - timedelta(minutes=30)
        data_min = bt.feeds.CCXT(exchange="gdax", symbol="BTC/USD", name="btc_usd_min",
                                 timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date)
        cerebro.adddata(data_min)
    
        # Add the strategy
        cerebro.addstrategy(TestStrategy)
    
        # Run the strategy
        cerebro.run()
    
    if __name__ == '__main__':
        sys.exit(runstrategy(sys.argv))
    
    
    1. Understand the strategy. It's quite silly one: it creates buy or sell orders for every tick. If you don't like it - feel free to change it.
    2. replace <your api key> and <you secret key> placeholders with your gemini keys.
    3. run the strategy:
    python ./bt-ccxt-test.py 
    
    1. complain here if it doesn't work :)


  • @theref said:

    For IB and other feeds, setting historical=True gives the following behaviour (taken from the docs):

    The data feed will make multiple requests if the requested duration is larger than the one allowed by IB given the
    timeframe/compression chosen for the data.

    But it appears that CCXT feeds are only making one request.

    This is because I thought CCXT doesn't have limitations similar to IB.
    However, not all exchanges would be able to return this data. Here is a quote from CCXT documentation:

    There's a limit on how far back in time your requests can go. Most of exchanges will not allow to query detailed candlestick history (like those for 1-minute and 5-minute timeframes) too far in the past. They usually keep a reasonable amount of most recent candles, like 1000 last candles for any timeframe is more than enough for most of needs. You can work around that limitation by continuously fetching (aka REST polling) latest OHLCVs and storing them in a CSV file or in a database.
    
    hist_start_date = datetime.utcnow() - timedelta(days=50)
    data_min = bt.feeds.CCXT(exchange="gdax", symbol="BTC/USD", name="btc_usd_min",
                             timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date, historical=True)
    

    This only returns the first 200 minutes, starting from 50 days ago. I may have completely misunderstood what's occurring, but I'd love to help out so feel free to point me in the right direction :)

    I'm trying to reproduce this and can not so far. The script runs for around 20 minutes and still works. I'll post how it went when it stops.

    You probably right. It should be implemented in a more sophisticated way. It's possible using 'since' and 'limit' parameters in a loop. I'm just thinking that if it makes sense as it's so slow.

    you can see the implemented in _fetch_ohlcv funcation in backtrader/feeds/ccxt.py



  • @theref my test failed after one hour of running:

    $ ./ccxttest.py 
    fetching ohlcv: symbol: BTC/USD, fromdate: 2017-11-12 20:50:36.325494, timeframe=1m, since=1510519836325, limit=None
    btc_usd_tick: loaded tick: time: 2017-12-12 20:50:41.363000, price: 17619.99, size: 0.15403533
    btc_usd_tick: loaded tick: time: 2017-12-12 20:50:47.907000, price: 17623.28, size: 0.02017904
    btc_usd_tick: loaded tick: time: 2017-12-12 20:50:52.978000, price: 17623.27, size: 0.08016539
    btc_usd_tick: loaded tick: time: 2017-12-12 20:50:55.855000, price: 17624.27, size: 0.8111514
    btc_usd_tick: loaded tick: time: 2017-12-12 20:51:07.144000, price: 17623.0, size: 0.01609553
    btc_usd_tick: loaded tick: time: 2017-12-12 20:51:13.503000, price: 17623.01, size: 5.66e-05
    btc_usd_tick: loaded tick: time: 2017-12-12 20:51:18.639000, price: 17623.0, size: 0.05
    btc_usd_tick: loaded tick: time: 2017-12-12 20:51:26.780000, price: 17623.0, size: 1.00178787
    ...
    btc_usd_tick: loaded tick: time: 2017-12-12 21:51:19.265000, price: 17552.38, size: 0.01497022
    btc_usd_tick: loaded tick: time: 2017-12-12 21:51:23.194000, price: 17552.37, size: 0.001136
    Traceback (most recent call last):
      File "./ccxttest.py", line 79, in <module>
        sys.exit(runstrategy(sys.argv))
      File "./ccxttest.py", line 76, in runstrategy
        cerebro.run()
      File "/Users/ebartosh/git/backtrader/backtrader/cerebro.py", line 1127, in run
        runstrat = self.runstrategies(iterstrat)
      File "/Users/ebartosh/git/backtrader/backtrader/cerebro.py", line 1295, in runstrategies
        self._runnext(runstrats)
      File "/Users/ebartosh/git/backtrader/backtrader/cerebro.py", line 1538, in _runnext
        drets.append(d.next(ticks=False))
      File "/Users/ebartosh/git/backtrader/backtrader/feed.py", line 404, in next
        ret = self.load()
      File "/Users/ebartosh/git/backtrader/backtrader/feed.py", line 476, in load
        _loadret = self._load()
      File "/Users/ebartosh/git/backtrader/backtrader/feeds/ccxt.py", line 113, in _load
        return self._load_ticks()
      File "/Users/ebartosh/git/backtrader/backtrader/feeds/ccxt.py", line 169, in _load_ticks
        trades = self.exchange.fetch_trades(self.symbol)
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/gdax.py", line 264, in fetch_trades
        }, params))
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/gdax.py", line 495, in request
        response = self.fetch2(path, api, method, params, headers, body)
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/base/exchange.py", line 282, in fetch2
        return self.fetch(request['url'], request['method'], request['headers'], request['body'])
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/base/exchange.py", line 340, in fetch
        self.handle_rest_errors(e, e.code, message if message else text, url, method)
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/base/exchange.py", line 368, in handle_rest_errors
        self.raise_error(error, url, method, exception if exception else http_status_code, response)
      File "/Users/ebartosh/.virtualenvs/backtrader-crypto/lib/python2.7/site-packages/ccxt/base/exchange.py", line 264, in raise_error
        details,
    ccxt.base.errors.RequestTimeout: gdax GET https://api.gdax.com/products/BTC-USD/trades 504 Gateway Time-out  <!DOCTYPE html>
    

    I'll reimplement this functionality when I have time. Any kind of help is appreciated.



  • @Ed-Bartosh said in Anyone use backtrader to do live trading on Bitcoin exchange?:

    @Søren-Pallesen from the top of my head:

    1. install ccxt (it's better to do it in virtualenv)
    pip install ccxt
    
    1. clone backtrader with ccxt support and reset source tree to ccxt branch:
    git clone https://github.com/bartosh/backtrader.git
    cd backtrader
    git checkout -b ccxt origin/ccxt
    
    1. install backtrader:
    python ./setup.py install
    
    1. copy & paste below test strategy code into some file, e.g. bt-ccxt-test.py
    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2017 Ed Bartosh
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import sys
    import time
    
    from datetime import datetime, timedelta
    
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        def next(self):
            for data in self.datas:
                print('*' * 5, 'NEXT:', bt.num2date(data.datetime[0]), data._name, data.open[0], data.high[0],
                      data.low[0], data.close[0], data.volume[0],
                      bt.TimeFrame.getname(data._timeframe), len(data))
                if not self.getposition(data):
                    order = self.buy(data, exectype=bt.Order.Limit, size=10, price=data.close[0])
                else:
                    order = self.sell(data, exectype=bt.Order.Limit, size=10, price=data.close[0])
    
        def notify_order(self, order):
            print('*' * 5, "NOTIFY ORDER", order)
    
    def runstrategy(argv):
        # Create a cerebro
        cerebro = bt.Cerebro()
    
        # Create broker
        broker_config = {'urls': {'api': 'https://api.sandbox.gemini.com'},
                         'apiKey': '<your api key>',
                         'secret': '<your secret key>',
                         'nonce': lambda: str(int(time.time() * 1000))
                        }
        broker = bt.brokers.CCXTBroker(exchange='gemini', currency='USD', config=broker_config)
        cerebro.setbroker(broker)
    
        # Create data feeds
        data_ticks = bt.feeds.CCXT(exchange='gdax', symbol='BTC/USD', name="btc_usd_tick",
                                   timeframe=bt.TimeFrame.Ticks, compression=1)
        cerebro.adddata(data_ticks)
    
        hist_start_date = datetime.utcnow() - timedelta(minutes=30)
        data_min = bt.feeds.CCXT(exchange="gdax", symbol="BTC/USD", name="btc_usd_min",
                                 timeframe=bt.TimeFrame.Minutes, fromdate=hist_start_date)
        cerebro.adddata(data_min)
    
        # Add the strategy
        cerebro.addstrategy(TestStrategy)
    
        # Run the strategy
        cerebro.run()
    
    if __name__ == '__main__':
        sys.exit(runstrategy(sys.argv))
    
    
    1. Understand the strategy. It's quite silly one: it creates buy or sell orders for every tick. If you don't like it - feel free to change it.
    2. replace <your api key> and <you secret key> placeholders with your gemini keys.
    3. run the strategy:
    python ./bt-ccxt-test.py 
    
    1. complain here if it doesn't work :)


  • @Søren-Pallesen @Ed-Bartosh

    Hello,

    Thanks very much for the share on this. I am trying to get this going (ccxt integrated with backtrader that is), and having a hard time. I have been using backtrader for a while to run some ML algos, but new to gemini and ccxt. I have successfully installed ccxt using pip install. But now stuck on part 2, which is to 'Clone backtrader with ccxt support and reset source tree to ccxt branch'. I am not sure how to do this. I know you guys say:

    git clone https://github.com/bartosh/backtrader.git
    cd backtrader
    git checkout -b ccxt origin/ccxt

    However, I am not really sure how to do the 3 lines mentioned above. Exactly, what do I need to type into my command line to do the 3 steps above? My working directory path for python is:

    C:\Users\Sam\Anaconda64Best\Scripts\

    However, when I type into the command line:

    C:\Users\Sam>C:\Users\Sam\Anaconda64Best\Scripts\conda.exe install git clone https://github.com/bartosh/backtrader.git

    I get the following error:

    PackageNotFoundError: Packages missing in current channels:

    • clone

    Curious on what I am doing wrong here? Is it a syntax issue with how I am trying to run git clone? And need to change the way I am typing it out?

    Please let me know what you think when you guys have a chance. I very much appreciate the assistance and hope to speak soon.

    Best,

    Sam



  • @Søren-Pallesen @Ed-Bartosh

    Hi Guys,

    Think I made some progress, yet still stuck here. As mentioned, my python directory with all created modules and files is:

    C:\Users\Sam\Anaconda64Best\Scripts\

    Thus, for part 1, I went ahead and tried:

    C:\Users\Sam\Anaconda64Best\Scripts\conda.exe -forge git clone https://github.com/bartosh/backtrader.git

    And it seemed to work fine as I got no errors. I then attempted to change directory to where backtrader is, which was the command:

    cd C:\Users\Sam\Anaconda64Best\Lib\site-packages\backtrader

    That worked fine too. But the last part is still not working. I am attempting to run command:

    C:\Users\Sam\Anaconda64Best\Lib\site-packages\backtrader> git checkout -b ccxt origin/ccxt

    However, still getting error that says: 'git' is not recognized as an internal or external command,
    operable program or batch file.

    Any idea what is going wrong here? Appreciate the help.



  • @Søren-Pallesen said in Anyone use backtrader to do live trading on Bitcoin exchange?:

    python ./setup.py install

    @Ed-Bartosh I follow your instructions to a tee however i get:

    AttributeError: module 'backtrader.brokers' has no attribute 'CCXTBroker'

    Any ideas?



  • @Søren-Pallesen @Ed-Bartosh I am getting the same error.



  • @Søren-Pallesen Can you run this command and show the output here?

    python -c 'from backtrader.brokers.ccxtbroker import CCXTBroker'
    


  • @samk said in Anyone use backtrader to do live trading on Bitcoin exchange?:

    Thus, for part 1, I went ahead and tried:

    C:\Users\Sam\Anaconda64Best\Scripts\conda.exe -forge git clone https://github.com/bartosh/backtrader.git

    I don't know what's conda -forge does, but you need to have git installed on your system to be able to clone git repository.

    I usually install git from here https://git-scm.com/download/win when I have to work on windows.



  • If you have it installed you can try this command to install backtrader with just one command:

    pip install git+https://github.com/bartosh/backtrader.git@ccxt
    


  • @Ed-Bartosh Im getting:

    (C:\Users\Soren\Anaconda3\envs\CCXTwBT2) C:\Users\Soren>python -c 'from backtrader.brokers.ccxtbroker import CCXTBroker'
      File "<string>", line 1
        'from
            ^
    SyntaxError: EOL while scanning string literal```


  • @Ed-Bartosh said in Anyone use backtrader to do live trading on Bitcoin exchange?:

    pip install git+https://github.com/bartosh/backtrader.git@ccxt

    I installed git through your link just fine and installed it in: C:\Users\Sam\Anaconda64Best

    I already have backtrader installed on my windows pc and it is saved in:

    C:\Users\Sam\Anaconda64Best\Lib\site-packages\backtrader

    Thus, I went and tried:

    1. C:\Users\Sam>cd C:\Users\Sam\Anaconda64Best\Lib\site-packages\backtrader (to change directory to backtrader first).

    2. Then I tried: pip install git+https://github.com/bartosh/backtrader.git@ccxt

    But I am getting the error:

    Collecting git+https://github.com/bartosh/backtrader.git@ccxt
    Cloning https://github.com/bartosh/backtrader.git (to ccxt) to c:\users\sam\appdata\local\temp\pip-kh91l0-build
    Error [Error 2] The system cannot find the file specified while executing command git clone -q https://github.com/bartosh/backtrader.git c:\users\sam\appdata\local\temp\pip-kh91l0-build
    Cannot find command 'git'

    Any idea what I am doing wrong here? Thanks in advance Ed......


Log in to reply
 

});