Backtrader Community

    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups
    • Search
    For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

    Interest in implementing observer to store data when live trading

    General Discussion
    4
    9
    2303
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • ?
      A Former User last edited by

      Hi,

      I was wondering if this could be generally useful. I am working on persisting data during live trading for inspection and comparison against backtests. I also plan to use it to ensure that live data is being processed on time.

      The idea is that you add an observer like this:

      cerebero.addobserver(PersistingObserver,
                           connectionstring='sqlite:///path/to/sqlite.sqlite',
                           observers=[BuySellObserver,Broker,Trades,LiveDataObserver])
      

      The workflow would be that we read the observer's lines, create a SQLAlchemy metaclass, "migrate" the schema if needed, and then write all the lines in each next() call.

      If this would be interesting for you, let me know.

      1 Reply Last reply Reply Quote 0
      • L
        Laurent Michelizza last edited by

        Hi,

        Would be interesting to compare the backtested results again live results (at least to calculate the slippage).

        Thanks

        ? 1 Reply Last reply Reply Quote 0
        • ?
          A Former User @Laurent Michelizza last edited by A Former User

          @laurent-michelizza Here's what I've got so far. Suggestions welcome

          1 Reply Last reply Reply Quote 0
          • ?
            A Former User last edited by A Former User

            import backtrader as bt
            import sqlalchemy as sqa
            import sqlalchemy.ext.declarative
            import sqlalchemy.sql.sqltypes
            from sqlalchemy import Column, Integer, String, TIMESTAMP, Float
            import re
            import os
            import os.path
            from typing import List
            import datetime as dt
            import logging
            
            def getTableName(data):
                name = getattr(data,'_name',data.__class__.__name__).strip()
            
                # Replace spaces with underscores
                name = re.sub('\s+','_',name)
                # Remove invalid characters
                name = re.sub('[^0-9a-zA-Z_]', '', name)
                # Remove leading characters until we find a letter or underscore
                name = re.sub('^[^a-zA-Z_]+', '', name)
            
                return name
            
            def getTableColumns(data):
                columns = dict(
                    datetime = Column(TIMESTAMP,primary_key=True),
                )
                for line in data.lines.getlinealiases():
                    if line == 'datetime':
                        continue
                    columns[line] = Column(Float,nullable=True)
                return columns
            
            class PersistToDatabase(bt.Observer):
                lines = ('ignored',)
                params = dict(
                    debug = False,
                    dbpath = '/path/to/sqlite.db'
                )
            
                def __init__(self):
                    bt.Observer.__init__(self)
                    if self.p.dbpath != ":memory:":
                        path = os.path.abspath(self.p.dbpath)
                        dirname = os.path.dirname(path)
                        if not os.path.isdir(dirname):
                            os.makedirs(dirname)
                    else:
                        path = self.p.dbpath
            
                    self._engine = sqa.create_engine("sqlite:///{}".format(path),echo=self.p.debug)
                    self._Base = sqa.ext.declarative.declarative_base()
                    self._classes:List[self._Base] = []
            
                    for lines in self._owner.datas + list(self._owner.observers):
                        self._classes.append(self._makeSqlClass(lines))
            
                    self._Base.metadata.create_all(self._engine)
                    self._Session = sqlalchemy.orm.sessionmaker(bind=self._engine)
            
                def _makeSqlClass(self,data):
                    name = getTableName(data)
                    columns = getTableColumns(data)
            
                    s = '#<{name} '.format(name=name)
                    for col in columns:
                        col:Column
                        s = s + '{name} = {value}, '.format(name=col,
                                                            value='{' + col + '}')
                    s = s + '>'
            
                    def classRepr(self2):
                        return s.format(**self2.__dict__)
            
                    return type(name,(self._Base,),
                                dict(__tablename__=name,**columns,__repr__=classRepr))
            
                def next(self):
                    try:
                        self._next()
                    except Exception as e:
                        logging.exception("EXCEPTION OCCURRED WHILE PERSISTING DATA")
            
                def _next(self):
                    session = self._Session()
                    with session.begin_nested():
                        for klass,data in zip(self._classes,self._owner.datas + list(self._owner.observers)):
                            instance = klass()
                            for column in klass.__table__.columns:
                                column:Column
                                data:bt.DataBase
                                value = getattr(data.lines,column.name,None)
                                if value is not None:
                                    if type(column.type) == sqa.sql.sqltypes.TIMESTAMP:
                                        value = data.datetime.datetime(0)
                                    else:
                                        value = value[0]
                                    setattr(instance,column.name,value)
                                if len(getattr(data,'datetime',[])) <= 1:
                                    setattr(instance,'datetime',self._owner.data0.datetime.datetime())
                            session.merge(instance)
            
                    session.commit()
            
                def stop(self):
                    self._engine.close()
            

            Will keep editing this comment with latest version.

            Note: this slows things down A LOT during backtesting.

            1 Reply Last reply Reply Quote 1
            • D
              duality_ last edited by

              Yes, having a database would be great! I remember when I serialized data to json, saved it to a file and later retrieve it and deserialize. So convoluted.

              Why does this slow down backtesting?

              ? 1 Reply Last reply Reply Quote 0
              • ?
                A Former User @duality_ last edited by

                @duality_ I haven't profiled it, but I'm assuming that this is because on every next() call, we synchronously write to the SQLite database in the main thread. The solution is pretty simple: all we do is create a database thread and do the writes there.

                Alternatively, we can write out to CSV and then import them into a SQLite database: http://www.sqlitetutorial.net/sqlite-import-csv/

                1 Reply Last reply Reply Quote 0
                • B
                  backtrader administrators last edited by

                  Remember that threads in Python are GIL-blocked. There will be some improvement because you are entering an I/O operation rather than just crunching numbers, but performance is still bound to suffer.

                  ? 1 Reply Last reply Reply Quote 0
                  • ?
                    A Former User @backtrader last edited by

                    @backtrader Correct. I'm currently working on implementing an equivalent PersistToCSV

                    ? 1 Reply Last reply Reply Quote 0
                    • ?
                      A Former User @Guest last edited by A Former User

                      Here is the CSV-based version, however I can't figure out why, but sometimes the entire file isn't flushed. This version works much faster.

                      import backtrader as bt
                      import re
                      import os
                      import os.path
                      from typing import List
                      import datetime as dt
                      import logging
                      import math
                      import pytz
                      import csv
                      
                      def getTableName(data):
                          name = getattr(data,'_name',data.__class__.__name__).strip()
                      
                          # Replace spaces with underscores
                          name = re.sub('\s+','_',name)
                          # Remove invalid characters
                          name = re.sub('[^0-9a-zA-Z_]', '', name)
                          # Remove leading characters until we find a letter or underscore
                          name = re.sub('^[^a-zA-Z_]+', '', name)
                      
                          return name
                      
                      def getTableColumns(data):
                          columns = dict(
                              datetime = dt.datetime
                          )
                          for line in data.lines.getlinealiases():
                              if line == 'datetime':
                                  continue
                              columns[line] = float
                          return columns
                      
                      class PersistToCSV(bt.Observer):
                          lines = ('ignored',)
                          params = dict(
                              debug = False,
                              rootpath = '/path/to/csv/output/root/'
                          )
                      
                          def __init__(self):
                              bt.Observer.__init__(self)
                              if not os.path.isdir(self.p.rootpath):
                                  os.makedirs(self.p.rootpath)
                      
                              self._csvs:List[csv.DictWriter] = []
                              self._metadata = []
                              self._objects = self._owner.datas + list(self._owner.observers)
                      
                              for obj in self._objects:
                                  name = getTableName(obj)
                                  columns = getTableColumns(obj)
                                  path = os.path.join(self.p.rootpath,name+'.csv')
                                  metadata = (name,columns,open(path,'a'),)
                                  exists = os.path.isfile(path)
                                  csvfile = csv.DictWriter(metadata[2],[c for (c,_) in columns.items()])
                                  if not exists:
                                      csvfile.writeheader()
                                  self._metadata.append(metadata)
                                  self._csvs.append(csvfile)
                      
                          def next(self):
                             for (obj,meta,csv) in zip(self._objects,self._metadata,self._csvs):
                                 try:
                                     self._persist(obj,meta,csv)
                                 except:
                                     logging.exception("EXCEPTION OCCURRED WHILE WRITING TO CSV")
                      
                          def _persist(self,data:bt.DataBase,meta,csv:csv.DictWriter):
                              name,columns = meta[0],meta[1]
                              row = dict()
                              foundValue = False
                              for name,column in columns.items():
                                  column:Column
                                  value = getattr(data.lines,name,None)
                                  if value is not None:
                                      if column == dt.datetime:
                                          value = data.datetime.datetime(0)
                                          # # Convert to UTC
                                          if data.contractdetails is not None:
                                              tz = pytz.timezone(data.contractdetails.m_timeZoneId)
                                              value = tz.normalize(tz.localize(value)).astimezone(pytz.utc)
                                      else:
                                          value = value[0]
                                      if type(value) == float and math.isnan(value):
                                          print("HI")
                                          continue
                                      foundValue = True
                                      row[name] = value
                                  if len(getattr(data,'datetime',[])) <= 1:
                                      data = self._owner.data0
                                      value = data.datetime.datetime(0)
                                      if data.contractdetails is not None:
                                          tz = pytz.timezone(data.contractdetails.m_timeZoneId)
                                          value = tz.normalize(tz.localize(value)).astimezone(pytz.utc)
                                      row['datetime'] = value
                              if foundValue:
                                  csv.writerow(row)
                      
                          def stop(self):
                              for (name,columns,fd) in self._metadata:
                                  fd.flush()
                                  fd.close() 
                      
                      1 Reply Last reply Reply Quote 1
                      • 1 / 1
                      • First post
                        Last post
                      Copyright © 2016, 2017, 2018, 2019, 2020, 2021 NodeBB Forums | Contributors