Skip to content

The first part of a composed function that ends with a Consumer gets invoked twice #1302

@rmarianni

Description

@rmarianni

As stated in this StackOverflow question, when an application has a function definition that composes two other functions — a reactive Function followed by an imperative Consumer —, the first function gets invoked twice, even though the Consumer is invoked only once, as expected.

The issue can be reproduced (and debugged) with this sample project.

As @artembilan has already looked into this by answering the question, I'll transcribe his answer for convenience:

So, after making this change into your code:

LOGGER.info("Will transform this message's payload to upper case: {}", message);

I see this in logs:

2025-08-28T12:18:26.201-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=c2e496dc-90e1-bbca-31ea-bc890f9de5bd, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906201}]
2025-08-28T12:18:26.223-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=919af0da-5461-b6ab-ddaf-40cd2b77e80a, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906223}]

That means that we have two different messages with the same payload.
After some debugging, I see that we have subscribe to the input Flux twice:

  1. SimpleFunctionRegistry.ConsumerWrapper:
public void accept(Flux messageFlux) {
     messageFlux.doOnNext(this.targetConsumer).subscribe();
 }
  1. FunctionWebRequestProcessingHelper:
Object result = function.apply(inputMessage);
 if (function.isConsumer()) {
     if (result instanceof Publisher) {
         Mono.from((Publisher) result).subscribe();
     }
     return "DELETE".equals(wrapper.getMethod()) ?
             Mono.empty() : Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers, ignoredHeaders, requestOnlyHeaders)).build());
 }

There is some disconnection between return from the reactive function call and that supplier invocation:

                result = fluxInput
                        .transform(flux -> {
                            flux =  Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
                            ((Consumer) this.target).accept(flux);
                            return Mono.ignoreElements((Flux) flux);
                        }).then();

That's, probably, why we don't see a a double processing on the consumer side, but the flux in the function is still called twice because of those two subscriptions.

Feels like a bug somewhere in the SimpleFunctionRegistry.invokeConsumer().

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions