利用粒子群优化算法(PSO)来优化vnpy的量化策略参数

发布时间:2025-08-31 21:03:26 作者:益华网络 来源:undefined 浏览量(0) 点赞(0)
摘要:上一篇文章中说要用PSO算法来做量化策略优化,模仿DEAP的示例代码,做了一个单线程的类,实现优化。里面代码结构是和之前做利用遗传算法优化的基本一样。 粒子全算法可以看

上一篇文章中说要用PSO算法来做量化策略优化,模仿DEAP的示例代码,做了一个单线程的类,实现优化。里面代码结构是和之前做利用遗传算法优化的基本一样。

粒子全算法可以看看我之前文章,抄的分析结果:

粒子群算法比遗传算法具有更高效的信息共享机制,更新群体极值使得信息实现全局范围共享,但遗传算法通过交叉和变异拥有比粒子群算法更有效的逃离局部最优解的概率。

下面简单说说代码:

1、静态函数object_func,本函数为优化目标函数,根据随机生成的策略参数,运行回测后自动返回1个结果指标:夏普比率 这个是直接抄GenticOptimize2V的。这个独立出来是为了之后多线程实现。这里传入的是一个[{key1:para},{key2,para}] 这样结构的参数队列。

2、类PSOOptimize,放置主要函数如下:

2.1、__init__ 对象初始化,需要传入的参数是Strategy量化策略, Symbollist回测品种, Parameterlist参数范围。

2.2、creator.create("FitnessMax", base.Fitness, weights=(1.0,)) 定义优化方向

        creator.create("Particle", list, fitness=creator.FitnessMax, speed=list, pmin = list, pmax = list,smin=list, smax=list, parameterPackage = dict, best=None)

         定义粒子类,其中包括参数list,回测返回结果,速度list,所在范围上下限队列,和速度 上下限

队列;还有一个打包的参数字典,主要为了之后多线程调用。和示例代码对比最大区别就是多了所在范围上线,因为如果没有,会出现负参数的情况报错。

2.3、particle_generate, 生成粒子,包含随机位置,和速度;这里面只支持(start,end,pace) 这样一种参数范围赋值。start和end就是位置范围上下限。步长pace就是速度上限,负pace就是速度下限。

2.4、updateParticle,更新粒子信息,根据粒子群最佳位置best,去更新粒子part的位置和速度。如果速度或位置在上下限,就去上下限值。如果位置值是整数,比是MAwindow这样,就会更新值也是整数。

        速度公式:

        v[] = v[] + c1 * rand() * (pbest[] - present[]) + c2 * rand() * (gbest[] - present[])

        位置公式:

        present[] = persent[] + v[]

2.5、optimize,这个没说明好说,主要就是绑定函数到toolbox,然后定义粒子群的粒子数量,和寻觅次数。优化出最有结果。    使用pool.map实现了多线程

2.6、mutated, 自变异函数

示例代码运行结果如下,result是夏普值。

best para: [{slMultiplier: 2.0}, {bollDev: 2.0}, {bollWindow: 53.0}, {atrWindow: 37.0}, {cciWindow: 8.0}], result:(1.194,)

------------------------------------------------------------------2019.4.15--------------------------------------------------------------------------------

增加自适应变异函数mutated,使用pool.map实现多线程。

可以在我的github的 PSOOptimize文件夹下载。

https://github.com/BillyZhangGuoping/MarketDataAnaylzerbyDataFrame

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# encoding: UTF-8
"""
展示如何实现PSO粒子群优化VNPY策略参数
1、静态函数object_func,本函数为优化目标函数,根据随机生成的策略参数,运行回测后自动返回1个结果指标:夏普比率 这个是直接抄GenticOptimize2V的。这个独立出来是为了之后多线程实现。这里传入的是一个[{key1:para},{key2,para}] 这样结构的参数队列。
2、类PSOOptimize,放置主要函数如下:
2.1、__init__ 对象初始化,需要传入的参数是Strategy量化策略, Symbollist回测品种, Parameterlist参数范围。
2.2、creator.create("FitnessMax", base.Fitness, weights=(1.0,)) 定义优化方向
creator.create("Particle", list, fitness=creator.FitnessMax, speed=list, pmin = list, pmax = list,smin=list, smax=list, parameterPackage = dict, best=None)
定义粒子类,其中包括参数list,回测返回结果,速度list,所在范围上下限队列,和速度 上下限 队列;还有一个打包的参数字典,主要为了之后多线程调用。和示例代码对比最大区别就是多了所在范围上线,因为如果没有,会出现负参数的情况报错。
2.3、particle_generate, 生成粒子,包含随机位置,和速度;这里面只支持(start,end,pace) 这样一种参数范围赋值。start和end就是位置范围上下限。步长pace就是速度上限,负pace就是速度下限。
2.4、updateParticle,更新粒子信息,根据粒子群最佳位置best,去更新粒子part的位置和速度。如果速度或位置在上下限,就去上下限值。如果位置值是整数,比是MAwindow这样,就会更新值也是整数。
速度公式:
v[] = v[] + c1 * rand() * (pbest[] - present[]) + c2 * rand() * (gbest[] - present[])
位置公式:
present[] = persent[] + v[]
2.5、optimize,这个没说明好说,主要就是绑定函数到toolbox,然后定义粒子群的粒子数量,和寻觅次数。优化出最有结果。
使用pool.map实现了多线程
2.6、mutated, 自变异函数
"""
from __future__ import division
from __future__ import print_function
import operator
import random
import numpy
from deap import base
from deap import creator
from deap import tools
from vnpy.trader.app.ctaStrategy.ctaBacktesting import BacktestingEngine, MINUTE_DB_NAME, OptimizationSetting
from vnpy.trader.app.ctaStrategy.strategy.strategyBollChannel import BollChannelStrategy
import multiprocessing
import pandas as pd
import datetime
def object_func(strategy_avgTuple):
"""
本函数为优化目标函数,根据随机生成的策略参数,运行回测后自动返回1个结果指标:夏普比率
这个是直接赋值GenticOptimize2V的
"""
strategy_avg = strategy_avgTuple
paraSet = strategy_avgTuple.parameterPackage
symbol = paraSet["symbol"]
strategy = paraSet["strategy"]
# 创建回测引擎对象
engine = BacktestingEngine()
# 设置回测使用的数据
engine.setBacktestingMode(engine.BAR_MODE)  # 设置引擎的回测模式为K线
engine.setDatabase("VnTrader_1Min_Db", symbol["vtSymbol"])  # 设置使用的历史数据库
engine.setStartDate(symbol["StartDate"])  # 设置回测用的数据起始日期
engine.setEndDate(symbol["EndDate"])  # 设置回测用的数据起始日期
# 配置回测引擎参数
engine.setSlippage(symbol["Slippage"])  # 1跳
engine.setRate(symbol["Rate"])  # 佣金大小
engine.setSize(symbol["Size"])  # 合约大小
engine.setPriceTick(symbol["Slippage"])  # 最小价格变动
engine.setCapital(symbol["Capital"])
setting = {}
for item in range(len(strategy_avg)):
setting.update(strategy_avg[item])
engine.clearBacktestingResult()
# 加载策略
engine.initStrategy(strategy, setting)
# 运行回测,返回指定的结果指标
engine.runBacktesting()  # 运行回测
# 逐日回测
# engine.calculateDailyResult()
backresult = engine.calculateBacktestingResult()
try:
sharpeRatio = round(backresult[sharpeRatio], 3)
totalResultCount = round(backresult[totalResult],3)
except Exception, e:
print("Error: %s, %s" %(str(Exception),str(e)))
sharpeRatio = 0
return sharpeRatio,
class PSOOptimize(object):
strategy = None
symbol = {}
parameterlist = {}
parameterPackage = {}
# ------------------------------------------------------------------------
def __init__(self, Strategy, Symbollist, Parameterlist):
self.strategy = Strategy
self.symbol = Symbollist
self.parameterlist = Parameterlist
self.parameterPackage = {
"strategy":self.strategy,
"symbol":self.symbol
}
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Particle", list, fitness=creator.FitnessMax, speed=list,
pmin = list, pmax = list,smin=list, smax=list, parameterPackage = dict, best=None)
def particle_generate(self):
"""
生产particle粒子,根据传入设置的起始值,终止值随机生成位置,和位置最大最小值,根据步进生成速度,和速度最大最小值
支持两种参数设置,一个是三个元素start,end 和pace
还有一个单一不变点
"""
position_list = []
speed_list = []
pmin_list = []
pmax_list = []
smin_list = []
smax_list = []
for key, value in self.parameterlist.items():
if isinstance(value, tuple):
if len(value) == 3:
if isinstance(value[0],int):
position_list.append({key:random.randrange(value[0], value[1])})
else:
position_list.append({key: random.uniform(value[0], value[1])})
pmin_list.append(value[0])
pmax_list.append(value[1])
speed_list.append(random.uniform(-value[2], value[2]))
smin_list.append(-value[2])
smax_list.append(value[2])
else:
print("Paramerte list incorrect")
elif isinstance(value, int) or isinstance(value, float):
position_list.append({key: value})
pmin_list.append(value)
pmax_list.append(value)
speed_list.append(0)
smin_list.append(0)
smax_list.append(0)
else:
print("Paramerte list incorrect")
particle = creator.Particle(position_list)
particle.speed = speed_list
particle.pmin = pmin_list
particle.pmax = pmax_list
particle.smin = smin_list
particle.smax = smax_list
particle.parameterPackage = self.parameterPackage
return particle
def updateParticle(self,part, best, phi1, phi2):
"""
根据粒子群最佳位置best,去更新粒子part的位置和速度,
速度公式:
v[] = v[] + c1 * rand() * (pbest[] - present[]) + c2 * rand() * (gbest[] - present[])
位置公式:
present[] = persent[] + v[]
"""
u1 = (random.uniform(0, phi1) for _ in range(len(part)))
u2 = (random.uniform(0, phi2) for _ in range(len(part)))
v_u1 = map(operator.mul, u1, map(self.sub, part.best, part))# c1 * rand() * (pbest[] - present[])
v_u2 = map(operator.mul, u2, map(self.sub, best, part)) # c2 * rand() * (gbest[] - present[])
part.speed = list(map(operator.add, part.speed, map(operator.add, v_u1, v_u2)))
for i, speed in enumerate(part.speed):
if speed < part.smin:
part.speed[i] = part.smin[i]
elif speed > part.smax:
part.speed[i] = part.smax[i]
#返回现在位置,如果原来是整数,返回也是整数,否则这是浮点数; 如果超过上下限,用上下限数值
for i,item in enumerate(part):
if isinstance(item.values()[0],int):
positionV = round(item.values()[0] + part.speed[i])
else:
positionV = item.values()[0] + part.speed[i]
if positionV <= part.pmin[i] :
part[i][item.keys()[0]] = part.pmin[i]
elif positionV >= part.pmax[i]:
part[i][item.keys()[0]] = part.pmax[i]
else:
part[i][item.keys()[0]] = positionV
def sub(self,a,b):
return a.values()[0] - b.values()[0]
def mutated(self,particle,MUTPB):
"""
自适应变异
:param particle: 传入
:param MUTPB:
:return:
"""
size = len(particle)
newPart = self.particle_generate()
for i in xrange(size):
if random.random() < MUTPB:
particle[i] = newPart[i]
return particle
def optimize(self):
toolbox = base.Toolbox()
pool = multiprocessing.Pool(processes=(multiprocessing.cpu_count()))
toolbox.register("map", pool.map)
toolbox.register("particle", self.particle_generate)
toolbox.register("population", tools.initRepeat, list, toolbox.particle)
toolbox.register("update", self.updateParticle, phi1=2.0, phi2=2.0)
toolbox.register("evaluate", object_func)
pop = toolbox.population(n=20) #粒子群有5个粒子
GEN = 20 #更新一千次
MUTPB = 0.1 #自适应变异概率
best = None
for g in range(GEN):
# for part in pop: #每次更新,计算粒子群中最优参数,并把最优值写入best
#     part.fitness.values = toolbox.evaluate(part)
#     if not part.best or part.best.fitness < part.fitness:
#         part.best = creator.Particle(part)
#         part.best.fitness.values = part.fitness.values
#     if not best or best.fitness < part.fitness:
#         best = creator.Particle(part)
#         best.fitness.values = part.fitness.values
fitnesses = toolbox.map(toolbox.evaluate, pop) #利用pool.map,实现多线程
for part, fit in zip(pop, fitnesses):
part.fitness.values = fit
if not part.best or part.best.fitness < part.fitness:
part.best = creator.Particle(part)
part.best.fitness.values = part.fitness.values
if not best or best.fitness < part.fitness:
best = creator.Particle(part)
best.fitness.values = part.fitness.values
if g < GEN - 1: #在最后一轮之前,每轮进行位置更新和变异
for i,part in enumerate(pop):
toolbox.update(part, best)#更新粒子位置
if random.random() < MUTPB: #自适应变异
self.mutated(part,MUTPB)
# Gather all the fitnesses in one list and print the stats
return pop, best
if __name__ == "__main__":
Strategy = BollChannelStrategy
Symbol = {
"vtSymbol": rb1905,
"StartDate": "20181001",
"EndDate": "20190301",
"Slippage": 1,
"Size": 10,
"Rate": 2 / 10000,
"Capital": 10000
}
Parameterlist = {
bollWindow:(10,50,5),
bollDev:(3.0,6.0,0.6),
cciWindow:(10,50,5),
atrWindow:(10,50,5),
slMultiplier:(3.5,5.5,0.5),
fixedSize:1
}
parameterPackage = {
"symbol": Symbol,
"parameterlist": Parameterlist,
"strategy": Strategy
}
PSO = PSOOptimize(Strategy, Symbol, Parameterlist)
pop,best = PSO.optimize()
print ("best para: %s, result:%s" %(best,best.fitness.values))
print(pop[:20])
print("-- End of (successful) %s evolution --", Symbol["vtSymbol"])

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!