For code/output blocks: Use ``` (aka backtick or grave accent) in a single line before and after the block. See: http://commonmark.org/help/

IB data caching into flat files using dataframes



  • I've built a handy class, attached, allowing caching of IB data into single flat files. This allows a speedup when pulling long sequences of historical data from IB, as it doesn't download the full data sequence where possible. It also backfills into the cache which allows later backtesting when IB is offline.

    This rewrite has the advantage that it uses one cache file per symbol and the period (ie. 1 minute data) specified.

    This is a complete rewrite of other's work in a previous thread: ( https://community.backtrader.com/topic/1007/how-to-cached-data-from-ib-and-use-it-later/4 ) That routine created many flat files that if the timeframe were change slightly, the cache would not be used. This also fixes a few bugs, such as reducing timeout delays.

    The disadvantages here are that for specifying duration of the backfill (not period), this routine only assumes the duration is specified in days. You can request 2 days of 1 minute data, but you cannot request six hours of one minute data. It will download what it needs of the given days specified. As well, this routine is unnecessarily disk intensive when it does download, rewriting the entire pandas dataframe back to disk.

    Please note that this routine does not autofill in gaps in data. It will backfill the duration specified with the exception of what is already cached. If the cached data is missing a week mid series, you could not manually run this routine with that week's end date and week duration specified to fill it in. It solely fills at the end of the cache.

    If this helps you, let me know.
    B.

    
    from ib.opt import ibConnection, message
    from ib.ext.Contract import Contract
    from time import time, strftime
    from datetime import datetime, timedelta
    import pandas as pd
    from pathlib import Path
    from random import randrange        #for random connection ID only
    
    
    class IBDataCache_FlatFile(object):
    
        def _reset_data(self, host='127.0.0.1', port=4001, client_id=None):
            self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest'])
            self._df.set_index('Date', inplace=True)
            self._s = pd.Series()
    
            #Initialize connection as long as it's not already connected:
            if (not hasattr(self, '_conn')) or (not self._conn.isConnected()):
                self._conn = ibConnection(host, port, client_id)
                self._conn.enableLogging()
                # Register the response callback function and type of data to be returned
                self._conn.register(self._error_handler, message.Error)
                self._conn.register(self._historical_data_handler, message.historicalData)
                self._conn.register(self._save_order_id, 'NextValidId')
                self._conn.register(self._nextValidId_handler, message.nextValidId)
                self._conn.connect()
    
    
        def _save_order_id(self, msg):
            self._next_valid_id = msg.orderId
            print('save order id', msg.orderId)
    
        def _nextValidId_handler(self, msg):
            print("nextValidId_handler: ", msg)
    
        def _error_handler(self, msg):
            if not msg:
                print('disconnecting', self._conn.disconnect())
            elif 'No security definition has been found' in str(msg):
                print ('ibcachedata: triggering timout')
                self.timeout = time()  # trigger timeout
    
        def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None):
            self._data_path = data_path
            self._date_format = date_format
            self._next_valid_id = 1
    
            self._host = host
            self._port = port
            self._client_id = client_id
    
        def _historical_data_handler(self, msg):
            """
                Define historical data handler for IB - this will populate our pandas data frame
            """
            if not 'finished' in str(msg.date):
                try:
                    dateidx = datetime.strptime(msg.date, self._date_format)
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                except:
                    #for dates only with no time in str:
                    dateidx = datetime.strptime(msg.date, "%Y%m%d")
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                self._df.loc[dateidx] = self._s
    
            else:
                print("ibcachedata: historical_data_handler: Received finish")
                if not self._df.empty:
                    self._df.sort_index(inplace=True)
                    self._df.to_csv(self.filename)
                self._conn.disconnect()
    
        def inner(self,
                  con_Id=0,
                  sec_type='',
                  symbol='',
                  currency='USD',
                  exchange='',
                  primaryExchange='',
                  endtime='',
                  duration='250 D',
                  bar_size='30 mins',
                  what_to_show='TRADES',
                  use_rth='1'):
    
            print ("called inner... setting up req.")
            self.req = Contract()
            self.req.m_conId = con_Id
            self.req.m_secType = sec_type
            self.req.m_symbol = symbol
            self.req.m_currency = currency
            self.req.m_exchange = exchange
            self.primaryExch = primaryExchange
            self.req.endtime = endtime
            self.req.duration = duration
            self.req.bar_size = bar_size
            self.req.what_to_show = what_to_show
            self.req.use_rth = use_rth
    
    
            self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration,
                                         bar_size, what_to_show, use_rth, 1)
    
        def get_dataframe(self,
                          sec_type=None,
                          symbol=None,
                          currency='USD',
                          exchange=None,
                          PrimaryExchange='NASDAQ',
                          endtime=None,
                          duration='250 D',
                          bar_size='30 mins',
                          what_to_show='TRADES',
                          use_rth='1',
                          timeoutsecs=300):
    
            date_format = '%Y%m%d %H:%M:%S'
            self.timeout = 0
    
            # build filename
            self.filename = symbol + '_' + exchange + '_' + \
                bar_size.replace(' ', '') + '_' + \
                what_to_show + '_' + str(use_rth) + '.csv'
            self.filename = self.filename.replace('/', '.')
            self.filename = self._data_path + '/' + self.filename
            print ("filename:  ", self.filename)
    
            # set up connection:
            self._reset_data(self._host, self._port, self._client_id)
    
            # check if we have this cached
            data_file = Path(self.filename)
            if data_file.is_file():
                self._df = pd.read_csv(self.filename,
                             parse_dates=True,
                             index_col=0)
    
            # get filedt (the last datetime in the file)
            if not self._df.empty:
                filedt = pd.to_datetime(self._df.index[-1])
            else:
                filedt = datetime(1970, 1, 1)
    
            if filedt < datetime.strptime(endtime, date_format):  # DAYS ONLY
                #SET DURATION SHORTER ENDTIME-FILEDT = NEW DURATION TO DOWNLOAD
                timediff = datetime.strptime(endtime, date_format) - filedt
                timediff = timedelta(timediff.days)
            else:
                timediff = timedelta(0)     #download nothing as the file is up to date
    
            if self._df.empty or timediff > timedelta(0):
                durationint = [int(s) for s in duration.split() if s.isdigit()]
                durationint = durationint[0]
                #test if the date difference to the end of the file exists, if so then scrape the non-cached data into the dataframe
                #if the difference to the last bar is larger than the specified duration, just do the user specified duration -- the goal is to save time, not cost time
                if timediff.days > 0 and filedt != datetime(1970, 1, 1) and timediff.days < durationint:
                    duration = str(timediff.days) + ' D'
    
                # Not cached. Download it.
                # Establish a Contract object and the params for the request
    
                self.inner(sec_type=sec_type, symbol=symbol, currency=currency, exchange=exchange,
                           primaryExchange=PrimaryExchange, endtime=endtime, duration=duration, bar_size=bar_size,
                           what_to_show=what_to_show, use_rth=use_rth)
    
                # Make sure the connection doesn't get disconnected prior the response data return
                self.timeout = time() + timeoutsecs
                while self._conn.isConnected() and time() < self.timeout:
                    pass
                if time() >= self.timeout:
                    self.timeout = 0
                    print ('Timed Out, disconnecting.')
                    self._conn.disconnect()
                    if not self._df.empty:
                        self._df.sort_index(inplace=True)
                        self._df.to_csv(self.filename)
    
            return self._df
    
    
    
    if __name__ == '__main__':
    
        date_format = '%Y%m%d %H:%M:%S'
    
        downloader_kwargs = dict(
            data_path='../data',
            date_format=date_format,
            host='127.0.0.1',
            port=4001,
            client_id=randrange(99999999)
        )
        downloader = IBDataCache_FlatFile(**downloader_kwargs)
    
        stock_kwargs = dict(
            sec_type='STK',
            symbol='AAPL',
            currency='USD',
            exchange='ISLAND',
            PrimaryExchange='NASDAQ',
            endtime=datetime(2019, 12, 10, 16, 1).strftime(date_format),
            duration='2 D',
            bar_size='1 min',
            what_to_show='TRADES',
            use_rth=1
        )
    
        df = downloader.get_dataframe(**stock_kwargs)
    
        print(df)
    


  • Fixed a bug that unnecessarily initialized the ipby connection before a download. There are a few tab errors due to c&ping:

    from ib.opt import ibConnection, message
    from ib.ext.Contract import Contract
    from time import time, strftime
    from datetime import datetime, timedelta
    import pandas as pd
    from pathlib import Path
    from random import randrange        #for random connection ID only
    
    
    class IBDataCache_FlatFile(object):
    
        def _reset_data(self, host='127.0.0.1', port=4001, client_id=None):
            self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest'])
            self._df.set_index('Date', inplace=True)
            self._s = pd.Series()
    
            #Initialize connection as long as it's not already connected:
            if (not hasattr(self, '_conn')) or (not self._conn.isConnected()):
                self._conn = ibConnection(host, port, client_id)
                self._conn.enableLogging()
                # Register the response callback function and type of data to be returned
                self._conn.register(self._error_handler, message.Error)
                self._conn.register(self._historical_data_handler, message.historicalData)
                self._conn.register(self._save_order_id, 'NextValidId')
                self._conn.register(self._nextValidId_handler, message.nextValidId)
                self._conn.connect()
    
    
        def _save_order_id(self, msg):
            self._next_valid_id = msg.orderId
            print('save order id', msg.orderId)
    
        def _nextValidId_handler(self, msg):
            print("nextValidId_handler: ", msg)
    
        def _error_handler(self, msg):
            if not msg:
                print('disconnecting', self._conn.disconnect())
            elif 'No security definition has been found' in str(msg):
                print ('ibcachedata: triggering timout')
                self.timeout = time()  # trigger timeout
    
        def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None):
            self._data_path = data_path
            self._date_format = date_format
            self._next_valid_id = 1
    
            self._host = host
            self._port = port
            self._client_id = client_id
    
        def _historical_data_handler(self, msg):
            """
                Define historical data handler for IB - this will populate our pandas data frame
            """
            if not 'finished' in str(msg.date):
                try:
                    dateidx = datetime.strptime(msg.date, self._date_format)
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                except:
                    #for dates only with no time in str:
                    dateidx = datetime.strptime(msg.date, "%Y%m%d")
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                self._df.loc[dateidx] = self._s
    
            else:
                print("ibcachedata: historical_data_handler: Received finish")
                if not self._df.empty:
                    self._df.sort_index(inplace=True)
                    self._df.to_csv(self.filename)
                self._conn.disconnect()
    
        def inner(self,
                  con_Id=0,
                  sec_type='',
                  symbol='',
                  currency='USD',
                  exchange='',
                  primaryExchange='',
                  endtime='',
                  duration='250 D',
                  bar_size='30 mins',
                  what_to_show='TRADES',
                  use_rth='1'):
    
            print ("called inner... setting up req.")
            self.req = Contract()
            self.req.m_conId = con_Id
            self.req.m_secType = sec_type
            self.req.m_symbol = symbol
            self.req.m_currency = currency
            self.req.m_exchange = exchange
            self.primaryExch = primaryExchange
            self.req.endtime = endtime
            self.req.duration = duration
            self.req.bar_size = bar_size
            self.req.what_to_show = what_to_show
            self.req.use_rth = use_rth
    
    
            self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration,
                                         bar_size, what_to_show, use_rth, 1)
    
        def get_dataframe(self,
                          sec_type=None,
                          symbol=None,
                          currency='USD',
                          exchange=None,
                          PrimaryExchange='NASDAQ',
                          endtime=None,
                          duration='250 D',
                          bar_size='30 mins',
                          what_to_show='TRADES',
                          use_rth='1',
                          timeoutsecs=300):
    
            date_format = '%Y%m%d %H:%M:%S'
            self.timeout = 0
    
            # build filename
            self.filename = symbol + '_' + exchange + '_' + \
                bar_size.replace(' ', '') + '_' + \
                what_to_show + '_' + str(use_rth) + '.csv'
            self.filename = self.filename.replace('/', '.')
            self.filename = self._data_path + '/' + self.filename
            print ("filename:  ", self.filename)
    
            # check if we have this cached
            data_file = Path(self.filename)
            if data_file.is_file():
            self._df = pd.read_csv(self.filename,
                    parse_dates=True,
                    index_col=0)
            else:
                    self._df = pd.DataFrame()       #just create empty dataframe
                    #print ("ibcachedata:  self._df.Date:  ", self._df.Date)
            
                    #self._df.index = pd.to_datetime(self._df[Date], format='%Y-%m-%d %H:%M:%S')
                    #self._df.dropna(inplace=True)
            
                    # get filedt (the last datetime in the file)
          if not self._df.empty:
                 filedt = pd.to_datetime(self._df.index[-1])
          else:
                 filedt = datetime(1970, 1, 1)
            
            
          if filedt < datetime.strptime(endtime, date_format):  # DAYS ONLY
                 #SET DURATION SHORTER ENDTIME-FILEDT = NEW DURATION TO DOWNLOAD
                 timediff = datetime.strptime(endtime, date_format) - filedt
                 timediff = timedelta(timediff.days)
          else:
                 timediff = timedelta(0)     #download nothing as the file is up to date
            
          if self._df.empty or timediff > timedelta(0):
                  durationint = [int(s) for s in duration.split() if s.isdigit()]
                  durationint = durationint[0]
                 #test if the date difference to the end of the file exists, if so then scrape the non-cached data into the dataframe
                 #if the difference to the last bar is larger than the specified duration, just do the user specified duration -- the goal is to save time, not cost time
                 if timediff.days > 0 and filedt != datetime(1970, 1, 1) and timediff.days < durationint:
                       duration = str(timediff.days) + ' D'
            
                 # Not cached. Download it.
                 # Establish a Contract object and the params for the request
                 # set up connection:
                 self._reset_data(self._host, self._port, self._client_id)
            
                 self.inner(sec_type=sec_type, symbol=symbol, currency=currency, exchange=exchange,
                                   primaryExchange=PrimaryExchange, endtime=endtime, duration=duration, bar_size=bar_size,
                                   what_to_show=what_to_show, use_rth=use_rth)
    
                # Make sure the connection doesn't get disconnected prior the response data return
                self.timeout = time() + timeoutsecs
                while self._conn.isConnected() and time() < self.timeout:
                    pass
                if time() >= self.timeout:
                    self.timeout = 0
                    print ('Timed Out, disconnecting.')
                    self._conn.disconnect()
                    if not self._df.empty:
                        self._df.sort_index(inplace=True)
                        self._df.to_csv(self.filename)
    
            return self._df
    
    
    
    if __name__ == '__main__':
    
        date_format = '%Y%m%d %H:%M:%S'
    
        downloader_kwargs = dict(
            data_path='../data',
            date_format=date_format,
            host='127.0.0.1',
            port=4001,
            client_id=randrange(99999999)
        )
        downloader = IBDataCache_FlatFile(**downloader_kwargs)
    
        stock_kwargs = dict(
            sec_type='STK',
            symbol='AAPL',
            currency='USD',
            exchange='ISLAND',
            PrimaryExchange='NASDAQ',
            endtime=datetime(2019, 12, 10, 16, 1).strftime(date_format),
            duration='2 D',
            bar_size='1 min',
            what_to_show='TRADES',
            use_rth=1
        )
    
        df = downloader.get_dataframe(**stock_kwargs)
    
        print(df)
    


  • No idea why I cannot edit a post or delete old posts, but found another bug which would wipe out the historical data cache under certain downloads:

    from ib.opt import ibConnection, message
    from ib.ext.Contract import Contract
    from time import time, strftime, sleep
    from datetime import datetime, timedelta
    import pandas as pd
    from pathlib import Path
    from random import randrange        #for random connection ID only
    class IBDataCache_FlatFile(object):
    
        def _reset_data(self, host='127.0.0.1', port=4001, client_id=None):
    
            #Initialize connection as long as it's not already connected:
            if (not hasattr(self, '_conn')) or (not self._conn.isConnected()):
                self._conn = ibConnection(host, port, client_id)
                self._conn.enableLogging()
                # Register the response callback function and type of data to be returned
                self._conn.register(self._error_handler, message.Error)
                self._conn.register(self._historical_data_handler, message.historicalData)
                self._conn.register(self._save_order_id, 'NextValidId')
                self._conn.register(self._nextValidId_handler, message.nextValidId)
                self._conn.connect()
    
    
    
        def _save_order_id(self, msg):
            self._next_valid_id = msg.orderId
            print('save order id', msg.orderId)
    
        def _nextValidId_handler(self, msg):
            print("nextValidId_handler: ", msg)
            '''
            self.inner(con_Id=self.req.m_conId, sec_type=self.req.m_secType, symbol=self.req.m_symbol, currency=self.req.m_currency, exchange=self.req.m_exchange, \
                       primaryExchange=self.req.m_primaryExch, endtime=self.req.endtime, duration=self.req.duration, \
                       bar_size=self.req.bar_size, what_to_show=self.req.what_to_show, use_rth=self.req.use_rth)
            '''
    
        def _error_handler(self, msg):
            #print("_error_handler error: ", msg)
            if not msg:
                print('disconnecting', self._conn.disconnect())
            elif 'No security definition has been found' in str(msg):
                print ('ibcachedata: triggering timout')
                self.timeout = time()  # trigger timeout
    
        def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None):
            self._data_path = data_path
            self._date_format = date_format
            self._next_valid_id = 1
    
            self._host = host
            self._port = port
            self._client_id = client_id
    
        def _historical_data_handler(self, msg):
            """
                Define historical data handler for IB - this will populate our pandas data frame
            """
    
            # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low)
            #print("historical_data_handler: ", msg)
            if not 'finished' in str(msg.date):
                try:
                    dateidx = datetime.strptime(msg.date, self._date_format)
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                except:
                    #for dates only with no time in str:
                    dateidx = datetime.strptime(msg.date, "%Y%m%d")
                    self._s = ([msg.open, msg.high, msg.low, msg.close, msg.volume, 0])
                self._df.loc[dateidx] = self._s
    
            else:
                print("ibcachedata: historical_data_handler: Received finish")
                if not self._df.empty:
                    self._df.sort_index(inplace=True)
                    self._df.to_csv(self.filename)
                self._conn.disconnect()
    
        def setAllWithKwArgs(self, **kwargs):
            #set all attributes to the kwargs to pass along
            for key, value in kwargs.items():
                setattr(self, key, value)
    
        def inner(self,
                  con_Id=0,
                  sec_type='',
                  symbol='',
                  currency='USD',
                  exchange='',
                  primaryExchange='',
                  endtime='',
                  duration='250 D',
                  bar_size='30 mins',
                  what_to_show='TRADES',
                  use_rth='1'):
    
    
    
            print ("called inner... setting up req.")
            self.req = Contract()
            self.req.m_conId = con_Id
            self.req.m_secType = sec_type
            self.req.m_symbol = symbol
            self.req.m_currency = currency
            self.req.m_exchange = exchange
            self.primaryExch = primaryExchange
            self.req.endtime = endtime
            self.req.duration = duration
            self.req.bar_size = bar_size
            self.req.what_to_show = what_to_show
            self.req.use_rth = use_rth
    
            self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration,
                                         bar_size, what_to_show, use_rth, 1)
    
        def get_dataframe(self,
                          sec_type=None,
                          symbol=None,
                          currency='USD',
                          exchange=None,
                          PrimaryExchange='NASDAQ',
                          endtime=None,
                          duration='250 D',
                          bar_size='30 mins',
                          what_to_show='TRADES',
                          use_rth='1',
                          timeoutsecs=300):
    
            date_format = '%Y%m%d %H:%M:%S'
            self.timeout = 0
    
            # build filename
            self.filename = symbol + '_' + exchange + '_' + \
                bar_size.replace(' ', '') + '_' + \
                what_to_show + '_' + str(use_rth) + '.csv'
            self.filename = self.filename.replace('/', '.')
            self.filename = self._data_path + '/' + self.filename
            print ("filename:  ", self.filename)
    
            # check if we have this cached
            data_file = Path(self.filename)
            if data_file.is_file():
                self._df = pd.read_csv(self.filename,
                             parse_dates=True,
                             index_col=0)
            else:
                self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest'])
                self._df.set_index('Date', inplace=True)
                self._s = pd.Series()
    
            # get filedt (the last datetime in the file)
            if not self._df.empty:
                filedt = pd.to_datetime(self._df.index[-1])
            else:
                filedt = datetime(1970, 1, 1)
    
    
            if filedt < datetime.strptime(endtime, date_format):  # DAYS ONLY
                #Set duration of ENDTIME-FILEDT = new duration to downloadD
                timediff = datetime.strptime(endtime, date_format) - filedt
                timediff = timedelta(timediff.days)
            else:
                timediff = timedelta(0)     #download nothing as the file is up to date
    
            if self._df.empty or timediff > timedelta(0):
                durationint = [int(s) for s in duration.split() if s.isdigit()]
                durationint = durationint[0]
                #test if the date difference to the end of the file exists, if so then scrape the non-cached data into the dataframe
                #if the difference to the last bar is larger than the specified duration, just do the user specified duration -- the goal is to save time, not cost time
                if timediff.days > 0 and filedt != datetime(1970, 1, 1) and timediff.days < durationint:
                    duration = str(timediff.days) + ' D'
    
                # Not cached. Download it.
                # Establish a Contract object and the params for the request
                # set up connection:
                self._reset_data(self._host, self._port, self._client_id)
    
                self.inner(sec_type=sec_type, symbol=symbol, currency=currency, exchange=exchange,
                           primaryExchange=PrimaryExchange, endtime=endtime, duration=duration, bar_size=bar_size,
                           what_to_show=what_to_show, use_rth=use_rth)
    
                # Make sure the connection doesn't get disconnected prior the response data return
                self.timeout = time() + timeoutsecs
                while self._conn.isConnected() and time() < self.timeout:
                    #print(".", end="", flush=True)
                    pass
                if time() >= self.timeout:
                    self.timeout = 0
                    print ('Timed Out, disconnecting.')
                    self._conn.disconnect()
                    if not self._df.empty:
                        self._df.sort_index(inplace=True)
                        self._df.to_csv(self.filename)
    
            return self._df
    
    
    
    if __name__ == '__main__':
    
        date_format = '%Y%m%d %H:%M:%S'
    
        downloader_kwargs = dict(
            data_path='../data',
            date_format=date_format,
            host='127.0.0.1',
            port=4001,
            client_id=randrange(99999999)
        )
        downloader = IBDataCache_FlatFile(**downloader_kwargs)
        '''
        stock_kwargs = dict(
            sec_type='STK',
            symbol='AAPL',
            currency='USD',
            exchange='ISLAND',
            PrimaryExchange='NASDAQ',
            endtime=datetime(2019, 8, 26, 15, 59).strftime(date_format),
            duration='2 D',
            bar_size='30 mins',
            what_to_show='TRADES',
            use_rth=1
        )
    
        df = downloader.get_dataframe(**stock_kwargs)
        print(df)
        '''
    
        stock_kwargs = dict(
            sec_type='STK',
            symbol='AAPL',
            currency='USD',
            exchange='ISLAND',
            PrimaryExchange='NASDAQ',
            endtime=datetime(2019, 12, 10, 16, 1).strftime(date_format),
            duration='2 D',
            bar_size='1 min',
            what_to_show='TRADES',
            use_rth=1
        )
    
        df = downloader.get_dataframe(**stock_kwargs)
    

Log in to reply
 

});