【AWS】bitbank 新規注文
1. 使用するサービス
(AWS)
- Lambda
- IAM
- SSM
(外部サービス)
- bitbank API
2. 概要
bitbank Apiを使用して、自作の注文サイトを作成します。
基本的な仕組みはbitflyer Lightning Apiで実装したものと同じでURLを叩くのが最終的なゴールです。
buffalokusojima.hatenablog.com
3. 実装
今回はバックエンド側を実装します。フロントは特にbitflyerと違いはなく、渡すパラメータが異なるのみなので省略します。
大まかな違いとしては、ヘッダの中身とパラメータくらいです。
const ssm = new (require('aws-sdk/clients/ssm'))(); const request = require('request'); const crypto = require('crypto'); exports.handler = function(event, context, callback) { var apikey; var sercretKey; var body = JSON.parse(event.body); if(!body){ console.log("body empty"); callback(null, { statusCode: 400, body: JSON.stringify({message: "body empty"}), headers: {"Content-type": "application/json"} }); return; } const COIN_PAIR = body.coin_pair; const PRICE = body.price; const SIDE = body.side; const SIZE = body.size; const TYPE = body.type; const PARAMETERS = body.parameters; const ORDER_METHOD = body.order_method; if(!TYPE && !PARAMETERS){ console.log("invalid order:",body); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } if(COIN_PAIR != 'xrp_jpy'){ console.log("invalid coin pair"); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } if(!checkElement(body)){ console.log("Bad body:", body); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } body = { pair: body.coin_pair, amount: body.size, price: body.price, side: body.side, type: body.type } getParameterFromSystemManager('bitbank-keys',callback) .then(function(data){ apikey = data.split(",")[0]; sercretKey = data.split(",")[1]; var timestamp = Date.now().toString(); var method = 'POST'; var path = '/v1/user/spot/order'; body = JSON.stringify(body); console.log(body) var text = timestamp + body; var sign = crypto.createHmac('sha256', sercretKey).update(text).digest('hex'); var option = { url: 'https://api.bitbank.cc' + path, method: method, headers: { 'ACCESS-KEY': apikey, 'ACCESS-NONCE': timestamp, 'ACCESS-SIGNATURE': sign, 'Content-Type': 'application/json' }, body: body } return sendRequest(option, callback); }).then(function(data){ data = JSON.parse(data.body) console.log(data) callback(null, { statusCode: 200, body: JSON.stringify({message: data.data}), headers: {"Content-type": "application/json"} }); return; }); function checkElement(element){ if(element.type == 'market' || element.type == 'limit' || element.type == 'stop'){ if(element.side != 'buy' && element.side != 'sell'){ console.log("invalid side"); return false; } element.price = Number(element.price); element.size = Number(element.size); if(isNaN(element.price) && element.type != 'market'){ console.log("number invalid:", element.price); return false; } if(isNaN(element.size)){ console.log("number invalid:", element.size); return false; } return true; } return false; } function getParameterFromSystemManager(apikey_name, callback) { return new Promise(function (resolve) { var apikey = process.env[apikey_name]; if(!apikey || typeof apikey == undefined){ // Fetches a parameter called REPO_NAME from SSM parameter store. // Requires a policy for SSM:GetParameter on the parameter being read. var params = { Name: apikey_name, /* required */ WithDecryption:true }; ssm.getParameter(params, function(err, apikey) { if (err){ console.error(err.stack); callback(null,{ statusCode: 500, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } process.env[apikey_name] = apikey.Parameter.Value; resolve(apikey.Parameter.Value); }); }else resolve(apikey); }); } function sendRequest(option, callback){ return new Promise(function (resolve) { request(option, function(error, response, body){ if(error){ console.error(error); callback(null,{ statusCode: 500, body: JSON.stringify({message: error.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); } var data = {response, body} resolve(data); }); }); } }
4. おわりに
AWS開発再開しているのでまた何か発見があれば載せます。
【AWS】Cognito でLambdaアクセス認可(Cors対応)
1. 使用するサービス
(AWS)
- Lambda
- Cognito
2. 概要
ApiGatewayの認可機能の1つとしてCognitoを選択できます。その設定の仕方とcorsを適用している場合のレスポンスの留意点を説明します。
前提としてCognitoで既にユーザプールは作成済とします。
3. 実装
3-1. Authorizer作成
ApiGatewayから認可機能をつけるApiを選択し、Authroizer項目を選択します。
Cognito User Poolには事前に作成したUser Poolが選択できるはずなので、それを選んでください。
3-2. メソッドに対して認証をつける
認可機能を付与したいメソッドのリクエストを選択し、Authorizationに作成したAuthorizerを選択します。
3-3. 認可確認
以上の設定で、クライアントの通信するヘッダにAuthorizationを付与し、値に事前にCognitoへの認証で得たidTokenを入れます。
認可が成功すると、ApiGatewayで設定した通りにレスポンスが返ってきます。主にCorsの部分が気になると思います。
認可が正しくできた場合はCorsが上手く機能しますが、認可が失敗した場合は恐らく403のCorsエラーが出るかと思います。
よく見るエラーかと思います。
原因としては認可失敗の場合、Access-Control-Allow-Origin Headerが無いからである。
これにより、認可失敗の場合でも正しい形でレスポンスを遅れます。これが無いとクライアント側は何故エラーがあったのか(Unauthorized)がわかりません。
【AWS】bitflyer 自動注文(続き)
【きっかけ】
過去の記事参照
buffalokusojima.hatenablog.com
1. 使用するサービス
(AWS)
- Lambda
- IAM
- SSM
- SQS
(外部サービス)
2.概要
前回の続きになります。
buffalokusojima.hatenablog.com
今回は実際に自動注文を行う部分を説明します。最初に注文を手動で行い。注文IDをDynamoDBに格納し、約定したかを前回のLambdaで確認します。
約定確認のLambdaは前回作成した部分で、今回の説明範囲は最初の注文の部分です。主な機能は以前の記事に記載したbitflyerへの注文受付を参照してください。こちらをベースとしてます。
buffalokusojima.hatenablog.com
3. 実装
3-1. バックエンド Lambda側
const AWS = require('aws-sdk'); const ssm = new (require('aws-sdk/clients/ssm'))(); const request = require('request'); const sqs = new AWS.SQS(); const lambda = new AWS.Lambda(); const NORMAL_QUEUE_URL = '通常注文SQS'; const SPECIAL_QUEUE_URL = '特殊注文SQS'; AWS.config.update({region: 'ap-northeast-1'}); const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'}); exports.handler = (event, context, callback) => { if(!event.body){ console.log('No form data Found'); callback(null, { statusCode: 200, body: JSON.stringify({message: 'No form data Found'}), headers: { "Content-type": "application/json" } }); return; } var body = JSON.parse(event.body); console.log(body) const COIN_PAIR = body.coin_pair; const PRICE = body.price; const SIDE = body.side; const SIZE = body.size; const TYPE = body.type; const PARAMETERS = body.parameters; const ORDER_METHOD = body.order_method; // 追加項目 この値で自動注文を注文処理またはキャンセル処理する const MODE = body.mode; // 追加項目 キャンセル処理をするに当たっての該当の注文ID const PARENT_ID = body.parent_id; if(!TYPE && !PARAMETERS && !PRICE){ console.log("invalid order:",body); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } if(COIN_PAIR != 'FX_BTC_JPY'){ console.log("invalid coin pair"); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } if(MODE != 'insert' && MODE != 'delete'){ console.log("invalid mode:", MODE); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } // 自動注文送信処理 DB操作までは以前と同様です。 if(MODE == 'insert'){ var queueBody = {autoOrder: true} var queueUrl; var groupId; if(!PARAMETERS){ if(!checkElement(body)){ console.log("Bad body:", body); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } queueBody.product_code = COIN_PAIR; queueBody.child_order_type = TYPE; queueBody.side = SIDE; queueBody.price = PRICE; queueBody.size = SIZE; if(TYPE == 'STOP'){ queueBody.trigger_price = queueBody.price; queueBody.price = 0; } queueUrl = NORMAL_QUEUE_URL; groupId = "NORMAL_ORDER"; }else{ if(ORDER_METHOD != "SIMPLE" && ORDER_METHOD != "IFD"){ console.log("bad order method:"); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } for(const parameter of PARAMETERS){ if(!checkElement(parameter)){ console.log("Bad body:", parameter); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } parameter.product_code = COIN_PAIR parameter.condition_type = parameter.type; if(parameter.condition_type == 'STOP'){ parameter.trigger_price = parameter.price; parameter.price = 0; } } queueBody.order_method = ORDER_METHOD; queueBody.parameters = PARAMETERS; queueUrl = SPECIAL_QUEUE_URL; groupId = "SPECIAL_ORDER"; } // SQSを送る前にDBでデータ取得し、操作が入る getDatafromDynamoDB(callback) .then(function(data){ data = data.Items; // 既に注文済の場合は注文をしません。 if(data.find(d => d.coin_pair == COIN_PAIR || d.price == PRICE)){ console.log('Data Already Exists:', data); callback(null, { statusCode: 401, body: JSON.stringify({message: 'Data Already Exists:'+ data}), headers: { "Content-type": "application/json" } }); return; } // 通常注文の場合parameterが無いのでdb挿入用のパラメータ作成 var parameter; if(queueBody.parameters) parameter = queueBody.parameters[0]; else{ parameter = { product_code: COIN_PAIR, trigger_price: PRICE, side: SIDE, size: SIZE } } parameter.trigger_price = parameter.trigger_price.toString(); parameter.size = parameter.size.toString(); parameter.id ="newOrder"; parameter.history_data = [] putDataToDynamoDB(parameter, callback); sendSQS(queueBody, queueUrl, groupId, callback); }); // 自動注文キャンセル処理 }else if(MODE == 'delete'){ if(!PARENT_ID){ console.log("invalid parent id:", PARENT_ID); callback(null, { statusCode: 400, body: JSON.stringify({message: "invalid order"}), headers: {"Content-type": "application/json"} }); return; } var params = { product_code: COIN_PAIR, price: PRICE.toString() } // DBから該当のレコードを削除 deleteDataFromDynamoDB(params, callback); var payload = { coin_pair: COIN_PAIR, parent_id: PARENT_ID }; payload = JSON.stringify({body:JSON.stringify(payload)}); // 注文キャンセルLambdaを呼び出してキャンセル callLambda(payload, callback) .then(function(data){ if(data.StatusCode != 200){ console.log("Delete Error:", data); callback(null, { statusCode: 400, body: JSON.stringify({message: "Delete failed:", data}), headers: {"Content-type": "application/json"} }); return; } var body = JSON.parse(data.Payload).body; var message = body.message; if(message == ''){ message = "Delete Success" } callback(null, { statusCode: 200, body: JSON.stringify({message: message}), headers: {"Content-type": "application/json"} }); return; }) }else{ console.log('wrong requestbody'); callback(null, { statusCode: 401, body: JSON.stringify({message: 'wrong requestbody'}), headers: { "Content-type": "application/json" } }); return; } function getDatafromDynamoDB(callback){ return new Promise(function (resolve) { var params = { TableName: 'stop_check_bitflyer' }; // Call DynamoDB to add the item to the table ddb.scan(params, function(err, data) { if (err) { console.log("Error", err); callback(null, { statusCode: 401, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } resolve(data); }); }); } function putDataToDynamoDB(params, callback){ var params = { Item: { 'coin_pair' : {S: params.product_code}, 'price' : {N: params.trigger_price}, 'side': {S: params.side}, 'size': {N: params.size}, 'id': {S: params.id}, 'history': {L: params.history_data} }, TableName: 'stop_check_bitflyer' }; console.log(params) // Call DynamoDB to add the item to the table ddb.putItem(params, function(err, data) { if (err) { console.log("Error", err); callback(null, { statusCode: 401, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); return; } console.log("Success"); params = params.Item; callback(null,{ statusCode: 200, body: JSON.stringify({message: 'PutItem Success: {coin_pair: ' + params.coin_pair.S + ' price: ' + params.price.N + ' side: ' + params.side.S +' size: ' + params.size.N + ' id: ' + params.id.S + ' history: ' + params.history.L }), headers: { "Content-type": "application/json" } }); return; }); } function deleteDataFromDynamoDB(params, callback){ var params = { TableName: 'stop_check_bitflyer', Key: { 'coin_pair' : {S: params.product_code}, 'price' : {N: params.price} } }; ddb.deleteItem(params, function(err, data){ if (err) { console.log("Error", err); callback(null, { statusCode: 400, body: JSON.stringify({message: err}), headers: { "Content-type": "application/json" } }); return; } console.log("Success"); params = params.Key; callback(null,{ statusCode: 200, body: JSON.stringify({message: 'DeleteItem Success: {coin_pair: ' + params.coin_pair.S + ' price: ' + params.price.N }), headers: { "Content-type": "application/json" } }); return; }); } function sendSQS(queueBody, queueUrl, groupId, callback){ var id=new Date().getTime().toString(); // SQS message parameters var params = { MessageBody: JSON.stringify(queueBody), MessageGroupId: groupId, MessageDeduplicationId: id, QueueUrl: queueUrl }; console.log(params) sqs.sendMessage(params, function(err, data) { var responseCode = 200; // response and status of HTTP endpoint var responseBody = { message: '' }; if (err) { console.log('error:', "failed to send message " + err); responseCode = 500; } else { console.log('data:', data.MessageId); responseBody.message = 'Sent to ' + queueUrl; responseBody.messageId = data.MessageId; } callback(null, { statusCode: responseCode, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(responseBody) }); }); } function callLambda(payload, callback){ return new Promise(function (resolve) { var params = { FunctionName: "注文キャンセルLambda名", InvocationType: "RequestResponse", Payload: payload } console.log(params); lambda.invoke(params, function(error, res){ if(error){ console.error(error); callback(null,{ statusCode: 500, body: JSON.stringify({message: error.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); } resolve(res); }); }); } function checkElement(element){ if(element.type == 'MARKET' || element.type == 'LIMIT' || element.type == 'STOP'){ if(element.side != 'BUY' && element.side != 'SELL'){ console.log("invalid side"); return false; } element.price = Number(element.price); element.size = Number(element.size); if(isNaN(element.price) && element.type != 'MARKET'){ console.log("number invalid:", element.price); return false; } if(isNaN(element.size)){ console.log("number invalid:", element.size); return false; } return true; } return false; } };
3-2. フロントサイド
ほぼ以前紹介したものと同じです。
①見た目の部分
非常にシンプルにしています。
<body> <div class="displayOrder"> <div id="specialOrder"> <div id="stop" class="orderArea"> <div class="orderTitle">STOP</div> <br> <div class="detailArea"> <span> <label for="StopOrderNumber">Num: </label> <input type="text" id="StopOrderNumber"/> </span> <br> <span> <label for="StopOrderPrice">Price: </label> <input type="text" id="StopOrderPrice"/> </span> </div> <div class="decideOrder"> <input type="button" id="StopBuy" onClick="stopBuy()" value="BUY"/> <input type="button" id="StopSell" onClick="stopSell()" value="SELL"/> </div> </div> </div> </div>
②注文送信部分
STOP注文のみ実装してます。以前紹介したフロントサイドとほぼ同じです。
sendRequest関数に関しても以前を確認してください。
function stopBuy(){ var num = document.getElementById("StopOrderNumber").value; var price = document.getElementById("StopOrderPrice").value; if(!num || !price){ console.log("element empty"); return; } var parameters = [{ "type": "STOP", "size": num, "price": price, "side": "BUY" }]; var body ={ 'coin_pair': 'FX_BTC_JPY', 'parameters': parameters, 'order_method': 'SIMPLE', "mode": "insert", "price": price }; var data = { url: URL, method: 'POST', body: body } sendRequest(data); } function stopSell(){ var num = document.getElementById("StopOrderNumber").value; var price = document.getElementById("StopOrderPrice").value; if(!num || !price){ console.log("element empty"); return; } var parameters = [{ "type": "STOP", "size": num, "price": price, "side": "SELL" }]; var body ={ 'coin_pair': 'FX_BTC_JPY', 'parameters': parameters, 'order_method': 'SIMPLE', "mode": "insert", "price": price }; var data = { url: URL, method: 'POST', body: body } sendRequest(data); }
4. 終わりに
これで自動注文の大まかな実装が完了します。基本的に暴騰、暴落時の機会損失を軽減する為に存在します。
レンジ相場ではいたすらマージ消費を行ってしまいます。いつかここもレンジ相場の場合はなるべくスキャって
少しでも稼いで損失を減らすような仕組みを作ろうかと思います。
また、以前触れたDynamoDBの一貫性について、一貫性をある程度担保する方法もあるようですが、そもそもデータに不整合が起こるとしても
数秒くらいのトランザクション間になるそうです。
ですので、以前の記事の課題としてはDynamoDBが原因というよりかはLambdaまたは注文方法に原因がありそうです。
少しCloudWatch、LINE通知で見たところ、OCOの注文が原因がありそうです。
OCO注文は2つ同時に注文を出し、片方の注文が約定するともう片方の注文はキャンセルする注文です。
しかし、見たところ1秒程度で同時にどちらの注文の約定条件を満たしてしまうと、どちらも注文が約定してしまうようです。
例えば、以下のOCO注文があったとします。
STOP BUY 1200000
LIMIT BUY 1199000
現在の価格が1199500だとします。これが1秒など一瞬で1200000と1199000を行き来すると注文がどちらも約定します。
※改めて再現性があるような状態で注文して確認してみます。もしかしたらbitflyerのサーバの状態にもよるかもしれません。
恐らく、内部的にもAPI叩いて注文キャンセルを行うことでOCO注文を実現している可能性があります。
【AWS】bitflyer 自動注文
【きっかけ】
過去の記事参照
buffalokusojima.hatenablog.com
1. 使用するサービス
(AWS)
- Lambda
- DynamoDB
- IAM
- SSM
(外部サービス)
2.概要
実際に注文を行うLambdaの実装に関しては過去記事参照。また、ベースとなるLambdaである約定確認Lambdaに関しても過去記事参照。
途中で呼び出すLambdaとして注文一覧取得Lambdaも必要です。
buffalokusojima.hatenablog.com
buffalokusojima.hatenablog.com
buffalokusojima.hatenablog.com
仕組みとしては、約定通知を受けた際に、事前に用意したDynamoDB内で挿入した注文がなくなってあれば、約定されたと認識し、約定をした注文とは
逆のSTOP注文をします。それにより、価格がある程度、下がるまたは上がった際に損切りを自動かつ無限に行ってくれる仕組みになります。
つまりは、価格が暴騰、暴落した際に自身で決めた価格で確実に購入、売却はされる状態にあるということです。機会損失をなくすことに注力した実装になります。
3. 実装
3-1. DynamoDBの設定
stop_check_bitflyerという名でテーブルを作成しています。
主な設定としては以下になります。
・Primary partition key ・・・ coin_pair (String)
・Primary sort key ・・・ price (Number)
NOSQLなのでややこしいですが、主キーと考えていただいて結構です。基本的に同じ価格のItemは存在しない。
挿入しようとすると上書きになります。
また、現状あるデータを参考として、載せると以下のようになります。
その他カラムに関してはプログラム内で追加しているので、そちらで説明します。事前いカラムを決めなくてもいいのがDynamoDBの特徴になります。
3-2. Lambda実装
ベースは過去記事で載せた約定通知Lambdaを改造したものになります。
以下、コード。
const AWS = require('aws-sdk'); const ssm = new (require('aws-sdk/clients/ssm'))(); const lambda = new AWS.Lambda(); const request = require('request'); const crypto = require('crypto'); const momentTimezone = require('moment-timezone'); const sqs = new AWS.SQS(); const ORDER_METHOD = 'OCO'; //const CONDITION_TYPE = 'STOP'; const STOP_PRICE_BIAS = 0.003; //STOP価格のバイアス const SPECIAL_QUEUE_URL = 'QUEUEのURL'; const queueUrl = SPECIAL_QUEUE_URL; const groupId = "SPECIAL_ORDER"; AWS.config.update({region: 'ap-northeast-1'}); const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'}); exports.handler = (event, context, callback) => { const id = process.env['id']; console.log('id: ' + id); getParameterFromSystemManager('bitflyer-keys',callback) .then(function(data){ const apikey = data.split(",")[0]; const sercretKey = data.split(",")[1]; var timestamp = Date.now().toString(); var method = 'GET'; var path = '/v1/me/getchildorders?product_code=FX_BTC_JPY&child_order_state=COMPLETED'; var afterID = '&after=' if(id){ path += afterID + id; } var text = timestamp + method + path; var sign = crypto.createHmac('sha256', sercretKey).update(text).digest('hex'); var option = { url: 'https://api.bitflyer.jp' + path, method: method, headers: { 'ACCESS-KEY': apikey, 'ACCESS-TIMESTAMP': timestamp, 'ACCESS-SIGN': sign, 'Content-Type': 'application/json' } } return sendRequest(option, callback); }).then(function(data){ if(data.response.statusCode != 200){ console.error("Error:",data.response); callback(null, { statusCode: data.response.statusCode, body: JSON.stringify({message: data.response}), headers: {"Content-type": "application/json"} }); return; } data = JSON.parse(data.body); if(data.length == 0){ console.log('No data Found'); callback(null,{ statusCode: 200, body: JSON.stringify({message: 'No data Found'}), headers: {"Content-type": "application/json"} }); return; } console.log(data) const dateTimeUtc = momentTimezone.tz(data[0].child_order_date.split(" ")[0], 'UTC'); const dateTimeJst = momentTimezone(dateTimeUtc).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss'); const date = new Date(dateTimeJst); const toDay = new Date(momentTimezone(new Date()).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss')); if(id == null && toDay.getTime() - date.getTime() > 180000){ process.env['id'] = data[0].id; console.log('Lambda Restarted'); callback(null,{ statusCode: 500, body: JSON.stringify({message: 'Lambda Restarted'}), headers: {"Content-type": "application/json"} }); return; } process.env['id'] = data[0].id; var message = '\n'; data.forEach(function(value){ const dateTimeUtc = momentTimezone.tz(value.child_order_date.split(" ")[0], 'UTC'); const dateTimeJst = momentTimezone(dateTimeUtc).tz('Asia/Tokyo').format('YYYY/MM/DD HH:mm:ss'); message += "[" + value.side + ": " + value.child_order_type + "]\n" + "Date " + dateTimeJst + "\n" + "price " + value.price + "\n" + "size: " + value.size + "\n" + "average price: " + value.average_price + "\n" + "executed size: " + value.executed_size + "\n" + "----------------------\n"; }); console.log(message); sendLine(message,callback); //ここから追加 //約定のデータをDynamoDB用に加工します。 var history = makeArrayforDynamoDB(data); //DynamoDBから自動注文のデータを取得して処理 getDatafromDynamoDB(callback).then(function(data){ if(data.Items.length == 0){ console.log("price check data not set"); callback(null, { statusCode: 200, body: JSON.stringify({message: "price check data not set"}), headers: {"Content-type": "application/json"} }); return; } const dbData = data.Items; //注文一覧Lambdaを呼び出す callLambda(callback).then(function(data){ var body = JSON.parse(data.Payload).body; data = JSON.parse(body).data //DynamoDBから取得したデータを元にfor文を回し、注文が約定しているか確認 for(var item of dbData){ //console.log(item.history.L) //注文一覧Lambdaから取得したデータ内にDynamoDB内にある注文IDと同じデータがあれば未約定としてスルー if(data.find(d => d.parent_order_acceptance_id == item.id.S)) continue; //STOP注文の作成 var side; var bias = Number(item.price.N) * STOP_PRICE_BIAS; if(item.side.S == 'BUY'){ side = 'SELL'; bias = -bias; }else if(item.side.S == 'SELL'){ side = 'BUY'; } //OCO注文として、最低限損失を少なくするようにする //何度も価格を行き来した際に、limit注文で損失を無くしていき、STOP注文で機会損失をカバーする var param1 = { product_code: item.coin_pair.S, condition_type: "STOP_LIMIT", side: side, size: item.size.N, price: item.price.N, trigger_price: item.price.N }; var param2 = { product_code: item.coin_pair.S, condition_type: "STOP", side: side, size: item.size.N, trigger_price: (Number(item.price.N) + bias).toString() } var parameters = [param1,param2]; console.log(parameters) var queueBody = { order_method: ORDER_METHOD, parameters: parameters, autoOrder: true } console.log(queueBody); //注文QUEUEを送信 sendSQS(queueBody, queueUrl, groupId, callback); var dbHistory = item.history.L dbHistory = dbHistory.concat(history) //console.log(dbHistory) param1.history_data = dbHistory; //DynamoDBに約定履歴や現状どのポジションで注文しているかをいれる putDataToDynamoDB(param1, callback); } }); }); }); function getParameterFromSystemManager(apikey_name, callback) { return new Promise(function (resolve) { var apikey = process.env[apikey_name]; if(!apikey || typeof apikey == undefined){ // Fetches a parameter called REPO_NAME from SSM parameter store. // Requires a policy for SSM:GetParameter on the parameter being read. var params = { Name: apikey_name, /* required */ WithDecryption:true }; ssm.getParameter(params, function(err, apikey) { if (err){ console.error(err.stack); callback(null,{ statusCode: 500, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } process.env[apikey_name] = apikey.Parameter.Value; resolve(apikey.Parameter.Value); }); }else resolve(apikey); }); } function sendLine(message, callback){ getParameterFromSystemManager('line-access-key', callback) .then(function(lineKey){ var option = { url: 'https://notify-api.line.me/api/notify', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + lineKey }, method: 'POST', form :{ message: message } }; return sendRequest(option,callback); }) .then(function(data){ if(data.response.statusCode != 200){ console.error(data) } callback(null, { statusCode: data.response.statusCode, body: data.body, headers: {"Content-type": "application/json"} }); }); } function makeArrayforDynamoDB(array){ if(!array || array.length == 0){ return; } var newArray = [] for(var item of array){ var newItem = {} Object.keys(item).forEach(function (key) { if(typeof(item[key])=="number") newItem[key] = {N: item[key].toString()} else if(typeof(item[key]=='string')) newItem[key] = {S: item[key]} }); newArray.push({M: newItem}); } return newArray; } function getDatafromDynamoDB(callback){ return new Promise(function (resolve) { var params = { TableName: 'stop_check_bitflyer' }; // Call DynamoDB to add the item to the table ddb.scan(params, function(err, data) { if (err) { console.log("Error", err); callback(null, { statusCode: 401, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); return; } resolve(data); }); }); } function putDataToDynamoDB(params, callback){ console.log(params.id) if(!params.id){ params.id = ''; } var params = { Item: { 'coin_pair' : {S: params.product_code}, 'price' : {N: params.trigger_price}, 'side': {S: params.side}, 'size': {N: params.size}, 'id': {S: params.id}, 'history': {L: params.history_data} }, TableName: 'stop_check_bitflyer' }; console.log(params) // Call DynamoDB to add the item to the table ddb.putItem(params, function(err, data) { if (err) { console.log("Error", err); callback(null, { statusCode: 401, body: JSON.stringify({message: err.toString()}), headers: {"Content-type": "application/json"} }); return; } console.log("Success"); params = params.Item; callback(null,{ statusCode: 200, body: JSON.stringify({message: 'PutItem Success: {coin_pair: ' + params.coin_pair.S + ' price: ' + params.price.N + ' side: ' + params.side.S +' size: ' + params.size.N + ' id: ' + params.id.S + ' history: ' + params.history.L }), headers: { "Content-type": "application/json" } }); return; }); } function sendSQS(queueBody, queueUrl, groupId, callback){ var id=new Date().getTime().toString(); // SQS message parameters var params = { MessageBody: JSON.stringify(queueBody), MessageGroupId: groupId, MessageDeduplicationId: id, QueueUrl: queueUrl }; console.log(params) sqs.sendMessage(params, function(err, data) { var responseCode = 200; // response and status of HTTP endpoint var responseBody = { message: '' }; if (err) { console.log('error:', "failed to send message " + err); responseCode = 500; } else { console.log('data:', data.MessageId); responseBody.message = 'Sent to ' + queueUrl; responseBody.messageId = data.MessageId; } callback(null, { statusCode: responseCode, headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(responseBody) }); }); } function callLambda(callback){ return new Promise(function (resolve) { var params = { FunctionName: "注文一覧Lambdaのファンクション名", InvocationType: "RequestResponse", //Payload: payload //リクエストパラメータはないのでいらない } lambda.invoke(params, function(error, res){ if(error){ console.error(error); callback(null,{ statusCode: 500, body: JSON.stringify({message: error.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); } resolve(res); }); }); } function sendRequest(option, callback){ return new Promise(function (resolve) { request(option, function(error, response, body){ if(error){ console.error(error); callback(null,{ statusCode: 500, body: JSON.stringify({message: error.toString()}), headers: {"Content-type": "application/json"} }); resolve(null); } var data = {response, body} resolve(data); }); }); } };
4. 課題
DynamoDBの特徴として、RDBMSと違って、データの一貫性が保証されず、データの上書きと参照で相違が出る可能性があります。
というのも、価格が上下し、自動注文を繰り返していると、誤注文のような事象が稀に見られ、100%安全な注文とは言えません。
また、Lambdaの再起動時に環境変数が初期化することによる、約定データ大量取得にも原因があると思われます。
いずれ問題解決したり、LmabdaのリスタートやDynamoDBのデータ特徴に関して何かわかれば記事にしていこうかと思います。
【AWS】CloudformationでApiGatewayをデプロイ
1. 使用するサービス
(AWS)
- CloudFormation
2. 概要
前回の続きで、前回は初回のApiGatewayのデプロイを行いました。ですが、前回のままだと、ApiGatewayのリソースの更新までしか行いません。
ApiGatewayをデプロイまでを行う為にはCloudformationのApiGatewayの命名を変える必要があります。
#ApiGatewayのデプロイの設定 #ここをユニークな名前にする ApiGatewayDeployment1: Type: AWS::ApiGateway::Deployment Properties: RestApiId: Ref: RestApi Description: "apigateway deployment" #パラメータのステージ名を使用するよう設定 StageName: !Ref Stage
参考:
medium.com
【AWS】 CloudformationでAPI Gateway Lamda環境構築
1. 使用するサービス
(AWS)
- CloudFormation
2. 概要
前提として、S3に静的コンテンツがある前提です。今回の説明ではCloudformationのテンプレートでApiGatewayとLamdaのRestfulな環境をデプロイまでするとこまでを説明します。
以下、テンプレートのソースを載せます。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Outputs the status #パラメータ設定 #Resources内で使用するパラメータを事前にこちらで定義することができます。 #Deploy前にCloudDeploy内で設定した値が入ります。 Parameters: Stage: Default: dev Type: String AllowedValues: [dev, test] #リソース設定 #実際に利用するサービスの設定を書きます。今回はApiGatewayにLambdaを直接書き込んで設定する方式をとります。 #別でMethodとして切り分けることも可能です。 Resources: #RestfulなApigatewayの設定 RestApi: Type: AWS::ApiGateway::RestApi Properties: Body: info: version: '1.0' title: !Ref 'AWS::StackName' #以下にパスの設定を記述していきます。例ではAPIのURL/helloを叩いた時の挙動を記述してます。 #基本的にコピペで大丈夫かと思います。CORSにも対応してます。 #パスを追加する際に変更する箇所にコメント入れておきます。 paths: /hello: #メソッドの種類になります。ここ以下は全てPOSTで大丈夫です。統合関係はPOSTだからだそうです。。。 get: responses: '200': description: "200 response" schema: $ref: "#/definitions/Empty" headers: Access-Control-Allow-Origin: type: "string" x-amazon-apigateway-integration: httpMethod: POST type: aws #実際にデプロイするURIのarnを記述します。変更する箇所としてはLambdaのarnになります。 uri: !Sub >- arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${HelloWorldFunction.Arn}/invocations responses: default: statusCode: "200" responseParameters: method.response.header.Access-Control-Allow-Origin: "'*'" passthroughBehavior: "when_no_match" httpMethod: "POST" type: "aws" options: x-amazon-apigateway-integration: type: mock schema: $ref: "#/definitions/Empty" requestTemplates: application/json: | { "statusCode" : 200 } responses: default: statusCode: '200' responseTemplates: application/json: | {} responseParameters: method.response.header.Access-Control-Allow-Origin: '''*''' method.response.header.Access-Control-Allow-Headers: '''Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token''' method.response.header.Access-Control-Allow-Methods: '''OPTIONS,POST''' consumes: - application/json summary: CORS support responses: '200': headers: Access-Control-Allow-Origin: type: string Access-Control-Allow-Methods: type: string Access-Control-Allow-Headers: type: string description: Default response for CORS method produces: - application/json definitions: Empty: type: object title: Empty Schema swagger: '2.0' #ApigatewayからLambdaを呼び出す設定 HelloWorldFunctionApiPermissiondev: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction Principal: apigateway.amazonaws.com #下記にある呼び出したいLambdaを設定 FunctionName: !Ref HelloWorldFunction #Lamdaの設定 HelloWorldFunction: Type: AWS::Serverless::Function Properties: Handler: index.handler Runtime: python3.8 CodeUri: ./HelloWorld #使用したいRoleのArnを記述。今回は下で設定しているRoleを設定 Role: !GetAtt HelloworldRole.Arn #Roleの設定 HelloWorldRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" #使用したいポリシーを記述 ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole MaxSessionDuration: 3600 Path: "/" #ApiGatewayのデプロイの設定 ApiGatewayDeployment: Type: AWS::ApiGateway::Deployment Properties: RestApiId: Ref: RestApi Description: "apigateway deployment" #パラメータのステージ名を使用するよう設定 StageName: !Ref Stage #デプロイのURiを設定 Outputs: RestApiUri: Export: Name: RestApiUri Value: !Sub "https://${RestApi}.execute-api.${AWS::Region}.amazonaws.com/${Stage}/"
以上で、CloudDeployでデプロイするとApiGatewayのRestfulな環境がデプロイされます。
Lambda内のコードを変更してデプロイ時でも更新されて指定された環境にデプロイされます。
【Deep Learning】Keras-yolo3のパッケージ化
Keras-yolo3の簡単なパッケージ化の例を載せておきます。
フォルダ構成は以下のようになっていて、各種フォルダからsrc内のKerasYolo3を呼び出すイメージになります。
KerasYolo3フォルダないで__init__.pyファイルを作成し、以下のように記述します。
from src.kerasYolo3.image_detection import ImageDetector
これにより、KerasYolo3をパッケージとして認識され、import KerasYolo3をした際に、image_detectionモジュールもimportされるようになります。
以下、image_detectionモジュールです。簡単なYoLO呼び出しモジュールになります。
import colorsys import os,sys from timeit import default_timer as timer import numpy as np from keras import backend as K from keras.models import load_model from keras.layers import Input from PIL import Image, ImageFont, ImageDraw """ call yolo modules from yolo folder here """ from .yolo import YOLO class ImageDetector(): def __init__(self, model_path, classes_path): try: if not os.path.isfile(model_path): print('model path does not exist') return None if not os.path.isfile(classes_path): print('class path does not exist') return None self.yolo = YOLO(model_path=model_path, classes_path=classes_path) except Exception as e: print('Open Model Error:', e) return None # detect image function def detect_image(self, img_path): try: image = Image.open(img_path) except Exception as e: print('Open Image Error:', img_path, e) return None, None else: r_image, result = self.yolo.detect_image(image) return r_image, result def close_session(self): self.yolo.close_session()
srcと同じフォルダ内で実行ファイルを作成します。以下のようにモジュール呼び出しをしてみます。
import src.kerasYolo3 as yolo imageDetect = yolo.ImageDetector("model_data/種類モデル/type-model.h5", "class/種類クラス/class.txt")
実行すると、クラスから上手くインスタンスが作成されることがありあります。