Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e16eb9d

Browse files
authoredOct 26, 2020
Update experimental declarations (Kotlin#2316)
* Gracefully increase deprecation level on Channel operators instead of removing them, a warning was not strict enough * Remove hidden onCompletion from -M release * Promote StateFlow and SharedFlow to stable API * Lift out experimentality where it is applicable * CoroutineDispatcher.invoke * ReceiveChannel.consume and ReceiveChannel.consumeEach * Flow core operators: onStart, onCompletion, onEmpty * CompletableDeferred.completeWith * awaitCancellation * Add experimentality notes where applicable
1 parent 92db4e1 commit e16eb9d

File tree

23 files changed

+222
-1669
lines changed

23 files changed

+222
-1669
lines changed
 

‎benchmarks/src/jmh/kotlin/benchmarks/ChannelSinkBenchmark.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,22 @@ open class ChannelSinkBenchmark {
5050
for (i in start until (start + count))
5151
send(i)
5252
}
53+
54+
// Migrated from deprecated operators, are good only for stressing channels
55+
56+
private fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
57+
GlobalScope.produce(context, onCompletion = { cancel() }) {
58+
for (e in this@filter) {
59+
if (predicate(e)) send(e)
60+
}
61+
}
62+
63+
private suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
64+
var accumulator = initial
65+
consumeEach {
66+
accumulator = operation(accumulator, it)
67+
}
68+
return accumulator
69+
}
5370
}
71+

‎kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
992992
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
993993
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
994994
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
995-
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
996995
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
997996
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
998997
public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

‎kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ public suspend fun <T> withContext(
175175
*
176176
* This inline function calls [withContext].
177177
*/
178-
@ExperimentalCoroutinesApi
179178
public suspend inline operator fun <T> CoroutineDispatcher.invoke(
180179
noinline block: suspend CoroutineScope.() -> T
181180
): T = withContext(this, block)

‎kotlinx-coroutines-core/common/src/CompletableDeferred.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public interface CompletableDeferred<T> : Deferred<T> {
5757
* This function transitions this deferred in the same ways described by [CompletableDeferred.complete] and
5858
* [CompletableDeferred.completeExceptionally].
5959
*/
60-
@ExperimentalCoroutinesApi // since 1.3.2, tentatively until 1.4.0
6160
public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean =
6261
result.fold({ complete(it) }, { completeExceptionally(it) })
6362

‎kotlinx-coroutines-core/common/src/CoroutineStart.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public enum class CoroutineStart {
5555
* Cancellability of coroutine at suspension points depends on the particular implementation details of
5656
* suspending functions as in [DEFAULT].
5757
*/
58-
@ExperimentalCoroutinesApi
58+
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
5959
ATOMIC,
6060

6161
/**
@@ -71,7 +71,7 @@ public enum class CoroutineStart {
7171
*
7272
* **Note: This is an experimental api.** Execution semantics of coroutines may change in the future when this mode is used.
7373
*/
74-
@ExperimentalCoroutinesApi
74+
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
7575
UNDISPATCHED;
7676

7777
/**

‎kotlinx-coroutines-core/common/src/Debug.common.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal expect fun assert(value: () -> Boolean)
2727
* Copy mechanism is used only on JVM, but it might be convenient to implement it in common exceptions,
2828
* so on JVM their stacktraces will be properly recovered.
2929
*/
30-
@ExperimentalCoroutinesApi
30+
@ExperimentalCoroutinesApi // Since 1.2.0, no ETA on stability
3131
public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable<T> {
3232

3333
/**

‎kotlinx-coroutines-core/common/src/Delay.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public interface Delay {
9595
* }
9696
* ```
9797
*/
98-
@ExperimentalCoroutinesApi
9998
public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
10099

101100
/**

‎kotlinx-coroutines-core/common/src/channels/Broadcast.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public fun <E> ReceiveChannel<E>.broadcast(
4747
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
4848
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
4949
// which passes all exceptions upstream to the source ReceiveChannel
50-
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
50+
return scope.broadcast(capacity = capacity, start = start, onCompletion = { cancelConsumed(it) }) {
5151
for (e in this@broadcast) {
5252
send(e)
5353
}

‎kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import kotlinx.coroutines.*
1616
* * [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
1717
* (so that buffer contents stay the same), do not suspend.
1818
*/
19-
@ExperimentalCoroutinesApi
2019
public enum class BufferOverflow {
2120
/**
2221
* Suspend on buffer overflow.

‎kotlinx-coroutines-core/common/src/channels/Channels.common.kt

Lines changed: 191 additions & 193 deletions
Large diffs are not rendered by default.

‎kotlinx-coroutines-core/common/src/flow/Builders.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
204204
@FlowPreview
205205
@Deprecated(
206206
message = "Use channelFlow with awaitClose { } instead of flowViaChannel and invokeOnClose { }.",
207-
level = DeprecationLevel.WARNING
208-
)
207+
level = DeprecationLevel.ERROR
208+
) // To be removed in 1.4.x
209209
@Suppress("DeprecatedCallableAddReplaceWith")
210210
public fun <T> flowViaChannel(
211211
bufferSize: Int = BUFFERED,

‎kotlinx-coroutines-core/common/src/flow/Migration.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,6 @@ public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): F
434434
message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library",
435435
replaceWith = ReplaceWith("runningReduce(operation)")
436436
)
437-
@ExperimentalCoroutinesApi
438437
public fun <T> Flow<T>.scanReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = runningReduce(operation)
439438

440439
@Deprecated(

‎kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ import kotlin.native.concurrent.*
108108
* might be added to this interface in the future, but is stable for use.
109109
* Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
110110
*/
111-
@ExperimentalCoroutinesApi
112111
public interface SharedFlow<out T> : Flow<T> {
113112
/**
114113
* A snapshot of the replay cache.
@@ -138,7 +137,6 @@ public interface SharedFlow<out T> : Flow<T> {
138137
* might be added to this interface in the future, but is stable for use.
139138
* Use the `MutableSharedFlow(...)` constructor function to create an implementation.
140139
*/
141-
@ExperimentalCoroutinesApi
142140
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
143141
/**
144142
* Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
@@ -202,7 +200,6 @@ public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
202200
* supported only when `replay > 0` or `extraBufferCapacity > 0`).
203201
*/
204202
@Suppress("FunctionName", "UNCHECKED_CAST")
205-
@ExperimentalCoroutinesApi
206203
public fun <T> MutableSharedFlow(
207204
replay: Int = 0,
208205
extraBufferCapacity: Int = 0,

‎kotlinx-coroutines-core/common/src/flow/SharingStarted.kt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import kotlin.time.*
1212
* A command emitted by [SharingStarted] implementations to control the sharing coroutine in
1313
* the [shareIn] and [stateIn] operators.
1414
*/
15-
@ExperimentalCoroutinesApi
1615
public enum class SharingCommand {
1716
/**
1817
* Starts sharing, launching collection of the upstream flow.
@@ -75,19 +74,16 @@ public enum class SharingCommand {
7574
* The completion of the `command` flow normally has no effect (the upstream flow keeps running if it was running).
7675
* The failure of the `command` flow cancels the sharing coroutine and the upstream flow.
7776
*/
78-
@ExperimentalCoroutinesApi
7977
public interface SharingStarted {
8078
public companion object {
8179
/**
8280
* Sharing is started immediately and never stops.
8381
*/
84-
@ExperimentalCoroutinesApi
8582
public val Eagerly: SharingStarted = StartedEagerly()
8683

8784
/**
8885
* Sharing is started when the first subscriber appears and never stops.
8986
*/
90-
@ExperimentalCoroutinesApi
9187
public val Lazily: SharingStarted = StartedLazily()
9288

9389
/**
@@ -108,7 +104,6 @@ public interface SharingStarted {
108104
* are negative.
109105
*/
110106
@Suppress("FunctionName")
111-
@ExperimentalCoroutinesApi
112107
public fun WhileSubscribed(
113108
stopTimeoutMillis: Long = 0,
114109
replayExpirationMillis: Long = Long.MAX_VALUE
@@ -143,7 +138,6 @@ public interface SharingStarted {
143138
*/
144139
@Suppress("FunctionName")
145140
@ExperimentalTime
146-
@ExperimentalCoroutinesApi
147141
public fun SharingStarted.Companion.WhileSubscribed(
148142
stopTimeout: Duration = Duration.ZERO,
149143
replayExpiration: Duration = Duration.INFINITE

‎kotlinx-coroutines-core/common/src/flow/StateFlow.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ import kotlin.native.concurrent.*
135135
* might be added to this interface in the future, but is stable for use.
136136
* Use the `MutableStateFlow(value)` constructor function to create an implementation.
137137
*/
138-
@ExperimentalCoroutinesApi
139138
public interface StateFlow<out T> : SharedFlow<T> {
140139
/**
141140
* The current value of this state flow.
@@ -156,7 +155,6 @@ public interface StateFlow<out T> : SharedFlow<T> {
156155
* might be added to this interface in the future, but is stable for use.
157156
* Use the `MutableStateFlow()` constructor function to create an implementation.
158157
*/
159-
@ExperimentalCoroutinesApi
160158
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
161159
/**
162160
* The current value of this state flow.
@@ -180,7 +178,6 @@ public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
180178
* Creates a [MutableStateFlow] with the given initial [value].
181179
*/
182180
@Suppress("FunctionName")
183-
@ExperimentalCoroutinesApi
184181
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
185182

186183
// ------------------------------------ Implementation ------------------------------------
@@ -380,4 +377,4 @@ internal fun <T> StateFlow<T>.fuseStateFlow(
380377
return this
381378
}
382379
return fuseSharedFlow(context, capacity, onBufferOverflow)
383-
}
380+
}

‎kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
7171
* .collect { println(it) } // prints Begin, a, b, c
7272
* ```
7373
*/
74-
@ExperimentalCoroutinesApi
7574
public fun <T> Flow<T>.onStart(
7675
action: suspend FlowCollector<T>.() -> Unit
7776
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
@@ -142,7 +141,6 @@ public fun <T> Flow<T>.onStart(
142141
* In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
143142
* Use [catch] if you need to suppress failure and replace it with emission of elements.
144143
*/
145-
@ExperimentalCoroutinesApi
146144
public fun <T> Flow<T>.onCompletion(
147145
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
148146
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
@@ -178,7 +176,6 @@ public fun <T> Flow<T>.onCompletion(
178176
* }.collect { println(it) } // prints 1, 2
179177
* ```
180178
*/
181-
@ExperimentalCoroutinesApi
182179
public fun <T> Flow<T>.onEmpty(
183180
action: suspend FlowCollector<T>.() -> Unit
184181
): Flow<T> = unsafeFlow {
@@ -203,12 +200,6 @@ private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?>
203200
}
204201
}
205202

206-
// It was only released in 1.3.0-M2, remove in 1.4.0
207-
/** @suppress */
208-
@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with a version w/o FlowCollector receiver")
209-
public fun <T> Flow<T>.onCompletion(action: suspend (cause: Throwable?) -> Unit): Flow<T> =
210-
onCompletion { action(it) }
211-
212203
private suspend fun <T> FlowCollector<T>.invokeSafely(
213204
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
214205
cause: Throwable?

‎kotlinx-coroutines-core/common/src/flow/operators/Share.kt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ import kotlin.jvm.*
132132
* @param started the strategy that controls when sharing is started and stopped.
133133
* @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
134134
*/
135-
@ExperimentalCoroutinesApi
136135
public fun <T> Flow<T>.shareIn(
137136
scope: CoroutineScope,
138137
started: SharingStarted,
@@ -297,7 +296,6 @@ private fun <T> CoroutineScope.launchSharing(
297296
* This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
298297
* with the `replayExpirationMillis` parameter.
299298
*/
300-
@ExperimentalCoroutinesApi
301299
public fun <T> Flow<T>.stateIn(
302300
scope: CoroutineScope,
303301
started: SharingStarted,
@@ -316,7 +314,6 @@ public fun <T> Flow<T>.stateIn(
316314
*
317315
* @param scope the coroutine scope in which sharing is started.
318316
*/
319-
@ExperimentalCoroutinesApi
320317
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
321318
val config = configureSharing(1)
322319
val result = CompletableDeferred<StateFlow<T>>()
@@ -353,14 +350,12 @@ private fun <T> CoroutineScope.launchSharingDeferred(
353350
/**
354351
* Represents this mutable shared flow as a read-only shared flow.
355352
*/
356-
@ExperimentalCoroutinesApi
357353
public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
358354
ReadonlySharedFlow(this)
359355

360356
/**
361357
* Represents this mutable state flow as a read-only state flow.
362358
*/
363-
@ExperimentalCoroutinesApi
364359
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
365360
ReadonlyStateFlow(this)
366361

@@ -391,7 +386,6 @@ private class ReadonlyStateFlow<T>(
391386
*
392387
* The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
393388
*/
394-
@ExperimentalCoroutinesApi
395389
public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
396390
SubscribedSharedFlow(this, action)
397391

‎kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt

Lines changed: 0 additions & 524 deletions
Large diffs are not rendered by default.

‎kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class ProduceTest : TestBase() {
163163
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
164164
val source = Channel<Int>()
165165
expect(1)
166-
val produced = produce<Int>(coroutineContext, onCompletion = source.consumes()) {
166+
val produced = produce<Int>(coroutineContext, onCompletion = { source.cancelConsumed(it) }) {
167167
expect(2)
168168
source.receive()
169169
}

‎kotlinx-coroutines-core/jvm/test/channels/ChannelsConsumeTest.kt

Lines changed: 0 additions & 908 deletions
This file was deleted.

‎kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ class ChannelsJvmTest : TestBase() {
1414
fun testBlocking() {
1515
val ch = Channel<Int>()
1616
val sum = GlobalScope.async {
17-
ch.sumBy { it }
17+
var sum = 0
18+
ch.consumeEach { sum += it }
19+
sum
1820
}
1921
repeat(10) {
2022
ch.sendBlocking(it)

‎reactive/kotlinx-coroutines-jdk9/src/Publish.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.reactivestreams.FlowAdapters
2828
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
2929
* to cancellation and error handling may change in the future.
3030
*/
31-
@ExperimentalCoroutinesApi
31+
@ExperimentalCoroutinesApi // Since 1.3.x
3232
public fun <T> flowPublish(
3333
context: CoroutineContext = EmptyCoroutineContext,
3434
@BuilderInference block: suspend ProducerScope<T>.() -> Unit

‎ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlinx.coroutines.flow.*
2424
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
2525
* [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
2626
*/
27-
@ExperimentalCoroutinesApi
27+
@ExperimentalCoroutinesApi // Since 1.3.x
2828
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
2929
val listener = ChangeListener<T> { _, _, newValue ->
3030
try {

0 commit comments

Comments
 (0)
Please sign in to comment.