Locustでシャーディング的なことをしたい
Locustは大変お手軽に負荷試験のスクリプトを作れる、かつ、テストスクリプトの柔軟性があってよい。
今回は以下のような要件があった。
- 複数のWorkerで処理をスケールアウトさせたい
- あるIDは、同じWorkerで処理をさせたい(同じIDを別のWorkerで処理させたくない)
Out of boxなサポートはないものの、Locustの Custom Messageのサンプル がほぼこれに該当する。
@events.test_start.add_listener
のリスナーでWorkerにIDを分配する- このタイミングまでには、Master, Workerともに立ち上がっているので、Master側はいくつWorkerを持っているか把握している
environment.runner.worker_count
およびenvironment.runner.clients
を参照すればOK
- 各Worker向けに、処理すべきIDをMessage経由で頒布すればOK
- これを応用して、ID自体ではなくて、処理すべきIDのレンジを渡す、とかもできそう
@events.test_start.add_listener def on_test_start(environment, **_kwargs): # When the test is started, evenly divides list between # worker nodes to ensure unique data across threads if not isinstance(environment.runner, WorkerRunner): users = [] for i in range(environment.runner.target_user_count): users.append({"name": f"User{i}"}) worker_count = environment.runner.worker_count chunk_size = int(len(users) / worker_count) for i, worker in enumerate(environment.runner.clients): start_index = i * chunk_size if i + 1 < worker_count: end_index = start_index + chunk_size else: end_index = len(users) data = users[start_index:end_index] environment.runner.send_message("test_users", data, worker)