Play with Queue
Queue represents a concurrent queue

Queue

1
export interface ConcurrentQueue<A> {
2
readonly take: T.Effect<T.NoEnv, never, A>;
3
offer(a: A): T.Effect<T.NoEnv, never, void>;
4
}
5
6
/**
7
* Create an unbounded concurrent queue
8
*/
9
export function unboundedQueue<A>(): T.Effect<
10
T.NoEnv,
11
never,
12
ConcurrentQueue<A>
13
>
14
15
/**
16
* Create a bounded queue with the given capacity that drops older offers
17
* @param capacity
18
*/
19
export function slidingQueue<A>(
20
capacity: number
21
): T.Effect<T.NoEnv, never, ConcurrentQueue<A>>
22
23
/**
24
* Create a dropping queue with the given capacity that drops offers on full
25
* @param capacity
26
*/
27
export function droppingQueue<A>(
28
capacity: number
29
): T.Effect<T.NoEnv, never, ConcurrentQueue<A>>
30
31
/**
32
* Create a bounded queue that blocks offers on capacity
33
* @param capacity
34
*/
35
export function boundedQueue<A>(
36
capacity: number
37
): T.Effect<T.NoEnv, never, ConcurrentQueue<A>>
Copied!

Usage

1
import { effect as T, queue as Q, ref as R } from "@matechs/effect";
2
import { Do } from "fp-ts-contrib/lib/Do";
3
import { pipe } from "fp-ts/lib/pipeable";
4
5
const program: T.Effect<unknown, never, void> = Do(T.effect)
6
.bind("queue", Q.unboundedQueue<number>()) // create a queue
7
.bind("ref", R.makeRef(0)) // create a reference
8
.bindL("fiberOffer", ({ queue, ref }) =>
9
T.fork( // fork a new fiber that offer n every second
10
T.forever(
11
T.delay(
12
pipe(
13
ref.modify(n => [n, n + 1]),
14
T.chain(queue.offer)
15
),
16
1000
17
)
18
)
19
)
20
)
21
.bindL("fiberTake", ({ queue }) =>
22
T.fork( // fork a new fiber that process each n and logs
23
T.forever(
24
pipe(
25
queue.take,
26
T.chain(n =>
27
T.sync(() => {
28
console.log(`got: ${n}`);
29
})
30
)
31
)
32
)
33
)
34
)
35
.doL(({ fiberOffer, fiberTake }) =>
36
// interrupt producer fiber after 5 seconds
37
T.effect.chain(T.delay(fiberOffer.interrupt, 5000), _ =>
38
// interrupt consumer fiber after 1 second
39
T.delay(fiberTake.interrupt, 1000)
40
)
41
)
42
.return(_ => {}); // return void
43
44
T.run(program);
45
46
// prints:
47
// got: 0
48
// got: 1
49
// got: 2
50
// got: 3
Copied!
Last modified 1yr ago
Copy link
Contents
Queue
Usage