Unable To Pickle Metaclasses - Willing to pay for help
-
Howdy!
I am currently preloading indicators into a dataframe as a means of saving cpu cycles when running
optstrategy
. Naturally, I ran into the same issue discussed here with multiprocessing and pickling whereby one needs a class type defined in order to pickle something.So, I created my own means of doing this and tested it with pickle (with success), but am now getting the following error:
_pickle.PicklingError: Can't pickle <class 'backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub'>: it's not the same object as backtrader.lineseries.Lines_LineSeries_DataSeries_OHLC_OHLCDateTime_AbstractDataBase_DataBase_PandasData_AutoSub
My code is below:
#!/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- ############################################################################### # # Copyright (C) 2015-2020 Daniel Rodriguez # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # ############################################################################### from __future__ import (absolute_import, division, print_function, unicode_literals) from backtrader.utils.py3 import filter, string_types, integer_types import pandas as pd import backtrader as bt from backtrader import date2num import backtrader.feed as feed from pickle import dumps, loads from storm.strategy import Storm class PandasData(feed.DataBase): params = ( ('nocase', True), # Possible values for datetime (must always be present) # None : datetime is the "index" in the Pandas Dataframe # -1 : autodetect position or case-wise equal name # >= 0 : numeric index to the colum in the pandas dataframe # string : column name (as index) in the pandas dataframe ('datetime', None), # Possible values below: # None : column not present # -1 : autodetect position or case-wise equal name # >= 0 : numeric index to the colum in the pandas dataframe # string : column name (as index) in the pandas dataframe ('open', -1), ('high', -1), ('low', -1), ('close', -1), ('volume', -1), ('openinterest', -1), ) datafields = [ 'datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest' ] def start(self): super(PandasData, self).start() # reset the length with each start self._idx = -1 # Transform names (valid for .ix) into indices (good for .iloc) if self.p.nocase: colnames = [x.lower() for x in self.p.dataname.columns.values] else: colnames = [x for x in self.p.dataname.columns.values] for k, v in self._colmapping.items(): if v is None: continue # special marker for datetime if isinstance(v, string_types): try: if self.p.nocase: v = colnames.index(v.lower()) else: v = colnames.index(v) except ValueError as e: defmap = getattr(self.params, k) if isinstance(defmap, integer_types) and defmap < 0: v = None else: raise e # let user now something failed self._colmapping[k] = v def _load(self): self._idx += 1 if self._idx >= len(self.p.dataname): # exhausted all rows return False # Set the standard datafields for datafield in self.getlinealiases(): if datafield == 'datetime': continue colindex = self._colmapping[datafield] if colindex is None: # datafield signaled as missing in the stream: skip it continue # get the line to be set line = getattr(self.lines, datafield) # indexing for pandas: 1st is colum, then row line[0] = self.p.dataname.iloc[self._idx, colindex] # datetime conversion coldtime = self._colmapping['datetime'] if coldtime is None: # standard index in the datetime tstamp = self.p.dataname.index[self._idx] else: # it's in a different column ... use standard column index tstamp = self.p.dataname.iloc[self._idx, coldtime] # convert to float via datetime and store it dt = tstamp.to_pydatetime() dtnum = date2num(dt) self.lines.datetime[0] = dtnum # Done ... return return True def __reduce__(self): print("hi") return (_InitializeParameterized(), (self.lines, self.params), self.__dict__) def make_parameterized(lines, params): class PandasIndicatorData(PandasData): lines = lines params = params datafields = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest'] + list(lines) def __init__(self): super(PandasData, self).__init__() # these "colnames" can be strings or numeric types colnames = list(self.p.dataname.columns.values) if self.p.datetime is None: # datetime is expected as index col and hence not returned pass # try to autodetect if all columns are numeric cstrings = filter(lambda x: isinstance(x, string_types), colnames) colsnumeric = not len(list(cstrings)) # Where each datafield find its value self._colmapping = dict() # Build the column mappings to internal fields in advance for datafield in self.getlinealiases(): defmapping = getattr(self.params, datafield) if isinstance(defmapping, integer_types) and defmapping < 0: # autodetection requested for colname in colnames: if isinstance(colname, string_types): if self.p.nocase: found = datafield.lower() == colname.lower() else: found = datafield == colname if found: self._colmapping[datafield] = colname break if datafield not in self._colmapping: # autodetection requested and not found self._colmapping[datafield] = None continue else: # all other cases -- used given index self._colmapping[datafield] = defmapping return PandasIndicatorData class PandasIndicatorData(PandasData): pass class _InitializeParameterized(object): def __call__(self, lines, params): obj = _InitializeParameterized() obj.__class__ = make_parameterized(lines, params) return obj if __name__ == "__main__": lines = ('hma_5','hma_6','hma_7','hma_8','hma_9') base_params = ( ('nocase', True), ('datetime', None), ('open', -1), ('high', -1), ('low', -1), ('close', -1), ('volume', -1), ('openinterest', -1), ) params = (('hma_5', -1), ('hma_6', -1), ('hma_7', -1), ('hma_8', -1), ('hma_9', -1)) df = pd.read_csv("data.csv", delimiter=",", header=0, squeeze=False, error_bad_lines=False, index_col=0, parse_dates=['datetime']) data = make_parameterized( lines, base_params + params )(dataname=df) print(data.lines) print(data.params) print(data.datafields) data_p = dumps(data) del data data = loads(data_p) print(data.lines) print(data.params) print(data.datafields) setattr(bt.metabase, "PandasIndicatorData", data) cerebro = bt.Cerebro() cerebro.adddata(data) cerebro.optstrategy(Storm, period=range(5,10), data_period="8h") cerebro.broker = bt.brokers.BackBroker(slip_perc=0.0025, slip_open=True) cerebro.broker.set_cash(10000) strats = cerebro.run()
And this is the storm strategy referenced in the code:
#!/usr/bin/env python # -*- coding: utf-8; py-indent-offset:4 -*- import logging import datetime as datetime import backtrader as bt class Storm(Strategy): params = ( ('log_data',False), ('log_trades',False), ("start_date",datetime.datetime(2019, 1, 1).date()), ("end_date",datetime.datetime(2019, 12, 31).date()), ("buy_target",0.5), ("sell_target",0.0), ("data_period",50), ("period",25), ("ma", "hma"), ) def __init__(self): cross = bt.ind.CrossOver(self.data.close, getattr(self.data, self.p.ma + "_" + str(self.p.period))) self.buysig = cross > 0 self.sellsig = cross < 0 def next(self): if self.datetime.date(ago=0) >= self.p.end_date: self.order_target_percent(target=0.0) elif self.datetime.date(ago=0) >= self.p.start_date: if self.buysig and self.position.size <=0: if self.p.log_trades: self.loginfo('Enter Long') self.order_target_percent(target=self.p.buy_target) elif self.sellsig and self.position.size >=0: if self.p.log_trades: self.loginfo('Enter Short') self.order_target_percent(target=self.p.sell_target) def notify_trade(self, trade): if not self.p.log_trades: return if trade.justopened: self.loginfo('Trade Opened - Size {} @Price {}', trade.size, trade.price) elif trade.isclosed: self.loginfo('Trade Closed - Profit {}', trade.pnlcomm) else: # trade updated self.loginfo('Trade Updated - Size {} @Price {}', trade.size, trade.price) def notify_order(self, order): if order.alive(): return otypetxt = 'Buy ' if order.isbuy() else 'Sell' if order.status == order.Completed: if self.p.log_trades: self.loginfo( ('{} Order Completed - ' 'Size: {} @Price: {} ' 'Value: {:.2f} Comm: {:.2f}'), otypetxt, order.executed.size, order.executed.price, order.executed.value, order.executed.comm ) else: if self.p.log_trades: self.loginfo('{} Order rejected', otypetxt) def loginfo(self, txt, *args): out = [self.datetime.date().isoformat(), txt.format(*args)] logging.info(','.join(out)) def logerror(self, txt, *args): out = [self.datetime.date().isoformat(), txt.format(*args)] logging.error(','.join(out)) def logdebug(self, txt, *args): out = [self.datetime.date().isoformat(), txt.format(*args)] logging.debug(','.join(out)) def logdata(self): txt = [] txt += ['{:.2f}'.format(self.data.open[0])] txt += ['{:.2f}'.format(self.data.high[0])] txt += ['{:.2f}'.format(self.data.low[0])] txt += ['{:.2f}'.format(self.data.close[0])] txt += ['{:.2f}'.format(self.data.volume[0])] if (self.p.log_data): self.loginfo(','.join(txt)) def stop(self): if (self.p.log_data): self.loginfo('Ending Value: {1:8.2f}'.format( self.broker.getvalue()))
If someone could take a look at this and provide some insight / help, I would gladly reimburse you for your time - this is something which I have been toiling away at for a few days, now, and know it would be a great example to add to the backtrader repository.
I think @ab_trader or @backtrader might have done something similar to this not too long ago.
Thank you so much for your time.
-
Oh, and you'll want to remove the
(Strategy)
superclass from the Storm file. -
I'm just going to use
maxcpus=1
and wrap everything in multiprocessing, instead. -
Did you get this sorted?