Best posts made by hghhgghdf dfdf
-
RE: ZigZag indicator for live trading.
Thank you for this code. I made an indicator based on it which measures the median/mean length of zigzag moves:
from collections import deque import backtrader as bt import numpy as np __all__ = ['ZigZag', 'ZigZagLen', ] class ZigZagLen(bt.Indicator): params = ( ## ZigZag inputs ('retrace', 0.05), # in percent ('minbars', 14), # number of bars to skip after the trend change ## # ZigZagLen inputs ('full_init', True), # ('period', 6 * 90,), # 6*90 # positive number for rolling series, -1 for expanding series ('output', 'median'), # median or mean ('del_outliers', True), ('alpha', 0.05), # exponential smoothing input ) lines = ('combined', 'bull', 'bear') def __init__(self): self.zigzag = ZigZag(retrace=self.p.retrace, minbars=self.p.minbars, ) assert self.p.output in ['median', 'mean'] if self.p.output == 'median': self.func = np.median elif self.p.output == 'mean': self.func = np.mean self.stored_listlen = 0 if self.p.full_init: assert self.p.period > 0, 'when full_init is enabled, we need a minimum period' self.addminperiod(self.p.period) def next(self): zz = self.zigzag l = self.l llist = {l.combined: ['bull', 'bear'], l.bull : ['bull'], l.bear : ['bear'], } # # No new values. Copying previous values instead of repeating calculations if len(zz.lenlist) < self.stored_listlen: for line in llist.keys(): line[0] = line[-1] return self.stored_listlen = len(zz.lenlist) # Do we have at least something to work with? if len(zz.lenlist) >= 8: if self.p.del_outliers: q1 = np.percentile(zz.lenlist, 25) q3 = np.percentile(zz.lenlist, 75) iqr = q3 - q1 upperlim = q3 + 1.5 * iqr lowerlim = q1 - 1.5 * iqr else: upperlim = float('inf') lowerlim = 0 lenlist = list(reversed(zz.lenlist)) if self.p.period > 0: # Rolling series if sum(lenlist) > self.p.period: # finding index of last value in our moving series cumlst = np.cumsum(lenlist) idx = np.where(cumlst > self.p.period)[0][0] lenlist = lenlist[:idx] # Getting the length of current move last_len = InfoInt(zz.since_last_pivot) last_len.bias = 'bear' if lenlist[0].bias != 'bear' else 'bull' lenlist.append(last_len) for line, bias in llist.items(): filter_lst = [v for v in lenlist if lowerlim < v < upperlim and v.bias in bias] output = self.func(filter_lst) if np.isnan(line[-1]): line[0] = output else: alpha = self.p.alpha line[0] = line[-1] * (1 - alpha) + output * alpha # inheriting allows us to add variables to an int class InfoInt(int): pass class ZigZag(bt.Indicator): ''' ZigZag indicator. ''' lines = ( 'trend', 'last_high', 'last_low', 'zigzag', ) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(_plotskip=True), last_high=dict(color='green', ls='-', _plotskip=True), last_low=dict(color='black', ls='-', _plotskip=True), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), ) params = ( ('retrace', 0.05), # in percent ('minbars', 14), # number of bars to skip after the trend change ) def __init__(self): super(ZigZag, self).__init__() self.addminperiod(self.p.minbars) assert self.p.retrace > 0, 'Retracement should be above zero.' assert self.p.minbars >= 0, 'Minimal bars should be >= zero.' self.retrace_thresh = self.data.close * self.p.retrace / 100 self.minbars = self.p.minbars self.count_bars = 0 self.last_pivots = deque([0, 0], maxlen=2) self.last_pivot_t = 0 self.since_last_pivot = 0 self.lenlist = [] self.stored_datalen = 0 def prenext(self): self.l.trend[0] = 0 self.l.last_high[0] = self.data.high[0] self.l.last_low[0] = self.data.low[0] self.l.zigzag[0] = (self.data.high[0] + self.data.low[0]) / 2 def next(self): # No new candle. Got called due to resampling if len(self.data) == self.stored_datalen: return self.stored_datalen = len(self.data) curr_idx = len(self.data) self.retrace_thresh = self.data.close[0] * self.p.retrace / 100 self.since_last_pivot = curr_idx - self.last_pivot_t self.l.trend[0] = self.l.trend[-1] self.l.last_high[0] = self.l.last_high[-1] self.l.last_low[0] = self.l.last_low[-1] self.l.zigzag[0] = float('nan') # Initialize trend if self.l.trend[-1] == 0: # If current candle has higher low and higher high if self.l.last_low[0] < self.data.low[0] and self.l.last_high[0] < self.data.high[0]: self.l.trend[0] = 1 self.l.last_high[0] = self.data.high[0] self.last_pivot_t = curr_idx # If current candle has lower low and lower high elif self.l.last_low[0] > self.data.low[0] and self.l.last_high[0] > self.data.high[0]: self.l.trend[0] = -1 self.l.last_low[0] = self.data.low[0] self.last_pivot_t = curr_idx # Up trend elif self.l.trend[-1] == 1: # if higher high, move pivot location to current high if self.data.high[0] > self.l.last_high[-1]: self.l.last_high[0] = self.data.high[0] self.count_bars = self.minbars self.last_pivot_t = curr_idx # elif at least p.minbars since last bull swing and currently in a retrace -> Switch Bearish elif self.count_bars <= 0 and self.l.last_high[0] - self.data.low[0] > self.retrace_thresh \ and self.data.high[0] < self.l.last_high[0]: self.switch_to_bear(curr_idx) # elif bearish close elif self.count_bars < self.minbars and self.data.close[0] < self.l.last_low[0]: self.switch_to_bear(curr_idx) # Down trend elif self.l.trend[-1] == -1: # if lower low, move pivot location to current low if self.data.low[0] < self.l.last_low[-1]: self.l.last_low[0] = self.data.low[0] self.count_bars = self.minbars self.last_pivot_t = curr_idx # elif we had an established bear swing and currently in a retrace -> Switch Bullish elif self.count_bars <= 0 and self.data.high[0] - self.l.last_low[0] > self.retrace_thresh and \ self.data.low[0] > self.l.last_low[0]: self.switch_to_bull(curr_idx) # elif bullish close elif self.count_bars < self.minbars and self.data.close[0] > self.l.last_high[-1]: self.switch_to_bull(curr_idx) # Decrease minbars counter self.count_bars -= 1 def switch_to_bear(self, idx): self.l.trend[0] = -1 self.count_bars = self.minbars self.l.last_low[0] = self.data.low[0] self.l.zigzag[-self.since_last_pivot] = self.l.last_high[0] self.last_pivot_t = idx self.record_idx(idx, 'bull', ) def switch_to_bull(self, idx): self.l.trend[0] = 1 self.count_bars = self.minbars self.l.last_high[0] = self.data.high[0] self.l.zigzag[-self.since_last_pivot] = self.l.last_low[0] self.last_pivot_t = idx self.record_idx(idx, 'bear', ) def record_idx(self, idx, bias, ): self.last_pivots.append(idx) last_move_len = self.last_pivots[-1] - self.last_pivots[0] last_move_len = InfoInt(last_move_len) last_move_len.bias = bias self.lenlist.append(last_move_len)
-
RE: Genetic Optimization
@Wayne-Filkins-0 pass whatever you want to optimize to the maximize function. So for your case, you could multiply a binary value (profitable/unprofitable) by the percentage of profitable trades and pass that to the gen-opt function
-
RE: Optimizer gives me " only size-1 arrays can be converted to Python scalars" error
@vladisld Had the same issue. problem is that arange returns numpy datatypes which causes this operation to return a numpy array:
stddev = self.p.devfactor * StdDev(self.data, ma, period=self.p.period, movav=self.p.movav)
Converting self.p.devfactor to a standard float fixes the problem
-
RE: Final SELL order not created in Quickstart Examples
@Yulem said in Final SELL order not created in Quickstart Examples:
but no SELL order is generated upon exit leaving me with an open position. I believe the logic is to generate a SELL using the final day's CLOSE value which makes sense and is what I saw in the previous examples.
There is no problem with your code. Whether you still have a position open or closed at the end of a backtest is inconsequential. If you want to generate a sell specifically using the final day's close value, you can do so in the strategy's stop method
-
RE: Order.StopTrail turns positive shares into short position
@Adham-Suliman said in Order.StopTrail turns positive shares into short position:
elif self.position.size > 0: self.close(exectype=bt.Order.StopTrail, trailamount = 8)
You're creating a stop order for every next call where your position is open. Assign this order to a variable and only create a new one if it doesn't exist.
-
RE: How to make All-in Market Price in tick-chart
@김도언 said in How to make All-in Market Price in tick-chart:
if self.buy_continue: if self.broker.get_cash() > 0: self.order = self.buy(size=self.market_volume[0])
market volume will be more than your available cash during your last fill.
a much easier alternative would be something like this inside the notify_order method
if order.status in [order.Margin, order.Rejected]: self.order = self.buy(size=int(self.data.close[0] / self.broker.get_cash())
possibly lower size by 1 until you get your fill.
-
RE: Close Order doesn't close whole size and sell order gets rejected
Correct way to write this:
def next(self): pass
The problem in your code is here:
# Sell stocks no longer meeting ranking filter. for i, d in enumerate(rankings): if self.getposition(d).size: if i > self.p.num_positions: self.close(d)
if i > 2, close the position, but since python uses zero-based indexing, it will only close a position if a stock is in 4th spot
-
RE: Close Order doesn't close whole size and sell order gets rejected
But does it explain why only a share of my shares get sold or the order gets rejected?
self.order_target_percent(d, target=pos_size)
This line sold those 3 shares. You're ordering for the target pct to be hit every candle so as the value of your account fluctuates, this method will add/subtract to your position to match the pct.
-
RE: Close Order doesn't close whole size and sell order gets rejected
not sure either.
I'd suggest cleaning up previous mistakes and walking through the code with breakpoints and more detailed logging. These kind of issues are always much easier to fix if you have personal access to the code/data.
Latest posts made by hghhgghdf dfdf
-
RE: ZigZag indicator for live trading.
Thank you for this code. I made an indicator based on it which measures the median/mean length of zigzag moves:
from collections import deque import backtrader as bt import numpy as np __all__ = ['ZigZag', 'ZigZagLen', ] class ZigZagLen(bt.Indicator): params = ( ## ZigZag inputs ('retrace', 0.05), # in percent ('minbars', 14), # number of bars to skip after the trend change ## # ZigZagLen inputs ('full_init', True), # ('period', 6 * 90,), # 6*90 # positive number for rolling series, -1 for expanding series ('output', 'median'), # median or mean ('del_outliers', True), ('alpha', 0.05), # exponential smoothing input ) lines = ('combined', 'bull', 'bear') def __init__(self): self.zigzag = ZigZag(retrace=self.p.retrace, minbars=self.p.minbars, ) assert self.p.output in ['median', 'mean'] if self.p.output == 'median': self.func = np.median elif self.p.output == 'mean': self.func = np.mean self.stored_listlen = 0 if self.p.full_init: assert self.p.period > 0, 'when full_init is enabled, we need a minimum period' self.addminperiod(self.p.period) def next(self): zz = self.zigzag l = self.l llist = {l.combined: ['bull', 'bear'], l.bull : ['bull'], l.bear : ['bear'], } # # No new values. Copying previous values instead of repeating calculations if len(zz.lenlist) < self.stored_listlen: for line in llist.keys(): line[0] = line[-1] return self.stored_listlen = len(zz.lenlist) # Do we have at least something to work with? if len(zz.lenlist) >= 8: if self.p.del_outliers: q1 = np.percentile(zz.lenlist, 25) q3 = np.percentile(zz.lenlist, 75) iqr = q3 - q1 upperlim = q3 + 1.5 * iqr lowerlim = q1 - 1.5 * iqr else: upperlim = float('inf') lowerlim = 0 lenlist = list(reversed(zz.lenlist)) if self.p.period > 0: # Rolling series if sum(lenlist) > self.p.period: # finding index of last value in our moving series cumlst = np.cumsum(lenlist) idx = np.where(cumlst > self.p.period)[0][0] lenlist = lenlist[:idx] # Getting the length of current move last_len = InfoInt(zz.since_last_pivot) last_len.bias = 'bear' if lenlist[0].bias != 'bear' else 'bull' lenlist.append(last_len) for line, bias in llist.items(): filter_lst = [v for v in lenlist if lowerlim < v < upperlim and v.bias in bias] output = self.func(filter_lst) if np.isnan(line[-1]): line[0] = output else: alpha = self.p.alpha line[0] = line[-1] * (1 - alpha) + output * alpha # inheriting allows us to add variables to an int class InfoInt(int): pass class ZigZag(bt.Indicator): ''' ZigZag indicator. ''' lines = ( 'trend', 'last_high', 'last_low', 'zigzag', ) plotinfo = dict( subplot=False, plotlinelabels=True, plotlinevalues=True, plotvaluetags=True, ) plotlines = dict( trend=dict(_plotskip=True), last_high=dict(color='green', ls='-', _plotskip=True), last_low=dict(color='black', ls='-', _plotskip=True), zigzag=dict(_name='zigzag', color='blue', ls='-', _skipnan=True), ) params = ( ('retrace', 0.05), # in percent ('minbars', 14), # number of bars to skip after the trend change ) def __init__(self): super(ZigZag, self).__init__() self.addminperiod(self.p.minbars) assert self.p.retrace > 0, 'Retracement should be above zero.' assert self.p.minbars >= 0, 'Minimal bars should be >= zero.' self.retrace_thresh = self.data.close * self.p.retrace / 100 self.minbars = self.p.minbars self.count_bars = 0 self.last_pivots = deque([0, 0], maxlen=2) self.last_pivot_t = 0 self.since_last_pivot = 0 self.lenlist = [] self.stored_datalen = 0 def prenext(self): self.l.trend[0] = 0 self.l.last_high[0] = self.data.high[0] self.l.last_low[0] = self.data.low[0] self.l.zigzag[0] = (self.data.high[0] + self.data.low[0]) / 2 def next(self): # No new candle. Got called due to resampling if len(self.data) == self.stored_datalen: return self.stored_datalen = len(self.data) curr_idx = len(self.data) self.retrace_thresh = self.data.close[0] * self.p.retrace / 100 self.since_last_pivot = curr_idx - self.last_pivot_t self.l.trend[0] = self.l.trend[-1] self.l.last_high[0] = self.l.last_high[-1] self.l.last_low[0] = self.l.last_low[-1] self.l.zigzag[0] = float('nan') # Initialize trend if self.l.trend[-1] == 0: # If current candle has higher low and higher high if self.l.last_low[0] < self.data.low[0] and self.l.last_high[0] < self.data.high[0]: self.l.trend[0] = 1 self.l.last_high[0] = self.data.high[0] self.last_pivot_t = curr_idx # If current candle has lower low and lower high elif self.l.last_low[0] > self.data.low[0] and self.l.last_high[0] > self.data.high[0]: self.l.trend[0] = -1 self.l.last_low[0] = self.data.low[0] self.last_pivot_t = curr_idx # Up trend elif self.l.trend[-1] == 1: # if higher high, move pivot location to current high if self.data.high[0] > self.l.last_high[-1]: self.l.last_high[0] = self.data.high[0] self.count_bars = self.minbars self.last_pivot_t = curr_idx # elif at least p.minbars since last bull swing and currently in a retrace -> Switch Bearish elif self.count_bars <= 0 and self.l.last_high[0] - self.data.low[0] > self.retrace_thresh \ and self.data.high[0] < self.l.last_high[0]: self.switch_to_bear(curr_idx) # elif bearish close elif self.count_bars < self.minbars and self.data.close[0] < self.l.last_low[0]: self.switch_to_bear(curr_idx) # Down trend elif self.l.trend[-1] == -1: # if lower low, move pivot location to current low if self.data.low[0] < self.l.last_low[-1]: self.l.last_low[0] = self.data.low[0] self.count_bars = self.minbars self.last_pivot_t = curr_idx # elif we had an established bear swing and currently in a retrace -> Switch Bullish elif self.count_bars <= 0 and self.data.high[0] - self.l.last_low[0] > self.retrace_thresh and \ self.data.low[0] > self.l.last_low[0]: self.switch_to_bull(curr_idx) # elif bullish close elif self.count_bars < self.minbars and self.data.close[0] > self.l.last_high[-1]: self.switch_to_bull(curr_idx) # Decrease minbars counter self.count_bars -= 1 def switch_to_bear(self, idx): self.l.trend[0] = -1 self.count_bars = self.minbars self.l.last_low[0] = self.data.low[0] self.l.zigzag[-self.since_last_pivot] = self.l.last_high[0] self.last_pivot_t = idx self.record_idx(idx, 'bull', ) def switch_to_bull(self, idx): self.l.trend[0] = 1 self.count_bars = self.minbars self.l.last_high[0] = self.data.high[0] self.l.zigzag[-self.since_last_pivot] = self.l.last_low[0] self.last_pivot_t = idx self.record_idx(idx, 'bear', ) def record_idx(self, idx, bias, ): self.last_pivots.append(idx) last_move_len = self.last_pivots[-1] - self.last_pivots[0] last_move_len = InfoInt(last_move_len) last_move_len.bias = bias self.lenlist.append(last_move_len)
-
RE: Adding indicator causes order to be rejected when using fractional size
Add commission info before running cerebro. Not inside of the strategy.
-
RE: Overcoming memory limitations in optimization
in the run method from cerebro you can intercept the variable product and choose to run only a certain number of optimizations.
lststrats = list(itertools.product(*self.strats)) random.shuffle(lststrats) lststrats = lststrats[:100] iterstrats = iter(lststrats)
Using 5 years of hourly data I can run roughly 80-160 sessions at a time. (depending on how much memory observers/indicators take up)
Alternatively you could roughly line up your tests with that number in mind: test in batches of 3 parameters with 6-8 variables for each parameter.
-
RE: Optimizing using threading module
fyi the pathos module has a multiprocessing implementation which uses dill instead of pickle. This solves a lot of pickling errors.
-
RE: Problem fetching execution price
just use self.position.price.
self.positions[data].price if using multiple data sets.
-
RE: Optimization leads to worse results than random picking of parameters
You could for each variable, average the return for each parameter and then select the best performing parameter out of that selection
e.g.:
Alternatively, you could include the neighboring parameters as part of the selection process. (fitness value of period=10 is the average of the fitness values of period 8-12)
Many possible solutions.
-
RE: Optimization leads to worse results than random picking of parameters
Export your backtesting data to excel and analyze if over there
Having all results lined up makes it plain as day if a parameter combination is the result of dumb luck or alpha
-
RE: How to read data feeds of more than two years?
Would need to see your data. Maybe try removing fromdate & todate arguments. That will use all available data by default