【AWS】bitflyer 自動注文

【きっかけ】

過去の記事参照
 
buffalokusojima.hatenablog.com


1. 使用するサービス

(AWS)

  • Lambda
  • DynamoDB
  • IAM
  • SSM

(外部サービス)

2.概要

実際に注文を行うLambdaの実装に関しては過去記事参照。また、ベースとなるLambdaである約定確認Lambdaに関しても過去記事参照。
途中で呼び出すLambdaとして注文一覧取得Lambdaも必要です。

buffalokusojima.hatenablog.com


buffalokusojima.hatenablog.com

buffalokusojima.hatenablog.com


仕組みとしては、約定通知を受けた際に、事前に用意したDynamoDB内で挿入した注文がなくなってあれば、約定されたと認識し、約定をした注文とは
逆のSTOP注文をします。それにより、価格がある程度、下がるまたは上がった際に損切りを自動かつ無限に行ってくれる仕組みになります。
つまりは、価格が暴騰、暴落した際に自身で決めた価格で確実に購入、売却はされる状態にあるということです。機会損失をなくすことに注力した実装になります。

3. 実装

3-1. DynamoDBの設定

stop_check_bitflyerという名でテーブルを作成しています。
主な設定としては以下になります。

・Primary partition key ・・・ coin_pair (String)
・Primary sort key ・・・ price (Number)

NOSQLなのでややこしいですが、主キーと考えていただいて結構です。基本的に同じ価格のItemは存在しない。
挿入しようとすると上書きになります。

また、現状あるデータを参考として、載せると以下のようになります。

stop_check_bitflyer_table
stop_check_bitflyerの中身

その他カラムに関してはプログラム内で追加しているので、そちらで説明します。事前いカラムを決めなくてもいいのがDynamoDBの特徴になります。


3-2. Lambda実装

ベースは過去記事で載せた約定通知Lambdaを改造したものになります。
以下、コード。

const AWS = require('aws-sdk');
const ssm = new (require('aws-sdk/clients/ssm'))();
const lambda = new AWS.Lambda();

const request = require('request');
const crypto = require('crypto');
const momentTimezone = require('moment-timezone');

const sqs = new AWS.SQS();
const ORDER_METHOD = 'OCO';
//const CONDITION_TYPE = 'STOP';
const STOP_PRICE_BIAS = 0.003; //STOP価格のバイアス
const SPECIAL_QUEUE_URL = 'QUEUEのURL';
const queueUrl = SPECIAL_QUEUE_URL;
const groupId = "SPECIAL_ORDER";


AWS.config.update({region: 'ap-northeast-1'});

const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});

exports.handler = (event, context, callback) => {
    
    const id = process.env['id'];
    console.log('id: ' + id);
    
    getParameterFromSystemManager('bitflyer-keys',callback)
    .then(function(data){
       
        const apikey = data.split(",")[0];
        const sercretKey = data.split(",")[1];
        
        var timestamp = Date.now().toString();
        var method = 'GET';
        var path = '/v1/me/getchildorders?product_code=FX_BTC_JPY&child_order_state=COMPLETED';
        
        var afterID = '&after='
        
        if(id){
            path += afterID + id;
        }
        
        var text = timestamp + method + path;
        var sign = crypto.createHmac('sha256', sercretKey).update(text).digest('hex');
        
        var option = {
          url: 'https://api.bitflyer.jp' + path,
          method: method,
          headers: {
            'ACCESS-KEY': apikey,
            'ACCESS-TIMESTAMP': timestamp,
            'ACCESS-SIGN': sign,
            'Content-Type': 'application/json'
            }
        }
        
        return sendRequest(option, callback);
    }).then(function(data){
        
        if(data.response.statusCode != 200){
          console.error("Error:",data.response);
          callback(null, {
            statusCode: data.response.statusCode,
            body: JSON.stringify({message: data.response}),
            headers: {"Content-type": "application/json"}
          });
          return;
        }
        
        data = JSON.parse(data.body);
        
        if(data.length == 0){
            console.log('No data Found');
            callback(null,{
                statusCode: 200,
                body: JSON.stringify({message: 'No data Found'}),
                headers: {"Content-type": "application/json"}
            });
            return;
        }
        
        console.log(data)
        
        const dateTimeUtc = momentTimezone.tz(data[0].child_order_date.split(" ")[0], 'UTC');
        
        const dateTimeJst = momentTimezone(dateTimeUtc).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss');
        
        const date = new Date(dateTimeJst);
        
        const toDay = new Date(momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss'));
        
        
        if(id == null && toDay.getTime() - date.getTime() > 180000){
            process.env['id'] = data[0].id;
            console.log('Lambda Restarted');
            callback(null,{
            statusCode: 500,
            body: JSON.stringify({message: 'Lambda Restarted'}),
            headers: {"Content-type": "application/json"}
        });
        return;
        }
        
        process.env['id'] = data[0].id;
        
        var message = '\n';
        
        data.forEach(function(value){
            
            const dateTimeUtc = momentTimezone.tz(value.child_order_date.split(" ")[0], 'UTC');
            
            const dateTimeJst = momentTimezone(dateTimeUtc).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss');
            
            
            message += "[" + value.side + ": " + value.child_order_type + "]\n"
                    + "Date " + dateTimeJst + "\n"
                    + "price " + value.price + "\n"
                    + "size: " + value.size + "\n"
                    + "average price: " + value.average_price + "\n"
                    + "executed size: " + value.executed_size + "\n"
                    + "----------------------\n";
                    
        });
        
        console.log(message);
       
        sendLine(message,callback);

        //ここから追加

  //約定のデータをDynamoDB用に加工します。
        var history = makeArrayforDynamoDB(data);
        
        //DynamoDBから自動注文のデータを取得して処理
        getDatafromDynamoDB(callback).then(function(data){
            
            if(data.Items.length == 0){
              console.log("price check data not set");
              callback(null, {
                  statusCode: 200,
                  body: JSON.stringify({message: "price check data not set"}),
                  headers: {"Content-type": "application/json"}
                });
                return;
            }
            
            const dbData = data.Items;
            
    //注文一覧Lambdaを呼び出す
            callLambda(callback).then(function(data){
                var body = JSON.parse(data.Payload).body;
                data = JSON.parse(body).data
                
                //DynamoDBから取得したデータを元にfor文を回し、注文が約定しているか確認
                for(var item of dbData){
                    //console.log(item.history.L)
                    
                    //注文一覧Lambdaから取得したデータ内にDynamoDB内にある注文IDと同じデータがあれば未約定としてスルー
                    if(data.find(d => d.parent_order_acceptance_id == item.id.S)) continue;
                    
                    //STOP注文の作成
                    var side;
                    var bias = Number(item.price.N) * STOP_PRICE_BIAS;
                    if(item.side.S == 'BUY'){
                        side = 'SELL';
                        bias = -bias;
                    }else if(item.side.S == 'SELL'){
                        side = 'BUY';
                    }

                    //OCO注文として、最低限損失を少なくするようにする
      //何度も価格を行き来した際に、limit注文で損失を無くしていき、STOP注文で機会損失をカバーする
                    var param1 = {
                        product_code: item.coin_pair.S,
                        condition_type: "STOP_LIMIT",
                        side: side,
                        size: item.size.N,
                        price: item.price.N,
                        trigger_price: item.price.N
                    };
                    var param2 = {
                        product_code: item.coin_pair.S,
                        condition_type: "STOP",
                        side: side,
                        size: item.size.N,
                        trigger_price: (Number(item.price.N) + bias).toString()
                    }
                    
                    var parameters = [param1,param2];
                    
                    console.log(parameters)
                    var queueBody = {
                        order_method: ORDER_METHOD,
                        parameters: parameters,
                        autoOrder: true
                    }
                    console.log(queueBody);

                    //注文QUEUEを送信
                    sendSQS(queueBody, queueUrl, groupId, callback);
                   
                    var dbHistory = item.history.L
                    dbHistory = dbHistory.concat(history)
                    //console.log(dbHistory)
                    
                    param1.history_data = dbHistory;
                    
                    //DynamoDBに約定履歴や現状どのポジションで注文しているかをいれる
                    putDataToDynamoDB(param1, callback);
                }
            });
        });
    });
    
    function getParameterFromSystemManager(apikey_name, callback) {
    
        return new Promise(function (resolve) {
            var apikey = process.env[apikey_name];
            
            if(!apikey || typeof apikey == undefined){
            
                // Fetches a parameter called REPO_NAME from SSM parameter store.
                // Requires a policy for SSM:GetParameter on the parameter being read.
                var params = {
                    Name: apikey_name,
                    /* required */
                    WithDecryption:true
                };
                
                ssm.getParameter(params, function(err, apikey) {
                    if (err){
                        console.error(err.stack);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: err.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                        return;
                    }
                    process.env[apikey_name] = apikey.Parameter.Value;
                    resolve(apikey.Parameter.Value);
                });
            }else resolve(apikey);
        });
    }
    
    function sendLine(message, callback){
        
        getParameterFromSystemManager('line-access-key', callback)
        .then(function(lineKey){
            
            var option = {
            url: 'https://notify-api.line.me/api/notify',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': 'Bearer ' + lineKey
                },
            method: 'POST',
            form :{
              message: message
            }
            };
        
            return sendRequest(option,callback);
        })
        .then(function(data){
          if(data.response.statusCode != 200){
            console.error(data)
          }
          callback(null, {
            statusCode: data.response.statusCode,
            body: data.body,
            headers: {"Content-type": "application/json"}
          });
        });
    }
    
    function makeArrayforDynamoDB(array){
        if(!array || array.length == 0){
            return;
        }
        var newArray = []
        for(var item of array){
            
            var newItem = {}
            Object.keys(item).forEach(function (key) {
              if(typeof(item[key])=="number") newItem[key] = {N: item[key].toString()}
              else if(typeof(item[key]=='string')) newItem[key] = {S: item[key]}
            });
            newArray.push({M: newItem});
        }
        return newArray;   
    }
    
    
    function getDatafromDynamoDB(callback){
      
      return new Promise(function (resolve) {
        var params = {
        TableName: 'stop_check_bitflyer'
      };
    
      // Call DynamoDB to add the item to the table
      ddb.scan(params, function(err, data) {
          if (err) {
            console.log("Error", err);
            callback(null, {
                statusCode: 401,
                body: JSON.stringify({message: err.toString()}),
                headers: {"Content-type": "application/json"}
              });
            resolve(null);
            return;
          }
          resolve(data);
        });
      });
    }

    function putDataToDynamoDB(params, callback){
        
        console.log(params.id)
        if(!params.id){
            params.id = '';
        }
      
        var params = {
                
            Item: {
                
                'coin_pair' : {S: params.product_code},
                'price' : {N: params.trigger_price},
                'side': {S: params.side},
                'size': {N: params.size},
                'id': {S: params.id},
                'history': {L: params.history_data}
            },
            TableName: 'stop_check_bitflyer'
          };
          
        console.log(params)
          // Call DynamoDB to add the item to the table
          ddb.putItem(params, function(err, data) {
                if (err) {
                    console.log("Error", err);
                    callback(null, {
                        statusCode: 401,
                        body: JSON.stringify({message: err.toString()}),
                        headers: {"Content-type": "application/json"}
                     });
                    
                    return;
                }
                console.log("Success");
                params = params.Item;
                
                callback(null,{
                    statusCode: 200,
                    body: JSON.stringify({message: 'PutItem Success: {coin_pair: ' + params.coin_pair.S
                                    + ' price: ' + params.price.N + ' side: ' + params.side.S +' size: ' + params.size.N
                                    + ' id: ' + params.id.S + ' history: ' + params.history.L
                                  }),
                    headers: {
                    "Content-type": "application/json"
                    }
                });
                return;
            });
    }
    
    function sendSQS(queueBody, queueUrl, groupId, callback){
        
        var id=new Date().getTime().toString();
        
        // SQS message parameters
        var params = {
            
            MessageBody: JSON.stringify(queueBody),
            MessageGroupId: groupId,
            MessageDeduplicationId: id,
            QueueUrl: queueUrl
        };
        
        
        
        console.log(params)
    
        sqs.sendMessage(params, function(err, data) {
            var responseCode = 200;
            
             // response and status of HTTP endpoint
            var responseBody = {
                message: ''
            };
        
            if (err) {
                console.log('error:', "failed to send message " + err);
                responseCode = 500;
            } else {
                console.log('data:', data.MessageId);
                responseBody.message = 'Sent to ' + queueUrl;
                responseBody.messageId = data.MessageId;
            }
    
            callback(null, {
                statusCode: responseCode,
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify(responseBody)
            });
        });
    }
    
    function callLambda(callback){
        return new Promise(function (resolve) {
            
            var params = {
                FunctionName: "注文一覧Lambdaのファンクション名",
                InvocationType: "RequestResponse",
                //Payload: payload //リクエストパラメータはないのでいらない
            }
            
            lambda.invoke(params, function(error, res){
                if(error){
                        console.error(error);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: error.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                    }
                resolve(res);
            }); 
        });          
    }
    
    
    function sendRequest(option, callback){
        
        return new Promise(function (resolve) {
            request(option, function(error, response, body){
                    
                    if(error){
                        console.error(error);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: error.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                    }
                    var data = {response, body}
                    resolve(data);
            });
        });
    }
    
};

4. 課題

DynamoDBの特徴として、RDBMSと違って、データの一貫性が保証されず、データの上書きと参照で相違が出る可能性があります。
というのも、価格が上下し、自動注文を繰り返していると、誤注文のような事象が稀に見られ、100%安全な注文とは言えません。
また、Lambdaの再起動時に環境変数が初期化することによる、約定データ大量取得にも原因があると思われます。
いずれ問題解決したり、LmabdaのリスタートやDynamoDBのデータ特徴に関して何かわかれば記事にしていこうかと思います。