For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Interest in implementing observer to store data when live trading



  • Hi,

    I was wondering if this could be generally useful. I am working on persisting data during live trading for inspection and comparison against backtests. I also plan to use it to ensure that live data is being processed on time.

    The idea is that you add an observer like this:

    cerebero.addobserver(PersistingObserver,
                         connectionstring='sqlite:///path/to/sqlite.sqlite',
                         observers=[BuySellObserver,Broker,Trades,LiveDataObserver])
    

    The workflow would be that we read the observer's lines, create a SQLAlchemy metaclass, "migrate" the schema if needed, and then write all the lines in each next() call.

    If this would be interesting for you, let me know.



  • Hi,

    Would be interesting to compare the backtested results again live results (at least to calculate the slippage).

    Thanks



  • @laurent-michelizza Here's what I've got so far. Suggestions welcome



  • import backtrader as bt
    import sqlalchemy as sqa
    import sqlalchemy.ext.declarative
    import sqlalchemy.sql.sqltypes
    from sqlalchemy import Column, Integer, String, TIMESTAMP, Float
    import re
    import os
    import os.path
    from typing import List
    import datetime as dt
    import logging
    
    def getTableName(data):
        name = getattr(data,'_name',data.__class__.__name__).strip()
    
        # Replace spaces with underscores
        name = re.sub('\s+','_',name)
        # Remove invalid characters
        name = re.sub('[^0-9a-zA-Z_]', '', name)
        # Remove leading characters until we find a letter or underscore
        name = re.sub('^[^a-zA-Z_]+', '', name)
    
        return name
    
    def getTableColumns(data):
        columns = dict(
            datetime = Column(TIMESTAMP,primary_key=True),
        )
        for line in data.lines.getlinealiases():
            if line == 'datetime':
                continue
            columns[line] = Column(Float,nullable=True)
        return columns
    
    class PersistToDatabase(bt.Observer):
        lines = ('ignored',)
        params = dict(
            debug = False,
            dbpath = '/path/to/sqlite.db'
        )
    
        def __init__(self):
            bt.Observer.__init__(self)
            if self.p.dbpath != ":memory:":
                path = os.path.abspath(self.p.dbpath)
                dirname = os.path.dirname(path)
                if not os.path.isdir(dirname):
                    os.makedirs(dirname)
            else:
                path = self.p.dbpath
    
            self._engine = sqa.create_engine("sqlite:///{}".format(path),echo=self.p.debug)
            self._Base = sqa.ext.declarative.declarative_base()
            self._classes:List[self._Base] = []
    
            for lines in self._owner.datas + list(self._owner.observers):
                self._classes.append(self._makeSqlClass(lines))
    
            self._Base.metadata.create_all(self._engine)
            self._Session = sqlalchemy.orm.sessionmaker(bind=self._engine)
    
        def _makeSqlClass(self,data):
            name = getTableName(data)
            columns = getTableColumns(data)
    
            s = '#<{name} '.format(name=name)
            for col in columns:
                col:Column
                s = s + '{name} = {value}, '.format(name=col,
                                                    value='{' + col + '}')
            s = s + '>'
    
            def classRepr(self2):
                return s.format(**self2.__dict__)
    
            return type(name,(self._Base,),
                        dict(__tablename__=name,**columns,__repr__=classRepr))
    
        def next(self):
            try:
                self._next()
            except Exception as e:
                logging.exception("EXCEPTION OCCURRED WHILE PERSISTING DATA")
    
        def _next(self):
            session = self._Session()
            with session.begin_nested():
                for klass,data in zip(self._classes,self._owner.datas + list(self._owner.observers)):
                    instance = klass()
                    for column in klass.__table__.columns:
                        column:Column
                        data:bt.DataBase
                        value = getattr(data.lines,column.name,None)
                        if value is not None:
                            if type(column.type) == sqa.sql.sqltypes.TIMESTAMP:
                                value = data.datetime.datetime(0)
                            else:
                                value = value[0]
                            setattr(instance,column.name,value)
                        if len(getattr(data,'datetime',[])) <= 1:
                            setattr(instance,'datetime',self._owner.data0.datetime.datetime())
                    session.merge(instance)
    
            session.commit()
    
        def stop(self):
            self._engine.close()
    

    Will keep editing this comment with latest version.

    Note: this slows things down A LOT during backtesting.



  • Yes, having a database would be great! I remember when I serialized data to json, saved it to a file and later retrieve it and deserialize. So convoluted.

    Why does this slow down backtesting?



  • @duality_ I haven't profiled it, but I'm assuming that this is because on every next() call, we synchronously write to the SQLite database in the main thread. The solution is pretty simple: all we do is create a database thread and do the writes there.

    Alternatively, we can write out to CSV and then import them into a SQLite database: http://www.sqlitetutorial.net/sqlite-import-csv/


  • administrators

    Remember that threads in Python are GIL-blocked. There will be some improvement because you are entering an I/O operation rather than just crunching numbers, but performance is still bound to suffer.



  • @backtrader Correct. I'm currently working on implementing an equivalent PersistToCSV



  • Here is the CSV-based version, however I can't figure out why, but sometimes the entire file isn't flushed. This version works much faster.

    import backtrader as bt
    import re
    import os
    import os.path
    from typing import List
    import datetime as dt
    import logging
    import math
    import pytz
    import csv
    
    def getTableName(data):
        name = getattr(data,'_name',data.__class__.__name__).strip()
    
        # Replace spaces with underscores
        name = re.sub('\s+','_',name)
        # Remove invalid characters
        name = re.sub('[^0-9a-zA-Z_]', '', name)
        # Remove leading characters until we find a letter or underscore
        name = re.sub('^[^a-zA-Z_]+', '', name)
    
        return name
    
    def getTableColumns(data):
        columns = dict(
            datetime = dt.datetime
        )
        for line in data.lines.getlinealiases():
            if line == 'datetime':
                continue
            columns[line] = float
        return columns
    
    class PersistToCSV(bt.Observer):
        lines = ('ignored',)
        params = dict(
            debug = False,
            rootpath = '/path/to/csv/output/root/'
        )
    
        def __init__(self):
            bt.Observer.__init__(self)
            if not os.path.isdir(self.p.rootpath):
                os.makedirs(self.p.rootpath)
    
            self._csvs:List[csv.DictWriter] = []
            self._metadata = []
            self._objects = self._owner.datas + list(self._owner.observers)
    
            for obj in self._objects:
                name = getTableName(obj)
                columns = getTableColumns(obj)
                path = os.path.join(self.p.rootpath,name+'.csv')
                metadata = (name,columns,open(path,'a'),)
                exists = os.path.isfile(path)
                csvfile = csv.DictWriter(metadata[2],[c for (c,_) in columns.items()])
                if not exists:
                    csvfile.writeheader()
                self._metadata.append(metadata)
                self._csvs.append(csvfile)
    
        def next(self):
           for (obj,meta,csv) in zip(self._objects,self._metadata,self._csvs):
               try:
                   self._persist(obj,meta,csv)
               except:
                   logging.exception("EXCEPTION OCCURRED WHILE WRITING TO CSV")
    
        def _persist(self,data:bt.DataBase,meta,csv:csv.DictWriter):
            name,columns = meta[0],meta[1]
            row = dict()
            foundValue = False
            for name,column in columns.items():
                column:Column
                value = getattr(data.lines,name,None)
                if value is not None:
                    if column == dt.datetime:
                        value = data.datetime.datetime(0)
                        # # Convert to UTC
                        if data.contractdetails is not None:
                            tz = pytz.timezone(data.contractdetails.m_timeZoneId)
                            value = tz.normalize(tz.localize(value)).astimezone(pytz.utc)
                    else:
                        value = value[0]
                    if type(value) == float and math.isnan(value):
                        print("HI")
                        continue
                    foundValue = True
                    row[name] = value
                if len(getattr(data,'datetime',[])) <= 1:
                    data = self._owner.data0
                    value = data.datetime.datetime(0)
                    if data.contractdetails is not None:
                        tz = pytz.timezone(data.contractdetails.m_timeZoneId)
                        value = tz.normalize(tz.localize(value)).astimezone(pytz.utc)
                    row['datetime'] = value
            if foundValue:
                csv.writerow(row)
    
        def stop(self):
            for (name,columns,fd) in self._metadata:
                fd.flush()
                fd.close()