DEV

Bitflyerの強化学習BOTをつくってみる

下記などを参考にしながら強化学習BOTを作ってみたいと思います。

http://www.wildml.com/2018/02/introduction-to-learning-to-trade-with-reinforcement-learning/www.wildml.com

GCPの環境で、Open AI Gymを使ってやりたいと思っています。データ取得からcsv生成まではnode.jsでやって、gcsにあげてbigQueryにインポートして、それをGCMLでごにょごにょして、それをGymで使うみたいになるのかなあと昨日調べて思いました。 細かいことまで全部調べて考えてると頭がこんがらがるので、とりあえず適当な感じで一通りを進めてみようと思っております。昨日データ取得のコード作って夜中中動かして、750万件くらいデータとったんだけど、rest apiで取ってるので、いわゆる遅延的な情報がないので、mmbot的なものにはあんまりよろしくないのかもなあと思ったのですが、せっかくなので、取得した2月13日の24時間分データを使って、シミュレーションしたいと思います。

まずは、思いついたものをすぐ具現化出来る状態にしたいので、ルール的なものは適当に決めていきたいと思います。

BOTの基本ルール

  • 使うデータ:上記の24時間分データ
  • データの整形: 1秒足データを作ってそれを使う(始値、終値、安値、高値、買サイズ、売サイズ)
  • 売買ルール:5秒に1回アクションをとる。(4連続同一サイドで約定すると強制解除。)
  • アクション:買う、売る、なにもしない(注文の有効期限は5分)
  • 売買価格:直近1秒足の高値と安値の真ん中
  • 報酬:約定時の利益変動

データ取得

下記のようなnode.jsコードを作って取り込みました。非常に適当に作っております。firestoreに入れています。firestoreにbatch()というのを使って一括で保存しているのですが、3分くらい経つとこれの途中でとまります。なので、2分したら強制的にプログラムを止めて、foreverで死活監視して自動再起動することでエンドレス取り込みをしております。でもなぜか750万位取り込んだ時点でプログラムが動いていませんでした。でも十分とれたからいいです。

const bf = require('./api/bitflyer');
const {db} = require('./api/firebase');
const finTime = 2; //終了させる時間(分)
const interval = 330; //api呼び出し間隔(ミリ秒)
let loop = true; //繰り返しフラグ
let callNum = 0; //api呼び出し回数
let oldestId = 816700061; //取得した最も古いID
let emptyCount = 0; //約定データが空だった回数
let emptyLimit = 100; //約定データが何回空だったらシステムを停止させるか
let executions = null;
let data = null;
let startTime = 0;
const main = async () => {
startTime = new Date().getTime();
console.log('start bf-exec');
const snap = await db.collection('oldestId').doc('1').get();
if (snap.data()) oldestId = snap.data().oldestId + 501;
await getLoop();
};
const getLoop = async () => {
if ((new Date().getTime() - startTime) > finTime * 60 * 1000) {
console.log('time is over');
process.exit(0);
} else {
console.log('loop');
if (!loop) return;
executions = await getExecutions();
await save();
setTimeout(() => getLoop(), interval);
}
};
const getExecutions = async () => {
console.log('getExecutions');
callNum += 1;
let before = null;
if (oldestId && oldestId > 0) before = oldestId;
console.log(oldestId);
return await bf.getExecutions(500, 'FX_BTC_JPY', before);
};
const save = async () => {
console.log('save');
if (!(executions && executions.data && executions.data.length > 0)) {
console.log('データがありません');
emptyCount += 1;
if (emptyCount >= emptyLimit) loop = false;
return;
}
data = executions.data;
oldestId = data[data.length - 1].id;
console.log('start batch')
let batch = db.batch();
let ref = null;
data.forEach(exec => {
ref = db.collection('raw_data').doc(exec.id.toString());
batch.set(ref, exec);
});
await batch.commit();
console.log('fin batch')
await db.collection('oldestId').doc('1').set({oldestId: oldestId});
};
main();

データ整形

1秒足データを作って保存します。node.jsで下記の様なものを作って保存しました。

const {db} = require('./api/firebase');
const moment = require('moment');
const getNum = 8000; //1回に取得するデータ数
let oldestId = null;
let loop = true;
let batch = null;
const main = async () => {
//oldestID取得(最も古いIDがFirestoreに登録済みか)
const snap = await db.collection('oldestId').doc('1').get();
if (snap.data()) {
//登録済みならその値をセット
oldestId = snap.data().oldestId;
} else {
//未登録ならraw_dataの最新IDをセット
const rawDataSnap = await db.collection('raw_data')
.orderBy('id', 'desc')
.limit(1)
.get();
oldestId = rawDataSnap.docs[0].data().id + 1;
}
return await createData();
};
//1秒足データ生成処理のループ
const createData = async () => {
try {
while (loop) {
//orderIdより古いデータをgetNum個取得する
const rawDataSnap = await db.collection('raw_data')
.where('id', '<=', oldestId)
.orderBy('id', 'desc')
.limit(getNum)
.get();
if (!rawDataSnap || !rawDataSnap.docs) {
console.log('empty');
return loop = false;
}
//最初と最後のデータは取得漏れ可能性を考慮して、下記でデータ取得・登録を実施
batch = db.batch();
await getSameSecData(rawDataSnap.docs[0].data().exec_date);
let firstStartTime = null; //取得データの最初のstartTime
let targetTime = null; //現在集計対象のstartTime
let sameSecData = []; //現在集計対象の同一秒データの配列
//取得した全データをチェックして、秒速足を作成していく
rawDataSnap.docs.forEach(async data => {
data = data.data();
let start = _startTime(data.exec_date);
if (!firstStartTime) firstStartTime = start;
//最初のデータは上記で登録済みのため排除
if (firstStartTime !== start) {
if (!targetTime) targetTime = start;
if (targetTime !== start) {
console.log('---- start ---')
console.log(sameSecData);
console.log('---- end ---')
const oneSecData = createOneSecData(sameSecData);
sameSecData = [];
sameSecData.unshift(data);
targetTime = start;
await save(oneSecData);
} else {
sameSecData.unshift(data);
}
}
});
const docLength = rawDataSnap.docs.length;
await getSameSecData(rawDataSnap.docs[docLength - 1].data().exec_date);
await batch.commit();
console.log('saved!');
batch = null;
oldestId = rawDataSnap.docs[docLength - 1].data().id;
await db.collection('oldestId').doc('1').set({oldestId: oldestId});
}
} catch (e) {
return console.log(e.code, e.message);
}
};
//exec_dateと同じ秒のデータを全部取得する
const getSameSecData = async (exec_date) => {
const start = _startTime(exec_date);
const end = _endTime(exec_date);
const timeSnap = await db.collection('raw_data')
.where('exec_date', '>=', start)
.where('exec_date', '<', end)
.orderBy('exec_date', 'asc')
.get();
let sameSecData = [];
timeSnap.docs.forEach(data => {
sameSecData.push(data.data());
});
const oneSecData = createOneSecData(sameSecData);
return await save(oneSecData);
};
const createOneSecData = data => {
let info = initInfo();
info.time = _startTime(data[0].exec_date);
info.startPrice = data[0].price;
info.endPrice = data[data.length - 1].price;
data.forEach(d => {
info.count += 1;
if (d.side === 'BUY') {
info.buySize += d.size;
} else {
info.sellSize += d.size;
}
if (info.maxPrice < d.price) info.maxPrice = d.price;
if (info.minPrice > d.price) info.minPrice = d.price;
});
return info;
};
const save = async (data) => {
console.log(data);
const timeId = createTimeId(data.time);
let ref = db.collection('oneSecData').doc(timeId);
batch.set(ref, data);
};
const createTimeId = (exec_date) => {
return moment(exec_date).format('YYYY-MM-DD-HH-mm-ss');
};
const _startTime = (exec_date) => {
return moment(exec_date).set({'millisecond': 0}).format().replace(/\+09:00/, '');
};
const _endTime = (exec_date) => {
return moment(exec_date).add(1, 's').set({'millisecond': 0}).format().replace(/\+09:00/, '');
};
const initInfo = () => ({
time: null,
startPrice: 0,
endPrice: 0,
maxPrice: 0,
minPrice: 100000000,
buySize: 0,
sellSize: 0,
count: 0,
});
main();

感想

ここまでやって思ったのは、Firestoreに入れるのはよくないなあとおもった。バックアップとか過不足なくデータを入れたいのでとりあえずFirestoreにいれようと思ったんだけど、金額的に厳しそうだから、普通にCSVとか作ってgoogle driveか、cloud storageにアップするような感じにしよう。GCE -> CRON-> BF API + REST API -> CSV -> Google Driveみたいな感じかなあと思った。1秒足データなら、1日8MBくらいなんだけど、生データだとでっかいから、移動が大変そう。という意味では、FirestoreからbigQueryへの移動の超絶的速さにはビビった。。まあでも2GBくらいだったきもする。

報酬の計算(バックテスト)

シミュレーションしていって、報酬が発生したら、エージェントくんにあげます。まだ見ぬエージェントくんに早くあげたいです。イマイチ報酬のあげ方的なところが分かってないので、とりあえず1日通しのバックテストをするようにしてみた。しっかりコードのテストをしてる状態じゃないけど、とりあえずメモ的に掲載。何も考えずに5秒おきに買って、売ってを繰り返すと、2月13日では、-14280円値幅くらい損したようです。毎回0.2でポジってたら、-2856円位です。

const fs = require('fs');
const readline = require('readline');
const orderTimeRange = 5; //注文間隔(秒)
const orderTimeLimit = 60; //各注文の有効時間(秒)
const oneShotSize = 0.2; //1回の注文サイズ
const maxPosiSize = 0.6; //保有可能なポジションサイズ
const lossCutPriceRate = 0.05; //強制ポジション解除の場合の金額すべらす率(%)
let executions = []; //約定データ
let orderNum = 0; //全注文回数
let actions = []; //今回のアクションが全てはいった配列
let positions = []; //全ての約定が入った配列
//シミュレーション
//ポジ有効期限切れあったか?
//ポジ解除時の利益遷移(これが報酬)
const main = async () => {
await readExecutions();
console.log('executions: ' + executions.length);
orderNum = Math.ceil(executions.length / orderTimeRange);
console.log('orderNum: ' + orderNum);
getActions();
console.log('actions: ' + actions.length);
for (let i = 0; i < orderNum; i++) {
let idx = orderTimeRange * i;
let action = actions[i];
const max = parseInt(executions[idx].maxPrice);
const min = parseInt(executions[idx].minPrice);
let price = parseInt((max - min) / 2 + min);
let executed = checkExecuted(idx, price);
if (!executed) continue;
positions.push({
action: action,
price: price,
executed: executed
});
}
sortPositions();
checkPosi();
};
const checkPosi = () => {
const maxPosiCount = Math.ceil(maxPosiSize / oneShotSize);
let posiData = [];
let profit = 0;
let totalProfit = 0;
positions.forEach(posi => {
if (posiData.length <= 0) {
posiData.push(posi);
} else {
if (posi.action === posiData[0].action) {
posiData.push(posi);
} else {
let old = posiData.shift();
if (old.action === 'BUY') {
profit = posi.price - old.price;
} else {
profit = old.price - posi.price;
}
}
}
if (posiData.length > maxPosiCount) {
let totalPrice = 0;
posiData.forEach(old => {
totalPrice += old.price;
});
const oldPrice = parseInt(totalPrice / posiData.length);
if (posi.action === 'SELL') {
profit += posi.price - oldPrice;
} else {
profit += oldPrice - posi.price;
}
profit -= posi.price * lossCutPriceRate / 100;
posiData = [];
}
totalProfit += profit;
posi.profit = profit;
posi.totalProfit = totalProfit;
profit = 0;
});
// console.log(positions);
console.log(totalProfit);
console.log(totalProfit * oneShotSize);
};
const sortPositions = () => {
positions.sort((a, b) => {
if (a.executed < b.executed) return -1;
if (a.executed > b.executed) return 1;
return 0;
});
};
//約定できたかどうかを返す
// idx + 1の開始時点で注文を出す
// idx + 1の終了時点で注文1秒経過
const checkExecuted = (idx, price) => {
for (let i = 0; i < orderTimeLimit; i++) {
let execIdx = idx + 1 + i;
if (execIdx >= executions.length) break;
let exec = executions[execIdx];
const max = parseInt(exec.maxPrice);
const min = parseInt(exec.minPrice);
if (price >= min && price <= max) return execIdx;
}
return null;
};
//今回はとりあえず、買いと売りを繰り返すだけにしてみる。
const getActions = () => {
let action = 'BUY';
for (let i = 0; i < orderNum; i++) {
actions.push(action);
action = action === 'BUY' ? 'SELL' : 'BUY';
}
};
//jsonファイルを読み込む
const readExecutions = async () => {
const fileStream = fs.createReadStream('./one_sec_0213.json');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
executions.push(JSON.parse(line));
}
};
main();