AWS ことはじめ その4 (DynamoDB - Part2)
前回につづいて、DynamoDB。
今回は、①Python(boto3
)でのテーブル作成、②Pythonで大量にデータをぶち込む、の2点。
boto3
でのテーブル作成
boto3
には、低レイヤーを担当するClient
、テーブルの操作を行う(?)Service Resource
なんかがあるらしい。
http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html#dynamodb
今回はClient
を使ってテーブルを作ってみる。テーブルを作ってすぐにデータを投入したりすると、まだ作成中のことがあるので、そういうときはwaiter
でテーブルが出来上がるまで待つ。
それ以外はaws
コマンドとほとんど一緒かな。あ、エラーは動的に生成しているらしいので、エラーハンドリングは少し癖がありそう。以下のStackOverFlowの通り、ClientError
とかのレスポンスの中身を見るとよいらしい。
amazon web services - Boto3, python and how to handle errors - Stack Overflow
def create_table(dynamo_client, table_name, rcu=5, wcu=5): try: dynamo_client.delete_table(TableName=table_name) waiter_noexists = dynamo_client.get_waiter('table_not_exists') waiter_noexists.wait(TableName=table_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': print('table: {0} does not exist when trying to delete it.'.format(table_name)) else: raise e try: dynamo_client.create_table( TableName='sparse-data', KeySchema=[{ 'AttributeName': 'sid', 'KeyType': 'HASH' }, { 'AttributeName': 'unixtime', 'KeyType': 'RANGE' }], AttributeDefinitions=[{ 'AttributeName': 'sid', 'AttributeType': 'S' }, { 'AttributeName': 'unixtime', 'AttributeType': 'N' }], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) waiter_exists = dynamo_client.get_waiter('table_exists') waiter_exists.wait(TableName=table_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceInUseException': print('table: {0} already exists when trying to create it.'.format(table_name)) else: raise e
データ投入
データ投入はあまり難しいことを考えず、単にdict
をput_item
すればよい。
ただ、大量にAPI Callをしそうなときは、batch_writer
を使うとよしなに取り計らってくれるもよう。Throttlingが発生して呼び出しが失敗したときのリトライなど。
あとは、float
はダメでdecimal.Decimal
にしないといけないところで少しハマった。
table_name = 'sparse-data' pbar = tqdm.tqdm(total=5*365*24*10) sparse_table = dynamo.Table(table_name) with sparse_table.batch_writer() as batch: count = 0 for data in generate_sparse_data(start_dt, duration, 600, 120): batch.put_item(Item=data) count += 1 pbar.update(count)
次回は、↑で投入したデータを使ってどのくらいレスポンスが出るのか見てみる。