Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
TargetCapacityDirection,
)
from ray.serve._private.constants import (
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S,
SERVE_LOGGER_NAME,
)
Expand Down Expand Up @@ -293,28 +292,21 @@ def get_total_num_requests(self) -> float:
If there are 0 running replicas, then returns the total number
of requests queued at handles

If the flag RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE is
set to 1, the returned average includes both queued and ongoing
requests. Otherwise, the returned average includes only ongoing
requests.
This code assumes that the metrics are either emmited on handles
or on replicas, but not both. Its the responsibility of the writer
to ensure enclusivity of the metrics.
"""

total_requests = 0

if (
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
or len(self._running_replicas) == 0
):
for handle_metric in self._handle_requests.values():
total_requests += handle_metric.queued_requests
for id in self._running_replicas:
if id in handle_metric.running_requests:
total_requests += handle_metric.running_requests[id]
else:
for handle_metric in self._handle_requests.values():
total_requests += handle_metric.queued_requests
for id in self._running_replicas:
if id in self._replica_requests:
total_requests += self._replica_requests[id].running_requests

if id in handle_metric.running_requests:
total_requests += handle_metric.running_requests[id]
for id in self._running_replicas:
if id in self._replica_requests:
total_requests += self._replica_requests[id].running_requests
return total_requests


Expand Down
8 changes: 6 additions & 2 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ async def __call__(self):
wait_for_condition(check_num_requests_ge, client=client, id=dep_id, expected=45)
print("Confirmed many queries are inflight.")

wait_for_condition(check_num_replicas_eq, name="A", target=5, app_name="app1")
wait_for_condition(
check_num_replicas_eq, name="A", target=5, app_name="app1", timeout=20
)
print("Confirmed deployment scaled to 5 replicas.")

# Wait for all requests to be scheduled to replicas so they'll be failed
Expand Down Expand Up @@ -414,7 +416,9 @@ def __call__(self):
signal.send.remote()

# As the queue is drained, we should scale back down.
wait_for_condition(check_num_replicas_lte, name="A", target=min_replicas)
wait_for_condition(
check_num_replicas_lte, name="A", target=min_replicas, timeout=20
)

# Make sure start time did not change for the deployment
assert get_deployment_start_time(client._controller, "A") == start_time
Expand Down