TQueue

data class TQueue<A>

A TQueue is a transactional unbounded queue which can be written to and read from concurrently.

The implementation uses two TVar's containing lists. One for read and one for write access. Due to the semantics of STM this means a write to the queue will never invalidate or block a read and vice versa, making highly concurrent use possible.

In practice, if the read variable is empty, the two must swap contents but this operation is infrequent and thus can be ignored.

Creating a TQueue

Creating an empty queue can be done by using either STM.newTQueue or TQueue.new depending on whether or not you are in a transaction or not.

Writing to the TQueue

Writing to the end of the queue is done by using STM.write:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
atomically {
tq.write(2)
// or alternatively
tq += 4
}
//sampleEnd
println("Items in queue ${atomically { tq.flush() }}")
}

It is also possible to write to the front of the queue, but since that accesses the read variable it can lead to worse overall performance:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
atomically {
tq.write(1)
tq.writeFront(2)
}
//sampleEnd
println("Items in queue ${atomically { tq.flush() }}")
}

Reading items from a TQueue

There are several different ways to read from a TQueue, the most common one being STM.read:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.write(2)
tq.read()
}
//sampleEnd
println("Result $result")
println("Items in queue ${atomically { tq.flush() }}")
}

Should the queue be empty calling STM.read will cause the transaction to retry and thus wait for items to be added to the queue. This can be avoided using STM.tryRead instead:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.tryRead()
}
//sampleEnd
println("Result $result")
println("Items in queue ${atomically { tq.flush() }}")
}

STM.read also removes the read item from the queue. Alternatively STM.peek will leave the queue unchanged on a read:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.write(2)

tq.peek()
}
//sampleEnd
println("Result $result")
println("Items in queue ${atomically { tq.flush() }}")
}

As with STM.read will retry should the queue be empty. The alternative STM.tryPeek is there to avoid that:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.tryPeek()
}
//sampleEnd
println("Result $result")
println("Items in queue ${atomically { tq.flush() }}")
}

It is also possible to read the entire list in one go using STM.flush:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.write(2)
tq.write(4)

tq.flush()
}
//sampleEnd
println("Result $result")
println("Items in queue ${atomically { tq.flush() }}")
}

Checking a queues size

Checking if a queue is empty can be done by using either STM.isEmpty or STM.isNotEmpty:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.isEmpty()
}
//sampleEnd
println("Result $result")
}

Retrieving the actual size of a list can be done using STM.size:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
val result = atomically {
tq.size()
}
//sampleEnd
println("Result $result")
}

All three of these methods have to access both the write and read end of a TQueue and thus can increase contention. Use them sparingly!

Removing elements from a TQueue

It is also possible to remove elements from a TQueue using STM.removeAll:

import arrow.fx.stm.TQueue
import arrow.fx.stm.atomically

suspend fun main() {
//sampleStart
val tq = TQueue.new<Int>()
atomically {
tq.write(0)
tq.removeAll { it != 0 }
}
//sampleEnd
println("Items in queue ${atomically { tq.flush() }}")
}

This method also access both ends of the queue and thus should be used infrequently to avoid contention.

Types

Link copied to clipboard
object Companion