Play with Streams
Potentially infinite effectful streams!

Stream

1
/**
2
* Stream represents a potentially infinite stream of effectful computations
3
* It is a more general structure in respect to the base effect
4
* that allows running computations in a rective manner
5
*/
6
export interface Stream<R, E, A> {}
7
8
/**
9
* Create a Stream from a source A action.
10
*
11
* The contract is that the acquisition of the resource should produce a Wave that may be repeatedly evaluated
12
* during the scope of the Managed
13
* If there is more data in the stream, the Wave should produce some(A) otherwise it should produce none.
14
* Once it produces none, it will not be evaluated again.
15
* @param r
16
*/
17
export function fromSource<R, E, A>(
18
r: Managed<R, E, T.Effect<R, E, Option<A>>>
19
): Stream<R, E, A>
20
21
/**
22
* Create a stream from an Array
23
* @param as
24
*/
25
export function fromArray<A>(as: readonly A[]): Stream<T.NoEnv, T.NoErr, A>
26
27
/**
28
* Create a stream from an iterator
29
* @param iter
30
*/
31
export function fromIterator<A>(
32
iter: Lazy<Iterator<A>>
33
): Stream<T.NoEnv, T.NoErr, A>
34
35
/**
36
* Create a stream that emits the elements in a range
37
* @param start
38
* @param interval
39
* @param end
40
*/
41
export function fromRange(
42
start: number,
43
interval?: number,
44
end?: number
45
): Stream<T.NoEnv, T.NoErr, number>
46
47
/**
48
* Create a stream from an existing iterator
49
* @param iter
50
*/
51
export function fromIteratorUnsafe<A>(
52
iter: Iterator<A>
53
): Stream<T.NoEnv, T.NoErr, A>
54
55
/**
56
* Create a stream that emits a single element
57
* @param a
58
*/
59
export function once<A>(a: A): Stream<T.NoEnv, T.NoErr, A>
60
61
/**
62
* Create a stream that emits As as fast as possible
63
*
64
* Be cautious when using this. If your entire pipeline is full of synchronous actions you can block the main
65
* thread until the stream runs to completion (or forever) using this
66
* @param a
67
*/
68
export function repeatedly<A>(a: A): Stream<T.NoEnv, T.NoErr, A>
69
70
export function periodically(ms: number): Stream<T.NoEnv, T.NoErr, number>
71
72
/**
73
* A stream that emits no elements an immediately terminates
74
*/
75
export const empty: Stream<
76
T.NoEnv,
77
T.NoErr,
78
never
79
>
80
81
/**
82
* Create a stream that evalutes w to emit a single element
83
* @param w
84
*/
85
export function encaseEffect<R, E, A>(w: T.Effect<R, E, A>): Stream<R, E, A>
86
87
/**
88
* Create a stream that immediately fails
89
* @param e
90
*/
91
export function raised<E>(e: E): Stream<T.NoEnv, E, never>
92
93
/**
94
* Create a stream that immediately aborts
95
* @param e
96
*/
97
export function aborted(e: unknown): Stream<T.NoEnv, T.NoErr, never>
98
99
/**
100
* Create a stream that immediately emits either 0 or 1 elements
101
* @param opt
102
*/
103
export function fromOption<A>(opt: Option<A>): Stream<T.NoEnv, T.NoErr, A>
104
105
/**
106
* Zip all stream elements with their index ordinals
107
* @param stream
108
*/
109
export function zipWithIndex<R, E, A>(
110
stream: Stream<R, E, A>
111
): Stream<R, E, readonly [A, number]>
112
113
/**
114
* Create a stream that emits all the elements of stream1 followed by all the elements of stream2
115
* @param stream1
116
* @param stream2
117
*/
118
export function concatL<R, E, A, R2, E2>(
119
stream1: Stream<R, E, A>,
120
stream2: Lazy<Stream<R2, E2, A>>
121
): Stream<R & R2, E | E2, A>
122
123
/**
124
* Strict form of concatL
125
* @param stream1
126
* @param stream2
127
*/
128
export function concat<R, E, A, R2, E2>(
129
stream1: Stream<R, E, A>,
130
stream2: Stream<R2, E2, A>
131
): Stream<R & R2, E | E2, A>
132
133
/**
134
* Creates a stream that repeatedly emits the elements of a stream forever.
135
*
136
* The elements are not cached, any effects required (i.e. opening files or sockets) are repeated for each cycle
137
* @param stream
138
*/
139
export function repeat<R, E, A>(stream: Stream<R, E, A>): Stream<R, E, A>
140
141
/**
142
* Map the elements of a stream
143
* @param stream
144
* @param f
145
*/
146
export function map<R, A, B>(
147
f: FunctionN<[A], B>
148
): <E>(stream: Stream<R, E, A>) => Stream<R, E, B>
149
150
/**
151
* Map every element emitted by stream to b
152
* @param stream
153
* @param b
154
*/
155
export function as<R, E, A, B>(stream: Stream<R, E, A>, b: B): Stream<R, E, B>
156
157
/**
158
* Filter the elements of a stream by a predicate
159
* @param stream
160
* @param f
161
*/
162
export function filter<R, E, A>(
163
stream: Stream<R, E, A>,
164
f: Predicate<A>
165
): Stream<R, E, A>
166
167
/**
168
* Curried form of filter
169
* @param f
170
*/
171
export function filterWith<A>(
172
f: Predicate<A>
173
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A>
174
175
// filter and refine
176
export function filterRefineWith<A, B extends A>(
177
f: Refinement<A, B>
178
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, B>
179
180
/**
181
* Filter the stream so that only items that are not equal to the previous item emitted are emitted
182
* @param eq
183
*/
184
export function distinctAdjacent<A>(
185
eq: Eq<A>
186
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A>
187
188
/**
189
* Fold the elements of this stream together using an effect.
190
*
191
* The resulting stream will emit 1 element produced by the effectful fold
192
* @param stream
193
* @param f
194
* @param seed
195
*/
196
export function foldM<R, E, A, R2, E2, B>(
197
stream: Stream<R, E, A>,
198
f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
199
seed: B
200
): Stream<R & R2, E | E2, B>
201
202
/**
203
* Fold the elements of a stream together purely
204
* @param stream
205
* @param f
206
* @param seed
207
*/
208
export function fold<R, E, A, B>(
209
stream: Stream<R, E, A>,
210
f: FunctionN<[B, A], B>,
211
seed: B
212
): Stream<R, E, B>
213
214
/**
215
* Scan across the elements the stream.
216
*
217
* This is like foldM but emits every intermediate seed value in the resulting stream.
218
* @param stream
219
* @param f
220
* @param seed
221
*/
222
export function scanM<R, E, A, B, R2, E2>(
223
stream: Stream<R, E, A>,
224
f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
225
seed: B
226
): Stream<R & R2, E | E2, B>
227
228
/**
229
* Purely scan a stream
230
* @param stream
231
* @param f
232
* @param seed
233
*/
234
export function scan<R, E, A, B>(
235
stream: Stream<R, E, A>,
236
f: FunctionN<[B, A], B>,
237
seed: B
238
): Stream<R, E, B>
239
240
/**
241
* Monadic chain on a stream
242
* @param stream
243
* @param f
244
*/
245
export function chain<A, R2, E2, B>(
246
f: FunctionN<[A], Stream<R2, E2, B>>
247
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B>
248
249
/**
250
* Flatten a stream of streams
251
* @param stream
252
*/
253
export function flatten<R, E, R2, E2, A>(
254
stream: Stream<R, E, Stream<R2, E2, A>>
255
): Stream<R & R2, E | E2, A>
256
257
/**
258
* Map each element of the stream effectfully
259
* @param stream
260
* @param f
261
*/
262
export function mapM<R, E, A, R2, E2, B>(
263
stream: Stream<R, E, A>,
264
f: FunctionN<[A], T.Effect<R2, E2, B>>
265
): Stream<R & R2, E | E2, B>
266
267
export function mapMWith<A, R2, E2, B>(
268
f: FunctionN<[A], T.Effect<R2, E2, B>>
269
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B>
270
271
/**
272
* A stream that emits no elements but never terminates.
273
*/
274
export const never: Stream<T.NoEnv, T.NoErr, never>
275
276
/**
277
* Transduce a stream via a sink.
278
*
279
* This repeatedly run a sink to completion on the elements of the input stream and emits the result of each run
280
* Leftovers from a previous run are fed to the next run
281
*
282
* @param stream
283
* @param sink
284
*/
285
export function transduce<R, E, A, R2, E2, S, B>(
286
stream: Stream<R, E, A>,
287
sink: Sink<R2, E2, S, A, B>
288
): Stream<R & R2, E | E2, B>
289
290
/**
291
* Drop some number of elements from a stream
292
*
293
* Their effects to be produced still occur in the background
294
* @param stream
295
* @param n
296
*/
297
export function drop<R, E, A>(
298
stream: Stream<R, E, A>,
299
n: number
300
): Stream<R, E, A>
301
302
/**
303
* Curried form of drop
304
* @param n
305
*/
306
export function dropWith(
307
n: number
308
): <R, E, A>(stream: Stream<R, E, A>) => Stream<R, E, A> {
309
return stream => drop(stream, n);
310
}
311
312
/**
313
* Take some number of elements of a stream
314
* @param stream
315
* @param n
316
*/
317
export function take<R, E, A>(
318
stream: Stream<R, E, A>,
319
n: number
320
): Stream<R, E, A>
321
322
/**
323
* Take elements of a stream while a predicate holds
324
* @param stream
325
* @param pred
326
*/
327
export function takeWhile<R, E, A>(
328
stream: Stream<R, E, A>,
329
pred: Predicate<A>
330
): Stream<R, E, A>
331
332
/**
333
* Push a stream into a sink to produce the sink's result
334
* @param stream
335
* @param sink
336
*/
337
export function into<R, E, A, R2, E2, S, B>(
338
stream: Stream<R, E, A>,
339
sink: Sink<R, E2, S, A, B>
340
): T.Effect<R & R2, E | E2, B>
341
342
/**
343
* Push a stream into a sink to produce the sink's result
344
* @param stream
345
* @param managedSink
346
*/
347
export function intoManaged<R, E, A, S, B>(
348
stream: Stream<R, E, A>,
349
managedSink: Managed<R, E, Sink<R, E, S, A, B>>
350
): T.Effect<R, E, B>
351
352
/**
353
* Push a stream in a sink to produce the result and the leftover
354
* @param stream
355
* @param sink
356
*/
357
export function intoLeftover<R, E, A, S, B>(
358
stream: Stream<R, E, A>,
359
sink: Sink<R, E, S, A, B>
360
): T.Effect<R, E, readonly [B, readonly A[]]>
361
362
/**
363
* Zip two streams together termitating when either stream is exhausted
364
* @param as
365
* @param bs
366
* @param f
367
*/
368
export function zipWith<R, E, A, R2, E2, B, C>(
369
as: Stream<R, E, A>,
370
bs: Stream<R2, E2, B>,
371
f: FunctionN<[A, B], C>
372
): Stream<R & R2, E | E2, C>
373
374
/**
375
* zipWith to form tuples
376
* @param as
377
* @param bs
378
*/
379
export function zip<R, E, A, R2, E2, B>(
380
as: Stream<R, E, A>,
381
bs: Stream<R2, E2, B>
382
): Stream<R & R2, E | E2, readonly [A, B]>
383
384
/**
385
* Feed a stream into a sink to produce a value.
386
*
387
* Emits the value and a 'remainder' stream that includes the rest of the elements of the input stream.
388
* @param stream
389
* @param sink
390
*/
391
export function peel<R, E, A, S, B>(
392
stream: Stream<R, E, A>,
393
sink: Sink<R, E, S, A, B>
394
): Stream<R, E, readonly [B, Stream<R, E, A>]>
395
396
export function peelManaged<R, E, A, S, B>(
397
stream: Stream<R, E, A>,
398
managedSink: Managed<R, E, Sink<R, E, S, A, B>>
399
): Stream<R, E, readonly [B, Stream<R, E, A>]>
400
401
/**
402
* Create a stream that switches to emitting elements of the most recent input stream.
403
* @param stream
404
*/
405
export function switchLatest<R, E, A>(
406
stream: Stream<R, E, Stream<R, E, A>>
407
): Stream<R, E, A>
408
409
/**
410
* Create a stream that switches to emitting the elements of the most recent stream produced by applying f to the
411
* element most recently emitted
412
* @param stream
413
* @param f
414
*/
415
export function chainSwitchLatest<R, E, A, R2, E2, B>(
416
stream: Stream<R, E, A>,
417
f: FunctionN<[A], Stream<R2, E2, B>>
418
): Stream<R & R2, E | E2, B>
419
420
/**
421
* Merge a stream of streams into a single stream.
422
*
423
* This stream will run up to maxActive streams concurrently to produce values into the output stream.
424
* @param stream the input stream
425
* @param maxActive the maximum number of streams to hold active at any given time
426
* this controls how much active streams are able to collectively produce in the face of a slow downstream consumer
427
*/
428
export function merge<R, E, A>(
429
stream: Stream<R, E, Stream<R, E, A>>,
430
maxActive: number
431
): Stream<R, E, A>
432
433
export function chainMerge<R, E, A, B>(
434
stream: Stream<R, E, A>,
435
f: FunctionN<[A], Stream<R, E, B>>,
436
maxActive: number
437
): Stream<R, E, B>
438
439
export function mergeAll<R, E, A>(
440
streams: Array<Stream<R, E, A>>
441
): Stream<R, E, A>
442
443
/**
444
* Drop elements of the stream while a predicate holds
445
* @param stream
446
* @param pred
447
*/
448
export function dropWhile<R, E, A>(
449
stream: Stream<R, E, A>,
450
pred: Predicate<A>
451
): Stream<R, E, A>
452
453
/**
454
* Collect all the elements emitted by a stream into an array.
455
* @param stream
456
*/
457
export function collectArray<R, E, A>(
458
stream: Stream<R, E, A>
459
): T.Effect<R, E, A[]>
460
461
/**
462
* Evaluate a stream for its effects
463
* @param stream
464
*/
465
export function drain<R, E, A>(stream: Stream<R, E, A>): T.Effect<R, E, void>
466
467
// instances
468
export const stream: Monad3E<URI> = {
469
URI,
470
map: map_,
471
of: <R, E, A>(a: A): Stream<R, E, A> => (once(a) as any) as Stream<R, E, A>,
472
ap: <R, R2, E, E2, A, B>(
473
sfab: Stream<R, E, FunctionN<[A], B>>,
474
sa: Stream<R2, E2, A>
475
) => zipWith(sfab, sa, (f, a) => f(a)),
476
chain: chain_
477
} as const;
478
479
// encase a node js stream object readable into a stream
480
export function fromObjectReadStream<A>(stream: Readable) {
481
return fromSource(getSourceFromObjectReadStream<A>(stream));
482
}
483
484
// encase a node js stream object readable into a stream (batched)
485
export function fromObjectReadStreamB<A>(
486
stream: ReadStream,
487
batch: number,
488
every: number
489
) {
490
return fromSource(getSourceFromObjectReadStreamB<A>(stream, batch, every));
491
}
Copied!

Sink

1
export interface Sink<R, E, S, A, B> {
2
readonly initial: T.Effect<R, E, SinkStep<A, S>>;
3
step: (state: S, next: A) => T.Effect<R, E, SinkStep<A, S>>;
4
extract: (step: S) => T.Effect<R, E, B>;
5
}
6
7
export interface SinkPure<S, A, B> {
8
readonly initial: SinkStep<A, S>;
9
step: (state: S, next: