【AWS】bitflyer 自動注文(続き)

【きっかけ】

過去の記事参照
 
buffalokusojima.hatenablog.com


1. 使用するサービス

(AWS)

  • Lambda
  • IAM
  • SSM
  • SQS 

 
(外部サービス)


2.概要

前回の続きになります。
buffalokusojima.hatenablog.com

今回は実際に自動注文を行う部分を説明します。最初に注文を手動で行い。注文IDをDynamoDBに格納し、約定したかを前回のLambdaで確認します。
約定確認のLambdaは前回作成した部分で、今回の説明範囲は最初の注文の部分です。主な機能は以前の記事に記載したbitflyerへの注文受付を参照してください。こちらをベースとしてます。
buffalokusojima.hatenablog.com


3. 実装

3-1. バックエンド Lambda側

const AWS = require('aws-sdk');
const ssm = new (require('aws-sdk/clients/ssm'))();
const request = require('request');

const sqs = new AWS.SQS();

const lambda = new AWS.Lambda();

const NORMAL_QUEUE_URL = '通常注文SQS';
const SPECIAL_QUEUE_URL = '特殊注文SQS';


AWS.config.update({region: 'ap-northeast-1'});

const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});

exports.handler = (event, context, callback) => {
    
    if(!event.body){
      console.log('No form data Found');
      callback(null, {
        statusCode: 200,
        body: JSON.stringify({message: 'No form data Found'}),
        headers: {
          "Content-type": "application/json"
        }
      });
      return;
    }
    
    var body = JSON.parse(event.body);
    
    console.log(body)
    
    const COIN_PAIR = body.coin_pair;
    
    const PRICE = body.price;
    
    const SIDE = body.side;
    
    const SIZE = body.size;
    
    const TYPE = body.type;
    
    const PARAMETERS = body.parameters;
   
    const ORDER_METHOD = body.order_method;
   
 // 追加項目 この値で自動注文を注文処理またはキャンセル処理する 
    const MODE = body.mode;
    
 // 追加項目 キャンセル処理をするに当たっての該当の注文ID
    const PARENT_ID = body.parent_id;
    
    if(!TYPE && !PARAMETERS && !PRICE){
        console.log("invalid order:",body);
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    if(COIN_PAIR != 'FX_BTC_JPY'){
        console.log("invalid coin pair");
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    if(MODE != 'insert' && MODE != 'delete'){
      console.log("invalid mode:", MODE);
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    // 自動注文送信処理 DB操作までは以前と同様です。
    if(MODE == 'insert'){
        var queueBody = {autoOrder: true}
        var queueUrl;
        var groupId;
        
        if(!PARAMETERS){
            
            if(!checkElement(body)){
                console.log("Bad body:", body);
                callback(null, {
                  statusCode: 400,
                  body: JSON.stringify({message: "invalid order"}),
                  headers: {"Content-type": "application/json"}
                });
                return;
            }
            queueBody.product_code = COIN_PAIR;
            queueBody.child_order_type = TYPE;
            queueBody.side = SIDE;
            queueBody.price = PRICE;
            queueBody.size = SIZE;
            
            if(TYPE == 'STOP'){
                queueBody.trigger_price = queueBody.price;
                queueBody.price = 0;
            } 
            queueUrl = NORMAL_QUEUE_URL;
            
            groupId = "NORMAL_ORDER";
        }else{
            if(ORDER_METHOD != "SIMPLE" && ORDER_METHOD != "IFD"){
                console.log("bad order method:");
                callback(null, {
                    statusCode: 400,
                    body: JSON.stringify({message: "invalid order"}),
                    headers: {"Content-type": "application/json"}
                });
                return;
            }
    
            for(const parameter of PARAMETERS){
                if(!checkElement(parameter)){
                    console.log("Bad body:", parameter);
                    callback(null, {
                      statusCode: 400,
                      body: JSON.stringify({message: "invalid order"}),
                      headers: {"Content-type": "application/json"}
                    });
                    return;
                }
                parameter.product_code = COIN_PAIR
                parameter.condition_type = parameter.type;
                if(parameter.condition_type == 'STOP'){
                    parameter.trigger_price = parameter.price;
                    parameter.price = 0;
                }
             }
             
             queueBody.order_method = ORDER_METHOD;
             queueBody.parameters = PARAMETERS;
             queueUrl = SPECIAL_QUEUE_URL;
             groupId = "SPECIAL_ORDER";
        }
      
   // SQSを送る前にDBでデータ取得し、操作が入る 
        getDatafromDynamoDB(callback)
        .then(function(data){
            
          data = data.Items;
            
            // 既に注文済の場合は注文をしません。
            if(data.find(d => d.coin_pair == COIN_PAIR || d.price == PRICE)){
                console.log('Data Already Exists:', data);
                callback(null, {
                  statusCode: 401,
                  body: JSON.stringify({message: 'Data Already Exists:'+ data}),
                  headers: {
                    "Content-type": "application/json"
                  }
                 });
                 return;
            }
            
   // 通常注文の場合parameterが無いのでdb挿入用のパラメータ作成
            var parameter;
            if(queueBody.parameters) parameter = queueBody.parameters[0];
            else{
              parameter = {
                  product_code: COIN_PAIR,
                  trigger_price: PRICE,
                  side: SIDE,
                  size: SIZE
              }
            }
            
            parameter.trigger_price = parameter.trigger_price.toString();
            parameter.size = parameter.size.toString();
            parameter.id ="newOrder";
            parameter.history_data = []
            
            putDataToDynamoDB(parameter, callback);
            sendSQS(queueBody, queueUrl, groupId, callback);
        });
 // 自動注文キャンセル処理
    }else if(MODE == 'delete'){
        
        if(!PARENT_ID){
          console.log("invalid parent id:", PARENT_ID);
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
        }
        
        var params = {
            product_code: COIN_PAIR,
            price: PRICE.toString()
        }
        
        // DBから該当のレコードを削除
        deleteDataFromDynamoDB(params, callback);
        
        var payload = {
            coin_pair: COIN_PAIR,
            parent_id: PARENT_ID
        };
        
        payload = JSON.stringify({body:JSON.stringify(payload)});
        
  // 注文キャンセルLambdaを呼び出してキャンセル
        callLambda(payload, callback)
        .then(function(data){
            if(data.StatusCode != 200){
                console.log("Delete Error:", data);
                callback(null, {
                  statusCode: 400,
                  body: JSON.stringify({message: "Delete failed:", data}),
                  headers: {"Content-type": "application/json"}
                });
                return;
            }
            var body = JSON.parse(data.Payload).body;
            var message = body.message;
            if(message == ''){
                message = "Delete Success"
            }
            callback(null, {
              statusCode: 200,
              body: JSON.stringify({message: message}),
              headers: {"Content-type": "application/json"}
            });
            return;
        })
        
    }else{
        console.log('wrong requestbody');
        callback(null, {
          statusCode: 401,
          body: JSON.stringify({message: 'wrong requestbody'}),
          headers: {
            "Content-type": "application/json"
          }
        });
        return;
    }
    
    function getDatafromDynamoDB(callback){
      
      return new Promise(function (resolve) {
        var params = {
        TableName: 'stop_check_bitflyer'
      };
    
      // Call DynamoDB to add the item to the table
      ddb.scan(params, function(err, data) {
          if (err) {
            console.log("Error", err);
            callback(null, {
                statusCode: 401,
                body: JSON.stringify({message: err.toString()}),
                headers: {"Content-type": "application/json"}
              });
            resolve(null);
            return;
          }
          resolve(data);
        });
      });
    }

    function putDataToDynamoDB(params, callback){
      
        var params = {
                
            Item: {
                
                'coin_pair' : {S: params.product_code},
                'price' : {N: params.trigger_price},
                'side': {S: params.side},
                'size': {N: params.size},
                'id': {S: params.id},
                'history': {L: params.history_data}
            },
            TableName: 'stop_check_bitflyer'
          };
        console.log(params)
          // Call DynamoDB to add the item to the table
          ddb.putItem(params, function(err, data) {
                if (err) {
                    console.log("Error", err);
                    callback(null, {
                        statusCode: 401,
                        body: JSON.stringify({message: err.toString()}),
                        headers: {"Content-type": "application/json"}
                     });
                    
                    return;
                }
                console.log("Success");
                params = params.Item;
                
                callback(null,{
                    statusCode: 200,
                    body: JSON.stringify({message: 'PutItem Success: {coin_pair: ' + params.coin_pair.S
                                    + ' price: ' + params.price.N + ' side: ' + params.side.S +' size: ' + params.size.N
                                    + ' id: ' + params.id.S + ' history: ' + params.history.L
                                  }),
                    headers: {
                    "Content-type": "application/json"
                    }
                });
                return;
            });
    }
    
    function deleteDataFromDynamoDB(params, callback){
        var params = {
        TableName: 'stop_check_bitflyer',
        Key: {
                'coin_pair' : {S: params.product_code},
                'price' : {N: params.price}
             }
      };
      ddb.deleteItem(params, function(err, data){
        if (err) {
          console.log("Error", err);
          callback(null, {
            statusCode: 400,
            body: JSON.stringify({message: err}),
            headers: {
              "Content-type": "application/json"
            }
          });
          return;
        }
        console.log("Success");
        
        params = params.Key;
        
        callback(null,{
          statusCode: 200,
          body: JSON.stringify({message: 'DeleteItem Success: {coin_pair: ' + params.coin_pair.S
                              + ' price: ' + params.price.N
                            }),
          headers: {
            "Content-type": "application/json"
          }
        });
        return;
      });
    }
    
    function sendSQS(queueBody, queueUrl, groupId, callback){
        
        var id=new Date().getTime().toString();
        
        // SQS message parameters
        var params = {
            
            MessageBody: JSON.stringify(queueBody),
            MessageGroupId: groupId,
            MessageDeduplicationId: id,
            QueueUrl: queueUrl
        };
        
        
        
        console.log(params)
    
        sqs.sendMessage(params, function(err, data) {
            var responseCode = 200;
            
             // response and status of HTTP endpoint
            var responseBody = {
                message: ''
            };
        
            if (err) {
                console.log('error:', "failed to send message " + err);
                responseCode = 500;
            } else {
                console.log('data:', data.MessageId);
                responseBody.message = 'Sent to ' + queueUrl;
                responseBody.messageId = data.MessageId;
            }
    
            callback(null, {
                statusCode: responseCode,
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify(responseBody)
            });
        });
    }
    
    function callLambda(payload, callback){
        return new Promise(function (resolve) {
            
            var params = {
                FunctionName: "注文キャンセルLambda名",
                InvocationType: "RequestResponse",
                Payload: payload
            }
            console.log(params);
            lambda.invoke(params, function(error, res){
                if(error){
                        console.error(error);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: error.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                    }
                resolve(res);
            }); 
        });          
    }
    
    function checkElement(element){
        if(element.type == 'MARKET' || element.type == 'LIMIT'
        || element.type == 'STOP'){
                
            if(element.side != 'BUY' && element.side != 'SELL'){
                console.log("invalid side");
                return false;
            }
            
            
            element.price = Number(element.price);
            element.size = Number(element.size);
            if(isNaN(element.price) && element.type != 'MARKET'){
                console.log("number invalid:", element.price);
                return false;
            }
            if(isNaN(element.size)){
                console.log("number invalid:", element.size);
                return false;
            }
            
            return true;
        }
        return false;
    }
    
};

3-2. フロントサイド

ほぼ以前紹介したものと同じです。

①見た目の部分

非常にシンプルにしています。

<body>
        <div class="displayOrder">
            <div id="specialOrder">
                
                <div id="stop" class="orderArea">
    
                    <div class="orderTitle">STOP</div>
                    <br>
                    <div class="detailArea">
                        <span>
                            <label for="StopOrderNumber">Num: </label>
                            <input type="text" id="StopOrderNumber"/>
                        </span>
                        <br>
                        <span>
                            <label for="StopOrderPrice">Price: </label>
                            <input type="text" id="StopOrderPrice"/>
                        </span>
                    </div>
                    <div class="decideOrder">
                        <input type="button" id="StopBuy" onClick="stopBuy()" value="BUY"/>
                        <input type="button" id="StopSell" onClick="stopSell()" value="SELL"/>
                    </div>
                </div>
            </div>
        </div>

②注文送信部分

STOP注文のみ実装してます。以前紹介したフロントサイドとほぼ同じです。
sendRequest関数に関しても以前を確認してください。

function stopBuy(){
            var num = document.getElementById("StopOrderNumber").value;
            var price = document.getElementById("StopOrderPrice").value;
            if(!num || !price){
                console.log("element empty");
                return;
            }

            var parameters = [{
                "type": "STOP",
                "size": num,
                "price": price,
                "side": "BUY"
            }];

            var body ={
                'coin_pair': 'FX_BTC_JPY',
                'parameters': parameters,
                'order_method': 'SIMPLE',
                "mode": "insert",
                "price": price
            };

            var data = {
                url: URL,
                method: 'POST',
                body: body
            }

            sendRequest(data);
        }

        function stopSell(){
            var num = document.getElementById("StopOrderNumber").value;
            var price = document.getElementById("StopOrderPrice").value;
            if(!num || !price){
                console.log("element empty");
                return;
            }

            var parameters = [{
                "type": "STOP",
                "size": num,
                "price": price,
                "side": "SELL"
            }];

            var body ={
                'coin_pair': 'FX_BTC_JPY',
                'parameters': parameters,
                'order_method': 'SIMPLE',
                "mode": "insert",
                "price": price
            };

            var data = {
                url: URL,
                method: 'POST',
                body: body
            }

            sendRequest(data);
        }

4. 終わりに

これで自動注文の大まかな実装が完了します。基本的に暴騰、暴落時の機会損失を軽減する為に存在します。
レンジ相場ではいたすらマージ消費を行ってしまいます。いつかここもレンジ相場の場合はなるべくスキャって
少しでも稼いで損失を減らすような仕組みを作ろうかと思います。

また、以前触れたDynamoDBの一貫性について、一貫性をある程度担保する方法もあるようですが、そもそもデータに不整合が起こるとしても
数秒くらいのトランザクション間になるそうです。
ですので、以前の記事の課題としてはDynamoDBが原因というよりかはLambdaまたは注文方法に原因がありそうです。
少しCloudWatch、LINE通知で見たところ、OCOの注文が原因がありそうです。
OCO注文は2つ同時に注文を出し、片方の注文が約定するともう片方の注文はキャンセルする注文です。
しかし、見たところ1秒程度で同時にどちらの注文の約定条件を満たしてしまうと、どちらも注文が約定してしまうようです。
例えば、以下のOCO注文があったとします。

STOP BUY 1200000
LIMIT BUY 1199000

現在の価格が1199500だとします。これが1秒など一瞬で1200000と1199000を行き来すると注文がどちらも約定します。

※改めて再現性があるような状態で注文して確認してみます。もしかしたらbitflyerのサーバの状態にもよるかもしれません。
恐らく、内部的にもAPI叩いて注文キャンセルを行うことでOCO注文を実現している可能性があります。