Resource
Resource models resource allocation and releasing. It is especially useful when multiple resources that depend on each other need to be acquired and later released in reverse order. The capability of installing resources is called ResourceScope, and Resource defines the value associating the acquisition
step, and the finalizer
. Resource allocates and releases resources in a safe way that co-operates with Structured Concurrency, and KotlinX Coroutines.
It is especially useful when multiple resources that depend on each other need to be acquired, and later released in reverse order. Or when you want to compose other suspend
functionality into resource safe programs, such as concurrency, parallelism or Arrow's Effect.
Creating a Resource value can be done using the resource function, and running a program using ResourceScope can be done using resourceScope, or use. Upon termination all finalizers are then guaranteed to run afterwards in reverse order of acquisition.
The following program is not-safe because it is prone to leak dataSource
and userProcessor
when an exception, or cancellation signal occurs whilst using the service.
class UserProcessor {
fun start(): Unit = println("Creating UserProcessor")
fun shutdown(): Unit = println("Shutting down UserProcessor")
}
class DataSource {
fun connect(): Unit = println("Connecting dataSource")
fun close(): Unit = println("Closed dataSource")
}
class Service(val db: DataSource, val userProcessor: UserProcessor) {
suspend fun processData(): List<String> = throw RuntimeException("I'm going to leak resources by not closing them")
}
suspend fun main(): Unit {
val userProcessor = UserProcessor().also { it.start() }
val dataSource = DataSource().also { it.connect() }
val service = Service(dataSource, userProcessor)
service.processData()
dataSource.close()
userProcessor.shutdown()
}
If we were using Kotlin JVM, we might've relied on Closeable
or AutoCloseable
and rewritten our code to:
suspend fun main(): Unit {
UserProcessor().use { userProcessor ->
userProcessor.start()
DataSource().use { dataSource ->
dataSource.connect()
Service(dataSource, userProcessor).processData()
}
}
}
However, while we fixed closing of UserProcessor
and DataSource
there are issues still with this code:
It requires implementing
Closeable
orAutoCloseable
, only possible for Kotlin JVM, not available for Kotlin MPPRequires implementing interface, or wrapping external types with i.e.
class CloseableOf<A>(val type: A): Closeable
.Requires nesting of different resources in callback tree, not composable.
Enforces
close
method name, renamedUserProcessor#shutdown
toclose
Cannot run suspend functions upon fun close(): Unit.
No exit signal, we don't know if we exited successfully, with an error or cancellation.
Resource solves of these issues. It defines 3 different steps:
Acquiring the resource of
A
.Using
A
.Releasing
A
with ExitCase.Completed, ExitCase.Failure or ExitCase.Cancelled.
We rewrite our previous example to Resource below by:
Define Resource for
UserProcessor
.Define Resource for
DataSource
, that also logs the ExitCase.Compose
UserProcessor
andDataSource
Resource together into a Resource forService
.
val userProcessor: Resource<UserProcessor> = resource({
UserProcessor().also { it.start() }
}) { p, _ -> p.shutdown() }
val dataSource: Resource<DataSource> = resource({
DataSource().also { it.connect() }
}) { ds, exitCase ->
println("Releasing $ds with exit: $exitCase")
withContext(Dispatchers.IO) { ds.close() }
}
val service: Resource<Service> = resource {
Service(dataSource.bind(), userProcessor.bind())
}
suspend fun main(): Unit = resourceScope {
val data = service.bind().processData()
println(data)
}
There is a lot going on in the snippet above, which we'll analyse in the sections below. Looking at the above example it should already give you some idea if the capabilities of Resource.
Resource constructors
Resource works entirely through a DSL, which allows installing a Resource
through the suspend fun <A> install(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): A
function.
acquire
is used to allocate the Resource
, and before returning the resource A
it also install the release
handler into the ResourceScope
.
We can use suspend fun
with Scope
as an extension function receiver to create synthetic constructors for our Resource
s. If you're using context receivers you can also use context(Scope)
instead.
suspend fun ResourceScope.userProcessor(): UserProcessor =
install({ UserProcessor().also { it.start() } }) { processor, _ ->
processor.shutdown()
}
We can of course also create lazy
representations of this by wrapping install
in resource and returning the suspend lambda
value instead.
val userProcessor: Resource<UserProcessor> = resource {
val x: UserProcessor = install(
{ UserProcessor().also { it.start() } },
{ processor, _ -> processor.shutdown() }
)
x
}
There is also a convenience operator for this pattern, but you might have preferred ResourceScope::userProcessor
instead since it yields the same result.
val userProcessor2: Resource<UserProcessor> = resource({
UserProcessor().also { it.start() }
}) { processor, _ -> processor.shutdown() }
val userProcessor3: Resource<UserProcessor> = ResourceScope::userProcessor
Scope DSL
The ResourceScope DSL allows you to install resources, and interact with them in a safe way.
Arrow offers the same elegant bind
DSL for Resource composition as you might be familiar with from Arrow Core. Which we've already seen above, in our first example. What is more interesting, is that we can also compose it with any other existing pattern from Arrow! Let's compose our UserProcessor
and DataSource
in parallel, so that their start
and connect
methods can run in parallel.
suspend fun ResourceScope.userProcessor(): UserProcessor =
install({ UserProcessor().also { it.start() } }){ p,_ -> p.shutdown() }
suspend fun ResourceScope.dataSource(): DataSource =
install({ DataSource().also { it.connect() } }) { ds, _ -> ds.close() }
suspend fun main(): Unit = resourceScope {
val service = parZip({ userProcessor() }, { dataSource() }) { userProcessor, ds ->
Service(ds, userProcessor)
}
val data = service.processData()
println(data)
}
Conclusion
Resource guarantee that their release finalizers are always invoked in the correct order when an exception is raised or the kotlinx.coroutines.Job is running gets canceled.
To achieve this Resource ensures that the acquire
&release
step are NonCancellable. If a cancellation signal, or an exception is received during acquire
, the resource is assumed to not have been acquired and thus will not trigger the release function. => Any composed resources that are already acquired they will be guaranteed to release as expected.
If you don't need a data-type like Resource but want a function alternative to try/catch/finally
with automatic error composition, and automatic NonCancellable acquire
and release
steps use bracketCase or bracket.