【AWS】Lambdaでbitcoinの価格通知

【きっかけ】

過去の記事参照
 
buffalokusojima.hatenablog.com


1. 使用するサービス

(AWS)

  • Lambda
  • IAM
  • Cloudwatch Event
  • SSM   
  • DynamoDB

 
(外部サービス)

  • Bitflyer Lightning API  (今回はHTTP APIです。いずれRealTime APIでもやろうと思います)


2.概要

Cloudwatch Eventでbitcoinの価格を確認するLambdaを定期的に実行することで実現していきます。
一つのLambdaでbitflyerAPIとの通信とLINE通知、DynamoDBからレコードの取得を行っています。
価格は取引履歴の直近価格です。前回取得した直近価格と現在の直近価格を比較して、価格通知を行っています。

architecture
構成図

3. 実装

3-1.Lambda作成
以下コード入力

const AWS = require('aws-sdk');
const ssm = new (require('aws-sdk/clients/ssm'))();
const request = require('request');

const momentTimezone = require('moment-timezone');


AWS.config.update({region: 'ap-northeast-1'});

const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});

exports.handler = (event, context, callback) => {
    
    // 前回取得した直近価格を保存
    var PRICE_FX_BTC_JPY = process.env['FX_BTC_JPY'];
    console.log(PRICE_FX_BTC_JPY);
    var price_alert_data;

    // DynamoDBからレコードを取得
    getDatafromDynamoDB(callback)
    .then(function(data){
        
        if(data.Items.length == 0){
          console.log("price check data not set");
          callback(null, {
              statusCode: 200,
              body: JSON.stringify({message: "price check data not set"}),
              headers: {"Content-type": "application/json"}
            });
          return;
        }
      
        price_alert_data = data.Items;
        
        const method = "GET"
        
        const path = "/v1/executions?product_code=FX_BTC_JPY" 
         
        
        var option = {
          url: 'https://api.bitflyer.jp' + path,
          method: method,
          headers: {
            'Content-Type': 'application/json'
            }
        }
        
        // bitflyerから直近取引履歴を取得
        return sendRequest(option,callback);
    })
    .then(function(data){
        
        if(data.response.statusCode != 200){
          console.error("Error:",data.response);
          callback(null, {
            statusCode: data.response.statusCode,
            body: JSON.stringify({message: data.response}),
            headers: {"Content-type": "application/json"}
          });
          return;
        }
        
        
        var price_data = JSON.parse(data.body);
        
        if(price_data.length == 0){
          console.log("price data not Found");
          callback(null, {
            statusCode: 403,
            body: JSON.stringify({message: 'No Data Found'}),
            headers: {"Content-type": "application/json"}
          });
          return;
        }
        
        console.log(price_data);
        
        if(typeof PRICE_FX_BTC_JPY == 'undefined'){
          PRICE_FX_BTC_JPY = process.env['FX_BTC_JPY'] = price_data[0].price;
          console.log("Lambda Restarted");
          callback(null,{
            statusCode: 200,
            body: JSON.stringify({message: "Lambda Restarted"}),
            headers: {"Content-type": "application/json"}
          });
        return;
        }
          
        // DynamoDBから取得したレコードを回し、直近価格に該当する場合LINE通知
        for(const value of price_alert_data){
          console.log(value);
          if(value.side.S == 'up' && 
              Number(value.price.N) > PRICE_FX_BTC_JPY && Number(value.price.N) < Number(price_data[0].price)){
                  console.log("["+value.coin_pair.S+"] " + value.price.N + "over");
                 
                  var message = '\n';
      
                  const dateTimeJst = momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss');
                  
                  message += "Price Alart for " + value.side.S + "\n" 
                          + "price " + value.price.N + "\n"
                          + dateTimeJst
                  process.env['FX_BTC_JPY'] = price_data[0].price;
                  console.log(message)
                  sendLine(message, callback);
                  return;
          }else if(value.side.S == 'down' && 
                    Number(value.price.N) < PRICE_FX_BTC_JPY && Number(value.price.N) > Number(price_data[0].price)){
                      console.log("["+value.coin_pair.S+"] " + value.price.N + "below");
                      
                      var message = '\n';
      
                      const dateTimeJst = momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss');
                      
                      message += "Price Alart for " + value.side.S + "\n" 
                              + "price " + value.price.N + "\n"
                              + dateTimeJst
                      process.env['FX_BTC_JPY'] = price_data[0].price;
                      console.log(message)
                      sendLine(message, callback);
                      return;
          }
        }

        // 該当なしの場合は通知なし
        process.env['FX_BTC_JPY'] = price_data[0].price;
        console.log("No Alert");
        callback(null,{
          statusCode: 200,
          body: JSON.stringify({message: "No Alert"}),
          headers: {"Content-type": "application/json"}
        });
        return;
    
    });
      
    function getDatafromDynamoDB(callback){
      
      return new Promise(function (resolve) {
          var params = {
        ExpressionAttributeValues: {
          ':c': {S: 'fx_btc_jpy'}
        },
        KeyConditionExpression: 'coin_pair = :c',
        ProjectionExpression: 'coin_pair, price, side',
        TableName: 'price_check_bitflyer'
      };
    
      // Call DynamoDB to add the item to the table
      ddb.query(params, function(err, data) {
          if (err) {
            console.log("Error", err);
            callback(null, {
                statusCode: 401,
                body: JSON.stringify({message: err.toString()}),
                headers: {"Content-type": "application/json"}
              });
            resolve(null);
            return;
          }
          resolve(data);
        });
      });
    }
    
    function getParameterFromSystemManager(apikey_name, callback) {
    
        return new Promise(function (resolve) {
            var apikey = process.env[apikey_name];
            
            if(!apikey || typeof apikey == undefined){
            
                // Fetches a parameter called REPO_NAME from SSM parameter store.
                // Requires a policy for SSM:GetParameter on the parameter being read.
                var params = {
                    Name: apikey_name,
                    /* required */
                    WithDecryption:true
                };
                
                ssm.getParameter(params, function(err, apikey) {
                    if (err){
                        console.error(err.stack);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: err.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                        return;
                    }
                    process.env[apikey_name] = apikey.Parameter.Value;
                    resolve(apikey.Parameter.Value);
                });
            }else resolve(apikey);
        });
    }
    
    function sendLine(message, callback){
        
        getParameterFromSystemManager('line-access-key', callback)
        .then(function(lineKey){
            
            var option = {
            url: 'https://notify-api.line.me/api/notify',
            headers: {
                'Content-Type': 'application/json',
                'Authorization': 'Bearer ' + lineKey
                },
            method: 'POST',
            form :{
              message: message
            }
            };
        
            return sendRequest(option,callback);
        })
        .then(function(data){
          if(data.response.statusCode != 200){
            console.error(data)
          }
          callback(null, {
            statusCode: data.response.statusCode,
            body: data.body,
            headers: {"Content-type": "application/json"}
          });
        });
    }
    
    function sendRequest(option, callback){
        
        return new Promise(function (resolve) {
            request(option, function(error, response, body){
                    
                    if(error){
                        console.error(error);
                        callback(null,{
                            statusCode: 500,
                            body: JSON.stringify({message: error.toString()}),
                            headers: {"Content-type": "application/json"}
                        });
                        resolve(null);
                    }
                    var data = {response, body}
                    resolve(data);
            });
            });
    }
};

 
3-2. Lambdaにssm取得ロール付与とssm登録

過去記事参照

3-3. DynamoDBのテーブル登録

以下のように設定

・Primary partition key ・・・ coin_pair (コインペア)
・Primary sort key ・・・ price(価格)

これら二つ合わせて一意のレコードになります。
加えて、sideというカラムを加えてupとdown判定に使います。
注意点としては同じコインペア、価格に対し、sideがupとdown両立は出来ません。(必要性が無さそうだったから)
以下参考テーブル

dynamoDB
テーブル例

3-4. LambdaにdynamoDB取得ロール付与

ssmで設定したのと同様に新たにポリシーを作成してアタッチします。(リソースのテーブル名は適宜変更してください)

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeReservedCapacity*",
                "dynamodb:List*",
                "dynamodb:DescribeTimeToLive",
                "dynamodb:DescribeLimits"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:BatchGet*",
                "dynamodb:PutItem",
                "dynamodb:DescribeTable",
                "dynamodb:Delete*",
                "dynamodb:Get*",
                "dynamodb:BatchWrite*",
                "dynamodb:Scan",
                "dynamodb:Query",
                "dynamodb:DescribeStream",
                "dynamodb:Update*"
            ],
            "Resource": "arn:aws:dynamodb:*:*:table/price_check_bitflyer"
        }
    ]
}

3-5. Cloudwatch EventのRoleに作成したラムダを設定

過去記事参照

3-6. 最後に
以上で設定した時間おきにBTCの価格を確認し、設定した価格以上または以下であればLINEが飛ぶようになっています。