diff --git a/.gitignore b/.gitignore index 0336509811..6b9fbe5aa5 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ bin *.ipr *.iws .metadata +jcstress.* +metrics-jcstress/results/ +TODO.md \ No newline at end of file diff --git a/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java index 5d47355a12..5d2e7885ac 100644 --- a/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java +++ b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java @@ -1,18 +1,20 @@ package com.codahale.metrics.benchmarks; import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import com.codahale.metrics.SlidingTimeWindowReservoir; import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.UniformReservoir; - import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; - +import org.openjdk.jmh.profile.GCProfiler; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; import java.util.concurrent.TimeUnit; @@ -22,7 +24,8 @@ public class ReservoirBenchmark { private final UniformReservoir uniform = new UniformReservoir(); private final ExponentiallyDecayingReservoir exponential = new ExponentiallyDecayingReservoir(); private final SlidingWindowReservoir sliding = new SlidingWindowReservoir(1000); - private final SlidingTimeWindowReservoir slidingTime = new SlidingTimeWindowReservoir(1, TimeUnit.SECONDS); + private final SlidingTimeWindowReservoir slidingTime = new SlidingTimeWindowReservoir(200, TimeUnit.MILLISECONDS); + private final SlidingTimeWindowArrayReservoir arrTime = new SlidingTimeWindowArrayReservoir(200, TimeUnit.MILLISECONDS); // It's intentionally not declared as final to avoid constant folding private long nextValue = 0xFBFBABBA; @@ -33,18 +36,24 @@ public Object perfUniformReservoir() { return uniform; } + @Benchmark + public Object perfSlidingTimeWindowArrayReservoir() { + arrTime.update(nextValue); + return arrTime; + } + @Benchmark public Object perfExponentiallyDecayingReservoir() { exponential.update(nextValue); return exponential; } - + @Benchmark public Object perfSlidingWindowReservoir() { sliding.update(nextValue); return sliding; } - + @Benchmark public Object perfSlidingTimeWindowReservoir() { slidingTime.update(nextValue); @@ -53,14 +62,18 @@ public Object perfSlidingTimeWindowReservoir() { public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() - .include(".*" + ReservoirBenchmark.class.getSimpleName() + ".*") - .warmupIterations(3) - .measurementIterations(5) - .threads(4) - .forks(1) - .build(); + .include(".*" + ReservoirBenchmark.class.getSimpleName() + ".*") + .warmupIterations(10) + .measurementIterations(10) + .addProfiler(GCProfiler.class) + .measurementTime(TimeValue.seconds(3)) + .timeUnit(TimeUnit.MICROSECONDS) + .mode(Mode.AverageTime) + .threads(4) + .forks(1) + .build(); new Runner(opt).run(); } - + } diff --git a/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/SlidingTimeWindowReservoirsBenchmark.java b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/SlidingTimeWindowReservoirsBenchmark.java new file mode 100644 index 0000000000..a21904bc4b --- /dev/null +++ b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/SlidingTimeWindowReservoirsBenchmark.java @@ -0,0 +1,80 @@ +package com.codahale.metrics.benchmarks; + +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import com.codahale.metrics.Snapshot; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +import java.util.concurrent.TimeUnit; + +/** + * @author bstorozhuk + */ +@State(Scope.Benchmark) +public class SlidingTimeWindowReservoirsBenchmark { + private final SlidingTimeWindowReservoir slidingTime = new SlidingTimeWindowReservoir(200, TimeUnit.MILLISECONDS); + private final SlidingTimeWindowArrayReservoir arrTime = new SlidingTimeWindowArrayReservoir(200, TimeUnit.MILLISECONDS); + + // It's intentionally not declared as final to avoid constant folding + private long nextValue = 0xFBFBABBA; + + @Benchmark + @Group("slidingTime") + @GroupThreads(3) + public Object slidingTimeAddMeasurement() { + slidingTime.update(nextValue); + return slidingTime; + } + + @Benchmark + @Group("slidingTime") + @GroupThreads(1) + public Object slidingTimeRead() { + Snapshot snapshot = slidingTime.getSnapshot(); + return snapshot; + } + + @Benchmark + @Group("arrTime") + @GroupThreads(3) + public Object arrTimeAddMeasurement() { + arrTime.update(nextValue); + return slidingTime; + } + + @Benchmark + @Group("arrTime") + @GroupThreads(1) + public Object arrTimeRead() { + Snapshot snapshot = arrTime.getSnapshot(); + return snapshot; + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(".*" + SlidingTimeWindowReservoirsBenchmark.class.getSimpleName() + ".*") + .warmupIterations(10) + .measurementIterations(10) + .addProfiler(GCProfiler.class) + .measurementTime(TimeValue.seconds(3)) + .timeUnit(TimeUnit.MICROSECONDS) + .mode(Mode.AverageTime) + .forks(1) + .build(); + + new Runner(opt).run(); + } +} + diff --git a/metrics-core/src/main/java/com/codahale/metrics/ChunkedAssociativeLongArray.java b/metrics-core/src/main/java/com/codahale/metrics/ChunkedAssociativeLongArray.java new file mode 100644 index 0000000000..f7be0e4ebd --- /dev/null +++ b/metrics-core/src/main/java/com/codahale/metrics/ChunkedAssociativeLongArray.java @@ -0,0 +1,271 @@ +package com.codahale.metrics; + +import static java.util.Arrays.binarySearch; + +import java.util.ArrayDeque; +import java.util.Deque; + +class ChunkedAssociativeLongArray { + private static final long[] EMPTY = new long[0]; + private static final int DEFAULT_CHUNK_SIZE = 512; + + private final int defaultChunkSize; + private Chunk activeChunk; + + ChunkedAssociativeLongArray() { + this(DEFAULT_CHUNK_SIZE); + } + + ChunkedAssociativeLongArray(int chunkSize) { + this.defaultChunkSize = chunkSize; + this.activeChunk = new Chunk(chunkSize); + } + + synchronized boolean put(long key, long value) { + if (activeChunk.cursor != 0 && activeChunk.keys[activeChunk.cursor - 1] > key) { + return false; // key should be the same as last inserted or bigger + } + boolean isFull = activeChunk.cursor - activeChunk.startIndex == activeChunk.chunkSize; + if (isFull) { + activeChunk = new Chunk(activeChunk, this.defaultChunkSize); + } + activeChunk.append(key, value); + return true; + } + + private int traverse(Deque traversedChunksDeque) { + Chunk currentChunk = activeChunk; + int valuesSize = 0; + while (true) { + valuesSize += currentChunk.cursor - currentChunk.startIndex; + if (traversedChunksDeque != null) { + traversedChunksDeque.addLast(currentChunk); + } + if (currentChunk.tailChunk == null) { + break; + } + currentChunk = currentChunk.tailChunk; + } + return valuesSize; + } + + synchronized long[] values() { + Deque chunksDeque = new ArrayDeque(); + int valuesSize = traverse(chunksDeque); + if (valuesSize == 0) { + return EMPTY; + } + long[] values = new long[valuesSize]; + int valuesIndex = 0; + while (!chunksDeque.isEmpty()) { + Chunk copySourceChunk = chunksDeque.removeLast(); + int length = copySourceChunk.cursor - copySourceChunk.startIndex; + int itemsToCopy = Math.min(valuesSize - valuesIndex, length); + System.arraycopy(copySourceChunk.values, copySourceChunk.startIndex, values, valuesIndex, itemsToCopy); + valuesIndex += length; + } + return values; + } + + synchronized int size() { + int valueSize = traverse(null); + return valueSize; + } + + synchronized String out() { + Deque chunksDeque = new ArrayDeque(); + int valuesSize = traverse(chunksDeque); + if (valuesSize == 0) { + return "[]"; + } + + StringBuilder builder = new StringBuilder(); + while (!chunksDeque.isEmpty()) { + Chunk copySourceChunk = chunksDeque.removeLast(); + builder.append('['); + for (int i = copySourceChunk.startIndex; i < copySourceChunk.cursor; i++) { + long key = copySourceChunk.keys[i]; + long value = copySourceChunk.values[i]; + builder.append('(').append(key).append(": ").append(value).append(')').append(' '); + } + builder.append(']'); + if (!chunksDeque.isEmpty()) { + builder.append("->"); + } + } + return builder.toString(); + } + + /** + * Try to trim all beyond specified boundaries. + * All items that are less then startKey or greater/equals then endKey + * + * @param startKey + * @param endKey + */ + synchronized void trim(long startKey, long endKey) { + /* + * [3, 4, 5, 9] -> [10, 13, 14, 15] -> [21, 24, 29, 30] -> [31] :: start layout + * |5______________________________23| :: trim(5, 23) + * [5, 9] -> [10, 13, 14, 15] -> [21] :: result layout + */ + Chunk head = findChunkWhereKeyShouldBe(activeChunk, endKey); + activeChunk = head; + int newEndIndex = findFirstIndexOfGreaterEqualElements( + activeChunk.keys, activeChunk.startIndex, activeChunk.cursor, endKey + ); + activeChunk.cursor = newEndIndex; + + Chunk tail = findChunkWhereKeyShouldBe(head, startKey); + int newStartIndex = findFirstIndexOfGreaterEqualElements( + tail.keys, tail.startIndex, tail.cursor, startKey + ); + if (tail.startIndex != newStartIndex) { + tail.startIndex = newStartIndex; + tail.chunkSize = tail.cursor - tail.startIndex; + tail.tailChunk = null; + } + } + + /** + * Clear all in specified boundaries. + * Remove all items between startKey(inclusive) and endKey(exclusive) + * + * @param startKey + * @param endKey + */ + synchronized void clear(long startKey, long endKey) { + /* + * [3, 4, 5, 9] -> [10, 13, 14, 15] -> [21, 24, 29, 30] -> [31] :: start layout + * |5______________________________23| :: clear(5, 23) + * [3, 4] -> [24, 29, 30] -> [31] :: result layout + */ + Chunk tail = findChunkWhereKeyShouldBe(activeChunk, endKey); + Chunk gapStartChunk = splitChunkOnTwoSeparateChunks(tail, endKey); + if (gapStartChunk == null) { + return; + } + // now we should skip specified gap [startKey, endKey] + // and concatenate our tail with new head four after gap + Chunk afterGapHead = findChunkWhereKeyShouldBe(gapStartChunk, startKey); + if (afterGapHead == null) { + return; + } + + int newEndIndex = findFirstIndexOfGreaterEqualElements( + afterGapHead.keys, afterGapHead.startIndex, afterGapHead.cursor, startKey + ); + if (newEndIndex == afterGapHead.startIndex) { + tail.tailChunk = null; + return; + } + if (afterGapHead.cursor != newEndIndex) { + afterGapHead.cursor = newEndIndex; + afterGapHead.chunkSize = afterGapHead.cursor - afterGapHead.startIndex; + } + tail.tailChunk = afterGapHead; // concat + } + + synchronized void clear() { + activeChunk.tailChunk = null; + activeChunk.startIndex = 0; + activeChunk.chunkSize = activeChunk.keys.length; + activeChunk.cursor = 0; + } + + private Chunk splitChunkOnTwoSeparateChunks(Chunk chunk, long key) { + /* + * [1, 2, 3, 4, 5, 6, 7, 8] :: beforeSplit + * |s--------chunk-------e| + * + * splitChunkOnTwoSeparateChunks(chunk, 5) + * + * [1, 2, 3, 4, 5, 6, 7, 8] :: afterSplit + * |s--tail--e||s--head--e| + */ + int splitIndex = findFirstIndexOfGreaterEqualElements( + chunk.keys, chunk.startIndex, chunk.cursor, key + ); + if (splitIndex == chunk.startIndex || splitIndex == chunk.cursor) { + return chunk.tailChunk; + } + int newTailSize = splitIndex - chunk.startIndex; + Chunk newTail = new Chunk(chunk.keys, chunk.values, chunk.startIndex, splitIndex, newTailSize, chunk.tailChunk); + chunk.startIndex = splitIndex; + chunk.chunkSize = chunk.chunkSize - newTailSize; + chunk.tailChunk = newTail; + return newTail; + } + + private Chunk findChunkWhereKeyShouldBe(Chunk currentChunk, long key) { + while (true) { + if (isFirstElementIsEmptyOrGreaterEqualThanKey(currentChunk, key) && currentChunk.tailChunk != null) { + currentChunk = currentChunk.tailChunk; + continue; + } + break; + } + return currentChunk; + } + + private boolean isFirstElementIsEmptyOrGreaterEqualThanKey(Chunk chunk, long key) { + return chunk.cursor == chunk.startIndex + || chunk.keys[chunk.startIndex] >= key; + } + + + private int findFirstIndexOfGreaterEqualElements(long[] array, int startIndex, int endIndex, long minKey) { + if (endIndex == startIndex || array[startIndex] >= minKey) { + return startIndex; + } + int searchIndex = binarySearch(array, startIndex, endIndex, minKey); + int realIndex; + if (searchIndex < 0) { + realIndex = -(searchIndex + 1); + } else { + realIndex = searchIndex; + } + return realIndex; + } + + private static class Chunk { + + private final long[] keys; + private final long[] values; + + private int chunkSize; // can differ from keys.length after half clear() + private int startIndex = 0; + private int cursor = 0; + private Chunk tailChunk; + + private Chunk(int chunkSize) { + this.chunkSize = chunkSize; + this.tailChunk = null; + this.keys = new long[chunkSize]; + this.values = new long[chunkSize]; + } + + private Chunk(Chunk tailChunk, int size) { + this.chunkSize = size; + this.tailChunk = tailChunk; + this.keys = new long[chunkSize]; + this.values = new long[chunkSize]; + } + + private Chunk(final long[] keys, final long[] values, + final int startIndex, final int cursor, final int chunkSize, final Chunk tailChunk) { + this.keys = keys; + this.values = values; + this.startIndex = startIndex; + this.cursor = cursor; + this.chunkSize = chunkSize; + this.tailChunk = tailChunk; + } + + private void append(long key, long value) { + keys[cursor] = key; + values[cursor] = value; + cursor++; + } + } +} diff --git a/metrics-core/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoir.java b/metrics-core/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoir.java new file mode 100644 index 0000000000..cb47a44923 --- /dev/null +++ b/metrics-core/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoir.java @@ -0,0 +1,100 @@ +package com.codahale.metrics; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A {@link Reservoir} implementation backed by a sliding window that stores only the measurements made + * in the last {@code N} seconds (or other time unit). + */ +public class SlidingTimeWindowArrayReservoir implements Reservoir { + // allow for this many duplicate ticks before overwriting measurements + private static final long COLLISION_BUFFER = 256L; + // only trim on updating once every N + private static final long TRIM_THRESHOLD = 256L; + private static final long CLEAR_BUFFER = TimeUnit.HOURS.toNanos(1) * COLLISION_BUFFER; + + private final Clock clock; + private final ChunkedAssociativeLongArray measurements; + private final long window; + private final AtomicLong lastTick; + private final AtomicLong count; + private final long startTick; + + /** + * Creates a new {@link SlidingTimeWindowArrayReservoir} with the given window of time. + * + * @param window the window of time + * @param windowUnit the unit of {@code window} + */ + public SlidingTimeWindowArrayReservoir(long window, TimeUnit windowUnit) { + this(window, windowUnit, Clock.defaultClock()); + } + + /** + * Creates a new {@link SlidingTimeWindowArrayReservoir} with the given clock and window of time. + * + * @param window the window of time + * @param windowUnit the unit of {@code window} + * @param clock the {@link Clock} to use + */ + public SlidingTimeWindowArrayReservoir(long window, TimeUnit windowUnit, Clock clock) { + this.startTick = clock.getTick(); + this.clock = clock; + this.measurements = new ChunkedAssociativeLongArray(); + this.window = windowUnit.toNanos(window) * COLLISION_BUFFER; + this.lastTick = new AtomicLong((clock.getTick() - startTick) * COLLISION_BUFFER); + this.count = new AtomicLong(); + } + + @Override + public int size() { + trim(); + return measurements.size(); + } + + @Override + public void update(long value) { + long newTick; + do { + if (count.incrementAndGet() % TRIM_THRESHOLD == 0L) { + trim(); + } + long lastTick = this.lastTick.get(); + newTick = getTick(); + boolean longOverflow = newTick < lastTick; + if (longOverflow) { + measurements.clear(); + } + } while (!measurements.put(newTick, value)); + } + + @Override + public Snapshot getSnapshot() { + trim(); + return new UniformSnapshot(measurements.values()); + } + + private long getTick() { + for (; ; ) { + final long oldTick = lastTick.get(); + final long tick = (clock.getTick() - startTick) * COLLISION_BUFFER; + // ensure the tick is strictly incrementing even if there are duplicate ticks + final long newTick = tick - oldTick > 0L ? tick : oldTick + 1L; + if (lastTick.compareAndSet(oldTick, newTick)) { + return newTick; + } + } + } + + void trim() { + final long now = getTick(); + final long windowStart = now - window; + final long windowEnd = now + CLEAR_BUFFER; + if (windowStart < windowEnd) { + measurements.trim(windowStart, windowEnd); + } else { + measurements.clear(windowEnd, windowStart); + } + } +} diff --git a/metrics-core/src/test/java/com/codahale/metrics/ChunkedAssociativeLongArrayTest.java b/metrics-core/src/test/java/com/codahale/metrics/ChunkedAssociativeLongArrayTest.java new file mode 100644 index 0000000000..b83a7e1aa3 --- /dev/null +++ b/metrics-core/src/test/java/com/codahale/metrics/ChunkedAssociativeLongArrayTest.java @@ -0,0 +1,71 @@ +package com.codahale.metrics; + +import static org.assertj.core.api.BDDAssertions.then; + +import org.junit.Test; + +public class ChunkedAssociativeLongArrayTest { + + @Test + public void testClear() { + ChunkedAssociativeLongArray array = new ChunkedAssociativeLongArray(3); + array.put(-3, 3); + array.put(-2, 1); + array.put(0, 5); + array.put(3, 0); + array.put(9, 8); + array.put(15, 0); + array.put(19, 5); + array.put(21, 5); + array.put(34, -9); + array.put(109, 5); + + then(array.out()) + .isEqualTo("[(-3: 3) (-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(19: 5) (21: 5) (34: -9) ]->[(109: 5) ]"); + then(array.values()) + .isEqualTo(new long[]{3, 1, 5, 0, 8, 0, 5, 5, -9, 5}); + then(array.size()) + .isEqualTo(10); + + array.clear(-2, 20); + then(array.out()) + .isEqualTo("[(-3: 3) ]->[(21: 5) (34: -9) ]->[(109: 5) ]"); + then(array.values()) + .isEqualTo(new long[]{3, 5, -9, 5}); + then(array.size()) + .isEqualTo(4); + } + + + @Test + public void testTrim() { + ChunkedAssociativeLongArray array = new ChunkedAssociativeLongArray(3); + array.put(-3, 3); + array.put(-2, 1); + array.put(0, 5); + array.put(3, 0); + array.put(9, 8); + array.put(15, 0); + array.put(19, 5); + array.put(21, 5); + array.put(34, -9); + array.put(109, 5); + + then(array.out()) + .isEqualTo("[(-3: 3) (-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(19: 5) (21: 5) (34: -9) ]->[(109: 5) ]"); + then(array.values()) + .isEqualTo(new long[]{3, 1, 5, 0, 8, 0, 5, 5, -9, 5}); + then(array.size()) + .isEqualTo(10); + + array.trim(-2, 20); + + then(array.out()) + .isEqualTo("[(-2: 1) (0: 5) ]->[(3: 0) (9: 8) (15: 0) ]->[(19: 5) ]"); + then(array.values()) + .isEqualTo(new long[]{1, 5, 0, 8, 0, 5}); + then(array.size()) + .isEqualTo(6); + + } +} \ No newline at end of file diff --git a/metrics-core/src/test/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTest.java b/metrics-core/src/test/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTest.java new file mode 100644 index 0000000000..96368235c0 --- /dev/null +++ b/metrics-core/src/test/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTest.java @@ -0,0 +1,150 @@ +package com.codahale.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@SuppressWarnings("Duplicates") +public class SlidingTimeWindowArrayReservoirTest { + + @Test + public void storesMeasurementsWithDuplicateTicks() throws Exception { + final Clock clock = mock(Clock.class); + final SlidingTimeWindowArrayReservoir reservoir = new SlidingTimeWindowArrayReservoir(10, NANOSECONDS, clock); + + when(clock.getTick()).thenReturn(20L); + + reservoir.update(1); + reservoir.update(2); + + assertThat(reservoir.getSnapshot().getValues()) + .containsOnly(1, 2); + } + + @Test + public void boundsMeasurementsToATimeWindow() throws Exception { + final Clock clock = mock(Clock.class); + final SlidingTimeWindowArrayReservoir reservoir = new SlidingTimeWindowArrayReservoir(10, NANOSECONDS, clock); + + when(clock.getTick()).thenReturn(0L); + reservoir.update(1); + + when(clock.getTick()).thenReturn(5L); + reservoir.update(2); + + when(clock.getTick()).thenReturn(10L); + reservoir.update(3); + + when(clock.getTick()).thenReturn(15L); + reservoir.update(4); + + when(clock.getTick()).thenReturn(20L); + reservoir.update(5); + + assertThat(reservoir.getSnapshot().getValues()) + .containsOnly(4, 5); + } + + @Test + public void comparisonResultsTest() { + int cycles = 1000000; + long time = (Long.MAX_VALUE / 256) - (long) (cycles * 0.5); + ManualClock manualClock = new ManualClock(); + manualClock.addNanos(time); + int window = 300; + Random random = new Random(ThreadLocalRandom.current().nextInt()); + + SlidingTimeWindowReservoir treeReservoir = new SlidingTimeWindowReservoir(window, NANOSECONDS, manualClock); + SlidingTimeWindowArrayReservoir arrayReservoir = new SlidingTimeWindowArrayReservoir(window, NANOSECONDS, manualClock); + + for (int i = 0; i < cycles; i++) { + manualClock.addNanos(1); + treeReservoir.update(i); + arrayReservoir.update(i); + if (random.nextDouble() < 0.01) { + long[] treeValues = treeReservoir.getSnapshot().getValues(); + long[] arrValues = arrayReservoir.getSnapshot().getValues(); + assertThat(arrValues).isEqualTo(treeValues); + } + if (random.nextDouble() < 0.05) { + assertThat(arrayReservoir.size()).isEqualTo(treeReservoir.size()); + } + } + } + + @Test + public void testGetTickOverflow() { + final Random random = new Random(0); + final int window = 128; + AtomicLong counter = new AtomicLong(0L); + + // Note: 'threshold' defines the number of updates submitted to the reservoir after overflowing + for (int threshold : Arrays.asList(0, 1, 2, 127, 128, 129, 255, 256, 257)) { + + // Note: 'updatePerTick' defines the number of updates submitted to the reservoir between each tick + for (int updatesPerTick : Arrays.asList(1, 2, 127, 128, 129, 255, 256, 257)) { + //logger.info("Executing test: threshold={}, updatesPerTick={}", threshold, updatesPerTick); + + // Set the clock to overflow in (2*window+1)ns + final ManualClock clock = new ManualClock(); + clock.addNanos(Long.MAX_VALUE / 256 - 2 * window - clock.getTick()); + assertThat(clock.getTick() * 256).isGreaterThan(0); + + // Create the reservoir + final SlidingTimeWindowArrayReservoir reservoir = new SlidingTimeWindowArrayReservoir(window, NANOSECONDS, clock); + int updatesAfterThreshold = 0; + while (true) { + // Update the reservoir + for (int i = 0; i < updatesPerTick; i++) { + long l = counter.incrementAndGet(); + reservoir.update(l); + } + + // Randomly check the reservoir size + if (random.nextDouble() < 0.1) { + assertThat(reservoir.size()) + .as("Bad reservoir size with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick) + .isLessThanOrEqualTo(window * 256); + } + + // Update the clock + clock.addNanos(1); + + // If the clock has overflowed start counting updates + if ((clock.getTick() * 256) < 0) { + if (updatesAfterThreshold++ >= threshold) { + break; + } + } + } + + // Check the final reservoir size + assertThat(reservoir.size()) + .as("Bad final reservoir size with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick) + .isLessThanOrEqualTo(window * 256); + + // Advance the clock far enough to clear the reservoir. Note that here the window only loosely defines + // the reservoir window; when updatesPerTick is greater than 128 the sliding window will always be well + // ahead of the current clock time, and advances in getTick while in trim (called randomly above from + // size and every 256 updates). Until the clock "catches up" advancing the clock will have no effect on + // the reservoir, and reservoir.size() will merely move the window forward 1/256th of a ns - as such, an + // arbitrary increment of 1s here was used instead to advance the clock well beyond any updates recorded + // above. + clock.addSeconds(1); + + // The reservoir should now be empty + assertThat(reservoir.size()) + .as("Bad reservoir size after delay with: threshold=%d, updatesPerTick=%d", threshold, updatesPerTick) + .isEqualTo(0); + } + } + } +} \ No newline at end of file diff --git a/metrics-jcstress/README.md b/metrics-jcstress/README.md new file mode 100644 index 0000000000..0b9a360d3f --- /dev/null +++ b/metrics-jcstress/README.md @@ -0,0 +1,17 @@ +Concurrency test are based on [OpenJDK Java Concurrency Stress tests](https://wiki.openjdk.java.net/display/CodeTools/jcstress). + +### Command line launching + +Build tests jar with maven and run tests: +````bash +mvn clean install +java -jar target/jcstress.jar +```` + +Look at results report `results/index.html` + +### Command line options + +The whole list of command line options is available by: + + java -jar target/jcstress.jar diff --git a/metrics-jcstress/findbugs-exclude.xml b/metrics-jcstress/findbugs-exclude.xml new file mode 100644 index 0000000000..097bbe3e53 --- /dev/null +++ b/metrics-jcstress/findbugs-exclude.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/metrics-jcstress/pom.xml b/metrics-jcstress/pom.xml new file mode 100644 index 0000000000..286a63bad3 --- /dev/null +++ b/metrics-jcstress/pom.xml @@ -0,0 +1,117 @@ + + + 4.0.0 + + metrics-parent + io.dropwizard.metrics + 3.2.3-SNAPSHOT + + + metrics-jcstress + 3.2.3-SNAPSHOT + jar + + Metrics JCStress tests + + + + + 3.0 + + + + + io.dropwizard.metrics + metrics-core + ${project.version} + + + org.openjdk.jcstress + jcstress-core + ${jcstress.version} + + + + + UTF-8 + + + 0.1.1 + + + 1.8 + + + jcstress + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${javac.target} + ${javac.target} + ${javac.target} + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + main + package + + shade + + + ${uberjar.name} + + + org.openjdk.jcstress.Main + + + META-INF/TestList + + + + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + findbugs-exclude.xml + + + + + + diff --git a/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTrimReadTest.java b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTrimReadTest.java new file mode 100644 index 0000000000..3132265da0 --- /dev/null +++ b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirTrimReadTest.java @@ -0,0 +1,67 @@ +package com.codahale.metrics; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.StringResult1; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@JCStressTest +@Outcome( + id = "\\[240, 241, 242, 243, 244, 245, 246, 247, 248, 249\\]", + expect = Expect.ACCEPTABLE, + desc = "Actor1 made read before Actor2 even started" +) +@Outcome( + id = "\\[243, 244, 245, 246, 247, 248, 249\\]", + expect = Expect.ACCEPTABLE, + desc = "Actor2 made trim before Actor1 even started" +) +@Outcome( + id = "\\[244, 245, 246, 247, 248, 249\\]", + expect = Expect.ACCEPTABLE, + desc = "Actor1 made trim, then Actor2 started trim and made startIndex change, " + + "before Actor1 concurrent read." +) +@Outcome( + id = "\\[243, 244, 245, 246, 247, 248\\]", + expect = Expect.ACCEPTABLE, + desc = "Actor1 made trim, then Actor2 started trim, but not finished startIndex change, before Actor1 concurrent read." +) +@State +public class SlidingTimeWindowArrayReservoirTrimReadTest { + private final AtomicLong ticks = new AtomicLong(0); + private final SlidingTimeWindowArrayReservoir reservoir; + + public SlidingTimeWindowArrayReservoirTrimReadTest() { + reservoir = new SlidingTimeWindowArrayReservoir(10, TimeUnit.NANOSECONDS, new Clock() { + @Override + public long getTick() { + return ticks.get(); + } + }); + + for (int i = 0; i < 250; i++) { + ticks.set(i); + reservoir.update(i); + } + } + + @Actor + public void actor1(StringResult1 r) { + Snapshot snapshot = reservoir.getSnapshot(); + String stringValues = Arrays.toString(snapshot.getValues()); + r.r1 = stringValues; + } + + @Actor + public void actor2() { + ticks.set(253); + reservoir.trim(); + } +} \ No newline at end of file diff --git a/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadAllocate.java b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadAllocate.java new file mode 100644 index 0000000000..8b73a89a19 --- /dev/null +++ b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadAllocate.java @@ -0,0 +1,45 @@ +package com.codahale.metrics; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.StringResult1; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +@JCStressTest +@Outcome(id = "\\[1023, 1029, 1034\\]", expect = Expect.ACCEPTABLE) +@State +public class SlidingTimeWindowArrayReservoirWriteReadAllocate { + + private final SlidingTimeWindowArrayReservoir reservoir; + + public SlidingTimeWindowArrayReservoirWriteReadAllocate() { + reservoir = new SlidingTimeWindowArrayReservoir(500, TimeUnit.SECONDS); + for (int i = 0; i < 1024; i++) { + reservoir.update(i); + } + } + + @Actor + public void actor1() { + reservoir.update(1029L); + } + + @Actor + public void actor2() { + reservoir.update(1034L); + } + + @Arbiter + public void arbiter(StringResult1 r) { + Snapshot snapshot = reservoir.getSnapshot(); + long[] values = snapshot.getValues(); + String stringValues = Arrays.toString(Arrays.copyOfRange(values, values.length - 3, values.length)); + r.r1 = stringValues; + } +} \ No newline at end of file diff --git a/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadTest.java b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadTest.java new file mode 100644 index 0000000000..420770d73a --- /dev/null +++ b/metrics-jcstress/src/main/java/com/codahale/metrics/SlidingTimeWindowArrayReservoirWriteReadTest.java @@ -0,0 +1,45 @@ +package com.codahale.metrics; + +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Expect; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.StringResult1; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +@JCStressTest +@Outcome(id = "\\[\\]", expect = Expect.ACCEPTABLE) +@Outcome(id = "\\[31\\]", expect = Expect.ACCEPTABLE) +@Outcome(id = "\\[15\\]", expect = Expect.ACCEPTABLE) +@Outcome(id = "\\[31, 15\\]", expect = Expect.ACCEPTABLE) +@Outcome(id = "\\[15, 31\\]", expect = Expect.ACCEPTABLE) +@State +public class SlidingTimeWindowArrayReservoirWriteReadTest { + + private final SlidingTimeWindowArrayReservoir reservoir; + + public SlidingTimeWindowArrayReservoirWriteReadTest() { + reservoir = new SlidingTimeWindowArrayReservoir(1, TimeUnit.SECONDS); + } + + @Actor + public void actor1() { + reservoir.update(31L); + } + + @Actor + public void actor2() { + reservoir.update(15L); + } + + @Actor + public void actor3(StringResult1 r) { + Snapshot snapshot = reservoir.getSnapshot(); + String stringValues = Arrays.toString(snapshot.getValues()); + r.r1 = stringValues; + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6e1288a519..f4d44cc7e6 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,8 @@ metrics-logback metrics-servlet metrics-servlets - + metrics-jcstress + UTF-8