【AWS】bitflyer注文受付Lambda実装

【きっかけ】

過去の記事参照
 
buffalokusojima.hatenablog.com


1. 使用するサービス

(AWS)

  • Lambda
  • IAM
  • SSM
  • SQS   

 
(外部サービス)


2.概要

HTTP通信で受けた注文をbitflyerAPIへ注文を送信するLambdaを実装します。注文までの流れとしては、クライアントから来たPOSTに対して、
中身を確認し、注文として有効であればSQSにメッセージとして載せて、別Lambdaに飛ばします。SQSをトリガーとして、メッセージを受け取ったLambdaからbitflyerAPIに対して注文を送信します。
API Gatewayとの接続はまた別でやります。

architecture
構成図

3. 実装

3-1. 注文受付Lambda作成
以下コード入力

var AWS = require('aws-sdk');
var sqs = new AWS.SQS();

exports.handler = function(event, context, callback) {
    
    // 通常注文の場合のSQSURL
    const NORMAL_QUEUE_URL = 'normal queue url';

    // 特殊注文の場合のSQSURL
    const SPECIAL_QUEUE_URL = 'special queue url';

    const body = JSON.parse(event.body);
    
    if(!body){
        console.log("body empty");
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "body empty"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    const COIN_PAIR = body.coin_pair;
    
    const PRICE = body.price;
    
    const SIDE = body.side;
    
    const SIZE = body.size;
    
    const TYPE = body.type;
    
    const PARAMETERS = body.parameters;

    const ORDER_METHOD = body.order_method;
    
    if(!TYPE && !PARAMETERS){
        console.log("invalid order:",body);
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    if(COIN_PAIR != 'FX_BTC_JPY'){
        console.log("invalid coin pair");
          callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
        return;
    }
    
    var queueBody;
    var queueUrl;
    var id=new Date().getTime().toString();
    var groupId;
    
    // 注文内容によって構成する注文を変更
    if(!PARAMETERS){
        
        if(!checkElement(body)){
            console.log("Bad body:", body);
            callback(null, {
              statusCode: 400,
              body: JSON.stringify({message: "invalid order"}),
              headers: {"Content-type": "application/json"}
            });
            return;
        }
        queueBody = {
            product_code: COIN_PAIR,
            child_order_type: TYPE,
            side: SIDE,
            price: PRICE,
            size: SIZE,
        }
        if(TYPE == 'STOP'){
            queueBody.trigger_price = queueBody.price;
            queueBody.price = 0;
        } 
        queueUrl = NORMAL_QUEUE_URL;
        
        groupId = "NORMAL_ORDER";
    }else{
        if(ORDER_METHOD != "SIMPLE" && ORDER_METHOD != "IFD"){
            console.log("bad order method:");
            callback(null, {
                statusCode: 400,
                body: JSON.stringify({message: "invalid order"}),
                headers: {"Content-type": "application/json"}
            });
            return;
        }

        for(const parameter of PARAMETERS){
            if(!checkElement(parameter)){
                console.log("Bad body:", parameter);
                callback(null, {
                  statusCode: 400,
                  body: JSON.stringify({message: "invalid order"}),
                  headers: {"Content-type": "application/json"}
                });
                return;
            }
            parameter.product_code = COIN_PAIR
            parameter.condition_type = parameter.type;
            if(parameter.condition_type == 'STOP'){
                parameter.trigger_price = parameter.price;
                parameter.price = 0;
            } 
        }
        queueBody = {
            order_method: ORDER_METHOD,
            parameters: PARAMETERS
        }
        queueUrl = SPECIAL_QUEUE_URL;
        groupId = "SPECIAL_ORDER";
    }
        

    // SQS message parameters
    var params = {
        
        MessageBody: JSON.stringify(queueBody),
        MessageGroupId: groupId,
        MessageDeduplicationId: id,
        QueueUrl: queueUrl
    };
    
    console.log(params)

    // SQS送信
    sqs.sendMessage(params, function(err, data) {
        var responseCode = 200;
        
         // response and status of HTTP endpoint
        var responseBody = {
            message: ''
        };
    
        if (err) {
            console.log('error:', "failed to send message " + err);
            responseCode = 500;
        } else {
            console.log('data:', data.MessageId);
            responseBody.message = 'Sent to ' + queueUrl;
            responseBody.messageId = data.MessageId;
        }

        callback(null, {
            statusCode: responseCode,
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(responseBody)
        });
    });
    
    
    function checkElement(element){
        if(element.type == 'MARKET' || element.type == 'LIMIT'
        || element.type == 'STOP'){
                
            if(element.side != 'BUY' && element.side != 'SELL'){
                console.log("invalid side");
                return false;
            }
            
            
            element.price = Number(element.price);
            element.size = Number(element.size);
            if(isNaN(element.price) && element.type != 'MARKET'){
                console.log("number invalid:", element.price);
                return false;
            }
            if(isNaN(element.size)){
                console.log("number invalid:", element.size);
                return false;
            }
            
            return true;
        }
        return false;
    }
}

3-2. Lambdaにssm取得ロール付与、加えてSQSFullAccessも加えます。
role_attached
ロール付与

3-3. SQS受付Lambda実装

以下コード入力

const ssm = new (require('aws-sdk/clients/ssm'))();
const request = require('request');
const crypto = require('crypto');

exports.handler = (event, context, callback) => {
    
    // SQSからメッセージ取得
    var body = JSON.parse(event.Records[0].body);
    
    if(!body){
        console.log("Queue Empty")
        callback(null, {
                statusCode: 400,
                body: JSON.stringify({message: "Queue Empty"}),
                headers: {"Content-type": "application/json"}
            });
            return;
    }
    
    
    console.log(body);
    
    if(!body.order_method || !body.parameters){
        console.log("element invalid");
        callback(null, {
            statusCode: 400,
            body: JSON.stringify({message: "element invalid"}),
            headers: {"Content-type": "application/json"}
        });
        return;
    }
    console.log(body.parameters[0].price)
    
  // メッセージの内容をbitflyerAPIへ送信します
    getParameterFromSystemManager('bitflyer-keys', callback)
    .then(function(data){
        
        
        const apikey = data.split(",")[0];
        const sercretKey = data.split(",")[1];
    
        var timestamp = Date.now().toString();
        var method = 'POST';
        var path = '/v1/me/sendparentorder';
        
        
        
        body = JSON.stringify(body);
        console.log(body)
        var text = timestamp + method + path + body;
        var sign = crypto.createHmac('sha256', sercretKey).update(text).digest('hex');
        
        var option = {
          url: 'https://api.bitflyer.com' + path,
          method: method,
          headers: {
            'ACCESS-KEY': apikey,
            'ACCESS-TIMESTAMP': timestamp,
            'ACCESS-SIGN': sign,
            'Content-Type': 'application/json'
            },
          body: body
        }
        
        return sendRequest(option, callback);
    }
    )
    .then(function(data){
        var message;
        if(data.response.statusCode == 200){
            message = data.body;
        }else{
            message = data.response;
        }
        console.log(message);
        callback(null, {
            statusCode: data.response.statusCode,
            body: JSON.stringify({message: message}),
            headers: {"Content-type": "application/json"}
        });
        return;
    });
        

    function getParameterFromSystemManager(apikey_name, callback) {
    
        return new Promise(function (resolve) {
            var apikey = process.env[apikey_name];
            
            if(!apikey || typeof apikey == undefined){
            
                // Fetches a parameter called REPO_NAME from SSM parameter store.
                // Requires a policy for SSM:GetParameter on the parameter being read.
                var params = {
                    Name: apikey_name,
                    /* required */
                    WithDecryption:true
                };
                
                ssm.getParameter(params, function(err, apikey) {
                    if (err){
                        console.error(err.stack);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: err.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                        return;
                    }
                    process.env[apikey_name] = apikey.Parameter.Value;
                    resolve(apikey.Parameter.Value);
                });
            }else resolve(apikey);
        });
    }
    
    function sendLine(message, callback){
        
        getParameterFromSystemManager('line-access-key', callback)
        .then(function(lineKey){
            
            var option = {
            url: 'https://notify-api.line.me/api/notify',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': 'Bearer ' + lineKey
                },
            method: 'POST',
            form :{
              message: message
            }
            };
        
            return sendRequest(option,callback);
        })
        .then(function(data){
          callback(null, {
            statusCode: 200,
            body: data,
            headers: {"Content-type": "application/json"}
          });
        });
    }
    
    function sendRequest(option, callback){
        
        return new Promise(function (resolve) {
            request(option, function(error, response, body){
                    
                    if(error){
                        console.error(error);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: error.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                    }
                    var data = {response, body}
                    resolve(data);
            });
            });
    }
}

3-4. Lambdaにロール付与

3-2と内容は同じで、このLambdaにもロールを付与します。

3-5. SQSの作成

今回は順序と2重注文を防ぐために、FIFOキューを使用します。コンソールからSQSを検索して移動します。
【Create New Queue】を押して作成画面にいきます。

sqs_create
キュー作成

キューの拡張子は.fifoにする必要があります。
作成されたキューを選択するとURLが表示されているので、3-1のコードのURL部分の入力してください。

3-6. 終わりに

以上で、簡単なSQSを使用した注文機能は完成です。といってもこれだけじゃ使えないので、APIゲートウェイの設定もいつか書きます。