For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See:

Resampling in InfluxDB Data Feed

  • I have been looking at the code of InfluxDB data feed and how it performs resampling of data. I noticed that the OHLC data is averaged. I don't have much experience with influxdb but shouldn't OHLC should be extracted using the following formula:

    1. Open: First
    2. High: Max
    3. Low: Min
    4. Close: Last

    The code here averages the data in backtrader code base.

    qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", '
                    'mean("{low_f}") AS "low", mean("{close_f}") AS "close", '
                    'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" '
                    'FROM "{dataname}" '
                    'WHERE time {begin} '
                    'GROUP BY time({timeframe}) fill(none)').format(
              , high_f=self.p.high,
                        low_f=self.p.low, close_f=self.p.close,
                        vol_f=self.p.volume, oi_f=self.p.ointerest,
                        timeframe=tf, begin=st, dataname=self.p.dataname)

  • To fix some of these issues, I have modified the code. The changes:

    1. The current implementation doesn't use fromdate, todate to range the data. This isn't a big issue when working on localhost, but it is when the database is on a remote server and you need to pull lots of data. Added support for this.

    2. self.p.startdate in the current implementation is inconsistent with other data feeds so removed that.

    3. Used first("open"), max("high"), min("low"), last("close") in resampling. I still don't understand why is mean aggregator being used in the current implementation. As I see it, it can distort the data.

    4. Open Interest was queried but never inserted in line.

    5. Added timestamp precision to be seconds instead of the default nanoseconds. It helps with some weird date errors.

    from __future__ import (absolute_import, division, print_function,
    from influxdb import InfluxDBClient
    from influxdb.exceptions import InfluxDBClientError
    import backtrader as bt
    import backtrader.feed as feed
    from backtrader.utils import date2num
    from datetime import datetime
    TIMEFRAMES = dict(
            (bt.TimeFrame.Seconds, "s"),
            (bt.TimeFrame.Minutes, "m"),
            (bt.TimeFrame.Days, "d"),
            (bt.TimeFrame.Weeks, "w"),
            (bt.TimeFrame.Months, "m"),
            (bt.TimeFrame.Years, "y"),
    class InfluxData(feed.DataBase):
        params = (
            ("host", "localhost"),
            ("port", 8086),
            ("username", None),
            ("password", None),
            ("database", None),
            ("timeframe", bt.TimeFrame.Days),
            ("high", "high"),
            ("low", "low"),
            ("open", "open"),
            ("close", "close"),
            ("volume", "volume"),
            ("openinterest", "oi"),
        def start(self):
            super(InfluxData, self).start()
                self.ndb = InfluxDBClient(, self.p.port, self.p.username,
                                          self.p.password, self.p.database)
            except InfluxDBClientError as err:
                print("Failed to establish connection to InfluxDB: %s" % err)
            tf = "{multiple}{timeframe}".format(
                multiple=(self.p.compression if self.p.compression else 1),
                timeframe=TIMEFRAMES.get(self.p.timeframe, "d"))
            if self.p.fromdate and self.p.todate:
                tcond = "time >= '{fromdate}' AND time <= '{todate}'".format(fromdate=self.p.fromdate, todate=self.p.todate)
            elif self.p.fromdate:
                tcond = "time >= '{fromdate}'".format(fromdate=self.p.fromdate)
            elif self.p.todate:
                tcond = "time <= '{todate}'".format(todate=self.p.todate)
                tcond = "time <= now()"
            qstr = 'SELECT FIRST("{open_f}") AS "open", MAX("{high_f}") as "high", MIN("{low_f}") as "low", ' \
                   'LAST("{close_f}") AS "close", SUM("{vol_f}") as "volume", SUM("{oi_f}") as "openinterest" ' \
                   'FROM "{dataname}" ' \
                   'WHERE {tcond} ' \
                   'GROUP BY time({timeframe}) fill(none) ' \
                   'ORDER BY time ASC'.format(, high_f=self.p.high,
                                              low_f=self.p.low, close_f=self.p.close,
                                              vol_f=self.p.volume, oi_f=self.p.openinterest,
                                              dataname=self.p.dataname, tcond=tcond, timeframe=tf)
                dbars = list(self.ndb.query(qstr, epoch="s").get_points())
            except InfluxDBClientError as err:
                print("InfluxDB query failed: %s" % err)
                dbars = []
            self.biter = iter(dbars)
        def _load(self):
                bar = next(self.biter)
            except StopIteration:
                return False
            self.l.datetime[0] = date2num(datetime.fromtimestamp(bar["time"]))
            vol = bar["volume"]
            oi = bar["openinterest"]
  [0] = bar["open"]
            self.l.high[0] = bar["high"]
            self.l.low[0] = bar["low"]
            self.l.close[0] = bar["close"]
            self.l.volume[0] = vol if vol else 0.0
            self.l.openinterest[0] = oi if oi else 0.0
            return True

    @RandyT have a look into this and see if its suitable for pull

  • @kausality Sorry for lack of response here. Had not been tracking BT for awhile and was just visiting to see about restarting.

    The changes look good to me. Clearly not too many people using this functionality it seems. I cannot recall why I used mean and may have just been my lack of familiarity with influxdb.

    I would encourage you to submit the PR.

Log in to reply