@tsguo3 if you want to be sure, you can always read the source code.
Best posts made by dasch
-
RE: Security Concerns
-
BollingerBands Squeeze
If anyone needs this, here is a BBSqueeze indicator.
implementation is based on this: https://www.netpicks.com/squeeze-out-the-chop/
BBSqueeze
from __future__ import (absolute_import, division, print_function, unicode_literals) import backtrader as bt from . import KeltnerChannel class BBSqueeze(bt.Indicator): ''' https://www.netpicks.com/squeeze-out-the-chop/ Both indicators are symmetrical, meaning that the upper and lower bands or channel lines are the same distance from the moving average. That means that we can focus on only one side in developing our indicator. In our case, we’ll just consider the upper lines. The basic formulas we need are: Bollinger Band = Moving Average + (Number of standard deviations X Standard Deviation) Keltner Channel = Moving Average + (Number of ATR’s X ATR) Or if we translate this into pseudo-code: BBUpper = Avg(close, period) + (BBDevs X StdDev(close, period)) KCUpper = Avg(close, period) + (KCDevs X ATR(period)) The squeeze is calculated by taking the difference between these two values: Squeeze = BBUpper – KCUpper Which simplifies down to this: Squeeze = (BBDevs X StdDev(close, period)) – (KCDevs X ATR(period)) ''' lines = ('squeeze',) params = (('period', 20), ('devfactor', 2.0), ('movav', bt.ind.MovAv.Simple),) plotinfo = dict(subplot=True) def _plotlabel(self): plabels = [self.p.period, self.p.devfactor] plabels += [self.p.movav] * self.p.notdefault('movav') return plabels def __init__(self): bb = bt.ind.BollingerBands( period=self.p.period, devfactor=self.p.devfactor, movav=self.p.movav) kc = KeltnerChannel( period=self.p.period, devfactor=self.p.devfactor, movav=self.p.movav) self.lines.squeeze = bb.top - kc.top
KeltnerChannel
from __future__ import (absolute_import, division, print_function, unicode_literals) import backtrader as bt class KeltnerChannel(bt.Indicator): lines = ('mid', 'top', 'bot',) params = (('period', 20), ('devfactor', 1.5), ('movav', bt.ind.MovAv.Simple),) plotinfo = dict(subplot=False) plotlines = dict( mid=dict(ls='--'), top=dict(_samecolor=True), bot=dict(_samecolor=True), ) def _plotlabel(self): plabels = [self.p.period, self.p.devfactor] plabels += [self.p.movav] * self.p.notdefault('movav') return plabels def __init__(self): self.lines.mid = ma = self.p.movav(self.data, period=self.p.period) atr = self.p.devfactor * bt.ind.ATR(self.data, period=self.p.period) self.lines.top = ma + atr self.lines.bot = ma - atr
-
RE: Resolve this error!
@curious_one your datetime is a string, try converting it to datetime objects. Look at pandas to_datetime() method.
df.Datetime = pandas.to_datetime( df.Datetime, format='%Y-%m-%d %H:%M:%S')
-
RE: BollingerBands Squeeze
BBSqueeze with configureable multiplicators for bb and keltner
from __future__ import (absolute_import, division, print_function, unicode_literals) import backtrader as bt from . import KeltnerChannel class BBSqueeze(bt.Indicator): ''' https://www.netpicks.com/squeeze-out-the-chop/ Both indicators are symmetrical, meaning that the upper and lower bands or channel lines are the same distance from the moving average. That means that we can focus on only one side in developing our indicator. In our case, we’ll just consider the upper lines. The basic formulas we need are: Bollinger Band = Moving Average + (Number of standard deviations X Standard Deviation) Keltner Channel = Moving Average + (Number of ATR’s X ATR) Or if we translate this into pseudo-code: BBUpper = Avg(close, period) + (BBDevs X StdDev(close, period)) KCUpper = Avg(close, period) + (KCDevs X ATR(period)) The squeeze is calculated by taking the difference between these two values: Squeeze = BBUpper – KCUpper Which simplifies down to this: Squeeze = (BBDevs X StdDev(close, period)) – (KCDevs X ATR(period)) ''' lines = ('squeeze',) params = (('period', 20), ('bbdevs', 2.0), ('kcdevs', 1.5), ('movav', bt.ind.MovAv.Simple),) plotinfo = dict(subplot=True) def _plotlabel(self): plabels = [self.p.period, self.p.bbdevs, self.p.kcdevs] plabels += [self.p.movav] * self.p.notdefault('movav') return plabels def __init__(self): bb = bt.ind.BollingerBands( period=self.p.period, devfactor=self.p.bbdevs, movav=self.p.movav) kc = KeltnerChannel( period=self.p.period, devfactor=self.p.kcdevs, movav=self.p.movav) self.lines.squeeze = bb.top - kc.top
-
Chandelier Exit
Another indicator, Chandelier Exit
import backtrader as bt class ChandelierExit(bt.Indicator): ''' https://corporatefinanceinstitute.com/resources/knowledge/trading-investing/chandelier-exit/ ''' lines = ('long', 'short') params = (('period', 22), ('multip', 3),) plotinfo = dict(subplot=False) def __init__(self): highest = bt.ind.Highest(self.data.high, period=self.p.period) lowest = bt.ind.Lowest(self.data.low, period=self.p.period) atr = self.p.multip * bt.ind.ATR(self.data, period=self.p.period) self.lines.long = highest - atr self.lines.short = lowest + atr
-
RE: Group orders with transmit but with 2 orders instead of 3?
I fixed the issue with oanda broker. It will now assume, there is no stopside
-
RE: Incorrect `_timeframe` after loading Pandas dataframe to a data feed
When you create the datafeed, you need to specify the timeframe and compression your data is using. Backtrader will not identify it by itself.
So when you do:
data_feed = CryptoPandasData(dataname=time_bars)
you would do it that way:
data_feed = CryptoPandasData(dataname=time_bars, timeframe=bt.TimeFrame.Minutes, compression=1)
-
converting pinescript indicators
Hi
I have worked on converting a Pinescript indicator to backtrader.
I will describe some findings in the process, give some tips, show a example and so on.
variables
in pinescript, variables seem to work like lines in backtrader. So for every variable used in the script I have added a line:pinescript:
setupCountUp = na
backtrader ind.__init__:
self.setup_sell = bt.LineNum(float('nan'))
conditions
to use methods likebarssince
,valuewhen
from pinescript, which allows conditions like:setupCountUp == self.p.setup_bars
the easiest method I found is to use numpy for this.- convert a line to a numpy array (using get(size=SIZE) to not work on complete line but only a defined range of data)
def line2arr(line, size=-1): if size <= 0: return np.array(line.array) else: return np.array(line.get(size=size))
- provide methods which works as methods from pinescript: so I wrote the methods I needed.
def na(val): return val != val def nz(x, y=None): if isinstance(x, np.generic): return x.fillna(y or 0) if x != x: if y is not None: return y return 0 return x def barssince(condition, occurrence=0): cond_len = len(condition) occ = 0 since = 0 res = float('nan') while cond_len - (since+1) >= 0: cond = condition[cond_len-(since+1)] if cond and not cond != cond: if occ == occurrence: res = since break occ += 1 since += 1 return res def valuewhen(condition, source, occurrence=0): res = float('nan') since = barssince(condition, occurrence) if since is not None: res = source[-(since+1)] return res
rewriting
the process of rewriting the pinescript in python using backtrader was simple after this. basically it can be split up to:
- look for vars, params, signals used in pinescript
- define vars, params, signal in indicator
- strip all plot related code from pinescript
- rewrite pinescript code in indicators next method
plotting
since i only want to have signals as lines in indicator, I did not set all vars used in pinescript in the lines dict, but in init. So I had a better control of what the indicator is plotting. But the indicator had more values then just the signals, so how to handle them?
The solution was a observer, which takes care of plotting all other information, the indicator has. Here you can setup all the plot related code from pinescript.some remaining issues
maybe some of you guys know answers for this:
Questions:
- is it possible to create lines with conditions (outside of __init__) like: self.line1 > val ?
- is there a way to get something like barssince, valuewhen natively in backtrader?
Issues:
- the code is not very good, since a lot of the code can be done better in python, but if you don't want to fiddle to much with it, this should be fine
- refactoring would make the code better readable
I hope, this will help some of you!
I will post the code of a indicator, observer and the pinescript methods in a next post below. -
RE: Problem with resampling 1m data to 5m/15m
i fixed issues with my datafeeds by adjusting the time to the end of the period they are on. Backtrader expects somehow the endtime of a period, while most data will have the starttime of a period as the datetime.
A custom csv feed with time adjustment which can be used to fix that:
import backtrader as bt from backtrader.utils import date2num from datetime import datetime, time, timedelta def getstarttime(timeframe, compression, dt, sessionstart=None, offset=0): ''' This method will return the start of the period based on current time (or provided time). ''' if sessionstart is None: # use UTC 22:00 (5:00 pm New York) as default sessionstart = time(hour=22, minute=0, second=0) if dt is None: dt = datetime.utcnow() if timeframe == bt.TimeFrame.Seconds: dt = dt.replace( second=(dt.second // compression) * compression, microsecond=0) if offset: dt = dt - timedelta(seconds=compression*offset) elif timeframe == bt.TimeFrame.Minutes: if compression >= 60: hours = 0 minutes = 0 # get start of day dtstart = getstarttime(bt.TimeFrame.Days, 1, dt, sessionstart) # diff start of day with current time to get seconds # since start of day dtdiff = dt - dtstart hours = dtdiff.seconds//((60*60)*(compression//60)) minutes = compression % 60 dt = dtstart + timedelta(hours=hours, minutes=minutes) else: dt = dt.replace( minute=(dt.minute // compression) * compression, second=0, microsecond=0) if offset: dt = dt - timedelta(minutes=compression*offset) elif timeframe == bt.TimeFrame.Days: if dt.hour < sessionstart.hour: dt = dt - timedelta(days=1) if offset: dt = dt - timedelta(days=offset) dt = dt.replace( hour=sessionstart.hour, minute=sessionstart.minute, second=sessionstart.second, microsecond=sessionstart.microsecond) elif timeframe == bt.TimeFrame.Weeks: if dt.weekday() != 6: # sunday is start of week at 5pm new york dt = dt - timedelta(days=dt.weekday() + 1) if offset: dt = dt - timedelta(days=offset * 7) dt = dt.replace( hour=sessionstart.hour, minute=sessionstart.minute, second=sessionstart.second, microsecond=sessionstart.microsecond) elif timeframe == bt.TimeFrame.Months: if offset: dt = dt - timedelta(days=(min(28 + dt.day, 31))) # last day of month last_day_of_month = dt.replace(day=28) + timedelta(days=4) last_day_of_month = last_day_of_month - timedelta( days=last_day_of_month.day) last_day_of_month = last_day_of_month.day # start of month if dt.day < last_day_of_month: dt = dt - timedelta(days=dt.day) dt = dt.replace( hour=sessionstart.hour, minute=sessionstart.minute, second=sessionstart.second, microsecond=sessionstart.microsecond) return dt class CSVAdjustTime(bt.feeds.GenericCSVData): params = dict( adjstarttime=False, ) def _loadline(self, linetokens): res = super(CSVAdjustTime, self)._loadline(linetokens) if self.p.adjstarttime: # move time to start time of next candle # and subtract 0.1 miliseconds (ensures no # rounding issues, 10 microseconds is minimum) new_date = getstarttime( self._timeframe, self._compression, self.datetime.datetime(0), self.sessionstart, -1) - timedelta(microseconds=100) self.datetime[0] = date2num(new_date) return res
-
RE: How does live mode reference old candle data?
@Wayne-Filkins I checked the code for alpaca data. the data feeds support backfill. Check out the params:
- ``backfill_start`` (default: ``True``) Perform backfilling at the start. The maximum possible historical data will be fetched in a single request. - ``backfill`` (default: ``True``) Perform backfilling after a disconnection/reconnection cycle. The gap duration will be used to download the smallest possible amount of data - ``backfill_from`` (default: ``None``) An additional data source can be passed to do an initial layer of backfilling. Once the data source is depleted and if requested, backfilling from IB will take place. This is ideally meant to backfill from already stored sources like a file on disk, but not limited to.
I did not look up what the size of max possible historical data per request is.
But it will backfill by default on live trading. You should check out their api description to see, what size it will be (how many historical candles it will prefill on start).
-
RE: Problem with resampling 1m data to 5m/15m
by using the adjustment above, no need to alter any settings like
rightedge=True, bar2edge=True, adjbartime=True, boundoff=0,
@Nam-Nguyen for the tick question, you will not experience any issues with backtrader when using tick data, since they are just events at a given time. they will resample correctly
-
RE: Resampling issues + adding multiple timeframes
you should check the length of data, if it really did advance.
next will be called on every data source. so if the 5 minute data advances, next will be called (this is why between 4 and 6, 4 will be twice.
-
RE: Problem with resampling 1m data to 5m/15m
@Nam-Nguyen you can use my code with the monkey patching of _load to have a common solution for this.
@run-out if the metatrader datafeeds have a datetime which is the start of the period then my solution will work with metatrader datafeeds
-
RE: Resampling issues + adding multiple timeframes
when using multiple data sources, you will get notified of every change. when resampling, if data advances, on replay if data changes (in that case, data will not necessary advance). So to know if the data source advanced, you need to check the length of data.
example:
if self._last_len > len(self.datas[1]): self.log(self.datas[1].close[0]) self._last_len = len(self.datas[1]
-
RE: Problem with resampling 1m data to 5m/15m
the datetime of the resampled data is the end of the period, there you may use the settings
rightedge=True, bar2edge=True, adjbartime=True, boundoff=0,
to adjust the time to be the start of a period
-
RE: Resampling issues + adding multiple timeframes
here is a cleaned up version of the indicator.
Trend_Indicator(bt.Indicator): lines = ('trend',) params = ( ('slow_ema', 21), ('fast_ema', 8), ) def __init__(self): # Add Bollinger bands self.bb = bt.indicators.BollingerBands() self.slow_ema = bt.indicators.EMA(period=self.params.slow_ema) self.fast_ema = bt.indicators.EMA(period=self.params.fast_ema) self.ohlc = {"o": None, "h": None, "l": None, "c": None} self.trend_bollinger_touch = 0 self.trend_status = 0 self.trend_enter = 0 self.trend_exit_params = { "20BarRule": False, "EMARule": False, "OppositeBollingerTouch": False} self.trend_ema_rule = { "Close1": False, "Close2": False} self.trend_position = 0 self.trend_test = 0 def nextstart(self): self._update_ohlc() def next(self): if (self.trend_bollinger_touch == 1 and self.data.low[0] > self.bb.lines.bot[0] and self.trend_enter == 0): # if the upper bollinger band was touched and the candle # we have now makes the highest high and highest close, # we enter in an uptrend if (self.data.high[0] > self.ohlc["h"] and self.data.close[0] > self.ohlc["c"]): self._update_ohlc(["h"]) self.trend_enter = 1 self.trend_status = 1 self.trend_exit_params["OppositeBollingerTouch"] = False self.trend_exit_params["20BarRule"] = False self.trend_exit_params["EMARule"] = False self.trend_position = len(self.ohlc) self.trend_test = len(self.data.high) if self.data.high[0] > self.ohlc["h"]: self._update_ohlc(["h"]) if self.data.close[0] > self.ohlc["c"]: self._update_ohlc(["c"]) # we enter in a downtrend elif (self.trend_bollinger_touch == -1 and self.data.high[0] < self.bb.lines.top[0] and self.trend_enter == 0): # if the bottom bollinger band was touched and the candle we # have now makes the lowest low and lowest close, we enter # in a downtrend if (self.data.low[0] < self.ohlc["l"] and self.data.close[0] < self.ohlc["c"]): self._update_ohlc(["l"]) self.trend_enter = -1 self.trend_status = -1 self.trend_exit_params["OppositeBollingerTouch"] = False self.trend_exit_params["20BarRule"] = False self.trend_exit_params["EMARule"] = False self.trend_position = len(self.ohlc) self.trend_test = len(self.data.low) if self.data.low[0] < self.ohlc["l"]: self._update_ohlc(["l"]) if self.data.close[0] < self.ohlc["c"]: self._update_ohlc(["c"]) elif self.trend_bollinger_touch == 0: # look for a top bollinger touch, then store that bar's data if (self.data.high[0] >= self.bb.lines.top[0] and self.data.low[0] > self.bb.lines.bot[0]): self._update_ohlc() self.trend_bollinger_touch = 1 elif (self.data.low[0] <= self.bb.lines.bot[0] and self.data.high[0] < self.bb.lines.top[0]): self._update_ohlc() self.trend_bollinger_touch = -1 # if the upper bollinger band was touched but now the lower bollinger # band got touched, the upper bollinger touch resets if (self.trend_bollinger_touch == 1 and self.data.low[0] <= self.bb.lines.bot[0]): self._update_ohlc() self.trend_bollinger_touch = -1 # if the bottom bollinger band was touched but now the top bollinger # band got touched, the bottom bollinger touch resets elif (self.trend_bollinger_touch == -1 and self.data.high[0] >= self.bb.lines.top[0]): self._update_ohlc() self.trend_bollinger_touch = 1 # we are in an uptrend if self.trend_status == 1: # if the opposite bollinger band was touched, we exit the uptrend if self.data.low[0] <= self.bb.lines.bot[0]: self.trend_status = 0 self.trend_enter = 0 self.trend_exit_params["OppositeBollingerTouch"] = True self.trend_bollinger_touch = -1 self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self._update_ohlc() # if a fresh high is made, keep track of the position if self.data.high[0] > self.ohlc["h"]: self._update_ohlc(["h"]) self.trend_position = len(self.data.high) # after 20 bars (excluding the first) no new high is made, we are # not in a trend anymore elif self.data.high[0] < self.ohlc["h"]: if len(self.data.high) - self.trend_position >= 21: self.trend_status = 0 self.trend_enter = 0 self.trend_bollinger_touch = -1 self.trend_exit_params["20BarRule"] = True self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self.trend_exit_params['20BarRule'] = True # if the top bollinger band was touched, reset the EMA rule if self.data.high[0] >= self.bb.lines.top[0]: self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False if (self.data.close[0] < self.slow_ema[0] and self.data.close[0] < self.fast_ema[0]): self.trend_ema_rule['Close1'] = True if (self.trend_ema_rule['Close1'] is True and self.data.close[0] > self.slow_ema[0] and self.data.close[0] > self.fast_ema[0]): self.trend_ema_rule['Close2'] = True if (self.trend_ema_rule['Close1'] is True and self.trend_ema_rule['Close2'] is True and self.data.close[0] < self.slow_ema[0] and self.data.close[0] < self.fast_ema[0]): self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self.trend_exit_params['EMARule'] = True self.trend_enter = 0 self.trend_status = 0 self.trend_bollinger_touch = 0 # we are in a downtrend if self.trend_status == -1: # if the opposite bollinger band was touched, we exit the downtrend if self.data.high[0] >= self.bb.lines.top[0]: self.trend_status = 0 self.trend_enter = 0 self.trend_bollinger_touch = 1 self.trend_exit_params["OppositeBollingerTouch"] = True self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self._update_ohlc() # if a fresh low is made, keep track of the position if self.data.low[0] < self.ohlc["h"]: self._update_ohlc(["l"]) self.trend_position = len(self.data.low) # after 20 bars (excluding the first) no new low is made, we are # not in a trend anymore elif self.data.low[0] > self.ohlc["l"]: if len(self.data.low) - self.trend_position >= 21: self.trend_status = 0 self.trend_enter = 0 self.trend_bollinger_touch = 0 self.trend_exit_params["20BarRule"] = True self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self.trend_exit_params['20BarRule'] = True # if the bottom bollinger band was touched, reset the EMA rule if self.data.low[0] <= self.bb.lines.bot[0]: self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False if (self.data.close[0] > self.slow_ema[0] and self.data.close[0] > self.fast_ema[0]): self.trend_ema_rule['Close1'] = True if (self.trend_ema_rule['Close1'] is True and self.data.close[0] < self.slow_ema[0] and self.data.close[0] < self.fast_ema[0]): self.trend_ema_rule['Close2'] = True if (self.trend_ema_rule['Close1'] is True and self.trend_ema_rule['Close2'] is True and self.data.close[0] > self.slow_ema[0] and self.data.close[0] > self.fast_ema[0]): self.trend_status = 0 self.trend_enter = 0 self.trend_bollinger_touch = 0 self.trend_ema_rule['Close1'] = False self.trend_ema_rule['Close2'] = False self.trend_exit_params['EMARule'] = True # update line if (self.trend_enter == -1 and True not in self.trend_exit_params.values()): self.lines.trend[0] = -1 elif (self.trend_enter == 1 and True not in self.trend_exit_params.values()): self.lines.trend[0] = 1 def _update_ohlc(self, values=["o", "h", "l", "c"]): if "o" in values: self.ohlc["o"] = self.data.open[0] if "h" in values: self.ohlc["h"] = self.data.high[0] if "l" in values: self.ohlc["l"] = self.data.low[0] if "c" in values: self.ohlc["c"] = self.data.close[0]
-
RE: Resampling issues + adding multiple timeframes
@ab_trader here: https://www.backtrader.com/docu/inddev/
Let’s recall that self.lines.dummyline is the long notation and that it can be shortened to:
self.l.dummyline
and even to:
self.dummyline
The latter being only possible if the code has not obscured this with a member attribute.
-
Multiple data sources using replay data
I noticed, that when using replaydata from a live data source, which is not added, will use the first data clock for other data sources.
So for example, if you have tick data, which is not added to cerebro, then you replaydata to some timeframe using the data and then adding another data source the same way, the second data source will only be updated, if the first replayed data source forwards.
some code to visualize this:
# create some data feed with live data data = DataFeed() # add replayed data cerebro.replaydata(data, timeframe=bt.TimeFrame.Minutes, compression=1, name="primary") cerebro.replaydata(data, timeframe=bt.TimeFrame.Minutes, compression=30, name="secondary")
This will go this way:
- data source primary will work as expected and will be updated every tick that comes in
- data source secondary will be updated when primary moves one candle forward (in the example above every 1 minute)
When adding data this way:
# create some data feed with live data data = DataFeed() # add live data cerebro.adddata(data) # add replayed data cerebro.replaydata(data, timeframe=bt.TimeFrame.Minutes, compression=1, name="primary") cerebro.replaydata(data, timeframe=bt.TimeFrame.Minutes, compression=30, name="secondary")
This will go this way:
- data source primary will work as expected and will be updated every tick that comes in
- data source secondary will work as expected and will be updated every tick that comes in
Just in case anyone comes into that situation when using replaydata
-
RE: How to convert pips to price?
you need to know the pip location. the pip location says, where pips are after comma, for EUR_USD it would be 4. below you have two functions to calculate the price diff from pips and pips from a value.
def get_pips_from_value(value, pip_location, precision=0): ''' Returns pips from a value ''' div = float(10 ** pip_location) pips = value / div return round(pips, precision) def get_value_from_pips(pips, pip_location): ''' Returns price diff from pips ''' mult = float(10 ** pip_location) return float(pips * mult)
-
RE: Entering and taking profit on the same bar
@run-out you are right. we went through this yesterday via email. The result was like you explained. The child order gets activated after the parent order gets executed and will be checked in next cycle.
One way to manually allow executing on the same bar is to activate the child order by hand.
after the order was created:
self.target.activate()
Which is not that good idea, since then you don't know, if the parent order was executed, too.