For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

World Trading Data Live Feed is not getting resampled into backtrader



  • Hello, I have been using backtrader for a while now and I enjoy the platform and think its a great place to develop but I am having some trouble with a custom store/feed I made with World Trading Data and I would really appreciate some help since I am really getting stuck.

    My store is here:

    
    #Python Imports
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    import collections
    from datetime import datetime, timedelta
    import time as _time
    import json
    import threading
    
    
    #Backtrader imports
    import backtrader as bt
    from backtrader import TimeFrame, date2num, num2date
    from backtrader.metabase import MetaParams
    from backtrader.utils.py3 import queue, with_metaclass
    from backtrader.utils import AutoDict
    
    import requests
    import pandas as pd
    import time
    
    
    class MetaSingleton(MetaParams):
        '''Metaclass to make a metaclassed class a singleton
    
        MetaParams - Imported from backtrader framework
        '''
        def __init__(cls, name, bases, dct):
            super(MetaSingleton, cls).__init__(name, bases, dct)
            cls._singleton = None
    
        def __call__(cls, *args, **kwargs):
            if cls._singleton is None:
                cls._singleton = (
                    super(MetaSingleton, cls).__call__(*args, **kwargs))
    
            return cls._singleton
    
    class WTDStore(with_metaclass(MetaSingleton, object)):
        '''
        The IG store class should inherit from the the metaclass and add some
        extensions to it.
        '''
        DataCls = None  # data class will auto register
    
        params = (
            ('api_key', ''),
            ('symbol_list', ''),
            ('currency_code', 'GBP'), #The currency code of the account
            ('practice', False),
            ('account_tmout', 10.0),  # account balance refresh timeout
        )
    
        @classmethod
        def getdata(cls, *args, **kwargs):
            '''Returns ``DataCls`` with args, kwargs'''
            return cls.DataCls(*args, **kwargs)
    
        
        def __init__(self):
            super(WTDStore, self).__init__()
    
            self.notifs = collections.deque()  # store notifications for cerebro
            self._env = None  # reference to cerebro for general notifications
            self.datas = list()  # datas that have registered over start
    
            self._cash = 0.0
            self._value = 0.0
        
        def start(self, data = None):
    
            # Datas require some processing to kickstart data reception
            if data is None:
                return
    
            if data is not None:
                self._env = data._env
                # For datas simulate a queue with None to kickstart co
                self.datas.append(data)
        
    
        def stop(self):
            pass
     
        def streaming_prices(self, dataname, tmout = None):
            q = queue.Queue()
            kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
            t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
            t.daemon = True
            t.start()
            return q
    
        def _t_streaming_prices(self, dataname, q, tmout):
            while True:
                # we need to log in to WTD for the given asset
                url = 'https://api.worldtradingdata.com/api/v1/stock'
                url = url + str('?symbol=') + str(dataname)
                url = url + str('&api_token=') + str(self.p.api_key)
                url = url + str('&sort=') + str('asc')
                r = requests.get(url)
                data = pd.DataFrame(r.json()['data'])
                q.put(data)
                
                time.sleep(75)
            
        
        def put_notification(self, msg, *args, **kwargs):
            self.notifs.append((msg, args, kwargs))
        
        def get_notifications(self):
            return []
    

    and my Data feed is here:

    import datetime as dt
    from backtrader.feed import DataBase
    from backtrader.utils.py3 import (integer_types, queue, string_types,
                                      with_metaclass)
    from backtrader.metabase import MetaParams
    import glob
    import csv
    import pandas as pd
    import requests
    from . import wtdstore
    import time
    import datetime
    import backtrader as bt
    
    class MetaWTDData(DataBase.__class__):
        def __init__(cls, name, bases, dct):
            # Initialize the class
            super(MetaWTDData, cls).__init__(name, bases, dct)
    
            # Register with the store
            wtdstore.WTDStore.DataCls = cls
    
    class WTDData(with_metaclass(MetaWTDData, DataBase)):
    
        params = (
            ('backfill_from',None),
            ('qcheck', 0),
            #('api_key',None),
            #('symbol',None),
        )
    
        # States for the Finite State Machine in _load
        _ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5)
    
        _store = wtdstore.WTDStore
    
        def islive(self):
            '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
            should be deactivated'''
            return True
        
        def live(self):
            return True
        
        def __init__(self, **kwargs):
            self.o = self._store(**kwargs)
    
        def setenvironment(self, env):
            '''Receives an environment (cerebro) and passes it over to the store it
            belongs to'''
            super(WTDData, self).setenvironment(env)
            env.addstore(self.o)
    
        def start(self):
            super(WTDData,self).start()
    
            self._statelivereconn = False  # if reconnecting in live state
            self._storedmsg = dict()
            self.qlive = queue.Queue()
            self._state = self._ST_OVER
    
            self.o.start(data = self)
    
            self._start_finish()
            self._state = self._ST_START
            self._st_start()
    
            if self.p.backfill_from is not None:
                self._state = self._ST_FROM
                self.p.backfill_from.setenvironment(self._env)
                self.p.backfill_from._start()
        
        def _st_start(self, instart = True, tmout = None):
    
            # in the IG version there is a call to its corresponding store 
            # streaming prices method but we are going to do that in house here
            # we need to log in to WTD for the given asset
            self.qlive = self.o.streaming_prices(self.p.dataname, tmout = tmout)
            self._state = self._ST_LIVE
    
            return True
    
        def stop(self):
            super(WTDData, self).stop()
            self.o.stop()
    
        def haslivedata(self):
            return bool(self.qlive)
    
        def _load(self):
            if self._state == self._ST_OVER:
                return False
            while True:
                
                if self._state == self._ST_LIVE:
                    # if time is the preferred interval then send a get request
                    try:
                        msg = self.qlive.get(timeout = self._qcheck)
                    except queue.Empty:
                        return None
                    
                    if msg is None:  # Conn broken during historical/backfilling
                        self.put_notification(self.CONNBROKEN)
                        self.put_notification(self.DISCONNECTED)
                        self._state = self._ST_OVER
                        return False  # failed
     
                    ret = self._load_tick(msg)
                    if ret:
                        self.put_notification(self.LIVE)
                        return True
                        
                    
                    continue                    # don't break the loop
                
                elif self._state == self._ST_FROM:
                    if not self.p.backfill_from.next():
                        self._state = self._ST_START
                        continue
                    
                    for alias in self.lines.getlinealiases():
                        lsrc = getattr(self.p.backfill_from.lines, alias)
                        ldst = getattr(self.lines, alias)
    
                        ldst[0] = lsrc[0]
                    
                    return True
    
                
                elif self._state == self._ST_HISTORBACK:
                    self._state = self._ST_LIVE
                    continue
                
                elif self._state == self._ST_START:
                    if not self._st_start(instart=False):
                        self._state = self._ST_OVER
                        return False
    
    
        
        def _load_tick(self, msg):
            dtobj = datetime.datetime.strptime(msg['last_trade_time'][0],'%Y-%m-%d %H:%M:%S')
            dt = bt.date2num(dtobj)
            print('dtobj: {}'.format(dtobj))
            try:
                vol = int(msg['volume'][0])
            except TypeError:
                vol = 0
    
            # Common fields
            self.lines.datetime[0] = dt
            tick = float(msg['price'][0])
            self.lines.open[0] = tick
            self.lines.high[0] = tick
            self.lines.low[0] = tick
            self.lines.close[0] = tick
            self.lines.volume[0] = vol
            print('last tick {}'.format(tick))
            return True
    

    In my output I can see that it sends me live data points but they are never sampled into bars I can work with.

    I feel like the answer is staring me in the face so any direction would really be appreciated.

    Thank you so much in advance.



  • Another thing I would like to add is that I am having the data get saved in the next method and I believe the strategy is not making it to that method



  • I have resolved the issue but now I getting an error with the backtrader turning every tick I get into completed bar, how would I make it wait for the full bar?


Log in to reply
 

});