For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Poll live data - how to make this better



  • Hi

    in the backtrader oanda v20 store, I added a method to use poll instead of stream. This provides volume in live data (which is delayed by the interval). I poll for data based on qcheck, but this will do a lot of unnecessary requests.

    How would I best check if interval is about to end or how to minimize the amount of requests when no new data is expected?

    For example when the timeframe is 15 Minutes, the feed should poll shortly after the timeframe is complete (0:00, 0:15, 0:30, 0:45 ... + a few microseconds)

    Is there some functionality in backtrader to give me that info in a feed?

    The oanda v20 store is located here:
    https://github.com/ftomassetti/backtrader-oandav20

    The last commit added the candles support

    Thanks in advance for any ideas or help :)



  • Maybe an example of data I need would explain it a bit better:

    Assume:
    Now: 4:13
    Period: 5 Min

    Required time / data:
    Previous complete period start: 4:05
    Current period start: 4:10

    I would then use previous period time for start time,
    current period time stored, to check if it is different from current period start of current time (at 4:16 it would be 4:15 as start, so I would fetch new data if not already done and current stored period start is 4:15)

    I hope that is more understandable


  • administrators

    @dasch said in Poll live data - how to make this better:

    Is there some functionality in backtrader to give me that info in a feed?

    Wrong area.

    Your best bet is to use a: queue.Queue which can timeout if nothing has been put in the queue and is thread-safe: Python 3 Docs - queue - A synchronized queue class

    Of course, if you are running in a independent thread you can simply call time.sleep on that thread if there is no communication with the outer world for thread control or message passing during the life of the thread.

    You need to wait for the 1st period and then calculate an interval which leaves you a few micro/milli-seconds away from the next target.



  • I added a method to calculate the start time of the period.

    It's for oanda, so i set the day, week and month to UTC 22:00 as the start time. Maybe someone will need this code, so here is how I do it.

    def _getstarttime(self, timeframe, compression, dt = None, offset = 0):
           if dt == None:
               dt = datetime.utcnow()
           if timeframe == TimeFrame.Seconds:
               dt = dt.replace(second=dt.second//compression, microsecond=0)
               if offset > 0:
                   dt = dt - timedelta(seconds=compression*offset)
           elif timeframe == TimeFrame.Minutes:
               hour = 0
               if compression >= 60:
                   hour = compression//60
               minute = compression % 60
               dt = dt - timedelta(hours=hour, minutes=minute)
               dt = dt.replace(minute=dt.minute//minute, second=0, microsecond=0)
               if offset > 0:
                   dt = dt - timedelta(minutes=compression*offset)
           elif timeframe == TimeFrame.Days:
               # start of day is UTC 22 (5pm new york)
               if dtnow.hour < 22:
                   dt = dt - timedelta(days=1)
               if offset:
                   dt = dt - timedelta(days=offset)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Weeks:
               if dt.weekday() != 6:
                   # sunday is start of week at 5pm new york
                   dt = dt - timedelta(days=dt.weekday() + 1)
               if offset:
                   dt = dt - timedelta(days=offset * 7)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Months:
               if offset > 0:
                   dt.replace(month=dt.month - offset)
               # last day of month
               last_day_of_month = dt.replace(day=28) + timedelta(days=4)
               last_day_of_month = last_day_of_month - timedelta(days=last_day_of_month.day)
               last_day_of_month = last_day_of_month.day
               # start of month (1 at 0, 22 last day of prev month)
               if dt.day < last_day_of_month:
                   dt = dt - timedelta(days=dt.day)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           return dt
    

    Edit:

    This method above will not have correct start times for intervals > 60 minutes for intraday. Will post an updated version, which fixes that issue.



  • This method will return the start of the period based on current time (or provided time). It is using UTC 22:00 (5:00 pm New York) as the start of the day.

    Fixed version:

       def _getstarttime(self, timeframe, compression, dt = None, offset = 0):
           if dt == None:
               dt = datetime.utcnow()
           if timeframe == TimeFrame.Seconds:
               dt = dt.replace(second=dt.second//compression, microsecond=0)
               if offset > 0:
                   dt = dt - timedelta(seconds=compression*offset)
           elif timeframe == TimeFrame.Minutes:
               if compression >= 60:
                   hours = 0
                   minutes = 0
                   # get start of day
                   dtstart = self._getstarttime(TimeFrame.Days, 1, dt)
                   #diff start of day with current time to get seconds since start of day
                   dtdiff = dt - dtstart
                   hours = dtdiff.seconds//((60*60)*(compression//60))
                   minutes = compression % 60
                   dt = dtstart + timedelta(hours=hours, minutes=minutes)
               else:
                   dt = dt.replace(minute=compression, second=0, microsecond=0)
               if offset > 0:
                   dt = dt - timedelta(minutes=compression*offset)
           elif timeframe == TimeFrame.Days:
               # start of day is UTC 22 (5pm new york)
               if dt.hour < 22:
                   dt = dt - timedelta(days=1)
               if offset:
                   dt = dt - timedelta(days=offset)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Weeks:
               if dt.weekday() != 6:
                   # sunday is start of week at 5pm new york
                   dt = dt - timedelta(days=dt.weekday() + 1)
               if offset:
                   dt = dt - timedelta(days=offset * 7)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Months:
               if offset > 0:
                   dt= dt - timedelta(days=(min(28+dt.day, 31)))
               # last day of month
               last_day_of_month = dt.replace(day=28) + timedelta(days=4)
               last_day_of_month = last_day_of_month - timedelta(days=last_day_of_month.day)
               last_day_of_month = last_day_of_month.day
               # start of month (1 at 0, 22 last day of prev month)
               if dt.day < last_day_of_month:
                   dt = dt - timedelta(days=dt.day)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           return dt
    


  • I finished the polling, I am using sleep in a thread, the time diff I calculate by start of next period and current time.

    Thanks again for your help @backtrader

       def poll_thread(self):
           t = threading.Thread(target=self._t_poll)
           t.daemon = True
           t.start()
    
       def _t_poll(self):
           dtstart = self._getstarttime(self._timeframe, self._compression, offset=2)
           qcandles = queue.Queue()
           while True:
               dtcurr = self._getstarttime(self._timeframe, self._compression)
               # request candles in live instead of stream
               if dtcurr > dtstart:
                   if len(self) > 1:
                       # len == 1 ... forwarded for the 1st time
                       dtbegin = self.datetime.datetime(-1)
                   elif self.fromdate > float('-inf'):
                       dtbegin = num2date(self.fromdate)
                   else:  # 1st bar and no begin set
                       dtbegin = dtstart
                   self.qlive = self.o.candles(
                       self.p.dataname, dtbegin, None,
                       self._timeframe, self._compression,
                       candleFormat=self._candleFormat,
                       onlyComplete=True,
                       includeFirst=False)
                   dtstart = dtbegin
                   # sleep until next call
                   dtnow = datetime.utcnow()
                   dtnext = self._getstarttime(self._timeframe, self._compression, dt=dtnow, offset=-1)
                   dtdiff = dtnext - dtnow
                   tmout = (dtdiff.days*24*60*60) + dtdiff.seconds + 1
                   if tmout <= 0: tmout = 5
                   _time.sleep(tmout)
    
       def _getstarttime(self, timeframe, compression, dt = None, offset = 0):
           '''This method will return the start of the period based on current 
           time (or provided time). It is using UTC 22:00 (5:00 pm New York) 
           as the start of the day.'''
           if dt == None:
               dt = datetime.utcnow()
           if timeframe == TimeFrame.Seconds:
               dt = dt.replace(second=(dt.second//compression)*compression, microsecond=0)
               if offset:
                   dt = dt - timedelta(seconds=compression*offset)
           elif timeframe == TimeFrame.Minutes:
               if compression >= 60:
                   hours = 0
                   minutes = 0
                   # get start of day
                   dtstart = self._getstarttime(TimeFrame.Days, 1, dt)
                   #diff start of day with current time to get seconds since start of day
                   dtdiff = dt - dtstart
                   hours = dtdiff.seconds//((60*60)*(compression//60))
                   minutes = compression % 60
                   dt = dtstart + timedelta(hours=hours, minutes=minutes)
               else:
                   dt = dt.replace(minute=(dt.minute//compression)*compression, second=0, microsecond=0)
               if offset:
                   dt = dt - timedelta(minutes=compression*offset)
           elif timeframe == TimeFrame.Days:
               # start of day is UTC 22 (5pm new york)
               if dt.hour < 22:
                   dt = dt - timedelta(days=1)
               if offset:
                   dt = dt - timedelta(days=offset)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Weeks:
               if dt.weekday() != 6:
                   # sunday is start of week at 5pm new york
                   dt = dt - timedelta(days=dt.weekday() + 1)
               if offset:
                   dt = dt - timedelta(days=offset * 7)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           elif timeframe == TimeFrame.Months:
               if offset:
                   dt= dt - timedelta(days=(min(28+dt.day, 31)))
               # last day of month
               last_day_of_month = dt.replace(day=28) + timedelta(days=4)
               last_day_of_month = last_day_of_month - timedelta(days=last_day_of_month.day)
               last_day_of_month = last_day_of_month.day
               # start of month (1 at 0, 22 last day of prev month)
               if dt.day < last_day_of_month:
                   dt = dt - timedelta(days=dt.day)
               dt = dt.replace(hour=22, minute=0, second=0, microsecond=0)
           return dt