-
Notifications
You must be signed in to change notification settings - Fork 63
Description
Hello!
First of all, thank you so much for developing this. I really appreciate this work.
Describe the bug
All communication between a peer and the nexus router becomes blocked if the peer exposes an RPC endpoint and there exists a caller that becomes “unhealthy” (e.g., unstable/super slow connection) while invoking that RPC.
The issue happens on the Router implementation, including nexusd. I could reproduce this error with various WAMP client implementation.
The issue happens if:
- There is a peer that exposes an RPC endpoint (CALLEE).
- There is a peer that invokes the RPC (CALLER) and waits for a response.
- For some reason, the CALLER becomes unresponsive to nexusd (e.g., due to a bad internet connection or can be a bug), and nexusd starts logging messages like:
!!! Dropped EVENT to session {id}: blocked - The CALLEE subsequently returns a response (YIELD) to CALLER.
At this point, the goroutine assigned for the CALLEE is responsible for sending (queueing) the YIELD response blocks to the CALLER message queue. As a result, the entire communication of CALLEE is stalled because of an unhealthy CALLER.
Relevant code:
Lines 328 to 360 in 5cfa511
done := make(chan struct{}) d.actionChan <- func() { again = d.syncYield(callee, msg, progress, true) done <- struct{}{} } <-done // If blocked, retry if again { retry := true delay := yieldRetryDelay start := time.Now() // Retry processing YIELD until caller gone or deadline reached for { if d.debug { d.log.Println("Retry sending RESULT after", delay) } <-time.After(delay) // Do not retry if the elapsed time exceeds deadline if time.Since(start) >= sendResultDeadline { retry = false } d.actionChan <- func() { again = d.syncYield(callee, msg, progress, retry) done <- struct{}{} } <-done if !again { break } delay *= 2 } } Lines 475 to 476 in 5cfa511
case *wamp.Yield: r.dealer.yield(sess, msg)
To Reproduce
- Run nexusd
- Tested both by building from source and using the official Docker image.
- Run
go run ./server.go - Execute
bad_caller.py:
python3 -m venv .venv
source .venv/bin/activate
pip install xconn
python bad_caller.pynexusd.mp4
Here, I use nexus for the callee and xconn-python for the caller, but it can be anything.
server.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/gammazero/nexus/v3/client"
"github.com/gammazero/nexus/v3/wamp"
)
func main() {
cfg := client.Config{
Realm: "realm1",
}
c, err := client.ConnectNet(context.Background(), "ws://localhost:8080/", cfg)
if err != nil {
log.Fatal(err)
}
defer c.Close()
bigPayload := string(make([]byte, 1024 * 1000))
procedure := func(ctx context.Context, inv *wamp.Invocation) client.InvokeResult {
fmt.Println("[server] ** Procedure Called **")
// To make sure nexusd starts saying:
// '!! Dropped EVENT to session {id}: blocked' by the broker.
time.Sleep(15 * time.Second)
// Returns a response to cause retry logic at 'syncYield' on the "callee" thread.
fmt.Println("[server] ** Procedure Returns **")
return client.InvokeResult{
Args: wamp.List{bigPayload},
}
}
err = c.Register("example.procedure", procedure, nil)
if err != nil {
log.Fatal(err)
}
go func() {
// Periodically publishes a big payload.
// This can easily cause 'Dropped EVENT' if there is a client that doesn't consume messages from the router.
for i := 0;; i++ {
args := wamp.List{
fmt.Sprintf("[server] message: %d", i),
bigPayload,
}
fmt.Printf("[server] Publish message: %d\n", i)
err = c.Publish("example.topic", nil, args, nil)
time.Sleep(100 * time.Millisecond)
}
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
<-sigChan
c.Unregister("example.procedure")
}bad_caller.py
import asyncio
import time
from xconn import run
from xconn.async_client import connect_anonymous
from xconn.async_session import AsyncSession
async def handler(_):
pass
async def main():
loop = asyncio.get_event_loop()
session: AsyncSession = await connect_anonymous("ws://localhost:8080/ws", "realm1")
await session.subscribe("example.topic", handler)
print("Calling procedure")
_ = loop.create_task(session.call("example.procedure"))
# Sleep 1.0 sec to make sure the invocation is sent.
await asyncio.sleep(1.0)
print("Block the event loop")
time.sleep(600)
if __name__ == "__main__":
run(main())Ideally, the communication of a callee should not be affected by the state of a caller, especially in an asynchronous RPC context.
Environment (please complete the following information):
- OS: [e.g. Mac/Linux/Windows]: Confirmed on Linux
- Nexus Router version: 5cfa511 (v3 HEAD)
- Name and version of WAMP Client library (if applicable): nexus, xconn-python
Additional context
- This blocking only occurs in the goroutine associated with the callee peer. Other peers continue communicating normally.
nexusd2.mp4
- Specifying a larger value to
OutQueueSizemay hide the issue. But the value needs to be very large if nexusd is used for high-frequency communication.