Where is the triger point for the next method in strategy using Oandadata feed.
-
Dear all, I am studying Oandadata feed. For now, I have known how Oandadata get data from remote point. Please see this.
The data is put into a queue q.Now, I wonder how backtrader fetches the data in the queue and triggers next method in strategy.
Any help is appreciated. -
take a look at the
_load
method of any feed class inherited fromAbstractDataBase
( including theOandaData
class).In this method the new bar is extracted from the queue assigned to a particular data feed.
The
_load
method is eventually called from Cerebro enginerun
method where all the data feeds are iterated in order to get their new bars. -
@vladisld Thank you. Below is the load method, could you point me which line of code will fetch bar data in the queue?
def _load(self): if self._state == self._ST_OVER: return False while True: if self._state == self._ST_LIVE: try: msg = (self._storedmsg.pop(None, None) or self.qlive.get(timeout=self._qcheck)) except queue.Empty: return None # indicate timeout situation if msg is None: # Conn broken during historical/backfilling self.put_notification(self.CONNBROKEN) # Try to reconnect if not self.p.reconnect or self._reconns == 0: # Can no longer reconnect self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # failed self._reconns -= 1 self._st_start(instart=False, tmout=self.p.reconntimeout) continue if 'code' in msg: self.put_notification(self.CONNBROKEN) code = msg['code'] if code not in [599, 598, 596]: self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # failed if not self.p.reconnect or self._reconns == 0: # Can no longer reconnect self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # failed # Can reconnect self._reconns -= 1 self._st_start(instart=False, tmout=self.p.reconntimeout) continue self._reconns = self.p.reconnections # Process the message according to expected return type if not self._statelivereconn: if self._laststatus != self.LIVE: if self.qlive.qsize() <= 1: # very short live queue self.put_notification(self.LIVE) ret = self._load_tick(msg) if ret: return True # could not load bar ... go and get new one continue # Fall through to processing reconnect - try to backfill self._storedmsg[None] = msg # keep the msg # else do a backfill if self._laststatus != self.DELAYED: self.put_notification(self.DELAYED) dtend = None if len(self) > 1: # len == 1 ... forwarded for the 1st time dtbegin = self.datetime.datetime(-1) elif self.fromdate > float('-inf'): dtbegin = num2date(self.fromdate) else: # 1st bar and no begin set # passing None to fetch max possible in 1 request dtbegin = None dtend = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6) self.qhist = self.o.candles( self.p.dataname, dtbegin, dtend, self._timeframe, self._compression, candleFormat=self._candleFormat, includeFirst=self.p.includeFirst) self._state = self._ST_HISTORBACK self._statelivereconn = False # no longer in live continue elif self._state == self._ST_HISTORBACK: msg = self.qhist.get() if msg is None: # Conn broken during historical/backfilling # Situation not managed. Simply bail out self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # error management cancelled the queue elif 'code' in msg: # Error self.put_notification(self.NOTSUBSCRIBED) self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False if msg: if self._load_history(msg): return True # loading worked continue # not loaded ... date may have been seen else: # End of histdata if self.p.historical: # only historical self.put_notification(self.DISCONNECTED) self._state = self._ST_OVER return False # end of historical # Live is also wished - go for it self._state = self._ST_LIVE continue elif self._state == self._ST_FROM: if not self.p.backfill_from.next(): # additional data source is consumed self._state = self._ST_START continue # copy lines of the same name for alias in self.lines.getlinealiases(): lsrc = getattr(self.p.backfill_from.lines, alias) ldst = getattr(self.lines, alias) ldst[0] = lsrc[0] return True elif self._state == self._ST_START: if not self._st_start(instart=False): self._state = self._ST_OVER return False
-
try: msg = (self._storedmsg.pop(None, None) or self.qlive.get(timeout=self._qcheck))
This fetches the data loads in to using this
ret = self._load_tick(msg)
-
@li-mike said in Where is the triger point for the next method in strategy using Oandadata feed.:
self.qlive.get(timeout=self._qcheck))