Skip to content

Commit 47ae7b3

Browse files
authored
chore: virtual threads support (aws#1120)
1 parent 976703e commit 47ae7b3

File tree

12 files changed

+127
-93
lines changed

12 files changed

+127
-93
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
### :magic_wand: Added
77
- Logic and a connection property to enable driver failover when network exceptions occur in the connect pipeline (PR #1099)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1099]
8+
- A new reworked and re-architected failover plugin (PR #1089)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1089]
9+
- Virtual Threading support (PR #1120)[https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1120]
810

911
## [2.3.9] - 2024-08-09
1012

@@ -308,7 +310,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
308310
- Lock initialization of `AuroraHostListProvider` ([PR #347](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/347)).
309311
- Optimized thread locks and expiring cache for the Enhanced Monitoring Plugin ([PR #365](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/365)).
310312
- Updated Hibernate sample code to reflect changes in the wrapper source code ([PR #368](https://github.com/awslabs/aws-advanced-jdbc-wrapper/pull/368)).
311-
- Updated KnownLimitations.md to reflect that Amazon RDS Blue/Green Deployments are not supported. See [Amazon RDS Blue/Green Deployments](./docs/KnownLimitations.md#amazon-rds-blue-green-deployments).
313+
- Updated KnownLimitations.md to reflect that Amazon RDS Blue/Green Deployments are not supported. See [Amazon RDS Blue/Green Deployments](./docs/README.md#amazon-rds-bluegreen-deployments).
312314

313315
## [1.0.1] - 2023-01-30
314316
### :magic_wand: Added

README.md

Lines changed: 34 additions & 37 deletions
Large diffs are not rendered by default.

wrapper/src/main/java/software/amazon/jdbc/RoundRobinHostSelector.java

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Properties;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.locks.ReentrantLock;
2627
import java.util.regex.Matcher;
2728
import java.util.regex.Pattern;
2829
import java.util.stream.Collectors;
@@ -48,6 +49,8 @@ public class RoundRobinHostSelector implements HostSelector {
4849
"((?<host>[^:/?#]*):(?<weight>[0-9]*))");
4950
protected static final CacheMap<String, RoundRobinClusterInfo> roundRobinCache = new CacheMap<>();
5051

52+
protected static final ReentrantLock lock = new ReentrantLock();
53+
5154
static {
5255
PropertyDefinition.registerPluginProperties(RoundRobinHostSelector.class);
5356
}
@@ -69,56 +72,63 @@ public static void setRoundRobinHostWeightPairsProperty(final @NonNull Propertie
6972
}
7073

7174
@Override
72-
public synchronized HostSpec getHost(
75+
public HostSpec getHost(
7376
final @NonNull List<HostSpec> hosts,
7477
final @NonNull HostRole role,
7578
final @Nullable Properties props) throws SQLException {
76-
final List<HostSpec> eligibleHosts = hosts.stream()
77-
.filter(hostSpec ->
78-
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
79-
.sorted(Comparator.comparing(HostSpec::getHost))
80-
.collect(Collectors.toList());
8179

82-
if (eligibleHosts.isEmpty()) {
83-
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
84-
}
80+
lock.lock();
81+
try {
82+
final List<HostSpec> eligibleHosts = hosts.stream()
83+
.filter(hostSpec ->
84+
role.equals(hostSpec.getRole()) && hostSpec.getAvailability().equals(HostAvailability.AVAILABLE))
85+
.sorted(Comparator.comparing(HostSpec::getHost))
86+
.collect(Collectors.toList());
87+
88+
if (eligibleHosts.isEmpty()) {
89+
throw new SQLException(Messages.get("HostSelector.noHostsMatchingRole", new Object[]{role}));
90+
}
8591

86-
// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
87-
createCacheEntryForHosts(eligibleHosts, props);
88-
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
89-
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);
92+
// Create new cache entries for provided hosts if necessary. All hosts point to the same cluster info.
93+
createCacheEntryForHosts(eligibleHosts, props);
94+
final String currentClusterInfoKey = eligibleHosts.get(0).getHost();
95+
final RoundRobinClusterInfo clusterInfo = roundRobinCache.get(currentClusterInfoKey);
9096

91-
final HostSpec lastHost = clusterInfo.lastHost;
92-
int lastHostIndex = -1;
97+
final HostSpec lastHost = clusterInfo.lastHost;
98+
int lastHostIndex = -1;
9399

94-
// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
95-
if (lastHost != null) {
96-
for (int i = 0; i < eligibleHosts.size(); i++) {
97-
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
98-
lastHostIndex = i;
100+
// Check if lastHost is in list of eligible hosts. Update lastHostIndex.
101+
if (lastHost != null) {
102+
for (int i = 0; i < eligibleHosts.size(); i++) {
103+
if (eligibleHosts.get(i).getHost().equals(lastHost.getHost())) {
104+
lastHostIndex = i;
105+
}
99106
}
100107
}
101-
}
102108

103-
final int targetHostIndex;
104-
// If the host is weighted and the lastHost is in the eligibleHosts list.
105-
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
106-
targetHostIndex = lastHostIndex;
107-
} else {
108-
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
109-
targetHostIndex = lastHostIndex + 1;
109+
final int targetHostIndex;
110+
// If the host is weighted and the lastHost is in the eligibleHosts list.
111+
if (clusterInfo.weightCounter > 0 && lastHostIndex != -1) {
112+
targetHostIndex = lastHostIndex;
110113
} else {
111-
targetHostIndex = 0;
114+
if (lastHostIndex != -1 && lastHostIndex != eligibleHosts.size() - 1) {
115+
targetHostIndex = lastHostIndex + 1;
116+
} else {
117+
targetHostIndex = 0;
118+
}
119+
120+
final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHostId());
121+
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
112122
}
113123

114-
final Integer weight = clusterInfo.clusterWeightsMap.get(eligibleHosts.get(targetHostIndex).getHostId());
115-
clusterInfo.weightCounter = weight == null ? clusterInfo.defaultWeight : weight;
116-
}
124+
clusterInfo.weightCounter--;
125+
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);
117126

118-
clusterInfo.weightCounter--;
119-
clusterInfo.lastHost = eligibleHosts.get(targetHostIndex);
127+
return eligibleHosts.get(targetHostIndex);
120128

121-
return eligibleHosts.get(targetHostIndex);
129+
} finally {
130+
lock.unlock();
131+
}
122132
}
123133

124134
private void createCacheEntryForHosts(

wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitoringConnectionPlugin.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import software.amazon.jdbc.util.RdsUrlType;
4141
import software.amazon.jdbc.util.RdsUtils;
4242
import software.amazon.jdbc.util.SubscribedMethodHelper;
43-
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
4443

4544
/**
4645
* Monitor the server while the connection is executing methods for more sophisticated failure
@@ -181,7 +180,8 @@ public <T, E extends Exception> T execute(
181180

182181
} finally {
183182
if (monitorContext != null) {
184-
synchronized (monitorContext) {
183+
monitorContext.getLock().lock();
184+
try {
185185
this.monitorService.stopMonitoring(monitorContext);
186186

187187
if (monitorContext.isNodeUnhealthy()) {
@@ -206,6 +206,8 @@ public <T, E extends Exception> T execute(
206206
new Object[] {this.pluginService.getCurrentHostSpec().asAlias()})));
207207
}
208208
}
209+
} finally {
210+
monitorContext.getLock().unlock();
209211
}
210212

211213
LOGGER.finest(

wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorConnectionContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.Executor;
2222
import java.util.concurrent.Executors;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.locks.ReentrantLock;
2425
import java.util.logging.Logger;
2526
import software.amazon.jdbc.util.Messages;
2627
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
@@ -49,6 +50,8 @@ public class MonitorConnectionContext {
4950
private long invalidNodeStartTimeNano; // Only accessed by monitor thread
5051
private long failureCount; // Only accessed by monitor thread
5152

53+
private final ReentrantLock lock = new ReentrantLock();
54+
5255
/**
5356
* Constructor.
5457
*
@@ -238,4 +241,8 @@ void setConnectionValid(
238241
() -> Messages.get("MonitorConnectionContext.hostAlive",
239242
new Object[] {hostName}));
240243
}
244+
245+
public ReentrantLock getLock() {
246+
return this.lock;
247+
}
241248
}

wrapper/src/main/java/software/amazon/jdbc/plugin/efm/MonitorImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ public void run() {
192192

193193
while ((monitorContext = this.activeContexts.poll()) != null) {
194194

195-
synchronized (monitorContext) {
195+
monitorContext.getLock().lock();
196+
try {
196197
// If context is already invalid, just skip it
197198
if (!monitorContext.isActiveContext()) {
198199
continue;
@@ -223,6 +224,8 @@ public void run() {
223224
delayMillis = monitorContext.getFailureDetectionIntervalMillis();
224225
}
225226
}
227+
} finally {
228+
monitorContext.getLock().unlock();
226229
}
227230
}
228231

wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ protected <E extends Exception> void dealWithIllegalStateException(
558558
* @param failedHost The host with network errors.
559559
* @throws SQLException if an error occurs
560560
*/
561-
protected synchronized void failover(final HostSpec failedHost) throws SQLException {
561+
protected void failover(final HostSpec failedHost) throws SQLException {
562562
this.pluginService.setAvailability(failedHost.asAliases(), HostAvailability.NOT_AVAILABLE);
563563

564564
if (this.failoverMode == FailoverMode.STRICT_WRITER) {
@@ -720,7 +720,7 @@ protected void invalidateCurrentConnection() {
720720
}
721721
}
722722

723-
protected synchronized void pickNewConnection() throws SQLException {
723+
protected void pickNewConnection() throws SQLException {
724724
if (this.isClosed && this.closedExplicitly) {
725725
LOGGER.fine(() -> Messages.get("Failover.transactionResolutionUnknownError"));
726726
return;

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.concurrent.locks.ReentrantLock;
3435
import java.util.logging.Level;
3536
import java.util.logging.Logger;
3637
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -56,7 +57,7 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {
5657

5758
private static final String MONITORING_PROPERTY_PREFIX = "limitless-router-monitor-";
5859
private final int intervalMs;
59-
private @NonNull HostSpec hostSpec;
60+
private final @NonNull HostSpec hostSpec;
6061
private final AtomicBoolean stopped = new AtomicBoolean(false);
6162
private final AtomicReference<List<HostSpec>> limitlessRouters = new AtomicReference<>(
6263
Collections.unmodifiableList(new ArrayList<>()));
@@ -73,6 +74,8 @@ public class LimitlessRouterMonitor implements AutoCloseable, Runnable {
7374
return monitoringThread;
7475
});
7576

77+
private final ReentrantLock lock = new ReentrantLock();
78+
7679
public LimitlessRouterMonitor(
7780
final @NonNull PluginService pluginService,
7881
final @NonNull HostSpec hostSpec,
@@ -164,17 +167,24 @@ public void run() {
164167
}
165168
}
166169

167-
public synchronized List<HostSpec> forceGetLimitlessRouters() throws SQLException {
170+
public List<HostSpec> forceGetLimitlessRouters() throws SQLException {
168171
LOGGER.finest(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRouters"));
169-
this.openConnection();
170-
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
171-
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));
172+
173+
lock.lock();
174+
try {
175+
this.openConnection();
176+
if (this.monitoringConn == null || this.monitoringConn.isClosed()) {
177+
throw new SQLException(Messages.get("LimitlessRouterMonitor.forceGetLimitlessRoutersFailed"));
178+
}
179+
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
180+
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
181+
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
182+
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
183+
return newLimitlessRouters;
184+
185+
} finally {
186+
lock.unlock();
172187
}
173-
List<HostSpec> newLimitlessRouters = queryForLimitlessRouters(this.monitoringConn);
174-
this.limitlessRouters.set(Collections.unmodifiableList(newLimitlessRouters));
175-
RoundRobinHostSelector.setRoundRobinHostWeightPairsProperty(this.props, newLimitlessRouters);
176-
LOGGER.finest(Utils.logTopology(limitlessRouters.get(), "[limitlessRouterMonitor]"));
177-
return newLimitlessRouters;
178188
}
179189

180190
private void openConnection() throws SQLException {

wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public List<HostSpec> forceGetLimitlessRouters(final String clusterId, final Pro
9898
}
9999

100100
@Override
101-
public synchronized void startMonitoring(final @NonNull PluginService pluginService,
101+
public void startMonitoring(final @NonNull PluginService pluginService,
102102
final @NonNull HostSpec hostSpec,
103103
final @NonNull Properties props,
104104
final int intervalMs) {

wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private void logAndThrowException(
377377
throw new ReadWriteSplittingSQLException(logMessage, sqlState.getState(), cause);
378378
}
379379

380-
private synchronized void switchToWriterConnection(
380+
private void switchToWriterConnection(
381381
final List<HostSpec> hosts)
382382
throws SQLException {
383383
final Connection currentConnection = this.pluginService.getCurrentConnection();
@@ -418,7 +418,7 @@ private void switchCurrentConnectionTo(
418418
newConnectionHost.getUrl()}));
419419
}
420420

421-
private synchronized void switchToReaderConnection(final List<HostSpec> hosts)
421+
private void switchToReaderConnection(final List<HostSpec> hosts)
422422
throws SQLException {
423423
final Connection currentConnection = this.pluginService.getCurrentConnection();
424424
final HostSpec currentHost = this.pluginService.getCurrentHostSpec();

0 commit comments

Comments
 (0)