先日の記事でInfluxQL
においてLIMIT
とOFFSET
を使いクエリを分割する方法を実装した。
しかしOFFSET
は仕組み上クエリ毎にOFFSET
値までの検索を走らせる必要があるため内部で重複したSELECT
が発生している。
つまり単純に1回のクエリでデータを取得する場合と比べ、分割クエリは処理時間が長くなることが分かっている。
そこで今回は各クエリにおいて最終レコードのタイムスタンプを使い次のWHERE
句を再生成することによりオーバーヘッドを軽減する。
Node.js
上からInfluxDB
を操作するライブラリnode-influx
を使用する。
それほど複雑ではないが分かりやすく順番に解説する。
SELECT COUNT(*) ~
で一度全件のカウントを取る。この時COUNTは非常に処理が軽いためクエリ分割は必要ない。
また件数は帰ってきた配列[0].count
というメンバ変数に入る点に注意。
const ループ回数 = Math.floor(配列[0].count / 設定するLIMIT);
LIMIT
を指定したSELECT
文を発行SELECT ◯ FROM △ WHERE time > 開始日時 AND time < 終了日時 LIMIT 1000
これにより最初の1000行が抽出される。私の場合2~3秒で1クエリとなるよう10万件でLIMIT
している。
.query
の結果がdata
という配列に入るようにしている。
const lastDate = data[data.length - 1].time.toNanoISOString()
必ずタイムスタンプがtime
というオブジェクトに入っている。このオブジェクトnode-influx
特有のもの。
いくつかメソッドを持っているおりtoNanoISOString()
を利用するとナノ秒を含めた日付文字列を取得できる。
また初心者がハマりやすい点として配列の最後の要素はlength - 1
である点は注意。
WHERE
句に追記次に発行するクエリに追記する。
SELECT ◯ FROM △ WHERE
time > 開始日時 AND
time < 終了日時 AND
time > ${lastDate} -- 追記
LIMIT 1000
最初のクエリにAND
条件を一つ追記した。
注意すべき点は不等号が「大なり」になっている点だ(逆にした場合は小なり)
イコールをつけてしまうと最後と同じ行が重複して取得されるためにこう記述する。
余談:time > 開始日時
の条件は重複なので削ってしまって良い。私の場合何かしらの改修orバグ発生時にここが抜け落ちているとえげつない量の件数がSELECTされる可能性もあり、条件が重複しても正常動作するためそのままにしている。オーバーヘッド的にはそれほどではないと考えている。
あとは③に戻って繰り返すだけだ。
ここは少し難しい。前回説明していないのでこちらで説明する。
クエリの発行は非同期で行われる。つまり複数のクエリが同時進行で処理されてしまう。
そのまま分けたクエリを同時に処理することも出来なくはないが量が多すぎてタイムアウトなどで破棄されるパターンがある。
私のコードでは大量にクエリを送信することのメリットはそれほど無いのでawait
により1件づつ実行している。
無名関数によりループ部にasync
を付ける。
.query
はpromise
を返すのでasync/await
にて待つことが出来る。もっと深く勉強したければ私の次の記事も読んでみて欲しい。
Promiseとasync/awaitが正しく機能しない時に読む記事
const loopMax = Math.floor(recordCount / select_limit); //←重要!セミコロンで明示的に行を終わる!
(async () => { //asyncの無名関数
for (let loopNo = 0; loopNo <= loopMax; loopNo++) { //ループ回数を考慮
await queryMain(loopNo, loopMax) //ここにawait
}
//データのファイル出力
exportData()
})();
無名関数の文法については他のWEBサイトへ譲る。
このコードが関数であれば 関数名 = () => {}
を変更し関数名 = async() => {}
でも良い。
余談:私は無名関数の構文はこのasync
が欲しいパターンでしか使わないが他に便利なケースがあるのだろうか?新しい記事が書けるかも。
queryMain()
の中身
//前省略
return remoteDB.query(limitedQuery) //ここでreturn
.then((res) => {
//後ろ省略
query
メソッドをreturn
すればそのままpromise
になる。
この実装により実行時間は1回のクエリで纏めて取得した時間とほぼ変わらない数値がでた。かなりのオーバーヘッド削減になったと思われる。
実装してみると意外に簡単であることが分かる。
どうせなので今回のコードを入れた5秒足をつくるコード全体を公開しておく。
グローバル変数が多いので余りスマートな実装だとは思えないがよければ参考にしてほしい。
※InfluxDB
とその設定関係は独自のソースを使っているのでコピペでは動かない。各自実装すること。
import Config from '../utils/config.mjs'
import DB from './libs/classes.mjs'
import dayjs from 'dayjs'
import * as fs from 'fs'
//------------------------------------------------------------------------------------
// 5秒足作成
// v0.1.3
// 2020/05/18 クエリを分けるシステム実装(OFFSETを使わない方式に変更)
// 空になってしまった足も生成するように変更
// 2020/05/07 日付ラベルの計算式を修正
//------------------------------------------------------------------------------------
//------------------------------------------
// 設定
//------------------------------------------
const sec_time_span = 5 //秒足の秒数
const select_limit = 100000 //1回のクエリで取得するレコード数
const use_validation = true //生成された足に不整合がないか検証する
//------------------------------------------
// グローバル変数
let candle = [] //足が格納される箱
let start_time = null //一番最初の足の日時を設定する
let errors = [] //エラー報告用
const config = new Config('../config/config.yaml') //DB用設定の読み込み
const process_time_start = process.hrtime() //計測用
let lastDate = null //OFFSETを使わないページネーション用
// DBサーバ
const remoteDB = new DB({
host: config.db_host_remote,
username: config.db_username,
password: config.db_password,
}).influx
if (sec_time_span - Math.floor(sec_time_span) != 0) {
console.log(`Error: sec_time_spanは整数である必要があります。`)
process.exit(1);
} else if (60 % sec_time_span) {
console.log(`Error: 1分(60秒)はsec_time_span(${sec_time_span}秒)では割り切れません。`)
process.exit(1);
} else {
console.log(`----- Start Creating ${sec_time_span}seconds OHLCV Data -----`)
}
//日時指定 外部から処理を委託する場合はここに引き渡し処理を記述
const start_date = '2020-05-17'
const end_date = '2020-05-18'
const allQuery = `SELECT * FROM "bitFlyer_db"."autogen"."lightning_executions_FX_BTC_JPY" WHERE time > '${start_date}' AND time < '${end_date}'`
const countQuery = `SELECT COUNT(id) FROM "bitFlyer_db"."autogen"."lightning_executions_FX_BTC_JPY" WHERE time > '${start_date}' AND time < '${end_date}'`
remoteDB.query(countQuery) //件数をカウント
.then((res) => {
if (res.length === 0) { console.log("No Data Counted."); process.exit(1); }
const recordCount = res[0].count //レコード数
console.log(`Total Records: ${recordCount}`)
const loopMax = Math.floor(recordCount / select_limit);
(async () => {
for (let loopNo = 0; loopNo <= loopMax; loopNo++) {
await queryMain(loopNo, loopMax) //分割してメインクエリを発行
}
//データのファイル出力
exportData()
})();
})
.catch((err) => {
console.log(err); process.exit(1);
})
//メインのクエリ発行
const queryMain = (loopNo, loopMax) => {
let limitedQuery = null
if (lastDate === null) { limitedQuery = `${allQuery} LIMIT ${select_limit}` } // 最初のレコードだけ日付を絞らない
else { limitedQuery = `${allQuery} AND time > '${lastDate}' LIMIT ${select_limit}` } // ページネーションコード
console.log(`Execute Query: ${loopNo} / ${loopMax}`)
return remoteDB.query(limitedQuery)
.then((res) => {
if (res.length === 0) { console.log("No Data Selected."); process.exit(1); }
candleMain(res) //ローソク足生成
lastDate = res[res.length - 1].time.toNanoISOString()//最後の日付を記録して次のページネーションを行っている
}).catch((err) => {
console.log(err)
})
}
const candleMain = (data) => {
//初回のみ最初の足の日時を決定する
if (candle.length === 0) { start_time = caliculateStartTime(data) }
else if (start_time === null) {
console.log("Error: Start Time is Null"); process.exit(1)
}
for (const d of data) {
const arrayNum = caliculateArrayNum(start_time, d.exec_date)
//要素を新規
if (!candle[arrayNum]) {
candle[arrayNum] = {
date: dayjs(start_time).add(arrayNum * sec_time_span, 'second'), //日付ラベル
O_date: dayjs(d.exec_date),
O: d.price,
H: d.price,
L: d.price,
C: d.price,
C_date: dayjs(d.exec_date),
V: { BUY: 0, SELL: 0 }
}
} else {
//O より早いレコードが入ってきた場合oを更新
if (candle[arrayNum].O_date > dayjs(d.exec_date)) {
candle[arrayNum].O_date = dayjs(d.exec_date)
candle[arrayNum].O = d.price
}
//Hを生成
if (candle[arrayNum].H < d.price) {
candle[arrayNum].H = d.price
}
//Lを生成
if (candle[arrayNum].L > d.price) {
candle[arrayNum].L = d.price
}
//C より遅いレコードが入ってきた場合Cを更新
if (candle[arrayNum].C_date < dayjs(d.exec_date)) {
candle[arrayNum].C_date = dayjs(d.exec_date)
candle[arrayNum].C = d.price
}
}
// Vを生成(BUY:SELL別Volume)
// 注意:candle[arrayNum].V[d.side]としてはいけない
// v.sideは板寄せ時の約定で空白が入っている可能性がある
if (d.side === 'BUY') {
candle[arrayNum].V.BUY += d.size
} else if (d.side === 'SELL') {
candle[arrayNum].V.SELL += d.size
}
}
}
// --------------------------------------------------------------------------------------------------------------------------------
// funcs
// --------------------------------------------------------------------------------------------------------------------------------
const exportData = () => {
//疎らな配列が出来るのでforで回す
for (let i = 0; i <= candle.length - 1; i++) {
//不整合がないか検証
if (use_validation) { validation({ c: candle[i], index: i }) }
const c = candle[i]
//出力
const BUY = (' ' + (Math.round(c.V.BUY * 100) / 100)).slice(-6) //文字のパディング
const SELL = (' ' + (Math.round(c.V.SELL * 100) / 100)).slice(-6) //文字のパディング
console.log(`${dayjs(c.date).format('YYYY/MM/DD HH:mm:ss')} [O: ${c.O}][H: ${c.H}][L: ${c.L}][C: ${c.C}][BUY: ${BUY}][SELL: ${SELL}] ${i}`)
}
//レポート
console.log("--- Report ---")
for (const e of errors) {
console.log(e)
}
//ファイルに出力
console.log("Exporting JSON File.")
fs.writeFileSync(`../data/candlestick(${sec_time_span}sec).json`, JSON.stringify(candle));
console.log("JSON File Exported.")
//処理時間計測
const performance_time_end = process.hrtime(process_time_start)
console.log(`Execution time: ${performance_time_end[0]}s ${performance_time_end[1] / 1000000}ms`)
}
// 生成した足にエラーがないか検証する
const validation = ({ c, index }) => {
// 疎らなためデータが存在しなかった場合は空の箱を入れる
if (!candle[index]) {
console.log(`${index}を追加`)
candle[index] = {
date: dayjs(start_time).add(index * sec_time_span, 'second'), //日付ラベル
O_date: dayjs(start_time).add(index * sec_time_span, 'second'),
O: candle[index - 1].C,
H: candle[index - 1].C,
L: candle[index - 1].C,
C: candle[index - 1].C,
C_date: dayjs(start_time).add(index * sec_time_span, 'second'), //日付ラベルと同じものを入れる
V: { BUY: 0, SELL: 0 }
}
c = candle[index]
}
if (c.L > c.H) errors.push(`Error(index${index}): L > H`) //LがHより大きい
if (c.O_date > c.C_date) errors.push(`Error(${index}): O_date > C_date`) //Oの日付よりCの日付のほうが早い
if (index > 0) {
if (candle[index - 1]) {
if (candle[index - 1].C_date > c.O_date) { errors.push(`Error(${index}): C_date[-1] > O_date`) } //前レコードのCの日付より今のレコードのOの日付のほうが早い
const prevPlus5 = dayjs(candle[index - 1].date).unix() + sec_time_span
const current = dayjs(candle[index].date).unix()
if (prevPlus5 != current) { errors.push(`Error(${index}): Date Differs ${prevPlus5}(prev + 5) : ${current}(current)`) } //日付ラベルが不正
} else {
errors.push(`Error: index${index - 1} dosen't exists.`)
}
}
}
//開始時間の計算
const caliculateStartTime = (data) => {
const start1 = dayjs(data[0].exec_date).second() //最初の秒数
//console.log(`秒数 ${start1}`)
const start2 = Math.floor(start1 / sec_time_span) //要素番号
//console.log(`要素番号: ${start2}`)
const start3 = start2 * sec_time_span
//console.log(`スタート秒数: ${start3}`)
const start4 = dayjs(data[0].exec_date).startOf('minute').second(start3)
//console.log(`スタート時間: ${start4}`)
return start4
}
//要素番号の計算
const caliculateArrayNum = (start_time, exec_date) => {
const time = dayjs(exec_date).unix() - start_time.unix()
return Math.floor(time / 5)
}