diff --git a/python/ray/serve/_private/autoscaling_state.py b/python/ray/serve/_private/autoscaling_state.py index 1a44d8702d18..72ec883aa7ba 100644 --- a/python/ray/serve/_private/autoscaling_state.py +++ b/python/ray/serve/_private/autoscaling_state.py @@ -10,7 +10,6 @@ TargetCapacityDirection, ) from ray.serve._private.constants import ( - RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S, SERVE_LOGGER_NAME, ) @@ -293,28 +292,21 @@ def get_total_num_requests(self) -> float: If there are 0 running replicas, then returns the total number of requests queued at handles - If the flag RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE is - set to 1, the returned average includes both queued and ongoing - requests. Otherwise, the returned average includes only ongoing - requests. + This code assumes that the metrics are either emmited on handles + or on replicas, but not both. Its the responsibility of the writer + to ensure enclusivity of the metrics. """ total_requests = 0 - if ( - RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE - or len(self._running_replicas) == 0 - ): - for handle_metric in self._handle_requests.values(): - total_requests += handle_metric.queued_requests - for id in self._running_replicas: - if id in handle_metric.running_requests: - total_requests += handle_metric.running_requests[id] - else: + for handle_metric in self._handle_requests.values(): + total_requests += handle_metric.queued_requests for id in self._running_replicas: - if id in self._replica_requests: - total_requests += self._replica_requests[id].running_requests - + if id in handle_metric.running_requests: + total_requests += handle_metric.running_requests[id] + for id in self._running_replicas: + if id in self._replica_requests: + total_requests += self._replica_requests[id].running_requests return total_requests diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index c39af8740690..df5fa6614a01 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -217,7 +217,9 @@ async def __call__(self): wait_for_condition(check_num_requests_ge, client=client, id=dep_id, expected=45) print("Confirmed many queries are inflight.") - wait_for_condition(check_num_replicas_eq, name="A", target=5, app_name="app1") + wait_for_condition( + check_num_replicas_eq, name="A", target=5, app_name="app1", timeout=20 + ) print("Confirmed deployment scaled to 5 replicas.") # Wait for all requests to be scheduled to replicas so they'll be failed @@ -414,7 +416,9 @@ def __call__(self): signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(check_num_replicas_lte, name="A", target=min_replicas) + wait_for_condition( + check_num_replicas_lte, name="A", target=min_replicas, timeout=20 + ) # Make sure start time did not change for the deployment assert get_deployment_start_time(client._controller, "A") == start_time