長らくInfluxDBを取り扱ってきたがようやくbitFlyerのRealtime APIより流れてくるデータの記録をスタートさせる準備が整った。
全ては今後投資のバックテスト等を行うためだ。
次の項目をチェックしていく。
今回は初回のため1回で全てチェックするのは難しいため継続的に検証する予定。
当面はデータの容量を見ていく予定だが、データの欠落や回線が切断されたときの挙動もなんとかして調べる方法はないだろうか(アイデアなし)
回線切断に関してはsocket.ioが自動で再接続を試みるらしいが、切断された時のログ出力など実装していないため不安である。なにかアイデアがあれば実装したい。
仮想通貨投資においてはAmazonのAWSを使っているプレイヤーが多い印象だが自宅サーバーでの記録を実験してみる。
メモリは16G。8~9年前のモデルだがInfluxDBのスペック表で見ると十分な性能だ。容量は少ないがSSDを搭載している。
ただ、普通のPCであるため電気代と発熱が気になる。長時間の使用に耐えられるのだろうか。
これまでの記事よりNode.jsを継続して使用。サーバー本体が直接socket.ioにてbitFlyerのAPIに接続し「node-influx」というInfluxDB用のクライアントパッケージで書き込んでいる。
import bF from './bitFlyer.js'
import * as db from './influx.mjs'
const socket_bF_client = bF()
const influx = db.getDatabaseNames();
// スナップショット
socket_bF_client.on("lightning_board_snapshot_FX_BTC_JPY", (message) => {
db.writeBoardData({ message, influx, type: "snapshot" })
})
// 差分
socket_bF_client.on("lightning_board_FX_BTC_JPY", (message) => {
db.writeBoardData({ message, influx, type: "update" })
})
// 約定
socket_bF_client.on("lightning_executions_FX_BTC_JPY", (message) => {
db.writeExecutions({ message, influx})
})
// ティッカー
socket_bF_client.on("lightning_ticker_FX_BTC_JPY", (message) => {
db.writeTicker({ message, influx})
})
import Influx from 'influx'
import microtime from 'microtime'
export const getDatabaseNames = function () {
const influx = new Influx.InfluxDB({
host: 'localhost',
database: 'bitFlyer_db',
schema: [
{
measurement: 'lightning_board_FX_BTC_JPY',
fields: {
price: Influx.FieldType.INTEGER,
size: Influx.FieldType.FLOAT,
},
tags: [
'type',
'bid_or_ask',
]
},
{
measurement: 'lightning_executions_FX_BTC_JPY',
fields: {
id: Influx.FieldType.INTEGER,
price: Influx.FieldType.INTEGER,
size: Influx.FieldType.FLOAT,
exec_date: Influx.FieldType.STRING,
buy_child_order_acceptance_id: Influx.FieldType.STRING,
sell_child_order_acceptance_id: Influx.FieldType.STRING,
},
tags: [
'side',
]
},
{
measurement: 'lightning_ticker_FX_BTC_JPY',
fields: {
product_code: Influx.FieldType.STRING,
timestamp: Influx.FieldType.STRING,
tick_id: Influx.FieldType.INTEGER,
best_bid: Influx.FieldType.INTEGER,
best_ask: Influx.FieldType.INTEGER,
best_bid_size: Influx.FieldType.FLOAT,
best_ask_size: Influx.FieldType.FLOAT,
total_bid_depth: Influx.FieldType.FLOAT,
total_ask_depth: Influx.FieldType.FLOAT,
ltp: Influx.FieldType.INTEGER,
volume: Influx.FieldType.FLOAT,
volume_by_product: Influx.FieldType.FLOAT,
},
tags: []
},
]
})
influx.getDatabaseNames()
.then(names => {
if (!names.includes('bitFlyer_db')) {
return influx.createDatabase('bitFlyer_db');
}
})
.catch(err => {
console.error(`Error creating Influx database! : ` + err);
})
return influx;
}
// Ticker
export const writeTicker = ({ message, influx }) => {
//Tickerはズラす必要がない
const output_data = {
measurement: 'lightning_ticker_FX_BTC_JPY',
fields: {
product_code: message.product_code,
timestamp: message.timestamp,
tick_id: message.tick_id,
best_bid: message.best_bid,
best_ask: message.best_ask,
best_bid_size: message.best_bid_size,
best_ask_size: message.best_ask_size,
total_bid_depth: message.total_bid_depth,
total_ask_depth: message.total_ask_depth,
ltp: message.ltp,
volume: message.volume,
volume_by_product: message.volume_by_product
},
tags: []
}
influx.writePoints([output_data]).then(() => {
}).catch(err => {
console.error(`Error saving Ticker data to InfluxDB! ${err.stack}`)
})
}
export const writeExecutions = ({ message, influx }) => {
//ナノ秒
const nano_date = BigInt(microtime.now() * 1000)
const output_data = message.map((data, index) => {
//ずらし
const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
return {
measurement: 'lightning_executions_FX_BTC_JPY',
fields: {
id: data.id,
price: data.price,
size: data.size,
exec_date: data.exec_date,
buy_child_order_acceptance_id: data.buy_child_order_acceptance_id,
sell_child_order_acceptance_id: data.ell_child_order_acceptance_id
},
tags: {
side: data.side
},
timestamp: indexed_date
}
})
influx.writePoints(output_data).then(() => {
}).catch(err => {
console.error(`Error saving Executions data to InfluxDB! ${err.stack}`)
})
}
export const writeBoardData = ({ message, influx, type }) => {
//ナノ秒
const nano_date = BigInt(microtime.now() * 1000)
const asks_data = message.asks.map((asks, index) => {
//ずらし
const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
return {
measurement: 'lightning_board_FX_BTC_JPY',
tags: {
type,
bid_or_ask: 'ask',
},
fields: {
price: asks.price,
size: asks.size
},
timestamp: indexed_date
}
})
const bids_data = message.bids.map((bids, index) => {
//ずらし
const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
return {
measurement: 'lightning_board_FX_BTC_JPY',
tags: {
type,
bid_or_ask: 'bid',
},
fields: {
price: bids.price,
size: bids.size
},
timestamp: indexed_date
}
})
influx.writePoints([...asks_data, ...bids_data]).then(() => {
}).catch(err => {
console.error(`Error saving Board data to InfluxDB! ${err.stack}`)
})
}
基本コード内でやっている事は過去の記事で紹介したことのみである。少しコードが冗長である気がするが時間の都合でこのままの実装だ。
この記事の執筆時で4時間と45分稼働した。115MBのデータがこの時間で増加している。
1日1ギガバイトになるかと見積もっていたが、この量なら500MB~多くて1G/日といったところだろうか。約定履歴のみで3G/monthという情報を頂いているので、板情報を差分含め記録している分それなりに覚悟せねばならない。
さすがに稼働時間が短いこともあって今の所Wi-Fi等にも問題はない。
長時間運用するにあたって障害が発生する兆しを少しでも把握するため、CPUやMEMの使用状況も合わせて記録してみたい。
これにはTelegrafというソフトウェアを追加で使用するのがベストかと思う。ハードウェアの使用状況などを記録するために利用するソフトウェアだ。
TelegrafがあればどれぐらいのSSD容量を消費するのかもグラフで確認出来るかと思う。
自宅サーバーのようなことをやるのは非常に久々でありディスクアクセスのランプが24時間チカチカしているのはやはり少し不安になる。
とりあえずはデータ分析用としてでなく記録が継続できるかのテストなので気軽にやっていきたい。