AWS ことはじめ その4 (DynamoDB - Part2)
前回につづいて、DynamoDB。
今回は、①Python(boto3
)でのテーブル作成、②Pythonで大量にデータをぶち込む、の2点。
boto3
でのテーブル作成
boto3
には、低レイヤーを担当するClient
、テーブルの操作を行う(?)Service Resource
なんかがあるらしい。
http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html#dynamodb
今回はClient
を使ってテーブルを作ってみる。テーブルを作ってすぐにデータを投入したりすると、まだ作成中のことがあるので、そういうときはwaiter
でテーブルが出来上がるまで待つ。
それ以外はaws
コマンドとほとんど一緒かな。あ、エラーは動的に生成しているらしいので、エラーハンドリングは少し癖がありそう。以下のStackOverFlowの通り、ClientError
とかのレスポンスの中身を見るとよいらしい。
amazon web services - Boto3, python and how to handle errors - Stack Overflow
def create_table(dynamo_client, table_name, rcu=5, wcu=5): try: dynamo_client.delete_table(TableName=table_name) waiter_noexists = dynamo_client.get_waiter('table_not_exists') waiter_noexists.wait(TableName=table_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceNotFoundException': print('table: {0} does not exist when trying to delete it.'.format(table_name)) else: raise e try: dynamo_client.create_table( TableName='sparse-data', KeySchema=[{ 'AttributeName': 'sid', 'KeyType': 'HASH' }, { 'AttributeName': 'unixtime', 'KeyType': 'RANGE' }], AttributeDefinitions=[{ 'AttributeName': 'sid', 'AttributeType': 'S' }, { 'AttributeName': 'unixtime', 'AttributeType': 'N' }], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) waiter_exists = dynamo_client.get_waiter('table_exists') waiter_exists.wait(TableName=table_name) except ClientError as e: if e.response['Error']['Code'] == 'ResourceInUseException': print('table: {0} already exists when trying to create it.'.format(table_name)) else: raise e
データ投入
データ投入はあまり難しいことを考えず、単にdict
をput_item
すればよい。
ただ、大量にAPI Callをしそうなときは、batch_writer
を使うとよしなに取り計らってくれるもよう。Throttlingが発生して呼び出しが失敗したときのリトライなど。
あとは、float
はダメでdecimal.Decimal
にしないといけないところで少しハマった。
table_name = 'sparse-data' pbar = tqdm.tqdm(total=5*365*24*10) sparse_table = dynamo.Table(table_name) with sparse_table.batch_writer() as batch: count = 0 for data in generate_sparse_data(start_dt, duration, 600, 120): batch.put_item(Item=data) count += 1 pbar.update(count)
次回は、↑で投入したデータを使ってどのくらいレスポンスが出るのか見てみる。
AWS ことはじめ その3 (DynamoDB Part 1)
なんでDynamoDBを使いたいのか
IoTっぽい時系列データを蓄積したい的な話。データの件数が数億~数百億のオーダーになる可能性があり、どれくらい現実的なのかを調べたいため。無料枠でできる範囲で試してみる。 いい感じに検証できる環境があればいいんだけど、数週間かかりそうでSIerのつらみを感じている。
ためたいデータ
蓄積したいデータは、ある機器が生成するもので、機器自体の数は1万〜100万くらいのオーダーと思われる。
個々のデータは、幾つか種類がある。
- たまに発生するデータ。アラート情報的なやつ。機器1台・1時間あたり、10回くらい?
- 継続して発生するデータ。メトリクス的なやつ。データ自体は1秒単位だが、1分、3分とかの単位でまとまった状態で機器が発信する。
用途
集計・分析を行うことは(今のところ)想定しておらず、機器ごとに最新データや過去の特定日時のデータを見たい、というもの。基本的には一本釣りに近い形だが、ある程度の範囲でのデータ検索が発生する可能性があり、おもにそこのレスポンスを知りたい。
データ自体は何らかの形で永年で保存しておきたいが、検索を行う期間は限定できる。直近1年間のデータのみ検索可能、とか。
DynamoDBのざっくりまとめ
DynamoDBはAWSのマネージド型Key-Value Stroe。NoSQLに分類される。
- Primary Keyを指定する以外は、スキーマレス。
- Primary Keyには、HASHキーとRANGEキー(SORTキー)を指定する。RANGEキーの方は任意。
- HASHキーで物理的な格納場所が決まり、RANGEキーでソートして格納されるイメージ。
- JSON形式で表現したデータを入れたり出したりできる
- NoSQLなのであまり複雑な検索はできない。基本的にはPrimary Keyで一本釣りするイメージ。
- Secondary Indexを作ることで、ある程度検索に使えるキーを増やすこともできる。
- Primary Keyには、HASHキーとRANGEキー(SORTキー)を指定する。RANGEキーの方は任意。
- 料金に効いてくるのは、①データ量、②スループットの2つ。事前にスループットを指定しないといけないのが肝。
- スループットは、キャパシティユニット(CU; Capacity Unit)という単位。読み取り(RCU)と書き込み(WCU)の2つで管理する。
- 1RCU = 1秒間に1KBまでの読み込みを行える。結果整合性のみでよい(=多少最新の更新が漏れてもよい)場合、倍になる。
- 1WCU = 1秒間に1KBまでを書き込みできる。
CLIでのDynamoDB作成
以下3種類を作成する。
- たまに発生するデータを格納しておくテーブル
- 1秒間隔で発生するデータを格納しておくテーブル。1つの項目(レコード)に1分、3分などのデータをまとめて持っておく
- 1秒間隔で発生するデータを格納しておくテーブル。1秒単位に項目(レコード)を持っておく
公式のドキュメントを見ながら実行。そろそろ手でコマンド打つのが辛くなってくる。 https://docs.aws.amazon.com/cli/latest/reference/dynamodb/create-table.html
❯ aws dynamodb create-table \ --table-name sparse-data \ --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \ --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 { "TableDescription": { "AttributeDefinitions": [ { "AttributeName": "sid", "AttributeType": "S" }, { "AttributeName": "unixtime", "AttributeType": "N" } ], "TableName": "sparse-data", "KeySchema": [ { "AttributeName": "sid", "KeyType": "HASH" }, { "AttributeName": "unixtime", "KeyType": "RANGE" } ], "TableStatus": "CREATING", "CreationDateTime": 1526805606.531, "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 0, "ItemCount": 0, "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/sparse-data", "TableId": "***masked***" } } ❯ aws dynamodb create-table \ --table-name continuous-data-aggr \ --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \ --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 { "TableDescription": { "AttributeDefinitions": [ { "AttributeName": "sid", "AttributeType": "S" }, { "AttributeName": "unixtime", "AttributeType": "N" } ], "TableName": "continuous-data-aggr", "KeySchema": [ { "AttributeName": "sid", "KeyType": "HASH" }, { "AttributeName": "unixtime", "KeyType": "RANGE" } ], "TableStatus": "CREATING", "CreationDateTime": 1526805540.236, "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 0, "ItemCount": 0, "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/continuous-data-aggr", "TableId": "***masked***" } } aws dynamodb create-table \ --table-name continuous-data \ --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \ --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 { "TableDescription": { "AttributeDefinitions": [ { "AttributeName": "sid", "AttributeType": "S" }, { "AttributeName": "unixtime", "AttributeType": "N" } ], "TableName": "continuous-data", "KeySchema": [ { "AttributeName": "sid", "KeyType": "HASH" }, { "AttributeName": "unixtime", "KeyType": "RANGE" } ], "TableStatus": "CREATING", "CreationDateTime": 1526804024.601, "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 0, "ItemCount": 0, "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/continuous-data", "TableId": "***masked***" } } ❯ aws dynamodb list-tables { "TableNames": [ "continuous-data", "continuous-data-aggr", "sparse-data" ] }
次回は、①データの投入、②バッチ投入、③データの読み込みの時間計測
aws ことはじめ その2 (IAM - アカウント別名)
IAMでロールを作ったはいいけど、アカウントIDを覚えられなくてログインできなかったので、アカウント別名を作る。
aws> iam create-account-alias --account-alias ***masked*** aws> iam list-account-aliases { "AccountAliases": [ "***masked***" ] }
これで、ログインのときに***masked***
を入力することで、IAMでかんたんにログインできる。
aws ことはじめ その1
まずは普通にインスタンスを立ててみる
VPCを作成
aws> ec2 create-vpc --cidr-block 192.168.0.0/16 { "Vpc": { "CidrBlock": "192.168.0.0/16", "DhcpOptionsId": "dopt-338b0854", "State": "pending", "VpcId": "vpc-***masked***", "InstanceTenancy": "default", "Ipv6CidrBlockAssociationSet": [], "CidrBlockAssociationSet": [ { "AssociationId": "vpc-cidr-assoc-00b5fa50810ec9a2f", "CidrBlock": "192.168.0.0/24", "CidrBlockState": { "State": "associated" } } ], "IsDefault": false, "Tags": [] } }
subnetを作成
aws> ec2 create-subnet --vpc-id vpc-***masked*** --cidr-block 192.168.1.0/24 { "Subnet": { "AvailabilityZone": "ap-northeast-1d", "AvailableIpAddressCount": 251, "CidrBlock": "192.168.1.0/24", "DefaultForAz": false, "MapPublicIpOnLaunch": false, "State": "pending", "SubnetId": "subnet-***masked***1", "VpcId": "vpc-***masked***", "AssignIpv6AddressOnCreation": false, "Ipv6CidrBlockAssociationSet": [] } } aws> ec2 create-subnet --vpc-id vpc-***masked*** --cidr-block 192.168.2.0/24 --availability-zone ap-northeast-1a { "Subnet": { "AvailabilityZone": "ap-northeast-1a", "AvailableIpAddressCount": 251, "CidrBlock": "192.168.2.0/24", "DefaultForAz": false, "MapPublicIpOnLaunch": false, "State": "pending", "SubnetId": "subnet-***masked***2", "VpcId": "vpc-***masked***", "AssignIpv6AddressOnCreation": false, "Ipv6CidrBlockAssociationSet": [] } }
Security groupを作成
以下のような流れで作成する
create-security-group
でセキュリティグループを作成するauthorize-security-group-ingress
でとりあえず自分ちからつながるようにする- キーペアを作成する
awa> ec2 create-security-group --group-name dev_private_ec2 --description "An environment for development" --vpc-id vpc-***masked*** { "GroupId": "sg-***masked***" } aws> ec2 authorize-security-group-ingress --group-id sg-***masked*** --protocol tcp --port 22 --cidr X.X.X.X/32 aws> ec2 create-key-pair --key-name aws_default --query "KeyMaterial" --output text
EC2を起動
以下を指定する。
❯ aws ec2 run-instances \ --image-id ami-2724cf58 \ --subnet-id ***masked*** \ --security-group-ids ***masked*** \ --count 1 \ --instance-type t2.micro \ --key-name aws_default { "Groups": [], "Instances": [ { "AmiLaunchIndex": 0, "ImageId": "ami-2724cf58", "InstanceId": "***masked***", "InstanceType": "t2.micro", "KeyName": "aws_default", "LaunchTime": "2018-05-20T09:17:33.000Z", "Monitoring": { "State": "disabled" }, "Placement": { "AvailabilityZone": "ap-northeast-1d", "GroupName": "", "Tenancy": "default" }, "PrivateDnsName": "ip-192-168-1-93.ap-northeast-1.compute.internal", "PrivateIpAddress": "192.168.1.93", "ProductCodes": [], "PublicDnsName": "", "State": { "Code": 0, "Name": "pending" }, "StateTransitionReason": "", "SubnetId": "***masked***", "VpcId": "***masked***", "Architecture": "x86_64", "BlockDeviceMappings": [], "ClientToken": "", "EbsOptimized": false, "Hypervisor": "xen", "NetworkInterfaces": [ { "Attachment": { "AttachTime": "2018-05-20T09:17:33.000Z", "AttachmentId": "***masked***", "DeleteOnTermination": true, "DeviceIndex": 0, "Status": "attaching" }, "Description": "", "Groups": [ { "GroupName": "dev_private_ec2", "GroupId": "***masked***" } ], "Ipv6Addresses": [], "MacAddress": "0e:01:03:99:93:96", "NetworkInterfaceId": "***masked***", "OwnerId": "***masked***", "PrivateIpAddress": "192.168.1.93", "PrivateIpAddresses": [ { "Primary": true, "PrivateIpAddress": "192.168.1.93" } ], "SourceDestCheck": true, "Status": "in-use", "SubnetId": "***masked***", "VpcId": "***masked***" } ], "RootDeviceName": "/dev/xvda", "RootDeviceType": "ebs", "SecurityGroups": [ { "GroupName": "dev_private_ec2", "GroupId": "***masked***" } ], "SourceDestCheck": true, "StateReason": { "Code": "pending", "Message": "pending" }, "VirtualizationType": "hvm", "CpuOptions": { "CoreCount": 1, "ThreadsPerCore": 1 } } ], "OwnerId": "***masked***", "ReservationId": "r-0ec8efda16e57dfd9" } ❯ aws ec2 describe-instances { // いっぱい出てくるので省略 }
インターネットゲートウェイとの接続
❯ aws ec2 create-internet-gateway { "InternetGateway": { "Attachments": [], "InternetGatewayId": "***masked***", "Tags": [] } } ❯ aws ec2 attach-internet-gateway \ --internet-gateway-id ***masked*** \ --vpc-id ***masked*** ❯ aws ec2 allocate-address --domain vpc { "PublicIp": "X.X.X.X", "AllocationId": "***masked***", "Domain": "vpc" } ❯ aws ec2 associate-address \ --instance-id ***masked*** \ --allocation-id ***masked*** { "AssociationId": "***masked***" }
これで、SSHでつながるぞ!と思ったら、繋がらない。
→自分で作ったSubnetのルートテーブルが、インターネットに出ていけない設定になっていたからでした。
ルートテーブルの変更
❯ aws ec2 describe-subnets --output table --query "Subnets[].[SubnetId,CidrBlock]" ------------------------------------------------ | DescribeSubnets | +---------------------------+------------------+ | subnet-aaaaaaaaa | 172.31.16.0/20 | | subnet-bbbbbbbb | 172.31.0.0/20 | | subnet-0aaaaaaaaaaaaaaaaa | 192.168.1.0/24 | | subnet-0bbbbbbbbbbbbbbb | 192.168.2.0/24 | | subnet-ccccccccc | 172.31.32.0/20 | +---------------------------+------------------+ ❯ aws ec2 describe-internet-gateways { // 省略 } ❯ aws ec2 describe-route-tables { // 省略 } ❯ aws ec2 create-internet-gateway ❯ aws ec2 attach-internet-gateway \ --internet-gateway-id ***masked*** \ --vpc-id ***masked***
今日はここまで
create-vpc
でVPC作成、create-subnet
でサブネット作成。- 作成したサブネットに対して、
authorize-security-group-ingress
で許可するインバウンド方向の通信を設定 create-key-pair
でキーペアを作成する、- EC2を作成
■
When a character's codepoint is beyond U+10000
, I should use 32-bit literal.
>>> '\u1f4a9' 'Ὂ9' >>> '\U0001f4a9' '💩'
If I try to input '💩' directly in Jupyter console
on Windows cmd
, it aborts with the error:
Traceback (most recent call last): File "C:\ProgramData\Anaconda3\Scripts\jupyter-console-script.py", line 10, in <module> sys.exit(main()) File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_core\application.py", line 267, in launch_instance return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs) File "C:\ProgramData\Anaconda3\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance app.start() File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\app.py", line 155, in start self.shell.mainloop() File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 508, in mainloop self.interact() File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 492, in interact code = self.prompt_for_code() File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 440, in prompt_for_code reset_current_buffer=True) File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\interface.py", line 415, in run self.eventloop.run(self.input, self.create_eventloop_callbacks()) File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\eventloop\win32.py", line 80, in run for k in keys: File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 143, in read all_keys = list(self._get_keys(read, input_records)) File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 186, in _get_keys for key_press in self._event_to_key_presses(ev): File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 225, in _event_to_key_presses ascii_char = u_char.encode('utf-8') UnicodeEncodeError: 'utf-8' codec can't encode character '\ud83d' in position 0: surrogates not allowed
This is because ev.uChar.UnicodeChar
returns one of a surrogate pair, \ud83d
, instead of a complete character \U0001F4A9
. It depends on Windows KEY_EVENT_RECORD
structure (https://docs.microsoft.com/en-us/windows/console/key-event-record-str).
It seems _event_to_key_resses(ev)
is called for each element of a surrogate pair. So we should check the key press is one of a surrogate pair and store an element of the pair.
def _get_keys(self, read, input_records): """ Generator that yields `KeyPress` objects from the input records. """ for i in range(read.value): ir = input_records[i] # Get the right EventType from the EVENT_RECORD. # (For some reason the Windows console application 'cmder' # [http://gooseberrycreative.com/cmder/] can return '0' for # ir.EventType. -- Just ignore that.) if ir.EventType in EventTypes: ev = getattr(ir.Event, EventTypes[ir.EventType]) # Process if this is a key event. (We also have mouse, menu and # focus events.) if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: for key_press in self._event_to_key_presses(ev): yield key_press elif type(ev) == MOUSE_EVENT_RECORD: for key_press in self._handle_mouse(ev): yield key_press
test
とりあえずテストですん
#!/usr/bin/env perl use v5.16; use warnings; use Getopt::Long; my $some_option; GetOptions( "some-option|s=s" => \$some_option, ); say "Hello, world."; my %hash; my @array; for my $i (0 .. 9) { say "$i ^ 2 = ", $i ** 2; push @array, $i; $hash{$i} = $i ** 2; }