How to cached data from IB and use it later
-
I wanted to download historical data from IB, write it to a csv and then use it later. Here's a simple example that does that. Hope this helps people!
from ib.opt import ibConnection, message from ib.ext.Contract import Contract from time import sleep, strftime from datetime import datetime import pandas as pd class IBDataCache(object): def __init__(self, data_path='.', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=7497, client_id=None): self._data_path = data_path self._date_format = date_format self._reset_data() self._next_valid_id = 1 self._conn = ibConnection(host, port, client_id) self._conn.enableLogging() # Register the response callback function and type of data to be returned self._conn.register(self._historical_data_handler, message.historicalData) self._conn.register(self._save_order_id, 'NextValidId') self._conn.connect() def _reset_data(self): self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest']) self._s = pd.Series() def _save_order_id(self, msg): self._next_valid_id = msg.orderId def _historical_data_handler(self, msg): """ Define historical data handler for IB - this will populate our pandas data frame """ # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low) if ('finished' in str(msg.date)) == False: self._s = ([datetime.strptime(msg.date, self._date_format), msg.open, msg.high, msg.low, msg.close, msg.volume, 0]) self._df.loc[len(self._df)] = self._s else: self._df.set_index('Date', inplace=True) def get_dataframe(self, sec_type, symbol, currency, exchange, endtime, duration, bar_size, what_to_show, use_rth): self._reset_data() # build filename filename = self._data_path + '/' + symbol + '_' + sec_type + '_' + exchange + '_' + currency + '_' + \ endtime.replace(' ', '') + '_' + duration.replace(' ', '') + '_' + bar_size.replace(' ', '') + '_' + \ what_to_show + '_' + str(use_rth) + '.csv' try: # check if we have this cached self._df = pd.read_csv(filename, parse_dates=True, index_col=0) except IOError: # Not cached. Download it. # Establish a Contract object and the params for the request req = Contract() req.m_secType = sec_type req.m_symbol = symbol req.m_currency = currency req.m_exchange = exchange self._conn.reqHistoricalData(self._next_valid_id, req, endtime, duration, bar_size, what_to_show, use_rth, 1) # Make sure the connection don't get disconnected prior the response data return sleep(5) self._conn.disconnect() # Cache dataframe to CSV self._df.to_csv(filename) return self._df if __name__ == '__main__': date_format = '%Y%m%d %H:%M:%S' downloader_kwargs = dict( data_path='../data', date_format=date_format, host='127.0.0.1', port=7497, client_id=999 ) downloader = IBDataCache(**downloader_kwargs) stock_kwargs = dict( sec_type='STK', symbol='AAPL', currency='USD', exchange='SMART', endtime=datetime(2018, 2, 14, 15, 59).strftime(date_format), duration='1 D', bar_size='1 min', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print(df) stock_kwargs = dict( sec_type='STK', symbol='MSFT', currency='USD', exchange='SMART', endtime=datetime(2018, 2, 14, 15, 59).strftime(date_format), duration='1 D', bar_size='1 min', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print(df)
-
Thanks for sharing.
-
Your code is extremely useful when dealing with IB's down times. And it's a nice speedup as well.
Just one quick suggestion/bug fix -- I've found that sometimes there's a numpy int64 error generated by line 268 in pandafeed (AttributeError: 'numpy.int64' object has no attribute 'to_pydatetime') due to the fact that some datetimes are stored in int64 in pandas and need to be converted manually. The line I've added to alleviate this is:
try: # check if we have this cached self._df = pd.read_csv(filename, parse_dates=True, index_col=0) self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%dT%H:%M:%S')
More information on this datetime conversion can be found here: https://community.backtrader.com/topic/676/bug-using-pandas-hdf
Again, great work brettelliot.
-
Although this routine is great for its simple flat-file data caching, it does have an implementation problem that can shut down the connection before the downloading of data is completed with its sleep statements. Another issue is that, even for small downloads, the sleep statements hold up processing and when one is trying to do analysis of 100-200 securities/hour these sleep statements have to be removed as below.
Ideally, the callback will just terminate the connection correctly when the "finished" response is received from the api. So I've altered the code below. It's a bit hard to follow due to the callbacks but appears to work effectively. Please note the timeoutsecs argument should be specified as it is the maximum that the routine will wait for the download.
from ib.opt import ibConnection, message from ib.ext.Contract import Contract from time import time, strftime from datetime import datetime import pandas as pd class IBDataCache(object): def _reset_data(self, host='127.0.0.1', port=4001, client_id=None): self._df = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'OpenInterest']) self._s = pd.Series() #Initialize connection as long as it's not already connected: if (not hasattr(self, '_conn')) or (not self._conn.isConnected()): self._conn = ibConnection(host, port, client_id) self._conn.enableLogging() # Register the response callback function and type of data to be returned self._conn.register(self._error_handler, message.Error) self._conn.register(self._historical_data_handler, message.historicalData) self._conn.register(self._save_order_id, 'NextValidId') self._conn.register(self._nextValidId_handler, message.nextValidId) self._conn.connect() def _save_order_id(self, msg): self._next_valid_id = msg.orderId print('save order id',msg.orderId) def _nextValidId_handler(self, msg): print("nextValidId_handler: ", msg) self.inner(sec_type=self.req.m_secType, symbol=self.req.m_symbol, currency=self.req.m_currency, exchange=self.req.m_exchange, \ primaryExchange=self.req.m_primaryExch, endtime=self.req.endtime, duration=self.req.duration, \ bar_size=self.req.bar_size, what_to_show=self.req.what_to_show, use_rth=self.req.use_rth) def _error_handler(self, msg): print("error: ", msg) if not msg: print('disconnecting', self._conn.disconnect()) def __init__(self, data_path='/docker_stocks/data', date_format='%Y%m%d %H:%M:%S', host='127.0.0.1', port=4001, client_id=None): self._data_path = data_path self._date_format = date_format self._next_valid_id = 1 self._host = host self._port = port self._client_id = client_id def _historical_data_handler(self, msg): """ Define historical data handler for IB - this will populate our pandas data frame """ # print (msg.reqId, msg.date, msg.open, msg.close, msg.high, msg.low) if not 'finished' in str(msg.date): #print ("historical_data_handler: ", msg) try: self._s = ([datetime.strptime(msg.date, self._date_format), msg.open, msg.high, msg.low, msg.close, msg.volume, 0]) except: #for dates only with no time to str: self._s = ([datetime.strptime(msg.date, "%Y%m%d"), msg.open, msg.high, msg.low, msg.close, msg.volume, 0]) self._df.loc[len(self._df)] = self._s else: self._df.set_index('Date', inplace=True) def setAllWithKwArgs(self, **kwargs): #set all attributes to the kwargs to pass along for key, value in kwargs.items(): setattr(self, key, value) def inner(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth): print ("calling inner... setting up req.") self.req = Contract() self.req.m_secType = sec_type self.req.m_symbol = symbol self.req.m_currency = currency self.req.m_exchange = exchange self.primaryExch = primaryExchange self.req.endtime = endtime self.req.duration = duration self.req.bar_size = bar_size self.req.what_to_show = what_to_show self.req.use_rth = use_rth self._conn.reqHistoricalData(self._next_valid_id, self.req, endtime, duration, bar_size, what_to_show, use_rth, 1) def get_dataframe(self, sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth, timeoutsecs): # build filename self.filename = symbol + '_' + sec_type + '_' + exchange + '_' + currency + '_' + \ endtime.replace(' ', '') + '_' + duration.replace(' ', '') + '_' + bar_size.replace(' ', '') + '_' + \ what_to_show + '_' + str(use_rth) + '.csv' self.filename = self.filename.replace('/', '.') self.filename = self._data_path + '/' + self.filename print ("filename: ", self.filename) try: # check if we have this cached self._df = pd.read_csv(self.filename, parse_dates=True, index_col=0) self._df.index = pd.to_datetime(self._df.index, format='%Y-%m-%d %H:%M:%S') except IOError: #set up connection: self._reset_data(self._host, self._port, self._client_id) # Not cached. Download it. # Establish a Contract object and the params for the request self.inner(sec_type, symbol, currency, exchange, primaryExchange, endtime, duration, bar_size, what_to_show, use_rth) # Make sure the connection doesn't get disconnected prior the response data return timeout = time() + timeoutsecs while self._conn.isConnected() and time() < timeout: #print(".", end="", flush=True) pass self._conn.disconnect() self._df.to_csv(self.filename) return self._df if __name__ == '__main__': date_format = '%Y%m%d %H:%M:%S' downloader_kwargs = dict( data_path='../data', date_format=date_format, host='127.0.0.1', port=4001, client_id=992 ) downloader = IBDataCache(**downloader_kwargs) stock_kwargs = dict( sec_type='STK', symbol='AAPL', currency='USD', exchange='SMART', primaryExchange='NASDAQ', endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format), duration='2 D', bar_size='30 mins', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print(df) stock_kwargs = dict( sec_type='STK', symbol='MSFT', currency='USD', exchange='SMART', primaryExchange='NASDAQ', endtime=datetime(2018, 10, 26, 15, 59).strftime(date_format), duration='1 D', bar_size='1 min', what_to_show='TRADES', use_rth=1 ) df = downloader.get_dataframe(**stock_kwargs) print ("IBCacheData") print(df)
(As a side note, the int64 error mentioned above is due to the sleep statement from the original code terminating before the downloading is completed.)