INFRA

Bitflyer FXの約定データを取得して1秒足データを作ってFirestoreに保存する

データ取得

下記のようなnode.jsコードを作って取り込みました。非常に適当に作っております。firestoreに入れています。firestoreにbatch()というのを使って一括で保存しているのですが、3分くらい経つとこれの途中でとまります。なので、2分したら強制的にプログラムを止めて、foreverで死活監視して自動再起動することでエンドレス取り込みをしております。でもなぜか750万位取り込んだ時点でプログラムが動いていませんでした。でも十分とれたからいいです。

const bf = require('./api/bitflyer');
const {db} = require('./api/firebase');
const finTime = 2; //終了させる時間(分)
const interval = 330; //api呼び出し間隔(ミリ秒)
let loop = true; //繰り返しフラグ
let callNum = 0; //api呼び出し回数
let oldestId = 816700061; //取得した最も古いID
let emptyCount = 0; //約定データが空だった回数
let emptyLimit = 100; //約定データが何回空だったらシステムを停止させるか
let executions = null;
let data = null;
let startTime = 0;
const main = async () => {
startTime = new Date().getTime();
console.log('start bf-exec');
const snap = await db.collection('oldestId').doc('1').get();
if (snap.data()) oldestId = snap.data().oldestId + 501;
await getLoop();
};
const getLoop = async () => {
if ((new Date().getTime() - startTime) > finTime * 60 * 1000) {
console.log('time is over');
process.exit(0);
} else {
console.log('loop');
if (!loop) return;
executions = await getExecutions();
await save();
setTimeout(() => getLoop(), interval);
}
};
const getExecutions = async () => {
console.log('getExecutions');
callNum += 1;
let before = null;
if (oldestId && oldestId > 0) before = oldestId;
console.log(oldestId);
return await bf.getExecutions(500, 'FX_BTC_JPY', before);
};
const save = async () => {
console.log('save');
if (!(executions && executions.data && executions.data.length > 0)) {
console.log('データがありません');
emptyCount += 1;
if (emptyCount >= emptyLimit) loop = false;
return;
}
data = executions.data;
oldestId = data[data.length - 1].id;
console.log('start batch')
let batch = db.batch();
let ref = null;
data.forEach(exec => {
ref = db.collection('raw_data').doc(exec.id.toString());
batch.set(ref, exec);
});
await batch.commit();
console.log('fin batch')
await db.collection('oldestId').doc('1').set({oldestId: oldestId});
};
main();

データ整形

1秒足データを作って保存します。node.jsで下記の様なものを作って保存しました。

const {db} = require('./api/firebase');
const moment = require('moment');
const getNum = 8000; //1回に取得するデータ数
let oldestId = null;
let loop = true;
let batch = null;
const main = async () => {
//oldestID取得(最も古いIDがFirestoreに登録済みか)
const snap = await db.collection('oldestId').doc('1').get();
if (snap.data()) {
//登録済みならその値をセット
oldestId = snap.data().oldestId;
} else {
//未登録ならraw_dataの最新IDをセット
const rawDataSnap = await db.collection('raw_data')
.orderBy('id', 'desc')
.limit(1)
.get();
oldestId = rawDataSnap.docs[0].data().id + 1;
}
return await createData();
};
//1秒足データ生成処理のループ
const createData = async () => {
try {
while (loop) {
//orderIdより古いデータをgetNum個取得する
const rawDataSnap = await db.collection('raw_data')
.where('id', '<=', oldestId)
.orderBy('id', 'desc')
.limit(getNum)
.get();
if (!rawDataSnap || !rawDataSnap.docs) {
console.log('empty');
return loop = false;
}
//最初と最後のデータは取得漏れ可能性を考慮して、下記でデータ取得・登録を実施
batch = db.batch();
await getSameSecData(rawDataSnap.docs[0].data().exec_date);
let firstStartTime = null; //取得データの最初のstartTime
let targetTime = null; //現在集計対象のstartTime
let sameSecData = []; //現在集計対象の同一秒データの配列
//取得した全データをチェックして、秒速足を作成していく
rawDataSnap.docs.forEach(async data => {
data = data.data();
let start = _startTime(data.exec_date);
if (!firstStartTime) firstStartTime = start;
//最初のデータは上記で登録済みのため排除
if (firstStartTime !== start) {
if (!targetTime) targetTime = start;
if (targetTime !== start) {
console.log('---- start ---')
console.log(sameSecData);
console.log('---- end ---')
const oneSecData = createOneSecData(sameSecData);
sameSecData = [];
sameSecData.unshift(data);
targetTime = start;
await save(oneSecData);
} else {
sameSecData.unshift(data);
}
}
});
const docLength = rawDataSnap.docs.length;
await getSameSecData(rawDataSnap.docs[docLength - 1].data().exec_date);
await batch.commit();
console.log('saved!');
batch = null;
oldestId = rawDataSnap.docs[docLength - 1].data().id;
await db.collection('oldestId').doc('1').set({oldestId: oldestId});
}
} catch (e) {
return console.log(e.code, e.message);
}
};
//exec_dateと同じ秒のデータを全部取得する
const getSameSecData = async (exec_date) => {
const start = _startTime(exec_date);
const end = _endTime(exec_date);
const timeSnap = await db.collection('raw_data')
.where('exec_date', '>=', start)
.where('exec_date', '<', end)
.orderBy('exec_date', 'asc')
.get();
let sameSecData = [];
timeSnap.docs.forEach(data => {
sameSecData.push(data.data());
});
const oneSecData = createOneSecData(sameSecData);
return await save(oneSecData);
};
const createOneSecData = data => {
let info = initInfo();
info.time = _startTime(data[0].exec_date);
info.startPrice = data[0].price;
info.endPrice = data[data.length - 1].price;
data.forEach(d => {
info.count += 1;
if (d.side === 'BUY') {
info.buySize += d.size;
} else {
info.sellSize += d.size;
}
if (info.maxPrice < d.price) info.maxPrice = d.price;
if (info.minPrice > d.price) info.minPrice = d.price;
});
return info;
};
const save = async (data) => {
console.log(data);
const timeId = createTimeId(data.time);
let ref = db.collection('oneSecData').doc(timeId);
batch.set(ref, data);
};
const createTimeId = (exec_date) => {
return moment(exec_date).format('YYYY-MM-DD-HH-mm-ss');
};
const _startTime = (exec_date) => {
return moment(exec_date).set({'millisecond': 0}).format().replace(/\+09:00/, '');
};
const _endTime = (exec_date) => {
return moment(exec_date).add(1, 's').set({'millisecond': 0}).format().replace(/\+09:00/, '');
};
const initInfo = () => ({
time: null,
startPrice: 0,
endPrice: 0,
maxPrice: 0,
minPrice: 100000000,
buySize: 0,
sellSize: 0,
count: 0,
});
main();

感想

ここまでやって思ったのは、Firestoreに入れるのはよくないなあとおもった。バックアップとか過不足なくデータを入れたいのでとりあえずFirestoreにいれようと思ったんだけど、金額的に厳しそうだから、普通にCSVとか作ってgoogle driveか、cloud storageにアップするような感じにしよう。ちなみに、750万件の生データを保存するのに1800円くらいかかって、1秒足データのためのデータ読み込みと保存でまた1800円くらいかかった。