World Trading Data Live Feed is not getting resampled into backtrader
-
Hello, I have been using backtrader for a while now and I enjoy the platform and think its a great place to develop but I am having some trouble with a custom store/feed I made with World Trading Data and I would really appreciate some help since I am really getting stuck.
My store is here:
#Python Imports from __future__ import (absolute_import, division, print_function, unicode_literals) import collections from datetime import datetime, timedelta import time as _time import json import threading #Backtrader imports import backtrader as bt from backtrader import TimeFrame, date2num, num2date from backtrader.metabase import MetaParams from backtrader.utils.py3 import queue, with_metaclass from backtrader.utils import AutoDict import requests import pandas as pd import time class MetaSingleton(MetaParams): '''Metaclass to make a metaclassed class a singleton MetaParams - Imported from backtrader framework ''' def __init__(cls, name, bases, dct): super(MetaSingleton, cls).__init__(name, bases, dct) cls._singleton = None def __call__(cls, *args, **kwargs): if cls._singleton is None: cls._singleton = ( super(MetaSingleton, cls).__call__(*args, **kwargs)) return cls._singleton class WTDStore(with_metaclass(MetaSingleton, object)): ''' The IG store class should inherit from the the metaclass and add some extensions to it. ''' DataCls = None # data class will auto register params = ( ('api_key', ''), ('symbol_list', ''), ('currency_code', 'GBP'), #The currency code of the account ('practice', False), ('account_tmout', 10.0), # account balance refresh timeout ) @classmethod def getdata(cls, *args, **kwargs): '''Returns ``DataCls`` with args, kwargs''' return cls.DataCls(*args, **kwargs) def __init__(self): super(WTDStore, self).__init__() self.notifs = collections.deque() # store notifications for cerebro self._env = None # reference to cerebro for general notifications self.datas = list() # datas that have registered over start self._cash = 0.0 self._value = 0.0 def start(self, data = None): # Datas require some processing to kickstart data reception if data is None: return if data is not None: self._env = data._env # For datas simulate a queue with None to kickstart co self.datas.append(data) def stop(self): pass def streaming_prices(self, dataname, tmout = None): q = queue.Queue() kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout} t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs) t.daemon = True t.start() return q def _t_streaming_prices(self, dataname, q, tmout): while True: # we need to log in to WTD for the given asset url = 'https://api.worldtradingdata.com/api/v1/stock' url = url + str('?symbol=') + str(dataname) url = url + str('&api_token=') + str(self.p.api_key) url = url + str('&sort=') + str('asc') r = requests.get(url) data = pd.DataFrame(r.json()['data']) q.put(data) time.sleep(75) def put_notification(self, msg, *args, **kwargs): self.notifs.append((msg, args, kwargs)) def get_notifications(self): return []
and my Data feed is here:
import datetime as dt from backtrader.feed import DataBase from backtrader.utils.py3 import (integer_types, queue, string_types, with_metaclass) from backtrader.metabase import MetaParams import glob import csv import pandas as pd import requests from . import wtdstore import time import datetime import backtrader as bt class MetaWTDData(DataBase.__class__): def __init__(cls, name, bases, dct): # Initialize the class super(MetaWTDData, cls).__init__(name, bases, dct) # Register with the store wtdstore.WTDStore.DataCls = cls class WTDData(with_metaclass(MetaWTDData, DataBase)): params = ( ('backfill_from',None), ('qcheck', 0), #('api_key',None), #('symbol',None), ) # States for the Finite State Machine in _load _ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5) _store = wtdstore.WTDStore def islive(self): '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce should be deactivated''' return True def live(self): return True def __init__(self, **kwargs): self.o = self._store(**kwargs) def setenvironment(self, env): '''Receives an environment (cerebro) and passes it over to the store it belongs to''' super(WTDData, self).setenvironment(env) env.addstore(self.o) def start(self): super(WTDData,self).start() self._statelivereconn = False # if reconnecting in live state self._storedmsg = dict() self.qlive = queue.Queue() self._state = self._ST_OVER self.o.start(data = self) self._start_finish() self._state = self._ST_START self._st_start() if self.p.backfill_from is not None: self._state = self._ST_FROM self.p.backfill_from.setenvironment(self._env) self.p.backfill_from._start() def _st_start(self, instart = True, tmout = None): # in the IG version there is a call to its corresponding store # streaming prices method but we are going to do that in house here # we need to log in to WTD for the given asset self.qlive = self.o.streaming_prices(self.p.dataname, tmout = tmout) self._state = self._ST_LIVE return True def stop(self): super(WTDData, self).stop() self.o.stop() def haslivedata(self): return bool(self.qlive) def _load(self): if self._state == self._ST_OVER: return False while True: if self._state == self._ST_LIVE: # if time is the preferred interval then send a get request try: msg = self.qlive.get(timeout = self._qcheck) except queue.Empty: return None if msg is None: # Conn broken during historical/backfilling self.put_notification(self.CONNBROKEN) self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # failed ret = self._load_tick(msg) if ret: self.put_notification(self.LIVE) return True continue # don't break the loop elif self._state == self._ST_FROM: if not self.p.backfill_from.next(): self._state = self._ST_START continue for alias in self.lines.getlinealiases(): lsrc = getattr(self.p.backfill_from.lines, alias) ldst = getattr(self.lines, alias) ldst[0] = lsrc[0] return True elif self._state == self._ST_HISTORBACK: self._state = self._ST_LIVE continue elif self._state == self._ST_START: if not self._st_start(instart=False): self._state = self._ST_OVER return False def _load_tick(self, msg): dtobj = datetime.datetime.strptime(msg['last_trade_time'][0],'%Y-%m-%d %H:%M:%S') dt = bt.date2num(dtobj) print('dtobj: {}'.format(dtobj)) try: vol = int(msg['volume'][0]) except TypeError: vol = 0 # Common fields self.lines.datetime[0] = dt tick = float(msg['price'][0]) self.lines.open[0] = tick self.lines.high[0] = tick self.lines.low[0] = tick self.lines.close[0] = tick self.lines.volume[0] = vol print('last tick {}'.format(tick)) return True
In my output I can see that it sends me live data points but they are never sampled into bars I can work with.
I feel like the answer is staring me in the face so any direction would really be appreciated.
Thank you so much in advance.
-
Another thing I would like to add is that I am having the data get saved in the next method and I believe the strategy is not making it to that method
-
I have resolved the issue but now I getting an error with the backtrader turning every tick I get into completed bar, how would I make it wait for the full bar?