Hello, I have been using backtrader for a while now and I enjoy the platform and think its a great place to develop but I am having some trouble with a custom store/feed I made with World Trading Data and I would really appreciate some help since I am really getting stuck.
My store is here:
#Python Imports
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import collections
from datetime import datetime, timedelta
import time as _time
import json
import threading
#Backtrader imports
import backtrader as bt
from backtrader import TimeFrame, date2num, num2date
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass
from backtrader.utils import AutoDict
import requests
import pandas as pd
import time
class MetaSingleton(MetaParams):
'''Metaclass to make a metaclassed class a singleton
MetaParams - Imported from backtrader framework
'''
def __init__(cls, name, bases, dct):
super(MetaSingleton, cls).__init__(name, bases, dct)
cls._singleton = None
def __call__(cls, *args, **kwargs):
if cls._singleton is None:
cls._singleton = (
super(MetaSingleton, cls).__call__(*args, **kwargs))
return cls._singleton
class WTDStore(with_metaclass(MetaSingleton, object)):
'''
The IG store class should inherit from the the metaclass and add some
extensions to it.
'''
DataCls = None # data class will auto register
params = (
('api_key', ''),
('symbol_list', ''),
('currency_code', 'GBP'), #The currency code of the account
('practice', False),
('account_tmout', 10.0), # account balance refresh timeout
)
@classmethod
def getdata(cls, *args, **kwargs):
'''Returns ``DataCls`` with args, kwargs'''
return cls.DataCls(*args, **kwargs)
def __init__(self):
super(WTDStore, self).__init__()
self.notifs = collections.deque() # store notifications for cerebro
self._env = None # reference to cerebro for general notifications
self.datas = list() # datas that have registered over start
self._cash = 0.0
self._value = 0.0
def start(self, data = None):
# Datas require some processing to kickstart data reception
if data is None:
return
if data is not None:
self._env = data._env
# For datas simulate a queue with None to kickstart co
self.datas.append(data)
def stop(self):
pass
def streaming_prices(self, dataname, tmout = None):
q = queue.Queue()
kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
t.daemon = True
t.start()
return q
def _t_streaming_prices(self, dataname, q, tmout):
while True:
# we need to log in to WTD for the given asset
url = 'https://api.worldtradingdata.com/api/v1/stock'
url = url + str('?symbol=') + str(dataname)
url = url + str('&api_token=') + str(self.p.api_key)
url = url + str('&sort=') + str('asc')
r = requests.get(url)
data = pd.DataFrame(r.json()['data'])
q.put(data)
time.sleep(75)
def put_notification(self, msg, *args, **kwargs):
self.notifs.append((msg, args, kwargs))
def get_notifications(self):
return []
and my Data feed is here:
import datetime as dt
from backtrader.feed import DataBase
from backtrader.utils.py3 import (integer_types, queue, string_types,
with_metaclass)
from backtrader.metabase import MetaParams
import glob
import csv
import pandas as pd
import requests
from . import wtdstore
import time
import datetime
import backtrader as bt
class MetaWTDData(DataBase.__class__):
def __init__(cls, name, bases, dct):
# Initialize the class
super(MetaWTDData, cls).__init__(name, bases, dct)
# Register with the store
wtdstore.WTDStore.DataCls = cls
class WTDData(with_metaclass(MetaWTDData, DataBase)):
params = (
('backfill_from',None),
('qcheck', 0),
#('api_key',None),
#('symbol',None),
)
# States for the Finite State Machine in _load
_ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5)
_store = wtdstore.WTDStore
def islive(self):
'''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
should be deactivated'''
return True
def live(self):
return True
def __init__(self, **kwargs):
self.o = self._store(**kwargs)
def setenvironment(self, env):
'''Receives an environment (cerebro) and passes it over to the store it
belongs to'''
super(WTDData, self).setenvironment(env)
env.addstore(self.o)
def start(self):
super(WTDData,self).start()
self._statelivereconn = False # if reconnecting in live state
self._storedmsg = dict()
self.qlive = queue.Queue()
self._state = self._ST_OVER
self.o.start(data = self)
self._start_finish()
self._state = self._ST_START
self._st_start()
if self.p.backfill_from is not None:
self._state = self._ST_FROM
self.p.backfill_from.setenvironment(self._env)
self.p.backfill_from._start()
def _st_start(self, instart = True, tmout = None):
# in the IG version there is a call to its corresponding store
# streaming prices method but we are going to do that in house here
# we need to log in to WTD for the given asset
self.qlive = self.o.streaming_prices(self.p.dataname, tmout = tmout)
self._state = self._ST_LIVE
return True
def stop(self):
super(WTDData, self).stop()
self.o.stop()
def haslivedata(self):
return bool(self.qlive)
def _load(self):
if self._state == self._ST_OVER:
return False
while True:
if self._state == self._ST_LIVE:
# if time is the preferred interval then send a get request
try:
msg = self.qlive.get(timeout = self._qcheck)
except queue.Empty:
return None
if msg is None: # Conn broken during historical/backfilling
self.put_notification(self.CONNBROKEN)
self.put_notification(self.DISCONNECTED)
self._state = self._ST_OVER
return False # failed
ret = self._load_tick(msg)
if ret:
self.put_notification(self.LIVE)
return True
continue # don't break the loop
elif self._state == self._ST_FROM:
if not self.p.backfill_from.next():
self._state = self._ST_START
continue
for alias in self.lines.getlinealiases():
lsrc = getattr(self.p.backfill_from.lines, alias)
ldst = getattr(self.lines, alias)
ldst[0] = lsrc[0]
return True
elif self._state == self._ST_HISTORBACK:
self._state = self._ST_LIVE
continue
elif self._state == self._ST_START:
if not self._st_start(instart=False):
self._state = self._ST_OVER
return False
def _load_tick(self, msg):
dtobj = datetime.datetime.strptime(msg['last_trade_time'][0],'%Y-%m-%d %H:%M:%S')
dt = bt.date2num(dtobj)
print('dtobj: {}'.format(dtobj))
try:
vol = int(msg['volume'][0])
except TypeError:
vol = 0
# Common fields
self.lines.datetime[0] = dt
tick = float(msg['price'][0])
self.lines.open[0] = tick
self.lines.high[0] = tick
self.lines.low[0] = tick
self.lines.close[0] = tick
self.lines.volume[0] = vol
print('last tick {}'.format(tick))
return True
In my output I can see that it sends me live data points but they are never sampled into bars I can work with.
I feel like the answer is staring me in the face so any direction would really be appreciated.
Thank you so much in advance.