Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 25199e9

Browse files
authoredApr 23, 2025
xds: XdsDepManager should ignore updates after shutdown
This prevents a NPE and subsequent channel panic when trying to build a config (because there are no watchers, so waitingOnResource==false) without any listener and route. ``` java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295) at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) ``` I think this fully-fixes the problem today, but not tomorrow. subscribeToCluster() is racy as well, but not yet used. This was noticed when idleTimeout was firing, with some other code calling getState(true) to wake the channel back up. That may have made this panic more visible than it would be otherwise, but that has not been investigated. b/412474567
1 parent 7952afd commit 25199e9

File tree

2 files changed

+151
-2
lines changed

2 files changed

+151
-2
lines changed
 

‎xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T>
199199
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
200200
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
201201
watcherEntry.getValue());
202+
watcherEntry.getValue().cancelled = true;
202203
}
203204
}
204205

@@ -591,6 +592,9 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
591592
@Override
592593
public void onError(Status error) {
593594
checkNotNull(error, "error");
595+
if (cancelled) {
596+
return;
597+
}
594598
// Don't update configuration on error, if we've already received configuration
595599
if (!hasDataValue()) {
596600
setDataAsStatus(Status.UNAVAILABLE.withDescription(
@@ -659,6 +663,9 @@ private LdsWatcher(String resourceName) {
659663
@Override
660664
public void onChanged(XdsListenerResource.LdsUpdate update) {
661665
checkNotNull(update, "update");
666+
if (cancelled) {
667+
return;
668+
}
662669

663670
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
664671
List<VirtualHost> virtualHosts;
@@ -787,6 +794,9 @@ public RdsWatcher(String resourceName) {
787794
@Override
788795
public void onChanged(RdsUpdate update) {
789796
checkNotNull(update, "update");
797+
if (cancelled) {
798+
return;
799+
}
790800
List<VirtualHost> oldVirtualHosts = hasDataValue()
791801
? getData().getValue().virtualHosts
792802
: Collections.emptyList();
@@ -815,6 +825,9 @@ private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
815825
@Override
816826
public void onChanged(XdsClusterResource.CdsUpdate update) {
817827
checkNotNull(update, "update");
828+
if (cancelled) {
829+
return;
830+
}
818831
switch (update.clusterType()) {
819832
case EDS:
820833
setData(update);
@@ -895,6 +908,9 @@ private EdsWatcher(String resourceName, CdsWatcher parentContext) {
895908

896909
@Override
897910
public void onChanged(XdsEndpointResource.EdsUpdate update) {
911+
if (cancelled) {
912+
return;
913+
}
898914
setData(checkNotNull(update, "update"));
899915
maybePublishConfig();
900916
}

‎xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import com.google.common.collect.ImmutableMap;
4343
import com.google.common.collect.ImmutableSet;
44+
import com.google.common.util.concurrent.MoreExecutors;
4445
import com.google.protobuf.Message;
4546
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
4647
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
@@ -65,7 +66,7 @@
6566
import io.grpc.xds.XdsConfig.XdsClusterConfig;
6667
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
6768
import io.grpc.xds.client.CommonBootstrapperTestUtils;
68-
import io.grpc.xds.client.XdsClientImpl;
69+
import io.grpc.xds.client.XdsClient;
6970
import io.grpc.xds.client.XdsClientMetricReporter;
7071
import io.grpc.xds.client.XdsTransportFactory;
7172
import java.io.Closeable;
@@ -115,7 +116,7 @@ public class XdsDependencyManagerTest {
115116
});
116117

117118
private ManagedChannel channel;
118-
private XdsClientImpl xdsClient;
119+
private XdsClient xdsClient;
119120
private XdsDependencyManager xdsDependencyManager;
120121
private TestWatcher xdsConfigWatcher;
121122
private Server xdsServer;
@@ -715,6 +716,138 @@ public void testCdsError() throws IOException {
715716
assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME);
716717
}
717718

719+
@Test
720+
public void ldsUpdateAfterShutdown() {
721+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
722+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
723+
724+
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
725+
serverName, serverName, nameResolverArgs, scheduler);
726+
727+
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
728+
729+
@SuppressWarnings("unchecked")
730+
XdsClient.ResourceWatcher<XdsListenerResource.LdsUpdate> resourceWatcher =
731+
mock(XdsClient.ResourceWatcher.class);
732+
xdsClient.watchXdsResource(
733+
XdsListenerResource.getInstance(),
734+
serverName,
735+
resourceWatcher,
736+
MoreExecutors.directExecutor());
737+
verify(resourceWatcher, timeout(5000)).onChanged(any());
738+
739+
syncContext.execute(() -> {
740+
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
741+
// Runnable returns
742+
xdsDependencyManager.shutdown();
743+
744+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS",
745+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
746+
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
747+
xdsClient.cancelXdsResourceWatch(
748+
XdsListenerResource.getInstance(), serverName, resourceWatcher);
749+
});
750+
}
751+
752+
@Test
753+
public void rdsUpdateAfterShutdown() {
754+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
755+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
756+
757+
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
758+
serverName, serverName, nameResolverArgs, scheduler);
759+
760+
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
761+
762+
@SuppressWarnings("unchecked")
763+
XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> resourceWatcher =
764+
mock(XdsClient.ResourceWatcher.class);
765+
xdsClient.watchXdsResource(
766+
XdsRouteConfigureResource.getInstance(),
767+
"RDS",
768+
resourceWatcher,
769+
MoreExecutors.directExecutor());
770+
verify(resourceWatcher, timeout(5000)).onChanged(any());
771+
772+
syncContext.execute(() -> {
773+
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
774+
// Runnable returns
775+
xdsDependencyManager.shutdown();
776+
777+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS",
778+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
779+
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
780+
xdsClient.cancelXdsResourceWatch(
781+
XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher);
782+
});
783+
}
784+
785+
@Test
786+
public void cdsUpdateAfterShutdown() {
787+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
788+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
789+
790+
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
791+
serverName, serverName, nameResolverArgs, scheduler);
792+
793+
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
794+
795+
@SuppressWarnings("unchecked")
796+
XdsClient.ResourceWatcher<XdsClusterResource.CdsUpdate> resourceWatcher =
797+
mock(XdsClient.ResourceWatcher.class);
798+
xdsClient.watchXdsResource(
799+
XdsClusterResource.getInstance(),
800+
"CDS",
801+
resourceWatcher,
802+
MoreExecutors.directExecutor());
803+
verify(resourceWatcher, timeout(5000)).onChanged(any());
804+
805+
syncContext.execute(() -> {
806+
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
807+
// Runnable returns
808+
xdsDependencyManager.shutdown();
809+
810+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2",
811+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
812+
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
813+
xdsClient.cancelXdsResourceWatch(
814+
XdsClusterResource.getInstance(), serverName, resourceWatcher);
815+
});
816+
}
817+
818+
@Test
819+
public void edsUpdateAfterShutdown() {
820+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
821+
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
822+
823+
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
824+
serverName, serverName, nameResolverArgs, scheduler);
825+
826+
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
827+
828+
@SuppressWarnings("unchecked")
829+
XdsClient.ResourceWatcher<XdsEndpointResource.EdsUpdate> resourceWatcher =
830+
mock(XdsClient.ResourceWatcher.class);
831+
xdsClient.watchXdsResource(
832+
XdsEndpointResource.getInstance(),
833+
"EDS",
834+
resourceWatcher,
835+
MoreExecutors.directExecutor());
836+
verify(resourceWatcher, timeout(5000)).onChanged(any());
837+
838+
syncContext.execute(() -> {
839+
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
840+
// Runnable returns
841+
xdsDependencyManager.shutdown();
842+
843+
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
844+
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
845+
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
846+
xdsClient.cancelXdsResourceWatch(
847+
XdsEndpointResource.getInstance(), serverName, resourceWatcher);
848+
});
849+
}
850+
718851
private Listener buildInlineClientListener(String rdsName, String clusterName) {
719852
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
720853
}

0 commit comments

Comments
 (0)
Please sign in to comment.