Navigation

    Backtrader Community

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups
    • Search
    For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

    Unable To Pickle Metaclasses - Willing to pay for help

    General Code/Help
    multiprocessing optstrategy preloading
    2
    4
    104
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • P
      pomnl last edited by

      Howdy!

      I am currently preloading indicators into a dataframe as a means of saving cpu cycles when running optstrategy. Naturally, I ran into the same issue discussed here with multiprocessing and pickling whereby one needs a class type defined in order to pickle something.

      So, I created my own means of doing this and tested it with pickle (with success), but am now getting the following error:

      _pickle.PicklingError: Can't pickle <class 'backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub'>: it's not the same object as backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub

      My code is below:

      #!/usr/bin/env python
      # -*- coding: utf-8; py-indent-offset:4 -*-
      ###############################################################################
      #
      # Copyright (C) 2015-2020 Daniel Rodriguez
      #
      # This program is free software: you can redistribute it and/or modify
      # it under the terms of the GNU General Public License as published by
      # the Free Software Foundation, either version 3 of the License, or
      # (at your option) any later version.
      #
      # This program is distributed in the hope that it will be useful,
      # but WITHOUT ANY WARRANTY; without even the implied warranty of
      # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      # GNU General Public License for more details.
      #
      # You should have received a copy of the GNU General Public License
      # along with this program.  If not, see <http://www.gnu.org/licenses/>.
      #
      ###############################################################################
      from __future__ import (absolute_import, division, print_function,
                              unicode_literals)
      
      from backtrader.utils.py3 import filter, string_types, integer_types
      
      import pandas as pd
      import backtrader as bt
      from backtrader import date2num
      import backtrader.feed as feed
      from pickle import dumps, loads
      from storm.strategy import Storm
      
      class PandasData(feed.DataBase):
      
          params = (
              ('nocase', True),
      
              # Possible values for datetime (must always be present)
              #  None : datetime is the "index" in the Pandas Dataframe
              #  -1 : autodetect position or case-wise equal name
              #  >= 0 : numeric index to the colum in the pandas dataframe
              #  string : column name (as index) in the pandas dataframe
              ('datetime', None),
      
              # Possible values below:
              #  None : column not present
              #  -1 : autodetect position or case-wise equal name
              #  >= 0 : numeric index to the colum in the pandas dataframe
              #  string : column name (as index) in the pandas dataframe
              ('open', -1),
              ('high', -1),
              ('low', -1),
              ('close', -1),
              ('volume', -1),
              ('openinterest', -1),
          )
      
          datafields = [
              'datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest'
          ]
      
          def start(self):
              super(PandasData, self).start()
      
              # reset the length with each start
              self._idx = -1
      
              # Transform names (valid for .ix) into indices (good for .iloc)
              if self.p.nocase:
                  colnames = [x.lower() for x in self.p.dataname.columns.values]
              else:
                  colnames = [x for x in self.p.dataname.columns.values]
      
              for k, v in self._colmapping.items():
                  if v is None:
                      continue  # special marker for datetime
                  if isinstance(v, string_types):
                      try:
                          if self.p.nocase:
                              v = colnames.index(v.lower())
                          else:
                              v = colnames.index(v)
                      except ValueError as e:
                          defmap = getattr(self.params, k)
                          if isinstance(defmap, integer_types) and defmap < 0:
                              v = None
                          else:
                              raise e  # let user now something failed
      
                  self._colmapping[k] = v
      
          def _load(self):
              self._idx += 1
      
              if self._idx >= len(self.p.dataname):
                  # exhausted all rows
                  return False
      
              # Set the standard datafields
              for datafield in self.getlinealiases():
                  if datafield == 'datetime':
                      continue
      
                  colindex = self._colmapping[datafield]
                  if colindex is None:
                      # datafield signaled as missing in the stream: skip it
                      continue
      
                  # get the line to be set
                  line = getattr(self.lines, datafield)
      
                  # indexing for pandas: 1st is colum, then row
                  line[0] = self.p.dataname.iloc[self._idx, colindex]
      
              # datetime conversion
              coldtime = self._colmapping['datetime']
      
              if coldtime is None:
                  # standard index in the datetime
                  tstamp = self.p.dataname.index[self._idx]
              else:
                  # it's in a different column ... use standard column index
                  tstamp = self.p.dataname.iloc[self._idx, coldtime]
      
              # convert to float via datetime and store it
              dt = tstamp.to_pydatetime()
              dtnum = date2num(dt)
              self.lines.datetime[0] = dtnum
      
              # Done ... return
              return True
      
          def __reduce__(self):
              print("hi")
              return (_InitializeParameterized(), (self.lines, self.params), self.__dict__)
      
      
      def make_parameterized(lines, params):
      
          class PandasIndicatorData(PandasData):
              
              lines = lines
              params = params
              datafields = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest'] + list(lines)
      
              def __init__(self):
      
                  super(PandasData, self).__init__()
      
                  # these "colnames" can be strings or numeric types
                  colnames = list(self.p.dataname.columns.values)
                  if self.p.datetime is None:
                      # datetime is expected as index col and hence not returned
                      pass
      
                  # try to autodetect if all columns are numeric
                  cstrings = filter(lambda x: isinstance(x, string_types), colnames)
                  colsnumeric = not len(list(cstrings))
      
                  # Where each datafield find its value
                  self._colmapping = dict()
      
                  # Build the column mappings to internal fields in advance
                  for datafield in self.getlinealiases():
                      defmapping = getattr(self.params, datafield)
      
                      if isinstance(defmapping, integer_types) and defmapping < 0:
                          # autodetection requested
                          for colname in colnames:
                              if isinstance(colname, string_types):
                                  if self.p.nocase:
                                      found = datafield.lower() == colname.lower()
                                  else:
                                      found = datafield == colname
      
                                  if found:
                                      self._colmapping[datafield] = colname
                                      break
      
                          if datafield not in self._colmapping:
                              # autodetection requested and not found
                              self._colmapping[datafield] = None
                              continue
                      else:
                          # all other cases -- used given index
                          self._colmapping[datafield] = defmapping
          
          return PandasIndicatorData
      
      
      class PandasIndicatorData(PandasData):
          pass
      
      class _InitializeParameterized(object):
          def __call__(self, lines, params):
              obj = _InitializeParameterized()
              obj.__class__ = make_parameterized(lines, params)
              return obj
      
      if __name__ == "__main__":
      
          lines = ('hma_5','hma_6','hma_7','hma_8','hma_9')
      
          base_params = (
              ('nocase', True),
              ('datetime', None),
              ('open', -1),
              ('high', -1),
              ('low', -1),
              ('close', -1),
              ('volume', -1),
              ('openinterest', -1),
          )
          params = (('hma_5', -1), ('hma_6', -1), ('hma_7', -1), ('hma_8', -1), ('hma_9', -1))
      
          df = pd.read_csv("data.csv", delimiter=",", header=0, squeeze=False, error_bad_lines=False, index_col=0, parse_dates=['datetime'])
      
          data = make_parameterized(
              lines, 
              base_params + params
          )(dataname=df)
      
          print(data.lines)
          print(data.params)
          print(data.datafields)
          data_p = dumps(data)
      
          del data
          data = loads(data_p)
      
          print(data.lines)
          print(data.params)
          print(data.datafields)
      
          setattr(bt.metabase, "PandasIndicatorData", data)
          cerebro = bt.Cerebro()
          cerebro.adddata(data)
      
          cerebro.optstrategy(Storm, period=range(5,10), data_period="8h")
      
          cerebro.broker = bt.brokers.BackBroker(slip_perc=0.0025, slip_open=True)
          cerebro.broker.set_cash(10000)
      
          strats = cerebro.run()
      

      And this is the storm strategy referenced in the code:

      #!/usr/bin/env python
      # -*- coding: utf-8; py-indent-offset:4 -*-
      
      import logging
      import datetime as datetime
      import backtrader as bt
      
      class Storm(Strategy):
      
          params = (
              ('log_data',False),
              ('log_trades',False),
              ("start_date",datetime.datetime(2019, 1, 1).date()),
              ("end_date",datetime.datetime(2019, 12, 31).date()),
              ("buy_target",0.5),
              ("sell_target",0.0),
              ("data_period",50),
              ("period",25),
              ("ma", "hma"),
          )
      
          def __init__(self):
              cross = bt.ind.CrossOver(self.data.close, getattr(self.data, self.p.ma + "_" + str(self.p.period)))
              self.buysig = cross > 0
              self.sellsig = cross < 0
      
          def next(self):
              if self.datetime.date(ago=0) >= self.p.end_date:
                  self.order_target_percent(target=0.0)
              elif self.datetime.date(ago=0) >= self.p.start_date:
                  if self.buysig and self.position.size <=0:
                      if self.p.log_trades:
                          self.loginfo('Enter Long')
                      self.order_target_percent(target=self.p.buy_target)
                  elif self.sellsig and self.position.size >=0:
                      if self.p.log_trades:
                          self.loginfo('Enter Short')
                      self.order_target_percent(target=self.p.sell_target)
      
          def notify_trade(self, trade):
              if not self.p.log_trades:
                  return
              
              if trade.justopened:
                  self.loginfo('Trade Opened  - Size {} @Price {}',
                               trade.size, trade.price)
              elif trade.isclosed:
                  self.loginfo('Trade Closed  - Profit {}', trade.pnlcomm)
      
              else:  # trade updated
                  self.loginfo('Trade Updated - Size {} @Price {}',
                               trade.size, trade.price)
      
          def notify_order(self, order):
              if order.alive():
                  return
      
              otypetxt = 'Buy ' if order.isbuy() else 'Sell'
              if order.status == order.Completed:
      
                  if self.p.log_trades:
                      self.loginfo(
                          ('{} Order Completed - '
                          'Size: {} @Price: {} '
                          'Value: {:.2f} Comm: {:.2f}'),
                          otypetxt, order.executed.size, order.executed.price,
                          order.executed.value, order.executed.comm
                      )
              else:
                  if self.p.log_trades:
                      self.loginfo('{} Order rejected', otypetxt)
      
          def loginfo(self, txt, *args):
              out = [self.datetime.date().isoformat(), txt.format(*args)]
              logging.info(','.join(out))
      
          def logerror(self, txt, *args):
              out = [self.datetime.date().isoformat(), txt.format(*args)]
              logging.error(','.join(out))
      
          def logdebug(self, txt, *args):
              out = [self.datetime.date().isoformat(), txt.format(*args)]
              logging.debug(','.join(out))
      
          def logdata(self):
              txt = []
              txt += ['{:.2f}'.format(self.data.open[0])]
              txt += ['{:.2f}'.format(self.data.high[0])]
              txt += ['{:.2f}'.format(self.data.low[0])]
              txt += ['{:.2f}'.format(self.data.close[0])]
              txt += ['{:.2f}'.format(self.data.volume[0])]
      
              if (self.p.log_data):
                  self.loginfo(','.join(txt))
      
          def stop(self):
              if (self.p.log_data):
                  self.loginfo('Ending Value: {1:8.2f}'.format(
                      self.broker.getvalue()))
      

      If someone could take a look at this and provide some insight / help, I would gladly reimburse you for your time - this is something which I have been toiling away at for a few days, now, and know it would be a great example to add to the backtrader repository.

      I think @ab_trader or @backtrader might have done something similar to this not too long ago.

      Thank you so much for your time.

      1 Reply Last reply Reply Quote 0
      • P
        pomnl last edited by

        Oh, and you'll want to remove the (Strategy) superclass from the Storm file.

        1 Reply Last reply Reply Quote 0
        • P
          pomnl last edited by

          I'm just going to use maxcpus=1 and wrap everything in multiprocessing, instead.

          1 Reply Last reply Reply Quote 0
          • run-out
            run-out last edited by

            Did you get this sorted?

            1 Reply Last reply Reply Quote 0
            • 1 / 1
            • First post
              Last post
            Copyright © 2016, 2017, 2018 NodeBB Forums | Contributors
            $(document).ready(function () { app.coldLoad(); }); }