【AWS】Lambdaでbitcoinの価格通知
【きっかけ】
過去の記事参照
buffalokusojima.hatenablog.com
1. 使用するサービス
(AWS)
- Lambda
- IAM
- Cloudwatch Event
- SSM
- DynamoDB
(外部サービス)
- LINE API
2.概要
Cloudwatch Eventでbitcoinの価格を確認するLambdaを定期的に実行することで実現していきます。
一つのLambdaでbitflyerAPIとの通信とLINE通知、DynamoDBからレコードの取得を行っています。
価格は取引履歴の直近価格です。前回取得した直近価格と現在の直近価格を比較して、価格通知を行っています。
3. 実装
3-1.Lambda作成
以下コード入力
const AWS = require('aws-sdk'); const ssm = new (require('aws-sdk/clients/ssm'))(); const request = require('request'); const momentTimezone = require('moment-timezone'); AWS.config.update({region: 'ap-northeast-1'}); const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'}); exports.handler = (event, context, callback) => { // 前回取得した直近価格を保存 var PRICE_FX_BTC_JPY = process.env['FX_BTC_JPY']; console.log(PRICE_FX_BTC_JPY); var price_alert_data; // DynamoDBからレコードを取得 getDatafromDynamoDB(callback) .then(function(data){ if(data.Items.length == 0){ console.log("price check data not set"); callback(null, { statusCode: 200, body: JSON.stringify({message: "price check data not set"}), headers: {"Content-type": "application/json"} }); return; } price_alert_data = data.Items; const method = "GET" const path = "/v1/executions?product_code=FX_BTC_JPY" var option = { url: 'https://api.bitflyer.jp' + path, method: method, headers: { 'Content-Type': 'application/json' } } // bitflyerから直近取引履歴を取得 return sendRequest(option,callback); }) .then(function(data){ if(data.response.statusCode != 200){ console.error("Error:",data.response); callback(null, { statusCode: data.response.statusCode, body: JSON.stringify({message: data.response}), headers: {"Content-type": "application/json"} }); return; } var price_data = JSON.parse(data.body); if(price_data.length == 0){ console.log("price data not Found"); callback(null, { statusCode: 403, body: JSON.stringify({message: 'No Data Found'}), headers: {"Content-type": "application/json"} }); return; } console.log(price_data); if(typeof PRICE_FX_BTC_JPY == 'undefined'){ PRICE_FX_BTC_JPY = process.env['FX_BTC_JPY'] = price_data[0].price; console.log("Lambda Restarted"); callback(null,{ statusCode: 200, body: JSON.stringify({message: "Lambda Restarted"}), headers: {"Content-type": "application/json"} }); return; } // DynamoDBから取得したレコードを回し、直近価格に該当する場合LINE通知 for(const value of price_alert_data){ console.log(value); if(value.side.S == 'up' && Number(value.price.N) > PRICE_FX_BTC_JPY && Number(value.price.N) < Number(price_data[0].price)){ console.log("["+value.coin_pair.S+"] " + value.price.N + "over"); var message = '\n'; const dateTimeJst = momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss'); message += "Price Alart for " + value.side.S + "\n" + "price " + value.price.N + "\n" + dateTimeJst process.env['FX_BTC_JPY'] = price_data[0].price; console.log(message) sendLine(message, callback); return; }else if(value.side.S == 'down' && Number(value.price.N) < PRICE_FX_BTC_JPY && Number(value.price.N) > Number(price_data[0].price)){ console.log("["+value.coin_pair.S+"] " + value.price.N + "below"); var message = '\n'; const dateTimeJst = momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss'); message += "Price Alart for " + value.side.S + "\n" + "price " + value.price.N + "\n" + dateTimeJst process.env['FX_BTC_JPY'] = price_data[0].price; console.log(message) sendLine(message, callback); return; } } // 該当なしの場合は通知なし process.env['FX_BTC_JPY'] = price_data[0].price; console.log("No Alert"); callback(null,{ statusCode: 200, body: JSON.stringify({message: "No Alert"}), headers: {"Content-type": "application/json"} }); return; }); function getDatafromDynamoDB(callback){ return new Promise(function (resolve) { var params = { ExpressionAttributeValues: { ':c': {S: 'fx_btc_jpy'} }, KeyConditionExpression: 'coin_pair = :c', ProjectionExpression: 'coin_pair, price, side', TableName: 'price_check_bitflyer' }; // Call DynamoDB to add the item to the table ddb.query(params, function(err, data) { if (err) { console.log("Error", err); callback(null, { statusCode: 401, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } resolve(data); }); }); } function getParameterFromSystemManager(apikey_name, callback) { return new Promise(function (resolve) { var apikey = process.env[apikey_name]; if(!apikey || typeof apikey == undefined){ // Fetches a parameter called REPO_NAME from SSM parameter store. // Requires a policy for SSM:GetParameter on the parameter being read. var params = { Name: apikey_name, /* required */ WithDecryption:true }; ssm.getParameter(params, function(err, apikey) { if (err){ console.error(err.stack); callback(null,{ statusCode: 500, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } process.env[apikey_name] = apikey.Parameter.Value; resolve(apikey.Parameter.Value); }); }else resolve(apikey); }); } function sendLine(message, callback){ getParameterFromSystemManager('line-access-key', callback) .then(function(lineKey){ var option = { url: 'https://notify-api.line.me/api/notify', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + lineKey }, method: 'POST', form :{ message: message } }; return sendRequest(option,callback); }) .then(function(data){ if(data.response.statusCode != 200){ console.error(data) } callback(null, { statusCode: data.response.statusCode, body: data.body, headers: {"Content-type": "application/json"} }); }); } function sendRequest(option, callback){ return new Promise(function (resolve) { request(option, function(error, response, body){ if(error){ console.error(error); callback(null,{ statusCode: 500, body: JSON.stringify({message: error.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); } var data = {response, body} resolve(data); }); }); } };
3-2. Lambdaにssm取得ロール付与とssm登録
過去記事参照
3-3. DynamoDBのテーブル登録
以下のように設定
・Primary partition key ・・・ coin_pair (コインペア)
・Primary sort key ・・・ price(価格)
これら二つ合わせて一意のレコードになります。
加えて、sideというカラムを加えてupとdown判定に使います。
注意点としては同じコインペア、価格に対し、sideがupとdown両立は出来ません。(必要性が無さそうだったから)
以下参考テーブル
3-4. LambdaにdynamoDB取得ロール付与
ssmで設定したのと同様に新たにポリシーを作成してアタッチします。(リソースのテーブル名は適宜変更してください)
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "dynamodb:DescribeReservedCapacity*", "dynamodb:List*", "dynamodb:DescribeTimeToLive", "dynamodb:DescribeLimits" ], "Resource": "*" }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "dynamodb:CreateTable", "dynamodb:BatchGet*", "dynamodb:PutItem", "dynamodb:DescribeTable", "dynamodb:Delete*", "dynamodb:Get*", "dynamodb:BatchWrite*", "dynamodb:Scan", "dynamodb:Query", "dynamodb:DescribeStream", "dynamodb:Update*" ], "Resource": "arn:aws:dynamodb:*:*:table/price_check_bitflyer" } ] }
3-5. Cloudwatch EventのRoleに作成したラムダを設定
過去記事参照
3-6. 最後に
以上で設定した時間おきにBTCの価格を確認し、設定した価格以上または以下であればLINEが飛ぶようになっています。