JSON data sources



  • I was just taking a look to see about adding InfluxDB as a data feed source. I see a lot of code specific to CSV data sources in the backtrader/feeds.py. @backtrader any thought toward providing the basic structure to add JSON as a data feed format?

    I think once that structure is there, it would be very simple to add something like InfluxDB or other JSON formated data sources. The Metaprogramming stuff is a bit of a high hurdle for me to grok right now...

    Current thought and application for this is to take advantage of Grafana to provide some dynamic interfaces to the data stored in InfluxDB. Ultimately I would like to create a writer that would write backtest results or to use the BT to process some of this data and augment equity data with other indicator values.

    Example below of what can easily be thrown together in Grafana once you have the data in a supported database.

    alt text



  • BTW, I am happy to take a stab at adding this if you are willing to answer a few noob questions along the way. :smile:



  • Interesting idea. So Grafana will replace standard plot with fancier diagrams.

    Is this a correct data flow?
    Backtrader > data in json > InfluxDB > Grafana



  • @ab_trader

    Data flows are several actually and a bit different from what you show. A few points to help clarify this.

    1. InfluxDB is a time series optimized database which would be an alternative to storage of data in CSV files.
    2. InfluxDB provides the ability to resample the data in the query for data, so as I am currently configured, I store tick or 1min resolution data in InfluxDB and am able to request a sample from the DB in 5min, 1Day, 1Week, 1Year, etc.
    3. InfluxDB is screaming fast and provides some other interesting options to query across other datasets.

    Data flows for me are currently:

    Historical data from DTN --> store as Pandas Dataframe (json format) in InfluxDB --> Grafana for automatically updated charts.

    With the data in InfluxDB, it is not possible to do something like the following to augment the data in InfluxDB with ADF or some other expensive calculation and store back to InfluxDB to later take advantage of BT's extended data feed capability. Or store backtest results to be analyzed in Grafana. Grafana provides a REST API that would allow building dashboards on the fly as you write results to InfluxDB.

    InfluxDB (json) --> BT --> (json) InfluxDB

    The other project I am actively look at here is to persist data about positions, cash, orders etc. from IB back to InfluxDB where I can have a trading console view on IB. (I run IBgateway via IBcontroller which prevents ability to run TWS at the same time)


  • administrators

    @RandyT said in JSON data sources:

    I was just taking a look to see about adding InfluxDB as a data feed source. I see a lot of code specific to CSV data sources in the backtrader/feeds.py. @backtrader any thought toward providing the basic structure to add JSON as a data feed format?

    The thing is that CSV based formats share all a common structure and that's why a base class can provide a lot of functionality to (super)simplify what each CSV data feed has to do (to the point that the GenericCSVData could actually handle all the cases with the proper configuration)

    Supporting a JSON based data feed doesn't pose any real problem. But for sure each JSON format will be different. In one the prices will be in a list with dict items, in another lists will be embedded in lists with prices being identified by position.

    Any sample format?



  • Here are a few tidbits. Happy to dig more.

    There are a few examples here from the influxdb module as to how to interact with the database. There is an ability to request a pandas formatted chunk of data, or a standard dict() response.

    https://github.com/influxdata/influxdb-python/tree/master/examples

    Query looks as follows. Ability to specify time range and the time compression as well as the fields. I'm currently setup to store each symbol in its own "record". The query below requests data 5 days back from now(). I'm a bit suspect of those values for volume, but that is another issue...

    >>> q = 'SELECT mean("close_p") AS "close", mean("open_p") AS "open", mean("volume") AS "volume" FROM "autogen"."SPY" WHERE time > now() - 5d GROUP BY time(1d) fill(linear)'
    >>> bar = ndb.query(q)
    >>> bar
    ResultSet({'('SPY', None)': [{'time': '2017-01-30T00:00:00Z', 'close': 227.0515876923076, 'open': 227.0525000000001, 'volume': 37294770.28461538}, {'time': '2017-01-31T00:00:00Z', 'close': 226.81787179487176, 'open': 226.81632641025658, 'volume': 31521417.551282052}, {'time': '2017-02-01T00:00:00Z', 'close': 227.555212051282, 'open': 227.55685282051283, 'volume': 32954027.492307693}, {'time': '2017-02-02T00:00:00Z', 'close': 227.53784871794895, 'open': 227.5361712820515, 'volume': 34967991.24615385}, {'time': '2017-02-03T00:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-04T00:00:00Z', 'close': None, 'open': None, 'volume': None}]})
    >>> type(bar)
    <class 'influxdb.resultset.ResultSet'>
    

    Here is a query showing 2 days of 2hr bars.

    >>> q = 'SELECT mean("close_p") AS "close", mean("open_p") AS "open", mean("volume") AS "volume" FROM "autogen"."SPY" WHERE time > now() - 2d GROUP BY time(2h) fill(linear)'
    >>> bar = ndb.query(q)
    >>> bar
    ResultSet({'('SPY', None)': [{'time': '2017-02-02T00:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T02:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T04:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T06:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T08:00:00Z', 'close': 227.15747000000002, 'open': 227.14870000000005, 'volume': 6508692.833333333}, {'time': '2017-02-02T10:00:00Z', 'close': 227.62730583333325, 'open': 227.6283566666666, 'volume': 21500074.783333335}, {'time': '2017-02-02T12:00:00Z', 'close': 227.55091916666683, 'open': 227.54574666666667, 'volume': 38636595.541666664}, {'time': '2017-02-02T14:00:00Z', 'close': 227.53041583333345, 'open': 227.5312783333334, 'volume': 51882128.016666666}, {'time': '2017-02-02T16:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T18:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T20:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-02T22:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T00:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T02:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T04:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T06:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T08:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T10:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T12:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T14:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T16:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T18:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T20:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-03T22:00:00Z', 'close': None, 'open': None, 'volume': None}, {'time': '2017-02-04T00:00:00Z', 'close': None, 'open': None, 'volume': None}]})
    


  • So there is not much about it that is json. Just a key pointing to a bunch of tuples.



  • @RandyT you can write your data to .csv with a couple of lines of python code I guess.



  • @Ed Bartosh

    Goal isn't really to get to CSV although that would be a way to feed it into the current CSV support. To the issues you have raised about speed, I'd like to pull this data directly from the InfluxDB and avoid putting it on disk.

    There are some other changes that would be needed here to allow you to define the query that would be sent to InfluxDB to specify what and at what timeframe you would get the data. But I will look at the idea of munging that into CSV in order to feed it into BT.



  • @RandyT said in JSON data sources:

    Goal isn't really to get to CSV although that would be a way to feed it into the current CSV support. To the issues you have raised about speed, I'd like to pull this data directly from the InfluxDB and avoid putting it on disk.

    According to the explanations I was given about the reason of that slowness this wouldn't work, I believe.


  • administrators

    This is a snippet that shows how it could be done ...

    class InfluxDB(bt.DataBase):
        def start(self):
            super(InfluxDB, self).start()
    
            self.ndb = GetConnectionToInflux()
    
            # The query could already consider parameters like fromdate and todate
            # to have the database skip them and not the internal code
            q = ('SELECT mean("close_p") AS "close", mean("open_p") '
                 'AS "open", mean("volume") AS "volume" FROM "autogen".'
                 '"{dataname}" WHERE time > now() - 5d GROUP BY time(1d)'
                 'fill(linear)')
    
            dbars = self.ndb.query(q.format(dataname=self.p.dataname))
    
            # The sample returns something called ResultSet which seems to contain
            # a dict with a single key and a list of dictionaries with the values
            # Let's assume for the sake of the example that it can be addressed
            # like a dict
            self.bars = dbars[(self.p.dataname, None)]
            self.biter = iter(bars)
    
        def _load(self):
            bar = next(self.biter)
            if bar['close'] is None:
                return False
    
            self.l.datetime[0] = datetime.strptime(bar['time'],
                                                   '%Y-%m-%dT%H:%M:%SZ')
    
            self.l.open[0] = bar['open']
            self.l.high[0] = bar['high']
            self.l.low[0] = bar['low']
            self.l.close[0] = bar['close']
            self.l.volume[0] = bar['volume']
    
            return True
    

    Real operational details pertaining to the connection to the InfluxDB and the result object ResultSet need to be worked out.



  • @backtrader Just FYI, I am chipping away at this and should have something that at least connects to the database. May need your assistance wiring it into the buffer mechanisms.



  • @backtrader
    The following bit of code works for the most part with one big exception in that I cannot seem to get the system to hit start() here. I've debugged this by putting it in __init__(). Perhaps you can do the magic to plug it in. Happy to provide this in a PR to development branch if you would prefer.

    FWIW, was trying to create a data feed with the following bit of code.

        data0 = btfeed.InfluxDB(dataname='SPY',  database='equitys', fromdate='2016-06-01')
    

    Here is the current code which supports fromdate in RFC3339 formats.

    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    
    import backtrader as bt
    import backtrader.feed as feed
    from backtrader.utils import date2num
    from influxdb import InfluxDBClient as idbclient
    from influxdb.exceptions import InfluxDBClientError
    import datetime as dt
    
    TIMEFRAMES = dict(
        (
            (bt.TimeFrame.MicroSeconds, '1ms'),
            (bt.TimeFrame.Seconds, '1s'),
            (bt.TimeFrame.Minutes, '1m'),
            (bt.TimeFrame.Days, '1d'),
            (bt.TimeFrame.Weeks, '1w'),
            (bt.TimeFrame.Months, '1m'),
            (bt.TimeFrame.Years, '1y'),
        )
    )
    
    
    class InfluxDB(feed.DataBase):
        params = (
            ('host', '127.0.0.1'),
            ('port', '8086'),
            ('username', None),
            ('password', None),
            ('database', None),
            ('timeframe', bt.TimeFrame.Days),
            ('startdate', None),
            ('high', 'high_p'),
            ('low', 'low_p'),
            ('open', 'open_p'),
            ('close', 'close_p'),
            ('volume', 'volume'),
            ('ointerest', 'oi'),
        )
    
        def start(self):
            super(InfluxDB, self).start()
            try:
                self.ndb = idbclient(self.p.host, self.p.port, self.p.username,
                                     self.p.password, self.p.database)
            except InfluxDBClientError as err:
                print('Failed to establish connection to InfluxDB: %s' % err)
    
            tf = TIMEFRAMES.get(self.p.timeframe, '1d')
            if not self.p.startdate:
                st = '<= now()'
            else:
                st = '>= \'%s\'' % self.p.startdate
    
            # The query could already consider parameters like fromdate and todate
            # to have the database skip them and not the internal code
            qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", '
                    'mean("{low_f}") AS "low", mean("{close_f}") AS "close", '
                    'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" '
                    'FROM "{dataname}" '
                    'WHERE time {begin} '
                    'GROUP BY time({timeframe}) fill(none)').format(
                        open_f=self.p.open, high_f=self.p.high,
                        low_f=self.p.low, close_f=self.p.close,
                        vol_f=self.p.volume, oi_f=self.p.ointerest,
                        timeframe=tf, begin=st, dataname=self.p.dataname)
    
            try:
                dbars = list(self.ndb.query(qstr).get_points())
            except InfluxDBClientError as err:
                print('InfluxDB query failed: %s' % err)
    
            self.biter = iter(dbars)
    
        def _load(self):
            bar = next(self.biter)
            if bar['close'] is None:
                return False
    
            self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'],
                                             '%Y-%m-%dT%H:%M:%SZ'))
    
            self.l.open[0] = bar['open']
            self.l.high[0] = bar['high']
            self.l.low[0] = bar['low']
            self.l.close[0] = bar['close']
            self.l.volume[0] = bar['volume']
    
            return True
    
    


  • Updated the above as it actually works with exception of some problem in the date format as it is parsing bars in def _load(). I had failed to do a cerebro.resampledata() which is why I was not starting this feed.

    Will work this out...

    Getting the following in assignment of the datetime.strptime():

     File "/home/randy/.virtualenvs/backtrader3/lib/python3.6/site-packages/backtrader/linebuffer.py", line 222, in __setitem__
        self.array[self.idx + ago] = value
    TypeError: must be real number, not datetime.datetime
    




  • I'm not seeing all bars in strategy.next() here. @backtrader perhaps you can shed some light?

    In this bit of code from influxfeed.py:

    def _load(self):
           self.i += 1
            try:
                bar = next(self.biter)
            except StopIteration:
                return False
            import ipdb; ipdb.set_trace()
            print('self: %d\tloadcount: %d\tdatecount: %d' % (len(self), self.i, len(self.l.datetime)))
            self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'],
                                                               '%Y-%m-%dT%H:%M:%SZ'))
    

    At the debug breakpoint, I am seeing all bars expected coming from the self.biter.

    However, I seem to be reentering this _load() object without incrementing self and I don't see all bars in strategy.next() as mentioned.

    Debugger shows interesting values for the self.l.datetime, yet date2num seems to be returning a whole number float.

    ipdb> c
    > /home/randy/Develop/backtrader-workspace/backtrader_local/feeds/influxfeed.py(103)_load()
        102         import ipdb; ipdb.set_trace()
    --> 103         self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'],
        104                                                            '%Y-%m-%dT%H:%M:%SZ'))
    
    ipdb> len(self.l.datetime)
    5
    ipdb> len(self)
    5
    ipdb> self.l.datetime[0]
    nan
    ipdb> self.l.datetime[-1]
    736125.9999999999
    ipdb> self.l.datetime[-2]
    736123.9999999999
    ipdb> self.l.datetime[0]
    nan
    ipdb> self.l.datetime[-1]
    736125.9999999999
    ipdb> date2num(dt.datetime.strptime(bar['time'], '%Y-%m-%dT%H:%M:%SZ'))
    736130.0
    ipdb> c
    ipdb> self.l.datetime[-1]
    736129.9999999999
    

    I am passing in 171 daily bars via self.biter and the count for number of times I enter _load() agrees with that, but the counts for self and self.l.datetime show that not all values are saved.

    self: 83	loadcount: 166	datecount: 83
    self: 84	loadcount: 167	datecount: 84
    self: 84	loadcount: 168	datecount: 84
    self: 85	loadcount: 169	datecount: 85
    self: 85	loadcount: 170	datecount: 85
    self: 86	loadcount: 171	datecount: 86
    

  • administrators

    To try to understand what's happening the following would be needed:

    • Each timestamp which is being parsed from the InfluxDB returned results

    If you are resampling, the resampling filter will sometimes remove bars from the stream while it is accumulating samples before delivering.

    In any case it would seem sensible to check a new data feed without resampling, more if like it seems, with the InfluxDB case, the data is frozen.



  • I am requesting daily data from InfluxDB so there should be no resampling happening within BT. At least not that I have specified. I suspect I am missing some nuance of how BT works here.

    Below is a snippet of the data. We seem to be dropping every other bar.

    2017-01-20T00:00:00Z
    self: 81	loadcount: 162	datecount: 81
    2017-01-23T00:00:00Z
    self: 82	loadcount: 163	datecount: 82
    2017-01-24T00:00:00Z
    self: 82	loadcount: 164	datecount: 82
    2017-01-25T00:00:00Z
    self: 83	loadcount: 165	datecount: 83
    2017-01-26T00:00:00Z
    self: 83	loadcount: 166	datecount: 83
    2017-01-27T00:00:00Z
    self: 84	loadcount: 167	datecount: 84
    2017-01-30T00:00:00Z
    self: 84	loadcount: 168	datecount: 84
    2017-01-31T00:00:00Z
    self: 85	loadcount: 169	datecount: 85
    2017-02-01T00:00:00Z
    self: 85	loadcount: 170	datecount: 85
    2017-02-02T00:00:00Z
    self: 86	loadcount: 171	datecount: 86
    2017-02-03T00:00:00Z
    self: 86	loadcount: 172	datecount: 86
    2017-02-06T00:00:00Z
    self: 87	loadcount: 173	datecount: 87
    2017-02-07T00:00:00Z
    

    Datafeed is setup as follows:

       # Parse CSI SPY data file
        data0 = btlfeed.InfluxDB(dataname='SPY', timeframe=bt.TimeFrame.Days,
                                 **feedkwargs)
        # cerebro.resampledata(data0, timeframe=bt.TimeFrame.Days)
        cerebro.adddata(data0)
    

    And I discover now that if I replace the .resampledata() with just .adddata(), I get the expected results.

    What is .resampledata() doing that would cause it to drop every other bar when resampling an input that is the same timeframe?


  • administrators

    The original design, and still lurking everywhere more or less, wouldn't consider the declared timeframe/compression pairs, being the reason that the end user may even not know what the actual combination of the input data is and only what the output should look like.

    As such even if the original data declares Days/1 and the same output is requested in resampledata, the resampler will try it.

    The observed effect is probably a consequence, without due investigation yet, of the usage of sessionend to control when subday data delivers the latest bar of the day and with it the resampled daily bar.


  • administrators

    Yes, the fact that you are missing each 2nd day is an effect of the new algorithm, because to cover any timeframe and when no sessionend is in place, the target is set at 23:59:59.999999 and the daily data is less generous being at 23:59:59

    If you set an explicit sessionend for the data source like 23:59:59 (or any other value actually), you will not miss any bar.


Log in to reply
 

Looks like your connection to Backtrader Community was lost, please wait while we try to reconnect.