Locustでシャーディング的なことをしたい

Locustは大変お手軽に負荷試験スクリプトを作れる、かつ、テストスクリプトの柔軟性があってよい。

今回は以下のような要件があった。

  • 複数のWorkerで処理をスケールアウトさせたい
  • あるIDは、同じWorkerで処理をさせたい(同じIDを別のWorkerで処理させたくない)

Out of boxなサポートはないものの、Locustの Custom Messageのサンプル がほぼこれに該当する。

  • @events.test_start.add_listener のリスナーでWorkerにIDを分配する
    • このタイミングまでには、Master, Workerともに立ち上がっているので、Master側はいくつWorkerを持っているか把握している
    • environment.runner.worker_count および environment.runner.clients を参照すればOK
  • 各Worker向けに、処理すべきIDをMessage経由で頒布すればOK
    • これを応用して、ID自体ではなくて、処理すべきIDのレンジを渡す、とかもできそう
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    # When the test is started, evenly divides list between
    # worker nodes to ensure unique data across threads
    if not isinstance(environment.runner, WorkerRunner):
        users = []
        for i in range(environment.runner.target_user_count):
            users.append({"name": f"User{i}"})

        worker_count = environment.runner.worker_count
        chunk_size = int(len(users) / worker_count)

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size

            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(users)

            data = users[start_index:end_index]
            environment.runner.send_message("test_users", data, worker)