Play with Streams
Potentially infinite effectful streams!

Stream

1
/**
2
* Stream represents a potentially infinite stream of effectful computations
3
* It is a more general structure in respect to the base effect
4
* that allows running computations in a rective manner
5
*/
6
export interface Stream<R, E, A> {}
7
8
/**
9
* Create a Stream from a source A action.
10
*
11
* The contract is that the acquisition of the resource should produce a Wave that may be repeatedly evaluated
12
* during the scope of the Managed
13
* If there is more data in the stream, the Wave should produce some(A) otherwise it should produce none.
14
* Once it produces none, it will not be evaluated again.
15
* @param r
16
*/
17
export function fromSource<R, E, A>(
18
r: Managed<R, E, T.Effect<R, E, Option<A>>>
19
): Stream<R, E, A>
20
21
/**
22
* Create a stream from an Array
23
* @param as
24
*/
25
export function fromArray<A>(as: readonly A[]): Stream<T.NoEnv, T.NoErr, A>
26
27
/**
28
* Create a stream from an iterator
29
* @param iter
30
*/
31
export function fromIterator<A>(
32
iter: Lazy<Iterator<A>>
33
): Stream<T.NoEnv, T.NoErr, A>
34
35
/**
36
* Create a stream that emits the elements in a range
37
* @param start
38
* @param interval
39
* @param end
40
*/
41
export function fromRange(
42
start: number,
43
interval?: number,
44
end?: number
45
): Stream<T.NoEnv, T.NoErr, number>
46
47
/**
48
* Create a stream from an existing iterator
49
* @param iter
50
*/
51
export function fromIteratorUnsafe<A>(
52
iter: Iterator<A>
53
): Stream<T.NoEnv, T.NoErr, A>
54
55
/**
56
* Create a stream that emits a single element
57
* @param a
58
*/
59
export function once<A>(a: A): Stream<T.NoEnv, T.NoErr, A>
60
61
/**
62
* Create a stream that emits As as fast as possible
63
*
64
* Be cautious when using this. If your entire pipeline is full of synchronous actions you can block the main
65
* thread until the stream runs to completion (or forever) using this
66
* @param a
67
*/
68
export function repeatedly<A>(a: A): Stream<T.NoEnv, T.NoErr, A>
69
70
export function periodically(ms: number): Stream<T.NoEnv, T.NoErr, number>
71
72
/**
73
* A stream that emits no elements an immediately terminates
74
*/
75
export const empty: Stream<
76
T.NoEnv,
77
T.NoErr,
78
never
79
>
80
81
/**
82
* Create a stream that evalutes w to emit a single element
83
* @param w
84
*/
85
export function encaseEffect<R, E, A>(w: T.Effect<R, E, A>): Stream<R, E, A>
86
87
/**
88
* Create a stream that immediately fails
89
* @param e
90
*/
91
export function raised<E>(e: E): Stream<T.NoEnv, E, never>
92
93
/**
94
* Create a stream that immediately aborts
95
* @param e
96
*/
97
export function aborted(e: unknown): Stream<T.NoEnv, T.NoErr, never>
98
99
/**
100
* Create a stream that immediately emits either 0 or 1 elements
101
* @param opt
102
*/
103
export function fromOption<A>(opt: Option<A>): Stream<T.NoEnv, T.NoErr, A>
104
105
/**
106
* Zip all stream elements with their index ordinals
107
* @param stream
108
*/
109
export function zipWithIndex<R, E, A>(
110
stream: Stream<R, E, A>
111
): Stream<R, E, readonly [A, number]>
112
113
/**
114
* Create a stream that emits all the elements of stream1 followed by all the elements of stream2
115
* @param stream1
116
* @param stream2
117
*/
118
export function concatL<R, E, A, R2, E2>(
119
stream1: Stream<R, E, A>,
120
stream2: Lazy<Stream<R2, E2, A>>
121
): Stream<R & R2, E | E2, A>
122
123
/**
124
* Strict form of concatL
125
* @param stream1
126
* @param stream2
127
*/
128
export function concat<R, E, A, R2, E2>(
129
stream1: Stream<R, E, A>,
130
stream2: Stream<R2, E2, A>
131
): Stream<R & R2, E | E2, A>
132
133
/**
134
* Creates a stream that repeatedly emits the elements of a stream forever.
135
*
136
* The elements are not cached, any effects required (i.e. opening files or sockets) are repeated for each cycle
137
* @param stream
138
*/
139
export function repeat<R, E, A>(stream: Stream<R, E, A>): Stream<R, E, A>
140
141
/**
142
* Map the elements of a stream
143
* @param stream
144
* @param f
145
*/
146
export function map<R, A, B>(
147
f: FunctionN<[A], B>
148
): <E>(stream: Stream<R, E, A>) => Stream<R, E, B>
149
150
/**
151
* Map every element emitted by stream to b
152
* @param stream
153
* @param b
154
*/
155
export function as<R, E, A, B>(stream: Stream<R, E, A>, b: B): Stream<R, E, B>
156
157
/**
158
* Filter the elements of a stream by a predicate
159
* @param stream
160
* @param f
161
*/
162
export function filter<R, E, A>(
163
stream: Stream<R, E, A>,
164
f: Predicate<A>
165
): Stream<R, E, A>
166
167
/**
168
* Curried form of filter
169
* @param f
170
*/
171
export function filterWith<A>(
172
f: Predicate<A>
173
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A>
174
175
// filter and refine
176
export function filterRefineWith<A, B extends A>(
177
f: Refinement<A, B>
178
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, B>
179
180
/**
181
* Filter the stream so that only items that are not equal to the previous item emitted are emitted
182
* @param eq
183
*/
184
export function distinctAdjacent<A>(
185
eq: Eq<A>
186
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A>
187
188
/**
189
* Fold the elements of this stream together using an effect.
190
*
191
* The resulting stream will emit 1 element produced by the effectful fold
192
* @param stream
193
* @param f
194
* @param seed
195
*/
196
export function foldM<R, E, A, R2, E2, B>(
197
stream: Stream<R, E, A>,
198
f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
199
seed: B
200
): Stream<R & R2, E | E2, B>
201
202
/**
203
* Fold the elements of a stream together purely
204
* @param stream
205
* @param f
206
* @param seed
207
*/
208
export function fold<R, E, A, B>(
209
stream: Stream<R, E, A>,
210
f: FunctionN<[B, A], B>,
211
seed: B
212
): Stream<R, E, B>
213
214
/**
215
* Scan across the elements the stream.
216
*
217
* This is like foldM but emits every intermediate seed value in the resulting stream.
218
* @param stream
219
* @param f
220
* @param seed
221
*/
222
export function scanM<R, E, A, B, R2, E2>(
223
stream: Stream<R, E, A>,
224
f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
225
seed: B
226
): Stream<R & R2, E | E2, B>
227
228
/**
229
* Purely scan a stream
230
* @param stream
231
* @param f
232
* @param seed
233
*/
234
export function scan<R, E, A, B>(
235
stream: Stream<R, E, A>,
236
f: FunctionN<[B, A], B>,
237
seed: B
238
): Stream<R, E, B>
239
240
/**
241
* Monadic chain on a stream
242
* @param stream
243
* @param f
244
*/
245
export function chain<A, R2, E2, B>(
246
f: FunctionN<[A], Stream<R2, E2, B>>
247
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B>
248
249
/**
250
* Flatten a stream of streams
251
* @param stream
252
*/
253
export function flatten<R, E, R2, E2, A>(
254
stream: Stream<R, E, Stream<R2, E2, A>>
255
): Stream<R & R2, E | E2, A>
256
257
/**
258
* Map each element of the stream effectfully
259
* @param stream
260
* @param f
261
*/
262
export function mapM<R, E, A, R2, E2, B>(
263
stream: Stream<R, E, A>,
264
f: FunctionN<[A], T.Effect<R2, E2, B>>
265
): Stream<R & R2, E | E2, B>
266
267
export function mapMWith<A, R2, E2, B>(
268
f: FunctionN<[A], T.Effect<R2, E2, B>>
269
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B>
270
271
/**
272
* A stream that emits no elements but never terminates.
273
*/
274
export const never: Stream<T.NoEnv, T.NoErr, never>
275
276
/**
277
* Transduce a stream via a sink.
278
*
279
* This repeatedly run a sink to completion on the elements of the input stream and emits the result of each run
280
* Leftovers from a previous run are fed to the next run
281
*
282
* @param stream
283
* @param sink
284
*/
285
export function transduce<R, E, A, R2, E2, S, B>(
286
stream: Stream<R, E, A>,
287
sink: Sink<R2, E2, S, A, B>
288
): Stream<R & R2, E | E2, B>
289
290
/**
291
* Drop some number of elements from a stream
292
*
293
* Their effects to be produced still occur in the background
294
* @param stream
295
* @param n
296
*/
297
export function drop<R, E, A>(
298
stream: Stream<R, E, A>,
299
n: number
300
): Stream<R, E, A>
301
302
/**
303
* Curried form of drop
304
* @param n
305
*/
306
export function dropWith(
307
n: number
308
): <R, E, A>(stream: Stream<R, E, A>) => Stream<R, E, A> {
309
return stream => drop(stream, n);
310
}
311
312
/**
313
* Take some number of elements of a stream
314
* @param stream
315
* @param n
316
*/
317
export function take<R, E, A>(
318
stream: Stream<R, E, A>,
319
n: number
320
): Stream<R, E, A>
321
322
/**
323
* Take elements of a stream while a predicate holds
324
* @param stream
325
* @param pred
326
*/
327
export function takeWhile<R, E, A>(
328
stream: Stream<R, E, A>,
329
pred: Predicate<A>
330
): Stream<R, E, A>
331
332
/**
333
* Push a stream into a sink to produce the sink's result
334
* @param stream
335
* @param sink
336
*/
337
export function into<R, E, A, R2, E2, S, B>(
338
stream: Stream<R, E, A>,
339
sink: Sink<R, E2, S, A, B>
340
): T.Effect<R & R2, E | E2, B>
341
342
/**
343
* Push a stream into a sink to produce the sink's result
344
* @param stream
345
* @param managedSink
346
*/
347
export function intoManaged<R, E, A, S, B>(
348
stream: Stream<R, E, A>,
349
managedSink: Managed<R, E, Sink<R, E, S, A, B>>
350
): T.Effect<R, E, B>
351
352
/**
353
* Push a stream in a sink to produce the result and the leftover
354
* @param stream
355
* @param sink
356
*/
357
export function intoLeftover<R, E, A, S, B>(
358
stream: Stream<R, E, A>,
359
sink: Sink<R, E, S, A, B>
360
): T.Effect<R, E, readonly [B, readonly A[]]>
361
362
/**
363
* Zip two streams together termitating when either stream is exhausted
364
* @param as
365
* @param bs
366
* @param f
367
*/
368
export function zipWith<R, E, A, R2, E2, B, C>(
369
as: Stream<R, E, A>,
370
bs: Stream<R2, E2, B>,
371
f: FunctionN<[A, B], C>
372
): Stream<R & R2, E | E2, C>
373
374
/**
375
* zipWith to form tuples
376
* @param as
377
* @param bs
378
*/
379
export function zip<R, E, A, R2, E2, B>(
380
as: Stream<R, E, A>,
381
bs: Stream<R2, E2, B>
382
): Stream<R & R2, E | E2, readonly [A, B]>
383
384
/**
385
* Feed a stream into a sink to produce a value.
386
*
387
* Emits the value and a 'remainder' stream that includes the rest of the elements of the input stream.
388
* @param stream
389
* @param sink
390
*/
391
export function peel<R, E, A, S, B>(
392
stream: Stream<R, E, A>,
393
sink: Sink<R, E, S, A, B>
394
): Stream<R, E, readonly [B, Stream<R, E, A>]>
395
396
export function peelManaged<R, E, A, S, B>(
397
stream: Stream<R, E, A>,
398
managedSink: Managed<R, E, Sink<R, E, S, A, B>>
399
): Stream<R, E, readonly [B, Stream<R, E, A>]>
400
401
/**
402
* Create a stream that switches to emitting elements of the most recent input stream.
403
* @param stream
404
*/
405
export function switchLatest<R, E, A>(
406
stream: Stream<R, E, Stream<R, E, A>>
407
): Stream<R, E, A>
408
409
/**
410
* Create a stream that switches to emitting the elements of the most recent stream produced by applying f to the
411
* element most recently emitted
412
* @param stream
413
* @param f
414
*/
415
export function chainSwitchLatest<R, E, A, R2, E2, B>(
416
stream: Stream<R, E, A>,
417
f: FunctionN<[A], Stream<R2, E2, B>>
418
): Stream<R & R2, E | E2, B>
419
420
/**
421
* Merge a stream of streams into a single stream.
422
*
423
* This stream will run up to maxActive streams concurrently to produce values into the output stream.
424
* @param stream the input stream
425
* @param maxActive the maximum number of streams to hold active at any given time
426
* this controls how much active streams are able to collectively produce in the face of a slow downstream consumer
427
*/
428
export function merge<R, E, A>(
429
stream: Stream<R, E, Stream<R, E, A>>,
430
maxActive: number
431
): Stream<R, E, A>
432
433
export function chainMerge<R, E, A, B>(
434
stream: Stream<R, E, A>,
435
f: FunctionN<[A], Stream<R, E, B>>,
436
maxActive: number
437
): Stream<R, E, B>
438
439
export function mergeAll<R, E, A>(
440
streams: Array<Stream<R, E, A>>
441
): Stream<R, E, A>
442
443
/**
444
* Drop elements of the stream while a predicate holds
445
* @param stream
446
* @param pred
447
*/
448
export function dropWhile<R, E, A>(
449
stream: Stream<R, E, A>,
450
pred: Predicate<A>
451
): Stream<R, E, A>
452
453
/**
454
* Collect all the elements emitted by a stream into an array.
455
* @param stream
456
*/
457
export function collectArray<R, E, A>(
458
stream: Stream<R, E, A>
459
): T.Effect<R, E, A[]>
460
461
/**
462
* Evaluate a stream for its effects
463
* @param stream
464
*/
465
export function drain<R, E, A>(stream: Stream<R, E, A>): T.Effect<R, E, void>
466
467
// instances
468
export const stream: Monad3E<URI> = {
469
URI,
470
map: map_,
471
of: <R, E, A>(a: A): Stream<R, E, A> => (once(a) as any) as Stream<R, E, A>,
472
ap: <R, R2, E, E2, A, B>(
473
sfab: Stream<R, E, FunctionN<[A], B>>,
474
sa: Stream<R2, E2, A>
475
) => zipWith(sfab, sa, (f, a) => f(a)),
476
chain: chain_
477
} as const;
478
479
// encase a node js stream object readable into a stream
480
export function fromObjectReadStream<A>(stream: Readable) {
481
return fromSource(getSourceFromObjectReadStream<A>(stream));
482
}
483
484
// encase a node js stream object readable into a stream (batched)
485
export function fromObjectReadStreamB<A>(
486
stream: ReadStream,
487
batch: number,
488
every: number
489
) {
490
return fromSource(getSourceFromObjectReadStreamB<A>(stream, batch, every));
491
}
Copied!

Sink

1
export interface Sink<R, E, S, A, B> {
2
readonly initial: T.Effect<R, E, SinkStep<A, S>>;
3
step: (state: S, next: A) => T.Effect<R, E, SinkStep<A, S>>;
4
extract: (step: S) => T.Effect<R, E, B>;
5
}
6
7
export interface SinkPure<S, A, B> {
8
readonly initial: SinkStep<A, S>;
9
step: (state: S, next: A) => SinkStep<A, S>;
10
extract: (state: S) => B;
11
}
12
13
/**
14
* Step a sink repeatedly.
15
* If the sink completes before consuming all of the input, then the done state will include the ops leftovers
16
* and anything left in the array
17
* @param sink
18
* @param s
19
* @param multi
20
*/
21
export function stepMany<R, E, S, A, B>(
22
sink: Sink<R, E, S, A, B>,
23
s: S,
24
multi: readonly A[]
25
): T.Effect<R, E, SinkStep<A, S>>
26
27
export function liftPureSink<S, A, B>(
28
sink: SinkPure<S, A, B>
29
): Sink<T.NoEnv, T.NoErr, S, A, B>
30
31
export function collectArraySink<R, E, A>(): Sink<R, E, A[], A, A[]>
32
33
export function drainSink<R, E, A>(): Sink<R, E, void, A, void>
34
35
/**
36
* A sink that consumes no input to produce a constant b
37
* @param b
38
*/
39
export function constSink<R, E, A, B>(b: B): Sink<R, E, void, A, B>
40
41
/**
42
* A sink that produces the head element of a stream (if any elements are emitted)
43
*/
44
export function headSink<R, E, A>(): Sink<R, E, Option<A>, A, Option<A>>
45
46
/**
47
* A sink that produces the last element of a stream (if any elements are emitted)
48
*/
49
export function lastSink<R, E, A>(): Sink<R, E, Option<A>, A, Option<A>>
50
51
/**
52
* A sink that evalutes an action for every element of a sink and produces no value
53
* @param f
54
*/
55
export function evalSink<R, E, A>(
56
f: FunctionN<[A], T.Effect<R, E, unknown>>
57
): Sink<R, E, void, A, void>
58
59
/**
60
* A sink that consumes elements for which a predicate does not hold.
61
*
62
* Returns the first element for which the predicate did hold if such an element is found.
63
* @param f
64
*/
65
export function drainWhileSink<R, E, A>(
66
f: Predicate<A>
67
): Sink<R, E, Option<A>, A, Option<A>>
68
69
/**
70
* A sink that offers elements into a concurrent queue
71
*
72
* @param queue
73
*/
74
export function queueSink<R, E, A>(
75
queue: ConcurrentQueue<A>
76
): Sink<R, E, void, A, void>
77
78
/**
79
* A sink that offers elements into a queue after wrapping them in an option.
80
*
81
* The sink will offer one final none into the queue when the stream terminates
82
* @param queue
83
*/
84
export function queueOptionSink<R, E, A>(
85
queue: ConcurrentQueue<Option<A>>
86
): Sink<R, E, void, A, void>
87
88
/**
89
* Map the output value of a sink
90
* @param sink
91
* @param f
92
*/
93
export function map<R, E, S, A, B, C>(
94
sink: Sink<R, E, S, A, B>,
95
f: FunctionN<[B], C>
96
): Sink<R, E, S, A, C>
Copied!

Usage

1
import { effect as T, stream as S } from "@matechs/effect";
2
import { pipe } from "fp-ts/lib/pipeable";
3
4
const program = pipe(
5
S.periodically(100),
6
S.chain(n =>
7
S.encaseEffect(
8
T.sync(() => {
9
console.log(`got: ${n}`);
10
11
return n + 1;
12
})
13
)
14
),
15
S.filterWith(n => n > 5)
16
);
17
18
T.runToPromise(S.collectArray(S.take(program, 10))).then(r => {
19
console.log(`elements: ${r.length}`);
20
});
21
22
const program2 = pipe(S.repeatedly(1), s =>
23
S.foldM(S.take(s, 100000), (acc, n) => T.sync(() => acc + n), 0)
24
);
25
26
T.runToPromise(S.collectArray(program2)).then(r => {
27
console.log(`fold: ${r[0]}`);
28
});
29
30
// prints
31
// fold: 100000
32
// got: 0
33
// got: 1
34
// got: 2
35
// got: 3
36
// got: 4
37
// got: 5
38
// got: 6
39
// got: 7
40
// got: 8
41
// got: 9
42
// got: 10
43
// got: 11
44
// got: 12
45
// got: 13
46
// got: 14
47
// elements: 10
Copied!
Note that errors are final if not handled in place, if you want a non final implementation you may use a Stream<R, never, Either<E,A>> where you can manage how error is propagated.
An implementation of StreamEither will be provided in work in progress and will be added to the core.
Last modified 1yr ago
Copy link