Interest in implementing observer to store data when live trading
-
Hi,
I was wondering if this could be generally useful. I am working on persisting data during live trading for inspection and comparison against backtests. I also plan to use it to ensure that live data is being processed on time.
The idea is that you add an observer like this:
cerebero.addobserver(PersistingObserver, connectionstring='sqlite:///path/to/sqlite.sqlite', observers=[BuySellObserver,Broker,Trades,LiveDataObserver])
The workflow would be that we read the observer's
lines
, create a SQLAlchemy metaclass, "migrate" the schema if needed, and then write all the lines in eachnext()
call.If this would be interesting for you, let me know.
-
Hi,
Would be interesting to compare the backtested results again live results (at least to calculate the slippage).
Thanks
-
@laurent-michelizza Here's what I've got so far. Suggestions welcome
-
import backtrader as bt import sqlalchemy as sqa import sqlalchemy.ext.declarative import sqlalchemy.sql.sqltypes from sqlalchemy import Column, Integer, String, TIMESTAMP, Float import re import os import os.path from typing import List import datetime as dt import logging def getTableName(data): name = getattr(data,'_name',data.__class__.__name__).strip() # Replace spaces with underscores name = re.sub('\s+','_',name) # Remove invalid characters name = re.sub('[^0-9a-zA-Z_]', '', name) # Remove leading characters until we find a letter or underscore name = re.sub('^[^a-zA-Z_]+', '', name) return name def getTableColumns(data): columns = dict( datetime = Column(TIMESTAMP,primary_key=True), ) for line in data.lines.getlinealiases(): if line == 'datetime': continue columns[line] = Column(Float,nullable=True) return columns class PersistToDatabase(bt.Observer): lines = ('ignored',) params = dict( debug = False, dbpath = '/path/to/sqlite.db' ) def __init__(self): bt.Observer.__init__(self) if self.p.dbpath != ":memory:": path = os.path.abspath(self.p.dbpath) dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname) else: path = self.p.dbpath self._engine = sqa.create_engine("sqlite:///{}".format(path),echo=self.p.debug) self._Base = sqa.ext.declarative.declarative_base() self._classes:List[self._Base] = [] for lines in self._owner.datas + list(self._owner.observers): self._classes.append(self._makeSqlClass(lines)) self._Base.metadata.create_all(self._engine) self._Session = sqlalchemy.orm.sessionmaker(bind=self._engine) def _makeSqlClass(self,data): name = getTableName(data) columns = getTableColumns(data) s = '#<{name} '.format(name=name) for col in columns: col:Column s = s + '{name} = {value}, '.format(name=col, value='{' + col + '}') s = s + '>' def classRepr(self2): return s.format(**self2.__dict__) return type(name,(self._Base,), dict(__tablename__=name,**columns,__repr__=classRepr)) def next(self): try: self._next() except Exception as e: logging.exception("EXCEPTION OCCURRED WHILE PERSISTING DATA") def _next(self): session = self._Session() with session.begin_nested(): for klass,data in zip(self._classes,self._owner.datas + list(self._owner.observers)): instance = klass() for column in klass.__table__.columns: column:Column data:bt.DataBase value = getattr(data.lines,column.name,None) if value is not None: if type(column.type) == sqa.sql.sqltypes.TIMESTAMP: value = data.datetime.datetime(0) else: value = value[0] setattr(instance,column.name,value) if len(getattr(data,'datetime',[])) <= 1: setattr(instance,'datetime',self._owner.data0.datetime.datetime()) session.merge(instance) session.commit() def stop(self): self._engine.close()
Will keep editing this comment with latest version.
Note: this slows things down A LOT during backtesting.
-
Yes, having a database would be great! I remember when I serialized data to json, saved it to a file and later retrieve it and deserialize. So convoluted.
Why does this slow down backtesting?
-
@duality_ I haven't profiled it, but I'm assuming that this is because on every
next()
call, we synchronously write to the SQLite database in the main thread. The solution is pretty simple: all we do is create a database thread and do the writes there.Alternatively, we can write out to CSV and then import them into a SQLite database: http://www.sqlitetutorial.net/sqlite-import-csv/
-
Remember that threads in Python are GIL-blocked. There will be some improvement because you are entering an I/O operation rather than just crunching numbers, but performance is still bound to suffer.
-
@backtrader Correct. I'm currently working on implementing an equivalent
PersistToCSV
-
Here is the CSV-based version, however I can't figure out why, but sometimes the entire file isn't flushed. This version works much faster.
import backtrader as bt import re import os import os.path from typing import List import datetime as dt import logging import math import pytz import csv def getTableName(data): name = getattr(data,'_name',data.__class__.__name__).strip() # Replace spaces with underscores name = re.sub('\s+','_',name) # Remove invalid characters name = re.sub('[^0-9a-zA-Z_]', '', name) # Remove leading characters until we find a letter or underscore name = re.sub('^[^a-zA-Z_]+', '', name) return name def getTableColumns(data): columns = dict( datetime = dt.datetime ) for line in data.lines.getlinealiases(): if line == 'datetime': continue columns[line] = float return columns class PersistToCSV(bt.Observer): lines = ('ignored',) params = dict( debug = False, rootpath = '/path/to/csv/output/root/' ) def __init__(self): bt.Observer.__init__(self) if not os.path.isdir(self.p.rootpath): os.makedirs(self.p.rootpath) self._csvs:List[csv.DictWriter] = [] self._metadata = [] self._objects = self._owner.datas + list(self._owner.observers) for obj in self._objects: name = getTableName(obj) columns = getTableColumns(obj) path = os.path.join(self.p.rootpath,name+'.csv') metadata = (name,columns,open(path,'a'),) exists = os.path.isfile(path) csvfile = csv.DictWriter(metadata[2],[c for (c,_) in columns.items()]) if not exists: csvfile.writeheader() self._metadata.append(metadata) self._csvs.append(csvfile) def next(self): for (obj,meta,csv) in zip(self._objects,self._metadata,self._csvs): try: self._persist(obj,meta,csv) except: logging.exception("EXCEPTION OCCURRED WHILE WRITING TO CSV") def _persist(self,data:bt.DataBase,meta,csv:csv.DictWriter): name,columns = meta[0],meta[1] row = dict() foundValue = False for name,column in columns.items(): column:Column value = getattr(data.lines,name,None) if value is not None: if column == dt.datetime: value = data.datetime.datetime(0) # # Convert to UTC if data.contractdetails is not None: tz = pytz.timezone(data.contractdetails.m_timeZoneId) value = tz.normalize(tz.localize(value)).astimezone(pytz.utc) else: value = value[0] if type(value) == float and math.isnan(value): print("HI") continue foundValue = True row[name] = value if len(getattr(data,'datetime',[])) <= 1: data = self._owner.data0 value = data.datetime.datetime(0) if data.contractdetails is not None: tz = pytz.timezone(data.contractdetails.m_timeZoneId) value = tz.normalize(tz.localize(value)).astimezone(pytz.utc) row['datetime'] = value if foundValue: csv.writerow(row) def stop(self): for (name,columns,fd) in self._metadata: fd.flush() fd.close()