For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

Unable To Pickle Metaclasses - Willing to pay for help



  • Howdy!

    I am currently preloading indicators into a dataframe as a means of saving cpu cycles when running optstrategy. Naturally, I ran into the same issue discussed here with multiprocessing and pickling whereby one needs a class type defined in order to pickle something.

    So, I created my own means of doing this and tested it with pickle (with success), but am now getting the following error:

    _pickle.PicklingError: Can't pickle <class 'backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub'>: it's not the same object as backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub

    My code is below:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    ###############################################################################
    #
    # Copyright (C) 2015-2020 Daniel Rodriguez
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    from __future__ import (absolute_import, division, print_function,
                            unicode_literals)
    
    from backtrader.utils.py3 import filter, string_types, integer_types
    
    import pandas as pd
    import backtrader as bt
    from backtrader import date2num
    import backtrader.feed as feed
    from pickle import dumps, loads
    from storm.strategy import Storm
    
    class PandasData(feed.DataBase):
    
        params = (
            ('nocase', True),
    
            # Possible values for datetime (must always be present)
            #  None : datetime is the "index" in the Pandas Dataframe
            #  -1 : autodetect position or case-wise equal name
            #  >= 0 : numeric index to the colum in the pandas dataframe
            #  string : column name (as index) in the pandas dataframe
            ('datetime', None),
    
            # Possible values below:
            #  None : column not present
            #  -1 : autodetect position or case-wise equal name
            #  >= 0 : numeric index to the colum in the pandas dataframe
            #  string : column name (as index) in the pandas dataframe
            ('open', -1),
            ('high', -1),
            ('low', -1),
            ('close', -1),
            ('volume', -1),
            ('openinterest', -1),
        )
    
        datafields = [
            'datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest'
        ]
    
        def start(self):
            super(PandasData, self).start()
    
            # reset the length with each start
            self._idx = -1
    
            # Transform names (valid for .ix) into indices (good for .iloc)
            if self.p.nocase:
                colnames = [x.lower() for x in self.p.dataname.columns.values]
            else:
                colnames = [x for x in self.p.dataname.columns.values]
    
            for k, v in self._colmapping.items():
                if v is None:
                    continue  # special marker for datetime
                if isinstance(v, string_types):
                    try:
                        if self.p.nocase:
                            v = colnames.index(v.lower())
                        else:
                            v = colnames.index(v)
                    except ValueError as e:
                        defmap = getattr(self.params, k)
                        if isinstance(defmap, integer_types) and defmap < 0:
                            v = None
                        else:
                            raise e  # let user now something failed
    
                self._colmapping[k] = v
    
        def _load(self):
            self._idx += 1
    
            if self._idx >= len(self.p.dataname):
                # exhausted all rows
                return False
    
            # Set the standard datafields
            for datafield in self.getlinealiases():
                if datafield == 'datetime':
                    continue
    
                colindex = self._colmapping[datafield]
                if colindex is None:
                    # datafield signaled as missing in the stream: skip it
                    continue
    
                # get the line to be set
                line = getattr(self.lines, datafield)
    
                # indexing for pandas: 1st is colum, then row
                line[0] = self.p.dataname.iloc[self._idx, colindex]
    
            # datetime conversion
            coldtime = self._colmapping['datetime']
    
            if coldtime is None:
                # standard index in the datetime
                tstamp = self.p.dataname.index[self._idx]
            else:
                # it's in a different column ... use standard column index
                tstamp = self.p.dataname.iloc[self._idx, coldtime]
    
            # convert to float via datetime and store it
            dt = tstamp.to_pydatetime()
            dtnum = date2num(dt)
            self.lines.datetime[0] = dtnum
    
            # Done ... return
            return True
    
        def __reduce__(self):
            print("hi")
            return (_InitializeParameterized(), (self.lines, self.params), self.__dict__)
    
    
    def make_parameterized(lines, params):
    
        class PandasIndicatorData(PandasData):
            
            lines = lines
            params = params
            datafields = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest'] + list(lines)
    
            def __init__(self):
    
                super(PandasData, self).__init__()
    
                # these "colnames" can be strings or numeric types
                colnames = list(self.p.dataname.columns.values)
                if self.p.datetime is None:
                    # datetime is expected as index col and hence not returned
                    pass
    
                # try to autodetect if all columns are numeric
                cstrings = filter(lambda x: isinstance(x, string_types), colnames)
                colsnumeric = not len(list(cstrings))
    
                # Where each datafield find its value
                self._colmapping = dict()
    
                # Build the column mappings to internal fields in advance
                for datafield in self.getlinealiases():
                    defmapping = getattr(self.params, datafield)
    
                    if isinstance(defmapping, integer_types) and defmapping < 0:
                        # autodetection requested
                        for colname in colnames:
                            if isinstance(colname, string_types):
                                if self.p.nocase:
                                    found = datafield.lower() == colname.lower()
                                else:
                                    found = datafield == colname
    
                                if found:
                                    self._colmapping[datafield] = colname
                                    break
    
                        if datafield not in self._colmapping:
                            # autodetection requested and not found
                            self._colmapping[datafield] = None
                            continue
                    else:
                        # all other cases -- used given index
                        self._colmapping[datafield] = defmapping
        
        return PandasIndicatorData
    
    
    class PandasIndicatorData(PandasData):
        pass
    
    class _InitializeParameterized(object):
        def __call__(self, lines, params):
            obj = _InitializeParameterized()
            obj.__class__ = make_parameterized(lines, params)
            return obj
    
    if __name__ == "__main__":
    
        lines = ('hma_5','hma_6','hma_7','hma_8','hma_9')
    
        base_params = (
            ('nocase', True),
            ('datetime', None),
            ('open', -1),
            ('high', -1),
            ('low', -1),
            ('close', -1),
            ('volume', -1),
            ('openinterest', -1),
        )
        params = (('hma_5', -1), ('hma_6', -1), ('hma_7', -1), ('hma_8', -1), ('hma_9', -1))
    
        df = pd.read_csv("data.csv", delimiter=",", header=0, squeeze=False, error_bad_lines=False, index_col=0, parse_dates=['datetime'])
    
        data = make_parameterized(
            lines, 
            base_params + params
        )(dataname=df)
    
        print(data.lines)
        print(data.params)
        print(data.datafields)
        data_p = dumps(data)
    
        del data
        data = loads(data_p)
    
        print(data.lines)
        print(data.params)
        print(data.datafields)
    
        setattr(bt.metabase, "PandasIndicatorData", data)
        cerebro = bt.Cerebro()
        cerebro.adddata(data)
    
        cerebro.optstrategy(Storm, period=range(5,10), data_period="8h")
    
        cerebro.broker = bt.brokers.BackBroker(slip_perc=0.0025, slip_open=True)
        cerebro.broker.set_cash(10000)
    
        strats = cerebro.run()
    

    And this is the storm strategy referenced in the code:

    #!/usr/bin/env python
    # -*- coding: utf-8; py-indent-offset:4 -*-
    
    import logging
    import datetime as datetime
    import backtrader as bt
    
    class Storm(Strategy):
    
        params = (
            ('log_data',False),
            ('log_trades',False),
            ("start_date",datetime.datetime(2019, 1, 1).date()),
            ("end_date",datetime.datetime(2019, 12, 31).date()),
            ("buy_target",0.5),
            ("sell_target",0.0),
            ("data_period",50),
            ("period",25),
            ("ma", "hma"),
        )
    
        def __init__(self):
            cross = bt.ind.CrossOver(self.data.close, getattr(self.data, self.p.ma + "_" + str(self.p.period)))
            self.buysig = cross > 0
            self.sellsig = cross < 0
    
        def next(self):
            if self.datetime.date(ago=0) >= self.p.end_date:
                self.order_target_percent(target=0.0)
            elif self.datetime.date(ago=0) >= self.p.start_date:
                if self.buysig and self.position.size <=0:
                    if self.p.log_trades:
                        self.loginfo('Enter Long')
                    self.order_target_percent(target=self.p.buy_target)
                elif self.sellsig and self.position.size >=0:
                    if self.p.log_trades:
                        self.loginfo('Enter Short')
                    self.order_target_percent(target=self.p.sell_target)
    
        def notify_trade(self, trade):
            if not self.p.log_trades:
                return
            
            if trade.justopened:
                self.loginfo('Trade Opened  - Size {} @Price {}',
                             trade.size, trade.price)
            elif trade.isclosed:
                self.loginfo('Trade Closed  - Profit {}', trade.pnlcomm)
    
            else:  # trade updated
                self.loginfo('Trade Updated - Size {} @Price {}',
                             trade.size, trade.price)
    
        def notify_order(self, order):
            if order.alive():
                return
    
            otypetxt = 'Buy ' if order.isbuy() else 'Sell'
            if order.status == order.Completed:
    
                if self.p.log_trades:
                    self.loginfo(
                        ('{} Order Completed - '
                        'Size: {} @Price: {} '
                        'Value: {:.2f} Comm: {:.2f}'),
                        otypetxt, order.executed.size, order.executed.price,
                        order.executed.value, order.executed.comm
                    )
            else:
                if self.p.log_trades:
                    self.loginfo('{} Order rejected', otypetxt)
    
        def loginfo(self, txt, *args):
            out = [self.datetime.date().isoformat(), txt.format(*args)]
            logging.info(','.join(out))
    
        def logerror(self, txt, *args):
            out = [self.datetime.date().isoformat(), txt.format(*args)]
            logging.error(','.join(out))
    
        def logdebug(self, txt, *args):
            out = [self.datetime.date().isoformat(), txt.format(*args)]
            logging.debug(','.join(out))
    
        def logdata(self):
            txt = []
            txt += ['{:.2f}'.format(self.data.open[0])]
            txt += ['{:.2f}'.format(self.data.high[0])]
            txt += ['{:.2f}'.format(self.data.low[0])]
            txt += ['{:.2f}'.format(self.data.close[0])]
            txt += ['{:.2f}'.format(self.data.volume[0])]
    
            if (self.p.log_data):
                self.loginfo(','.join(txt))
    
        def stop(self):
            if (self.p.log_data):
                self.loginfo('Ending Value: {1:8.2f}'.format(
                    self.broker.getvalue()))
    

    If someone could take a look at this and provide some insight / help, I would gladly reimburse you for your time - this is something which I have been toiling away at for a few days, now, and know it would be a great example to add to the backtrader repository.

    I think @ab_trader or @backtrader might have done something similar to this not too long ago.

    Thank you so much for your time.



  • Oh, and you'll want to remove the (Strategy) superclass from the Storm file.



  • I'm just going to use maxcpus=1 and wrap everything in multiprocessing, instead.



  • Did you get this sorted?



});