For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

How to synchronize callbacks from strategy.stop



  • My class that runs an optimization passes a callback function to my strategy for it to call from strategy.stop. When the callback is invoked for each instance of the strategy (I have 4 because I'm using 2 parameters with 2 values each) my class wants to accumulate some data. I had unexpected behavior unless I set maxcpus=1. I wanted to ask if anyone has a recommended synchronization pattern? I tried creating a multiprocessing.Lock just before cerebro.run, and then acquire/release in the callback function, but that gives RuntimeError: Lock objects should only be shared between processes through inheritance.



  • Here's a short program that gives the RuntimeError: Lock objects should only be shared between processes through inheritance. Of course this is just a Python error, not a Backtrader error, but I'm not seeing how this usage of Lock is different from the Lock example here: https://www.blog.pythonlibrary.org/2016/08/02/python-201-a-multiprocessing-tutorial/

    If I run this example without the lock, the output shows that instance_counter is corrupted.

    import datetime
    from multiprocessing import Lock
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        params = (
            ('x', 10),
            ('y', 20),
            ('callback', None),
            ('lock', None),
        )
        def __init__(self):
            print(f'Init  x: {self.p.x}  y: {self.p.y}')
    
        def stop(self):
            self.p.callback(self.p.x, self.p.y, self.p.lock)
    
    instance_counter = 0
    
    # The callback function gets the parameters and the lock from the strategy.
    # It increments the instance_counter and prints it.
    def mycallback(param1, param2, lock):
        global instance_counter
        lock.acquire()
        instance_counter += 1
        print(f"Callback, instance_counter: {instance_counter}  p1: {param1}, p2: {param2}")
        lock.release()
    
    if __name__ == '__main__':
        cerebro = bt.Cerebro()
        # pass a callback and a lock object into the strategy
        lock = Lock()
        strats = cerebro.optstrategy(TestStrategy, x=(10, 11), y=(20, 21), callback=(mycallback), lock=(lock))
    
        data = bt.feeds.YahooFinanceCSVData(dataname='/code/backtrader/data/orcl-1995-2014.txt',
            fromdate=datetime.datetime(2000, 1, 1), todate=datetime.datetime(2000, 2, 1),  reverse=False)
        cerebro.adddata(data)
    
        cerebro.run(maxcpus=None)
    


  • I can avoid the python error by constructing the Lock object inside the strategy (then I guess it's the same process Backtrader uses). However, I'm surprised it doesn't synchronize access to the critical section where the counter is incremented! The counter in this simple snippet is only an example, my real code needs to update a list. I guess I'll have to look into a thread-safe list.
    Here's the output I get with the lock moved into the strategy. Notice that counter == 1 each time while the other params vary as expected...
    Init x: 10 y: 20
    Init x: 10 y: 21
    Callback, instance_counter: 1 p1: 10, p2: 20
    Init x: 11 y: 20
    Init x: 11 y: 21
    Callback, instance_counter: 1 p1: 10, p2: 21
    Callback, instance_counter: 1 p1: 11, p2: 20
    Callback, instance_counter: 1 p1: 11, p2: 21

    Any feedback is welcome. Here's the code...

    import datetime
    from multiprocessing import Lock
    import backtrader as bt
    
    class TestStrategy(bt.Strategy):
        params = (
            ('x', 10),
            ('y', 20),
            ('callback', None),
        )
        def __init__(self):
            print(f'Init  x: {self.p.x}  y: {self.p.y}')
            self.lock = Lock()
    
        def stop(self):
            with self.lock:
                self.p.callback(self.p.x, self.p.y)
    
    class Runner:
        def run(self):
            self.instance_counter = 0
            cerebro = bt.Cerebro()
            # pass a callback into the strategy
            cerebro.optstrategy(TestStrategy, x=(10, 11), y=(20, 21), callback=self.callback)
    
            data = bt.feeds.YahooFinanceCSVData(dataname='/code/backtrader/data/orcl-1995-2014.txt',
                                                fromdate=datetime.datetime(2000, 1, 1),
                                                todate=datetime.datetime(2000, 2, 1), reverse=False)
            cerebro.adddata(data)
            cerebro.run(maxcpus=None)
    
        # The callback function assumes strategy.stop implements a critical section lock.
        def callback(self, param1, param2):
            self.instance_counter += 1
            print(f"Callback, instance_counter: {self.instance_counter}  p1: {param1}, p2: {param2}")
    
    if __name__ == '__main__':
        r = Runner()
        r.run()
    

  • administrators

    @scottz1 said in How to synchronize callbacks from strategy.stop:

    I can avoid the python error by constructing the Lock object inside the strategy (then I guess it's the same process Backtrader uses)

    Wrong guess. There is no synchronization of anything, because nothing communicates.

    You are creating individual locks in sub-processes, and that's why there is no error ... and why there is no synchronization.

    Your best bet is to follow the recommended approach, i.e.: gather the results as returned by cerebro.run and examine each instance as returned.


Log in to reply
 

});