--------------------------------------------------------------------------------------------------------------------------------------------------
尝试做为指标用在期货量化交易,效果不怎么理想。不过作者用DataFrame作为容器来分析指标,根据行情数据,分析指标效果,并进行验证让我学到不少。之前做量化交易策略设计时候只是靠第三方平台或者交易软件来验证,很不专业。
就尝试着用DataFrame作为容器分析市场行情指标,比如MACD,KDJ,CCI等等行情指标效用。这些行情指标,都可以从金融交易常用talib库获得。调出这样指标,结合行情数据分析,用DataFrame做为容器,充分利用python的数据分析和图片功能。
定义了一个类DataAnalyzer初始化的时候需要输入生成csv文件导出文件夹地址,和数据格式
方法db2df,输入vnpy使用Mongodb行情数据库信息和读取品种信息collection,程序会读取指定品种的开始时间到结束时间段的一分钟k线数据;按照初始化格式返回生成DataFrame供分析;如果expeort2csv为True的话,会生成一个csv文件到指定地址.
方法csv2df,输入指定路径的csv行情文件;程序读取csv文件;按照格式返回生成DataFrame供分析;如果expeort2csv为True的话,会生成一个csv文件到指定地址。程序把导入的字符串转换datetime格式,此时可能会有warning信息。
方法df2Barmin,输入DataFrame格式1分钟行情数据,和指定输出的多分钟k线,程序整合出对应多分钟k线数据。比如输出1分钟行情数据,要求输出5分钟k先数据;程序输出5分钟K线信息DataFrame供分析。这里有点地方要注意,如果数据中一天开始第一个bar是9点,那么crossmin为1; 如果第一个是9点1分,此处为0。如果expeort2csv为True的话,会生成一个csv文件到指定地址。
方法dfcci,其实是一个示例方法,输入DataFrame格式分钟行情数据,此处可以是1分钟也可以是多分钟的,和参数cciWindows;程序调用talib的cci方法,进行计算。返回带有新的一列cci数据的DataFrame。用来分析。如果expeort2csv为True的话,会生成一个csv文件到指定地址。打开就是这样一个东西。其他talib方法,其实调用原理差不多
通过上面的类,可以获得一个带有指标数据的DataFrame,后面就可以对这个进行分析,后面是一系列演示分析,其实DataFrame对应的分析工具很多多。
通过类的方法,读取一个rb1905行情数据,按照聚合出5分钟K线,在按照cci周期为15条K线计算cci值
画出cci的柱状分布图,CCI(Commodity Channel lndex)顺势指标是测量股价是否已超出常态分布范围的一个指数,波动于正无限大和负无限小之间。如下图x轴是cci值,y轴是出现次数。从图中可以看出cci数据是两个正太分布叠加,波峰在-80和+80两个值,正负200之后,cci出现就变的很少,此时可以用DataFrame的数字分析功能找到更多数据。
计算每个时间点的当前价格,和之后第5根K线结束价格差,和cci的值做成散点图,看看cci值和价格波动是否有联系。如下图,图象上看并没有上面关联。
cci值在正负(100-200)区间,和(200-300)区间算是出现比较少,计算在这个区间出现时,之后第2,第4,和第6根K线结束价格增多还是减少概率。从下图数据来看,似乎在(100,200)区间,第2个k线上涨概率大点点,但是也说明不了什么。
# encoding: UTF-8
from pymongo import MongoClient, ASCENDING
import pandas as pd
import numpy as np
from datetime import datetime
import talib
import matplotlib.pyplot as plt
import scipy.stats as scs
class DataAnalyzer(object):
def __init__(self, exportpath="C:\Project\\", datformat=[datetime, high, low, open, close,volume]):
self.mongohost = None
self.mongoport = None
self.db = None
self.collection = None
self.df = pd.DataFrame()
self.exportpath = exportpath
self.datformat = datformat
def db2df(self, db, collection, start, end, mongohost="localhost", mongoport=27017, export2csv=True):
"""读取MongoDB数据库行情记录,输出到Dataframe中"""
self.mongohost = mongohost
self.mongoport = mongoport
self.db = db
self.collection = collection
dbClient = MongoClient(self.mongohost, self.mongoport, connectTimeoutMS=500)
db = dbClient[self.db]
cursor = db[self.collection].find({datetime: {$gte: start}, datetime: {$lt: end}}).sort("datetime",
ASCENDING)
self.df = pd.DataFrame(list(cursor))
self.df = self.df[self.datformat]
self.df = self.df.reset_index(drop=True)
path = self.exportpath + self.collection + ".csv"
if export2csv == True:
self.df.to_csv(path, index=True, header=True)
return self.df
def csv2df(self, csvpath, dataname="csv_data", export2csv=True):
"""读取csv行情数据,输入到Dataframe中"""
csv_df = pd.read_csv(csvpath)
self.df = csv_df[self.datformat]
self.df["datetime"] = pd.to_datetime(self.df[datetime])
# self.df["high"] = self.df[high].astype(float)
# self.df["low"] = self.df[low].astype(float)
# self.df["open"] = self.df[open].astype(float)
# self.df["close"] = self.df[close].astype(float)
# self.df["volume"] = self.df[volume].astype(int)
self.df = self.df.reset_index(drop=True)
path = self.exportpath + dataname + ".csv"
if export2csv == True:
self.df.to_csv(path, index=True, header=True)
return self.df
def df2Barmin(self, inputdf, barmins, crossmin=1, export2csv=True):
"""输入分钟k线dataframe数据,合并多多种数据,例如三分钟/5分钟等,如果开始时间是9点1分,crossmin = 0;如果是9点1分,crossmin为1"""
dfbarmin = pd.DataFrame()
highBarMin = 0
lowBarMin = 0
openBarMin = 0
volumeBarmin = 0
datetime = 0
for i in range(0, len(inputdf) - 1):
bar = inputdf.iloc[i, :].to_dict()
if openBarMin == 0:
openBarmin = bar["open"]
if highBarMin == 0:
highBarMin = bar["high"]
else:
highBarMin = max(bar["high"], highBarMin)
if lowBarMin == 0:
lowBarMin = bar["low"]
else:
lowBarMin = min(bar["low"], lowBarMin)
closeBarMin = bar["close"]
datetime = bar["datetime"]
volumeBarmin += int(bar["volume"])
# X分钟已经走完
if not (bar["datetime"].minute + crossmin) % barmins: # 可以用X整除
# 生成上一X分钟K线的时间戳
barMin = {datetime: datetime, high: highBarMin, low: lowBarMin, open: openBarmin,
close: closeBarMin, volume : volumeBarmin}
dfbarmin = dfbarmin.append(barMin, ignore_index=True)
highBarMin = 0
lowBarMin = 0
openBarMin = 0
volumeBarmin = 0
if export2csv == True:
dfbarmin.to_csv(self.exportpath + "bar" + str(barmins) + ".csv", index=True, header=True)
return dfbarmin
def dfcci(self, inputdf, n, export2csv=True):
"""调用talib方法计算CCI指标,写入到df并输出"""
dfcci = inputdf
dfcci["cci"] = None
for i in range(n, len(inputdf)):
df_ne = inputdf.loc[i - n + 1:i, :]
cci = talib.CCI(np.array(df_ne["high"]), np.array(df_ne["low"]), np.array(df_ne["close"]), n)
dfcci.loc[i, "cci"] = cci[-1]
dfcci = dfcci.fillna(0)
dfcci = dfcci.replace(np.inf, 0)
if export2csv == True:
dfcci.to_csv(self.exportpath + "dfcci" + ".csv", index=True, header=True)
return dfcci
if __name__ == __main__:
DA = DataAnalyzer()
#数据库导入
# start = datetime(year=2019, month=3, day=1, hour=0, minute=0, second=0, microsecond=0)
# end = datetime.today()
# df = DA.db2df(db="VnTrader_1Min_Db", collection="rb1905", start = start, end = end)
#csv导入
df = DA.csv2df("rb1905.csv")
df10min = DA.df2Barmin(df,5)
dfaftercci = DA.dfcci(df10min, 15)
dfaftercci = dfaftercci.loc[15:,:]
dfaftercci = dfaftercci.reset_index(drop=True)
#######################################分析cci分布########################################
plt.figure(figsize=(15,5))
plt.hist(dfaftercci[cci],bins=100,histtype=bar,align=mid,orientation=vertical,color=r)
plt.show()
sta = scs.describe(dfaftercci.cci)
stew = sta[4]
kurtosis = sta[5]
print(cci的偏度:%s % (stew))
print(cci的峰度:%s % (kurtosis))
#######cci在(100 - 200),(200 -300)后的第2根,第4根,第6根的价格走势######################
dfaftercci["next2BarClose"] = None
dfaftercci["next4BarClose"] = None
dfaftercci["next6BarClose"] = None
dfaftercci["next5BarCloseMakrup"] = None
for i in range(1, len(dfaftercci)-6):
if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+2,"close"]:
dfaftercci.loc[i,"next2BarClose"] = -1
else:
dfaftercci.loc[i, "next2BarClose"] =1
if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+4,"close"]:
dfaftercci.loc[i, "next4BarClose"] = -1
else:
dfaftercci.loc[i, "next4BarClose"] = 1
if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+6,"close"]:
dfaftercci.loc[i, "next6BarClose"] = -1
else:
dfaftercci.loc[i, "next6BarClose"] = 1
#######计算######################
dfaftercci.loc[i,"next5BarCloseMakrup"] = dfaftercci.loc[i+5,"close"] - dfaftercci.loc[i,"close"]
dfaftercci = dfaftercci.fillna(0)
plt.figure(figsize=(15, 3))
plt.scatter(dfaftercci["cci"], dfaftercci["next5BarCloseMakrup"])
plt.show()
for cciValue in [100,200]:
de_anaylsis = dfaftercci.loc[(dfaftercci["cci"]>= cciValue)& (dfaftercci["cci"]< cciValue + 100)]
percebtage = de_anaylsis[de_anaylsis["next2BarClose"]>0]["next2BarClose"].count()*100.000/de_anaylsis[cci].count()
print(在cci 区间(%s , %s) 时候,第二根K线结束价格上涨概率为 %s%% %(cciValue,cciValue + 100,percebtage))
percebtage = de_anaylsis[de_anaylsis["next2BarClose"]<0]["next2BarClose"].count()*100.000/de_anaylsis[cci].count()
print(在cci 区间-(%s , %s) 时候,第二根K线结束价格下跌概率为 %s%% %(cciValue,cciValue + 100,percebtage))
percebtage = de_anaylsis[de_anaylsis["next4BarClose"] > 0]["next2BarClose"].count() * 100.000 / de_anaylsis[
cci].count()
print(在cci 区间(%s , %s) 时候,第四根K线结束价格上涨概率为 %s%% % (cciValue, cciValue + 100, percebtage))
percebtage = de_anaylsis[de_anaylsis["next4BarClose"] < 0]["next2BarClose"].count() * 100.000 / de_anaylsis[
cci].count()
print(在cci 区间-(%s , %s) 时候,第四根K线结束价格下跌概率为 %s%% % (cciValue, cciValue + 100, percebtage))
percebtage = de_anaylsis[de_anaylsis["next6BarClose"] > 0]["next2BarClose"].count() * 100.000 / de_anaylsis[
cci].count()
print(在cci 区间(%s , %s) 时候,第六根K线结束价格上涨概率为 %s%% % (cciValue, cciValue + 100, percebtage))
percebtage = de_anaylsis[de_anaylsis["next6BarClose"] < 0]["next2BarClose"].count() * 100.000 / de_anaylsis[
cci].count()
print(在cci 区间-(%s , %s) 时候,第六根K线结束价格下跌概率为 %s%% % (cciValue, cciValue + 100, percebtage))