Navigation

    Backtrader Community

    • Register
    • Login
    • Search
    • Categories
    • Recent
    • Tags
    • Popular
    • Users
    • Groups
    • Search
    For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

    How to cached data from IB and use it later

    General Code/Help
    3
    4
    1931
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • B
      brettelliot last edited by

      I wanted to download historical data from IB, write it to a csv and then use it later. Here's a simple example that does that. Hope this helps people!

      from ib.opt import ibConnection, message
      from ib.ext.Contract import Contract
      from time import sleep, strftime
      from datetime import datetime
      import pandas as pd
      
      
      class IBDataCache(object):
      
          def __init__(self, data_path='.', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=7497, client_id=None):
              self._data_path = data_path
              self._date_format = date_format
              self._reset_data()
              self._next_valid_id = 1
      
              self._conn = ibConnection(host, port, client_id)
              self._conn.enableLogging()
              # Register the response callback function and type of data to be returned
              self._conn.register(self._historical_data_handler, message.historicalData)
              self._conn.register(self._save_order_id, 'NextValidId')
              self._conn.connect()
      
      
          def _reset_data(self):
              self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest'])
              self._s = pd.Series()
      
      
          def _save_order_id(self, msg):
              self._next_valid_id = msg.orderId
      
      
          def _historical_data_handler(self, msg):
              """
                  Define historical data handler for IB - this will populate our pandas data frame
              """
      
              # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low)
              if ('finished' in str(msg.date)) == False:
                  self._s = ([datetime.strptime(msg.date, self._date_format),
                              msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                  self._df.loc[len(self._df)] = self._s
          
              else:
                  self._df.set_index('Date', inplace=True)
      
      
          def get_dataframe(self, sec_type, symbol, currency, exchange, endtime, duration, bar_size, what_to_show, use_rth):
              self._reset_data()
      
              # build filename
              filename = self._data_path + '/' + symbol + '_' + sec_type + '_' + exchange + '_' + currency + '_' + \
                  endtime.replace(' ', '') + '_' + duration.replace(' ', '') + '_' + bar_size.replace(' ', '') + '_' + \
                  what_to_show + '_' + str(use_rth) + '.csv'
      
              try:
      
                  # check if we have this cached
                  self._df = pd.read_csv(filename,
                               parse_dates=True,
                               index_col=0)
              except IOError:
      
                  # Not cached. Download it.
                  # Establish a Contract object and the params for the request
                  req = Contract()
                  req.m_secType = sec_type
                  req.m_symbol = symbol
                  req.m_currency = currency
                  req.m_exchange = exchange
      
                  self._conn.reqHistoricalData(self._next_valid_id, req, endtime, duration,
                                               bar_size, what_to_show, use_rth, 1)
      
                  # Make sure the connection don't get disconnected prior the response data return
                  sleep(5)
                  self._conn.disconnect()
      
                  # Cache dataframe to CSV
                  self._df.to_csv(filename)
      
              return self._df
      
      
      if __name__ == '__main__':
      
          date_format = '%Y%m%d %H:%M:%S'
      
          downloader_kwargs = dict(
              data_path='../data',
              date_format=date_format,
              host='127.0.0.1',
              port=7497,
              client_id=999
          )
          downloader = IBDataCache(**downloader_kwargs)
      
          stock_kwargs = dict(
              sec_type='STK',
              symbol='AAPL',
              currency='USD',
              exchange='SMART',
              endtime=datetime(2018, 2, 14, 15, 59).strftime(date_format),
              duration='1 D',
              bar_size='1 min',
              what_to_show='TRADES',
              use_rth=1
          )
      
          df = downloader.get_dataframe(**stock_kwargs)
          print(df)
      
      
          stock_kwargs = dict(
              sec_type='STK',
              symbol='MSFT',
              currency='USD',
              exchange='SMART',
              endtime=datetime(2018, 2, 14, 15, 59).strftime(date_format),
              duration='1 D',
              bar_size='1 min',
              what_to_show='TRADES',
              use_rth=1
          )
      
          df = downloader.get_dataframe(**stock_kwargs)
          print(df)
      
      1 Reply Last reply Reply Quote 2
      • B
        backtrader administrators last edited by

        Thanks for sharing.

        1 Reply Last reply Reply Quote 0
        • B
          bigdavediode last edited by

          Your code is extremely useful when dealing with IB's down times. And it's a nice speedup as well.

          Just one quick suggestion/bug fix -- I've found that sometimes there's a numpy int64 error generated by line 268 in pandafeed (AttributeError: 'numpy.int64' object has no attribute 'to_pydatetime') due to the fact that some datetimes are stored in int64 in pandas and need to be converted manually. The line I've added to alleviate this is:

                  try:
          
                      # check if we have this cached
                      self._df = pd.read_csv(filename,
                                   parse_dates=True,
                                   index_col=0)
                      self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%dT%H:%M:%S')
          

          More information on this datetime conversion can be found here: https://community.backtrader.com/topic/676/bug-using-pandas-hdf

          Again, great work brettelliot.

          1 Reply Last reply Reply Quote 0
          • B
            bigdavediode last edited by bigdavediode

            Although this routine is great for its simple flat-file data caching, it does have an implementation problem that can shut down the connection before the downloading of data is completed with its sleep statements. Another issue is that, even for small downloads, the sleep statements hold up processing and when one is trying to do analysis of 100-200 securities/hour these sleep statements have to be removed as below.

            Ideally, the callback will just terminate the connection correctly when the "finished" response is received from the api. So I've altered the code below. It's a bit hard to follow due to the callbacks but appears to work effectively. Please note the timeoutsecs argument should be specified as it is the maximum that the routine will wait for the download.

            from ib.opt import ibConnection, message
            from ib.ext.Contract import Contract
            from time import time, strftime
            from datetime import datetime
            import pandas as pd
            
            
            class IBDataCache(object):
            
                def _reset_data(self, host='127.0.0.1', port=4001, client_id=None):
                    self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest'])
                    self._s = pd.Series()
            
                    #Initialize connection as long as it's not already connected:
                    if (not hasattr(self, '_conn')) or (not self._conn.isConnected()):
                        self._conn = ibConnection(host, port, client_id)
                        self._conn.enableLogging()
                        # Register the response callback function and type of data to be returned
                        self._conn.register(self._error_handler, message.Error)
                        self._conn.register(self._historical_data_handler, message.historicalData)
                        self._conn.register(self._save_order_id, 'NextValidId')
                        self._conn.register(self._nextValidId_handler, message.nextValidId)
                        self._conn.connect()
            
            
            
                def _save_order_id(self, msg):
                    self._next_valid_id = msg.orderId
                    print('save order id',msg.orderId)
            
                def _nextValidId_handler(self, msg):
                    print("nextValidId_handler: ", msg)
                    self.inner(sec_type=self.req.m_secType, symbol=self.req.m_symbol, currency=self.req.m_currency, exchange=self.req.m_exchange, \
                               primaryExchange=self.req.m_primaryExch, endtime=self.req.endtime, duration=self.req.duration, \
                               bar_size=self.req.bar_size, what_to_show=self.req.what_to_show, use_rth=self.req.use_rth)
            
                def _error_handler(self, msg):
                    print("error: ", msg)
                    if not msg:
                        print('disconnecting', self._conn.disconnect())
            
            
            
                def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None):
                    self._data_path = data_path
                    self._date_format = date_format
                    self._next_valid_id = 1
            
                    self._host = host
                    self._port = port
                    self._client_id = client_id
            
            
            
            
            
            
                def _historical_data_handler(self, msg):
                    """
                        Define historical data handler for IB - this will populate our pandas data frame
                    """
            
                    # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low)
                    if not 'finished' in str(msg.date):
                        #print ("historical_data_handler: ", msg)
                        try:
                            self._s = ([datetime.strptime(msg.date, self._date_format),
                                        msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                        except:
                            #for dates only with no time to str:
                            self._s = ([datetime.strptime(msg.date, "%Y%m%d"),
                                        msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                        self._df.loc[len(self._df)] = self._s
            
                    else:
                        self._df.set_index('Date', inplace=True)
            
            
            
                def setAllWithKwArgs(self, **kwargs):
                    #set all attributes to the kwargs to pass along
                    for key, value in kwargs.items():
                        setattr(self, key, value)
            
                def inner(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth):
                    print ("calling inner... setting up req.")
                    self.req = Contract()
                    self.req.m_secType = sec_type
                    self.req.m_symbol = symbol
                    self.req.m_currency = currency
                    self.req.m_exchange = exchange
                    self.primaryExch = primaryExchange
                    self.req.endtime = endtime
                    self.req.duration = duration
                    self.req.bar_size = bar_size
                    self.req.what_to_show = what_to_show
                    self.req.use_rth = use_rth
            
            
                    self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration,
                                                 bar_size, what_to_show, use_rth, 1)
            
            
                def get_dataframe(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth, timeoutsecs):
            
            
            
                    # build filename
                    self.filename = symbol + '_' + sec_type + '_' + exchange + '_' + currency + '_' + \
                        endtime.replace(' ', '') + '_' + duration.replace(' ', '') + '_' + bar_size.replace(' ', '') + '_' + \
                        what_to_show + '_' + str(use_rth) + '.csv'
                    self.filename = self.filename.replace('/', '.')
                    self.filename = self._data_path + '/' + self.filename
                    print ("filename:  ", self.filename)
            
            
                    try:
            
                        # check if we have this cached
                        self._df = pd.read_csv(self.filename,
                                     parse_dates=True,
                                     index_col=0)
                        self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%d %H:%M:%S')
                    except IOError:
                        #set up connection:
                        self._reset_data(self._host, self._port, self._client_id)
            
                        # Not cached. Download it.
                        # Establish a Contract object and the params for the request
                        self.inner(sec_type, symbol, currency, exchange,
                              primaryExchange, endtime, duration, bar_size,
                              what_to_show, use_rth)
            
                        # Make sure the connection doesn't get disconnected prior the response data return
                        timeout = time() + timeoutsecs
                        while self._conn.isConnected() and time() < timeout:
                            #print(".", end="", flush=True)
                            pass
                        self._conn.disconnect()
                        self._df.to_csv(self.filename)
            
            
                    return self._df
            
            
            
            
            
            if __name__ == '__main__':
            
                date_format = '%Y%m%d %H:%M:%S'
            
                downloader_kwargs = dict(
                    data_path='../data',
                    date_format=date_format,
                    host='127.0.0.1',
                    port=4001,
                    client_id=992
                )
                downloader = IBDataCache(**downloader_kwargs)
            
                stock_kwargs = dict(
                    sec_type='STK',
                    symbol='AAPL',
                    currency='USD',
                    exchange='SMART',
                    primaryExchange='NASDAQ',
                    endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format),
                    duration='2 D',
                    bar_size='30 mins',
                    what_to_show='TRADES',
                    use_rth=1
                )
            
                df = downloader.get_dataframe(**stock_kwargs)
                print(df)
            
            
                stock_kwargs = dict(
                    sec_type='STK',
                    symbol='MSFT',
                    currency='USD',
                    exchange='SMART',
                    primaryExchange='NASDAQ',
                    endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format),
                    duration='1 D',
                    bar_size='1 min',
                    what_to_show='TRADES',
                    use_rth=1
                )
            
                df = downloader.get_dataframe(**stock_kwargs)
            
                print ("IBCacheData")
            
                print(df)
            
            

            (As a side note, the int64 error mentioned above is due to the sleep statement from the original code terminating before the downloading is completed.)

            1 Reply Last reply Reply Quote 2
            • 1 / 1
            • First post
              Last post
            Copyright © 2016, 2017, 2018, 2019, 2020, 2021 NodeBB Forums | Contributors