In Part 1 of this blog we created the core code for retrieving market prices from Coinbase. In Part 2, we are going to add the ability to store the data into RDS MySQL.
Setting Up RDS
Create an RDS database using MySQL on a micro instance in the same region you created your Lambda function. (I am not going to cover that process here, but there are many excellent tutorials available on the internet).
Connect to the database using your favorite client – I prefer to use the MySQL Workbench tool.
Create a schema (database) called “aither_crypto” and create a user called “crypto” with permissions on the DB. Here is a script to do this.
drop user 'crypto'@'localhost';
create database aither_crypto;
create user 'crypto'@'%' identified by '<SET YOUR PASSWORD HERE>';
grant select, insert, update, delete, create, drop, reload, process, references, index, alter,
show databases, create temporary tables, lock tables, execute, replication slave, replication client,
create view, show view, create routine, alter routine, create user, event, trigger on *.* to `crypto`@`%` with grant option;
flush privileges;
select * from mysql.user;
Create a simple table called price_history in which you can store price data every time your Lambda function runs.
You can use the following SQL:
use aither_crypto;
drop table if exists price_history;
create table price_history
(
exchange varchar(20),
symbol varchar(10),
created timestamp default current_timestamp,
price decimal(10,2),
primary key (exchange, symbol, created)
);
All done. Now let’s get connected from Lambda.
External Libraries in Lambda Functions
One of the challenges working directly online with the Cloud9 Lambda editor is that it becomes difficult to utilize external libraries that are not included with Lambda’s Python runtime.
I really wish that Amazon would come up with a solution to this problem as it seems like a pretty straightforward thing to do and would make coding in the Cloud9 browser much easier (which I really like).
Fortunately, there is an open source project that HAS solved this problem. The KLayers project maintains many public libraries with ARNs you can use to import into your function.
The libraries are here: https://github.com/keithrozario/Klayers
The alternative to using KLayers is to create your function locally, compress everything into a .zip file, and then use the AWS CLI to deploy to Lambda. This works just fine, but adds a layer of complexity to development.
Unfortunately, for most professional level development and deployment, you will ultimately need to learn how to develop and deploy using the CLI. But I like the simplicity of the Cloud9 editor and will stick with that here.
Adding An External Library to a Lambda Function
For our function, we are going to use the KLayers PyMySQL layer, which is the same as the PyMySQL library you would normally install with pip.
Scroll to the bottom of the getPrices function.
Here is the ARN to cut/paste:
arn:aws:lambda:us-east-2:770693421928:layer:Klayers-python38-PyMySQL:4
IMPORTANT: Make sure you use the same region in the ARN that your function is in. The above link uses Ohio (us-east-2). Change the section in red to the region you are working in if it is not Ohio. Klayers is available in all regions.
Click Add and then you will see the layer attached to your function.
Using the PyMySQL Layer
Time to modify our getPrices function (which we created in Part 1) to add in the database connection.
Here is the new Python code with changes highlighted:
import json
import boto3
from urllib.request import Request, urlopen
from datetime import datetime
import pymysql
#NOTE: This is a helper function that will parse and return JSON
#from an event message body. I have found this addresses various
#scenarios where messages have slightly different formats between
#production SQS and using the Test functionality.
def parse_message_body_json(message):
message_body=message["body"]
try:
#Adjust our message body so that it parses correctly
message_body=str(message_body)
message_body=message_body.replace("'", "\"")
message_body=message_body.strip('"')
if '[' not in message_body:
if ']' not in message_body:
message_body="[%s]" % message_body.strip('"')
message_body=json.loads(str(message_body))
except Exception as error:
print(error)
print("Unable to parse JSON.")
return message_body[0]
#The main handler for this Lambda function.
def lambda_handler(event, context):
symbols = []
json_return=[]
#Setup our database connection - change the host, username, and
#password to match what you created.
db_host="<Your DB hostname here>"
db_username="crypto" #OR whatever DB username you created
db_password="<Your DB password here>"
conn = pymysql.connect(host=db_host, user=db_username, password=db_password, \
charset='utf8', db='aither_crypto', cursorclass=pymysql.cursors.DictCursor)
cur = conn.cursor()
#Verify we have event records and then parse each one.
if "Records" in event:
for message in event['Records']:
print("Processing message...; message=%s" % message)
message_body_json=parse_message_body_json(message)
print(message_body_json)
symbols.append(message_body_json["Symbol"])
print(symbols)
#Format date to send in the TIMESTAMP field
now=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#For each symbol (coin_pair) in our event message, retrieve the price.
for coin_pair in symbols:
url_spot="https://api.coinbase.com/v2/prices/%s/spot" % (coin_pair)
try:
#Get SPOT price
req = Request(url_spot)
req.add_header('Accept', 'application/json')
req.add_header('User-Agent', 'Mozilla/5.0')
with urlopen(req, timeout = 15) as response:
raw_json = response.read()
spot_details=json.loads(raw_json)
#Store the results in RDS
sql = """insert into price_history (exchange, symbol, created, price)
values ('COINBASE', '%s', '%s', %s)""" % (coin_pair, now, spot_details['data']['amount'])
print(sql)
cur.execute(sql)
conn.commit()
#Store the result into JSON. We will return this for now, but use it later.
arr_single_coin=[]
dict_single_coin={"Symbol": coin_pair, "Prices": {"SPOT":spot_details['data']['amount'], "TIMESTAMP":now}}
arr_single_coin.append(dict_single_coin)
json_return.append(arr_single_coin)
except Exception as error:
print("ERROR: Failed to retrieve prices for %s" % coin_pair)
print(error)
#Return result
return {
'statusCode': 200,
'body': json_return
}
Deploy and run Test using our BTC-USD test message (that we created in Part 1).
Once you see that the function has run and returned a result, check the database. You should see a price data point for BTC-USD.
select * from price_history;
Now, every time you run the getPrices function, the result will be stored in RDS.
Automatically Running the getPrices Function
The last thing we are going to do in Part 2 is add an EventBridge trigger to our Lambda function so that we are pulling price data every minute. We will use this incoming stream of data to trigger our trades.
In the getPrices Lambda function, click on the “Add trigger” button.
Then create a new EventBridge rule as shown below. The schedule expression we will use is “rate(1 minute)”.
Once you have added the trigger, you will see it in the function.
You will now see price data being stored every minute in your price_history table as the Lambda function is continuously run by EventBridge.
In Part 3, we will build the trade evaluation engine to check incoming price data against trade rules that we set. Happy coding!