AWS ことはじめ その4 (DynamoDB - Part2)

前回につづいて、DynamoDB。

choge.hatenadiary.com

今回は、①Python(boto3)でのテーブル作成、②Pythonで大量にデータをぶち込む、の2点。

boto3でのテーブル作成

boto3には、低レイヤーを担当するClient、テーブルの操作を行う(?)Service Resourceなんかがあるらしい。

http://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html#dynamodb

今回はClientを使ってテーブルを作ってみる。テーブルを作ってすぐにデータを投入したりすると、まだ作成中のことがあるので、そういうときはwaiterでテーブルが出来上がるまで待つ。

それ以外はawsコマンドとほとんど一緒かな。あ、エラーは動的に生成しているらしいので、エラーハンドリングは少し癖がありそう。以下のStackOverFlowの通り、ClientErrorとかのレスポンスの中身を見るとよいらしい。

amazon web services - Boto3, python and how to handle errors - Stack Overflow

def create_table(dynamo_client, table_name, rcu=5, wcu=5):
    try:
        dynamo_client.delete_table(TableName=table_name)
        waiter_noexists = dynamo_client.get_waiter('table_not_exists')
        waiter_noexists.wait(TableName=table_name)
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print('table: {0} does not exist when trying to delete it.'.format(table_name))
        else:
            raise e
    try:
        dynamo_client.create_table(
                TableName='sparse-data',
                KeySchema=[{
                    'AttributeName': 'sid',
                    'KeyType': 'HASH'
                },
                {
                    'AttributeName': 'unixtime',
                    'KeyType': 'RANGE'
                }],
                AttributeDefinitions=[{
                    'AttributeName': 'sid',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'unixtime',
                    'AttributeType': 'N'
                }],
                ProvisionedThroughput={
                    'ReadCapacityUnits': 5,
                    'WriteCapacityUnits': 5
                }
        )
        waiter_exists = dynamo_client.get_waiter('table_exists')
        waiter_exists.wait(TableName=table_name)
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceInUseException':
            print('table: {0} already exists when trying to create it.'.format(table_name))
        else:
raise e

データ投入

データ投入はあまり難しいことを考えず、単にdictput_itemすればよい。

ただ、大量にAPI Callをしそうなときは、batch_writerを使うとよしなに取り計らってくれるもよう。Throttlingが発生して呼び出しが失敗したときのリトライなど。

あとは、floatはダメでdecimal.Decimalにしないといけないところで少しハマった。

    table_name = 'sparse-data'
    pbar = tqdm.tqdm(total=5*365*24*10)
    sparse_table = dynamo.Table(table_name)
    with sparse_table.batch_writer() as batch:
        count = 0
        for data in generate_sparse_data(start_dt, duration, 600, 120):
            batch.put_item(Item=data)
            count += 1
pbar.update(count)

次回は、↑で投入したデータを使ってどのくらいレスポンスが出るのか見てみる。

AWS ことはじめ その3 (DynamoDB Part 1)

なんでDynamoDBを使いたいのか

IoTっぽい時系列データを蓄積したい的な話。データの件数が数億~数百億のオーダーになる可能性があり、どれくらい現実的なのかを調べたいため。無料枠でできる範囲で試してみる。 いい感じに検証できる環境があればいいんだけど、数週間かかりそうでSIerのつらみを感じている。

ためたいデータ

蓄積したいデータは、ある機器が生成するもので、機器自体の数は1万〜100万くらいのオーダーと思われる。

個々のデータは、幾つか種類がある。

  • たまに発生するデータ。アラート情報的なやつ。機器1台・1時間あたり、10回くらい?
  • 継続して発生するデータ。メトリクス的なやつ。データ自体は1秒単位だが、1分、3分とかの単位でまとまった状態で機器が発信する。

用途

集計・分析を行うことは(今のところ)想定しておらず、機器ごとに最新データや過去の特定日時のデータを見たい、というもの。基本的には一本釣りに近い形だが、ある程度の範囲でのデータ検索が発生する可能性があり、おもにそこのレスポンスを知りたい。

データ自体は何らかの形で永年で保存しておきたいが、検索を行う期間は限定できる。直近1年間のデータのみ検索可能、とか。

DynamoDBのざっくりまとめ

DynamoDBはAWSのマネージド型Key-Value Stroe。NoSQLに分類される。

  • Primary Keyを指定する以外は、スキーマレス。
    • Primary Keyには、HASHキーとRANGEキー(SORTキー)を指定する。RANGEキーの方は任意。
      • HASHキーで物理的な格納場所が決まり、RANGEキーでソートして格納されるイメージ。
    • JSON形式で表現したデータを入れたり出したりできる
    • NoSQLなのであまり複雑な検索はできない。基本的にはPrimary Keyで一本釣りするイメージ。
      • Secondary Indexを作ることで、ある程度検索に使えるキーを増やすこともできる。
  • 料金に効いてくるのは、①データ量、②スループットの2つ。事前にスループットを指定しないといけないのが肝。
    • スループットは、キャパシティユニット(CU; Capacity Unit)という単位。読み取り(RCU)と書き込み(WCU)の2つで管理する。
    • 1RCU = 1秒間に1KBまでの読み込みを行える。結果整合性のみでよい(=多少最新の更新が漏れてもよい)場合、倍になる。
    • 1WCU = 1秒間に1KBまでを書き込みできる。

CLIでのDynamoDB作成

以下3種類を作成する。

  1. たまに発生するデータを格納しておくテーブル
  2. 1秒間隔で発生するデータを格納しておくテーブル。1つの項目(レコード)に1分、3分などのデータをまとめて持っておく
  3. 1秒間隔で発生するデータを格納しておくテーブル。1秒単位に項目(レコード)を持っておく

公式のドキュメントを見ながら実行。そろそろ手でコマンド打つのが辛くなってくる。 https://docs.aws.amazon.com/cli/latest/reference/dynamodb/create-table.html

❯ aws dynamodb create-table \
    --table-name sparse-data \
    --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \
    --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "sid",
                "AttributeType": "S"
            },
            {
                "AttributeName": "unixtime",
                "AttributeType": "N"
            }
        ],
        "TableName": "sparse-data",
        "KeySchema": [
            {
                "AttributeName": "sid",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "unixtime",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "CREATING",
        "CreationDateTime": 1526805606.531,
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/sparse-data",
        "TableId": "***masked***"
    }
}

❯ aws dynamodb create-table \
    --table-name continuous-data-aggr \
    --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \
    --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "sid",
                "AttributeType": "S"
            },
            {
                "AttributeName": "unixtime",
                "AttributeType": "N"
            }
        ],
        "TableName": "continuous-data-aggr",
        "KeySchema": [
            {
                "AttributeName": "sid",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "unixtime",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "CREATING",
        "CreationDateTime": 1526805540.236,
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/continuous-data-aggr",
        "TableId": "***masked***"
    }
}


aws dynamodb create-table \
    --table-name continuous-data \
    --attribute-definitions AttributeName=sid,AttributeType=S AttributeName=unixtime,AttributeType=N \
    --key-schema AttributeName=sid,KeyType=HASH AttributeName=unixtime,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "sid",
                "AttributeType": "S"
            },
            {
                "AttributeName": "unixtime",
                "AttributeType": "N"
            }
        ],
        "TableName": "continuous-data",
        "KeySchema": [
            {
                "AttributeName": "sid",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "unixtime",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "CREATING",
        "CreationDateTime": 1526804024.601,
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:***masked***:table/continuous-data",
        "TableId": "***masked***"
    }
}

❯ aws dynamodb list-tables
{
    "TableNames": [
        "continuous-data",
        "continuous-data-aggr",
        "sparse-data"
    ]
}

次回は、①データの投入、②バッチ投入、③データの読み込みの時間計測

aws ことはじめ その2 (IAM - アカウント別名)

IAMでロールを作ったはいいけど、アカウントIDを覚えられなくてログインできなかったので、アカウント別名を作る。

aws> iam create-account-alias --account-alias ***masked***
aws> iam list-account-aliases
{
    "AccountAliases": [
        "***masked***"
    ]
}

これで、ログインのときに***masked***を入力することで、IAMでかんたんにログインできる。

aws ことはじめ その1

  • ようやくがっつりAWSをいじることになりそうなので、無料利用枠で使い始めてみる。
  • やるならCLIっすよ、と後輩に言われたのでCLIオンリーでやってみる。

まずは普通にインスタンスを立ててみる

VPCを作成

aws> ec2 create-vpc --cidr-block 192.168.0.0/16
{
    "Vpc": {
        "CidrBlock": "192.168.0.0/16",
        "DhcpOptionsId": "dopt-338b0854",
        "State": "pending",
        "VpcId": "vpc-***masked***",
        "InstanceTenancy": "default",
        "Ipv6CidrBlockAssociationSet": [],
        "CidrBlockAssociationSet": [
            {
                "AssociationId": "vpc-cidr-assoc-00b5fa50810ec9a2f",
                "CidrBlock": "192.168.0.0/24",
                "CidrBlockState": {
                    "State": "associated"
                }
            }
        ],
        "IsDefault": false,
        "Tags": []
    }
}

subnetを作成

aws> ec2 create-subnet --vpc-id vpc-***masked*** --cidr-block 192.168.1.0/24
{
    "Subnet": {
        "AvailabilityZone": "ap-northeast-1d",
        "AvailableIpAddressCount": 251,
        "CidrBlock": "192.168.1.0/24",
        "DefaultForAz": false,
        "MapPublicIpOnLaunch": false,
        "State": "pending",
        "SubnetId": "subnet-***masked***1",
        "VpcId": "vpc-***masked***",
        "AssignIpv6AddressOnCreation": false,
        "Ipv6CidrBlockAssociationSet": []
    }
}

aws> ec2 create-subnet --vpc-id vpc-***masked*** --cidr-block 192.168.2.0/24 --availability-zone ap-northeast-1a
{
    "Subnet": {
        "AvailabilityZone": "ap-northeast-1a",
        "AvailableIpAddressCount": 251,
        "CidrBlock": "192.168.2.0/24",
        "DefaultForAz": false,
        "MapPublicIpOnLaunch": false,
        "State": "pending",
        "SubnetId": "subnet-***masked***2",
        "VpcId": "vpc-***masked***",
        "AssignIpv6AddressOnCreation": false,
        "Ipv6CidrBlockAssociationSet": []
    }
}

Security groupを作成

以下のような流れで作成する

  1. create-security-groupでセキュリティグループを作成する
  2. authorize-security-group-ingressでとりあえず自分ちからつながるようにする
  3. キーペアを作成する
awa> ec2 create-security-group --group-name dev_private_ec2 --description "An environment for development" --vpc-id vpc-***masked***
{
    "GroupId": "sg-***masked***"
}

aws> ec2 authorize-security-group-ingress --group-id sg-***masked*** --protocol tcp --port 22 --cidr X.X.X.X/32
aws> ec2 create-key-pair --key-name aws_default --query "KeyMaterial" --output text

EC2を起動

以下を指定する。

❯ aws ec2 run-instances \
    --image-id ami-2724cf58 \
    --subnet-id ***masked*** \
    --security-group-ids ***masked*** \
    --count 1 \
    --instance-type t2.micro \
    --key-name aws_default
{
    "Groups": [],
    "Instances": [
        {
            "AmiLaunchIndex": 0,
            "ImageId": "ami-2724cf58",
            "InstanceId": "***masked***",
            "InstanceType": "t2.micro",
            "KeyName": "aws_default",
            "LaunchTime": "2018-05-20T09:17:33.000Z",
            "Monitoring": {
                "State": "disabled"
            },
            "Placement": {
                "AvailabilityZone": "ap-northeast-1d",
                "GroupName": "",
                "Tenancy": "default"
            },
            "PrivateDnsName": "ip-192-168-1-93.ap-northeast-1.compute.internal",
            "PrivateIpAddress": "192.168.1.93",
            "ProductCodes": [],
            "PublicDnsName": "",
            "State": {
                "Code": 0,
                "Name": "pending"
            },
            "StateTransitionReason": "",
            "SubnetId": "***masked***",
            "VpcId": "***masked***",
            "Architecture": "x86_64",
            "BlockDeviceMappings": [],
            "ClientToken": "",
            "EbsOptimized": false,
            "Hypervisor": "xen",
            "NetworkInterfaces": [
                {
                    "Attachment": {
                        "AttachTime": "2018-05-20T09:17:33.000Z",
                        "AttachmentId": "***masked***",
                        "DeleteOnTermination": true,
                        "DeviceIndex": 0,
                        "Status": "attaching"
                    },
                    "Description": "",
                    "Groups": [
                        {
                            "GroupName": "dev_private_ec2",
                            "GroupId": "***masked***"
                        }
                    ],
                    "Ipv6Addresses": [],
                    "MacAddress": "0e:01:03:99:93:96",
                    "NetworkInterfaceId": "***masked***",
                    "OwnerId": "***masked***",
                    "PrivateIpAddress": "192.168.1.93",
                    "PrivateIpAddresses": [
                        {
                            "Primary": true,
                            "PrivateIpAddress": "192.168.1.93"
                        }
                    ],
                    "SourceDestCheck": true,
                    "Status": "in-use",
                    "SubnetId": "***masked***",
                    "VpcId": "***masked***"
                }
            ],
            "RootDeviceName": "/dev/xvda",
            "RootDeviceType": "ebs",
            "SecurityGroups": [
                {
                    "GroupName": "dev_private_ec2",
                    "GroupId": "***masked***"
                }
            ],
            "SourceDestCheck": true,
            "StateReason": {
                "Code": "pending",
                "Message": "pending"
            },
            "VirtualizationType": "hvm",
            "CpuOptions": {
                "CoreCount": 1,
                "ThreadsPerCore": 1
            }
        }
    ],
    "OwnerId": "***masked***",
    "ReservationId": "r-0ec8efda16e57dfd9"
}

❯ aws ec2 describe-instances
{
  // いっぱい出てくるので省略
}

インターネットゲートウェイとの接続

❯ aws ec2 create-internet-gateway
{
    "InternetGateway": {
        "Attachments": [],
        "InternetGatewayId": "***masked***",
        "Tags": []
    }
}

❯ aws ec2 attach-internet-gateway \
    --internet-gateway-id ***masked*** \
    --vpc-id ***masked***

❯ aws ec2 allocate-address --domain vpc
{
    "PublicIp": "X.X.X.X",
    "AllocationId": "***masked***",
    "Domain": "vpc"
}

❯ aws ec2 associate-address \
    --instance-id ***masked*** \
    --allocation-id ***masked***
{
    "AssociationId": "***masked***"
}

これで、SSHでつながるぞ!と思ったら、繋がらない。

→自分で作ったSubnetのルートテーブルが、インターネットに出ていけない設定になっていたからでした。

ルートテーブルの変更

❯ aws ec2 describe-subnets --output table --query "Subnets[].[SubnetId,CidrBlock]"
------------------------------------------------
|                DescribeSubnets               |
+---------------------------+------------------+
|  subnet-aaaaaaaaa          |  172.31.16.0/20  |
|  subnet-bbbbbbbb          |  172.31.0.0/20   |
|  subnet-0aaaaaaaaaaaaaaaaa |  192.168.1.0/24  |
|  subnet-0bbbbbbbbbbbbbbb |  192.168.2.0/24  |
|  subnet-ccccccccc          |  172.31.32.0/20  |
+---------------------------+------------------+

❯ aws ec2 describe-internet-gateways
{
  // 省略
}

❯ aws ec2 describe-route-tables
{
  // 省略
}

❯ aws ec2 create-internet-gateway

❯ aws ec2 attach-internet-gateway \
    --internet-gateway-id ***masked*** \
    --vpc-id ***masked***

今日はここまで

  • create-vpcVPC作成、 create-subnetでサブネット作成。
  • 作成したサブネットに対して、authorize-security-group-ingressで許可するインバウンド方向の通信を設定
  • create-key-pairでキーペアを作成する、
  • EC2を作成

When a character's codepoint is beyond U+10000, I should use 32-bit literal.

>>> '\u1f4a9'
'Ὂ9'
>>> '\U0001f4a9'
'💩'

If I try to input '💩' directly in Jupyter console on Windows cmd, it aborts with the error:

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\Scripts\jupyter-console-script.py", line 10, in <module>
    sys.exit(main())
  File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_core\application.py", line 267, in launch_instance
    return super(JupyterApp, cls).launch_instance(argv=argv, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance
    app.start()
  File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\app.py", line 155, in start
    self.shell.mainloop()
  File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 508, in mainloop
    self.interact()
  File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 492, in interact
    code = self.prompt_for_code()
  File "C:\ProgramData\Anaconda3\lib\site-packages\jupyter_console\ptshell.py", line 440, in prompt_for_code
    reset_current_buffer=True)
  File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\interface.py", line 415, in run
    self.eventloop.run(self.input, self.create_eventloop_callbacks())
  File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\eventloop\win32.py", line 80, in run
    for k in keys:
  File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 143, in read
    all_keys = list(self._get_keys(read, input_records))
  File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 186, in _get_keys
    for key_press in self._event_to_key_presses(ev):
  File "C:\ProgramData\Anaconda3\lib\site-packages\prompt_toolkit\terminal\win32_input.py", line 225, in _event_to_key_presses
    ascii_char = u_char.encode('utf-8')
UnicodeEncodeError: 'utf-8' codec can't encode character '\ud83d' in position 0: surrogates not allowed

This is because ev.uChar.UnicodeChar returns one of a surrogate pair, \ud83d, instead of a complete character \U0001F4A9. It depends on Windows KEY_EVENT_RECORD structure (https://docs.microsoft.com/en-us/windows/console/key-event-record-str).

It seems _event_to_key_resses(ev) is called for each element of a surrogate pair. So we should check the key press is one of a surrogate pair and store an element of the pair.

    def _get_keys(self, read, input_records):
        """
        Generator that yields `KeyPress` objects from the input records.
        """
        for i in range(read.value):
            ir = input_records[i]

            # Get the right EventType from the EVENT_RECORD.
            # (For some reason the Windows console application 'cmder'
            # [http://gooseberrycreative.com/cmder/] can return '0' for
            # ir.EventType. -- Just ignore that.)
            if ir.EventType in EventTypes:
                ev = getattr(ir.Event, EventTypes[ir.EventType])

                # Process if this is a key event. (We also have mouse, menu and
                # focus events.)
                if type(ev) == KEY_EVENT_RECORD and ev.KeyDown:
                    for key_press in self._event_to_key_presses(ev):
                        yield key_press

                elif type(ev) == MOUSE_EVENT_RECORD:
                    for key_press in self._handle_mouse(ev):
                        yield key_press

test

とりあえずテストですん

#!/usr/bin/env perl
use v5.16;
use warnings;

use Getopt::Long;

my $some_option;
GetOptions(
    "some-option|s=s" => \$some_option,
);

say "Hello, world.";

my %hash;
my @array;
for my $i (0 .. 9) {
    say "$i ^ 2 = ", $i ** 2;
    push @array, $i;
    $hash{$i} = $i ** 2;
}