Backtrader Community

    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups
    • Search
    For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

    World Trading Data Live Feed is not getting resampled into backtrader

    Indicators/Strategies/Analyzers
    1
    3
    424
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • sullyai
      sullyai last edited by

      Hello, I have been using backtrader for a while now and I enjoy the platform and think its a great place to develop but I am having some trouble with a custom store/feed I made with World Trading Data and I would really appreciate some help since I am really getting stuck.

      My store is here:

      
      #Python Imports
      from __future__ import (absolute_import, division, print_function,
                              unicode_literals)
      
      import collections
      from datetime import datetime, timedelta
      import time as _time
      import json
      import threading
      
      
      #Backtrader imports
      import backtrader as bt
      from backtrader import TimeFrame, date2num, num2date
      from backtrader.metabase import MetaParams
      from backtrader.utils.py3 import queue, with_metaclass
      from backtrader.utils import AutoDict
      
      import requests
      import pandas as pd
      import time
      
      
      class MetaSingleton(MetaParams):
          '''Metaclass to make a metaclassed class a singleton
      
          MetaParams - Imported from backtrader framework
          '''
          def __init__(cls, name, bases, dct):
              super(MetaSingleton, cls).__init__(name, bases, dct)
              cls._singleton = None
      
          def __call__(cls, *args, **kwargs):
              if cls._singleton is None:
                  cls._singleton = (
                      super(MetaSingleton, cls).__call__(*args, **kwargs))
      
              return cls._singleton
      
      class WTDStore(with_metaclass(MetaSingleton, object)):
          '''
          The IG store class should inherit from the the metaclass and add some
          extensions to it.
          '''
          DataCls = None  # data class will auto register
      
          params = (
              ('api_key', ''),
              ('symbol_list', ''),
              ('currency_code', 'GBP'), #The currency code of the account
              ('practice', False),
              ('account_tmout', 10.0),  # account balance refresh timeout
          )
      
          @classmethod
          def getdata(cls, *args, **kwargs):
              '''Returns ``DataCls`` with args, kwargs'''
              return cls.DataCls(*args, **kwargs)
      
          
          def __init__(self):
              super(WTDStore, self).__init__()
      
              self.notifs = collections.deque()  # store notifications for cerebro
              self._env = None  # reference to cerebro for general notifications
              self.datas = list()  # datas that have registered over start
      
              self._cash = 0.0
              self._value = 0.0
          
          def start(self, data = None):
      
              # Datas require some processing to kickstart data reception
              if data is None:
                  return
      
              if data is not None:
                  self._env = data._env
                  # For datas simulate a queue with None to kickstart co
                  self.datas.append(data)
          
      
          def stop(self):
              pass
       
          def streaming_prices(self, dataname, tmout = None):
              q = queue.Queue()
              kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
              t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
              t.daemon = True
              t.start()
              return q
      
          def _t_streaming_prices(self, dataname, q, tmout):
              while True:
                  # we need to log in to WTD for the given asset
                  url = 'https://api.worldtradingdata.com/api/v1/stock'
                  url = url + str('?symbol=') + str(dataname)
                  url = url + str('&api_token=') + str(self.p.api_key)
                  url = url + str('&sort=') + str('asc')
                  r = requests.get(url)
                  data = pd.DataFrame(r.json()['data'])
                  q.put(data)
                  
                  time.sleep(75)
              
          
          def put_notification(self, msg, *args, **kwargs):
              self.notifs.append((msg, args, kwargs))
          
          def get_notifications(self):
              return []
      

      and my Data feed is here:

      import datetime as dt
      from backtrader.feed import DataBase
      from backtrader.utils.py3 import (integer_types, queue, string_types,
                                        with_metaclass)
      from backtrader.metabase import MetaParams
      import glob
      import csv
      import pandas as pd
      import requests
      from . import wtdstore
      import time
      import datetime
      import backtrader as bt
      
      class MetaWTDData(DataBase.__class__):
          def __init__(cls, name, bases, dct):
              # Initialize the class
              super(MetaWTDData, cls).__init__(name, bases, dct)
      
              # Register with the store
              wtdstore.WTDStore.DataCls = cls
      
      class WTDData(with_metaclass(MetaWTDData, DataBase)):
      
          params = (
              ('backfill_from',None),
              ('qcheck', 0),
              #('api_key',None),
              #('symbol',None),
          )
      
          # States for the Finite State Machine in _load
          _ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5)
      
          _store = wtdstore.WTDStore
      
          def islive(self):
              '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
              should be deactivated'''
              return True
          
          def live(self):
              return True
          
          def __init__(self, **kwargs):
              self.o = self._store(**kwargs)
      
          def setenvironment(self, env):
              '''Receives an environment (cerebro) and passes it over to the store it
              belongs to'''
              super(WTDData, self).setenvironment(env)
              env.addstore(self.o)
      
          def start(self):
              super(WTDData,self).start()
      
              self._statelivereconn = False  # if reconnecting in live state
              self._storedmsg = dict()
              self.qlive = queue.Queue()
              self._state = self._ST_OVER
      
              self.o.start(data = self)
      
              self._start_finish()
              self._state = self._ST_START
              self._st_start()
      
              if self.p.backfill_from is not None:
                  self._state = self._ST_FROM
                  self.p.backfill_from.setenvironment(self._env)
                  self.p.backfill_from._start()
          
          def _st_start(self, instart = True, tmout = None):
      
              # in the IG version there is a call to its corresponding store 
              # streaming prices method but we are going to do that in house here
              # we need to log in to WTD for the given asset
              self.qlive = self.o.streaming_prices(self.p.dataname, tmout = tmout)
              self._state = self._ST_LIVE
      
              return True
      
          def stop(self):
              super(WTDData, self).stop()
              self.o.stop()
      
          def haslivedata(self):
              return bool(self.qlive)
      
          def _load(self):
              if self._state == self._ST_OVER:
                  return False
              while True:
                  
                  if self._state == self._ST_LIVE:
                      # if time is the preferred interval then send a get request
                      try:
                          msg = self.qlive.get(timeout = self._qcheck)
                      except queue.Empty:
                          return None
                      
                      if msg is None:  # Conn broken during historical/backfilling
                          self.put_notification(self.CONNBROKEN)
                          self.put_notification(self.DISCONNECTED)
                          self._state = self._ST_OVER
                          return False  # failed
       
                      ret = self._load_tick(msg)
                      if ret:
                          self.put_notification(self.LIVE)
                          return True
                          
                      
                      continue                    # don't break the loop
                  
                  elif self._state == self._ST_FROM:
                      if not self.p.backfill_from.next():
                          self._state = self._ST_START
                          continue
                      
                      for alias in self.lines.getlinealiases():
                          lsrc = getattr(self.p.backfill_from.lines, alias)
                          ldst = getattr(self.lines, alias)
      
                          ldst[0] = lsrc[0]
                      
                      return True
      
                  
                  elif self._state == self._ST_HISTORBACK:
                      self._state = self._ST_LIVE
                      continue
                  
                  elif self._state == self._ST_START:
                      if not self._st_start(instart=False):
                          self._state = self._ST_OVER
                          return False
      
      
          
          def _load_tick(self, msg):
              dtobj = datetime.datetime.strptime(msg['last_trade_time'][0],'%Y-%m-%d %H:%M:%S')
              dt = bt.date2num(dtobj)
              print('dtobj: {}'.format(dtobj))
              try:
                  vol = int(msg['volume'][0])
              except TypeError:
                  vol = 0
      
              # Common fields
              self.lines.datetime[0] = dt
              tick = float(msg['price'][0])
              self.lines.open[0] = tick
              self.lines.high[0] = tick
              self.lines.low[0] = tick
              self.lines.close[0] = tick
              self.lines.volume[0] = vol
              print('last tick {}'.format(tick))
              return True
      

      In my output I can see that it sends me live data points but they are never sampled into bars I can work with.

      I feel like the answer is staring me in the face so any direction would really be appreciated.

      Thank you so much in advance.

      1 Reply Last reply Reply Quote 0
      • sullyai
        sullyai last edited by

        Another thing I would like to add is that I am having the data get saved in the next method and I believe the strategy is not making it to that method

        1 Reply Last reply Reply Quote 0
        • sullyai
          sullyai last edited by

          I have resolved the issue but now I getting an error with the backtrader turning every tick I get into completed bar, how would I make it wait for the full bar?

          1 Reply Last reply Reply Quote 0
          • 1 / 1
          • First post
            Last post
          Copyright © 2016, 2017, 2018, 2019, 2020, 2021 NodeBB Forums | Contributors