Play with Streams

Potentially infinite effectful streams!

Stream

/**
 * Stream represents a potentially infinite stream of effectful computations
 * It is a more general structure in respect to the base effect
 * that allows running computations in a rective manner
 */
export interface Stream<R, E, A> {}

/**
 * Create a Stream from a source A action.
 *
 * The contract is that the acquisition of the resource should produce a Wave that may be repeatedly evaluated
 * during the scope of the Managed
 * If there is more data in the stream, the Wave should produce some(A) otherwise it should produce none.
 * Once it produces none, it will not be evaluated again.
 * @param r
 */
export function fromSource<R, E, A>(
  r: Managed<R, E, T.Effect<R, E, Option<A>>>
): Stream<R, E, A> 

/**
 * Create a stream from an Array
 * @param as
 */
export function fromArray<A>(as: readonly A[]): Stream<T.NoEnv, T.NoErr, A> 

/**
 * Create a stream from an iterator
 * @param iter
 */
export function fromIterator<A>(
  iter: Lazy<Iterator<A>>
): Stream<T.NoEnv, T.NoErr, A> 

/**
 * Create a stream that emits the elements in a range
 * @param start
 * @param interval
 * @param end
 */
export function fromRange(
  start: number,
  interval?: number,
  end?: number
): Stream<T.NoEnv, T.NoErr, number>

/**
 * Create a stream from an existing iterator
 * @param iter
 */
export function fromIteratorUnsafe<A>(
  iter: Iterator<A>
): Stream<T.NoEnv, T.NoErr, A>

/**
 * Create a stream that emits a single element
 * @param a
 */
export function once<A>(a: A): Stream<T.NoEnv, T.NoErr, A> 

/**
 * Create a stream that emits As as fast as possible
 *
 * Be cautious when using this. If your entire pipeline is full of synchronous actions you can block the main
 * thread until the stream runs to completion (or forever) using this
 * @param a
 */
export function repeatedly<A>(a: A): Stream<T.NoEnv, T.NoErr, A> 

export function periodically(ms: number): Stream<T.NoEnv, T.NoErr, number> 

/**
 * A stream that emits no elements an immediately terminates
 */
export const empty: Stream<
  T.NoEnv,
  T.NoErr,
  never
> 

/**
 * Create a stream that evalutes w to emit a single element
 * @param w
 */
export function encaseEffect<R, E, A>(w: T.Effect<R, E, A>): Stream<R, E, A> 

/**
 * Create a stream that immediately fails
 * @param e
 */
export function raised<E>(e: E): Stream<T.NoEnv, E, never> 

/**
 * Create a stream that immediately aborts
 * @param e
 */
export function aborted(e: unknown): Stream<T.NoEnv, T.NoErr, never> 

/**
 * Create a stream that immediately emits either 0 or 1 elements
 * @param opt
 */
export function fromOption<A>(opt: Option<A>): Stream<T.NoEnv, T.NoErr, A> 

/**
 * Zip all stream elements with their index ordinals
 * @param stream
 */
export function zipWithIndex<R, E, A>(
  stream: Stream<R, E, A>
): Stream<R, E, readonly [A, number]> 

/**
 * Create a stream that emits all the elements of stream1 followed by all the elements of stream2
 * @param stream1
 * @param stream2
 */
export function concatL<R, E, A, R2, E2>(
  stream1: Stream<R, E, A>,
  stream2: Lazy<Stream<R2, E2, A>>
): Stream<R & R2, E | E2, A> 

/**
 * Strict form of concatL
 * @param stream1
 * @param stream2
 */
export function concat<R, E, A, R2, E2>(
  stream1: Stream<R, E, A>,
  stream2: Stream<R2, E2, A>
): Stream<R & R2, E | E2, A>

/**
 * Creates a stream that repeatedly emits the elements of a stream forever.
 *
 * The elements are not cached, any effects required (i.e. opening files or sockets) are repeated for each cycle
 * @param stream
 */
export function repeat<R, E, A>(stream: Stream<R, E, A>): Stream<R, E, A> 

/**
 * Map the elements of a stream
 * @param stream
 * @param f
 */
export function map<R, A, B>(
  f: FunctionN<[A], B>
): <E>(stream: Stream<R, E, A>) => Stream<R, E, B> 

/**
 * Map every element emitted by stream to b
 * @param stream
 * @param b
 */
export function as<R, E, A, B>(stream: Stream<R, E, A>, b: B): Stream<R, E, B> 

/**
 * Filter the elements of a stream by a predicate
 * @param stream
 * @param f
 */
export function filter<R, E, A>(
  stream: Stream<R, E, A>,
  f: Predicate<A>
): Stream<R, E, A>

/**
 * Curried form of filter
 * @param f
 */
export function filterWith<A>(
  f: Predicate<A>
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A> 

// filter and refine
export function filterRefineWith<A, B extends A>(
  f: Refinement<A, B>
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, B> 

/**
 * Filter the stream so that only items that are not equal to the previous item emitted are emitted
 * @param eq
 */
export function distinctAdjacent<A>(
  eq: Eq<A>
): <R, E>(stream: Stream<R, E, A>) => Stream<R, E, A> 

/**
 * Fold the elements of this stream together using an effect.
 *
 * The resulting stream will emit 1 element produced by the effectful fold
 * @param stream
 * @param f
 * @param seed
 */
export function foldM<R, E, A, R2, E2, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
  seed: B
): Stream<R & R2, E | E2, B> 

/**
 * Fold the elements of a stream together purely
 * @param stream
 * @param f
 * @param seed
 */
export function fold<R, E, A, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[B, A], B>,
  seed: B
): Stream<R, E, B> 

/**
 * Scan across the elements the stream.
 *
 * This is like foldM but emits every intermediate seed value in the resulting stream.
 * @param stream
 * @param f
 * @param seed
 */
export function scanM<R, E, A, B, R2, E2>(
  stream: Stream<R, E, A>,
  f: FunctionN<[B, A], T.Effect<R2, E2, B>>,
  seed: B
): Stream<R & R2, E | E2, B> 

/**
 * Purely scan a stream
 * @param stream
 * @param f
 * @param seed
 */
export function scan<R, E, A, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[B, A], B>,
  seed: B
): Stream<R, E, B>

/**
 * Monadic chain on a stream
 * @param stream
 * @param f
 */
export function chain<A, R2, E2, B>(
  f: FunctionN<[A], Stream<R2, E2, B>>
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B> 

/**
 * Flatten a stream of streams
 * @param stream
 */
export function flatten<R, E, R2, E2, A>(
  stream: Stream<R, E, Stream<R2, E2, A>>
): Stream<R & R2, E | E2, A> 

/**
 * Map each element of the stream effectfully
 * @param stream
 * @param f
 */
export function mapM<R, E, A, R2, E2, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[A], T.Effect<R2, E2, B>>
): Stream<R & R2, E | E2, B> 

export function mapMWith<A, R2, E2, B>(
  f: FunctionN<[A], T.Effect<R2, E2, B>>
): <R, E>(stream: Stream<R, E, A>) => Stream<R & R2, E | E2, B> 

/**
 * A stream that emits no elements but never terminates.
 */
export const never: Stream<T.NoEnv, T.NoErr, never> 

/**
 * Transduce a stream via a sink.
 *
 * This repeatedly run a sink to completion on the elements of the input stream and emits the result of each run
 * Leftovers from a previous run are fed to the next run
 *
 * @param stream
 * @param sink
 */
export function transduce<R, E, A, R2, E2, S, B>(
  stream: Stream<R, E, A>,
  sink: Sink<R2, E2, S, A, B>
): Stream<R & R2, E | E2, B> 

/**
 * Drop some number of elements from a stream
 *
 * Their effects to be produced still occur in the background
 * @param stream
 * @param n
 */
export function drop<R, E, A>(
  stream: Stream<R, E, A>,
  n: number
): Stream<R, E, A> 

/**
 * Curried form of drop
 * @param n
 */
export function dropWith(
  n: number
): <R, E, A>(stream: Stream<R, E, A>) => Stream<R, E, A> {
  return stream => drop(stream, n);
}

/**
 * Take some number of elements of a stream
 * @param stream
 * @param n
 */
export function take<R, E, A>(
  stream: Stream<R, E, A>,
  n: number
): Stream<R, E, A> 

/**
 * Take elements of a stream while a predicate holds
 * @param stream
 * @param pred
 */
export function takeWhile<R, E, A>(
  stream: Stream<R, E, A>,
  pred: Predicate<A>
): Stream<R, E, A> 

/**
 * Push a stream into a sink to produce the sink's result
 * @param stream
 * @param sink
 */
export function into<R, E, A, R2, E2, S, B>(
  stream: Stream<R, E, A>,
  sink: Sink<R, E2, S, A, B>
): T.Effect<R & R2, E | E2, B> 

/**
 * Push a stream into a sink to produce the sink's result
 * @param stream
 * @param managedSink
 */
export function intoManaged<R, E, A, S, B>(
  stream: Stream<R, E, A>,
  managedSink: Managed<R, E, Sink<R, E, S, A, B>>
): T.Effect<R, E, B> 

/**
 * Push a stream in a sink to produce the result and the leftover
 * @param stream
 * @param sink
 */
export function intoLeftover<R, E, A, S, B>(
  stream: Stream<R, E, A>,
  sink: Sink<R, E, S, A, B>
): T.Effect<R, E, readonly [B, readonly A[]]> 

/**
 * Zip two streams together termitating when either stream is exhausted
 * @param as
 * @param bs
 * @param f
 */
export function zipWith<R, E, A, R2, E2, B, C>(
  as: Stream<R, E, A>,
  bs: Stream<R2, E2, B>,
  f: FunctionN<[A, B], C>
): Stream<R & R2, E | E2, C> 

/**
 * zipWith to form tuples
 * @param as
 * @param bs
 */
export function zip<R, E, A, R2, E2, B>(
  as: Stream<R, E, A>,
  bs: Stream<R2, E2, B>
): Stream<R & R2, E | E2, readonly [A, B]> 

/**
 * Feed a stream into a sink to produce a value.
 *
 * Emits the value and a 'remainder' stream that includes the rest of the elements of the input stream.
 * @param stream
 * @param sink
 */
export function peel<R, E, A, S, B>(
  stream: Stream<R, E, A>,
  sink: Sink<R, E, S, A, B>
): Stream<R, E, readonly [B, Stream<R, E, A>]> 

export function peelManaged<R, E, A, S, B>(
  stream: Stream<R, E, A>,
  managedSink: Managed<R, E, Sink<R, E, S, A, B>>
): Stream<R, E, readonly [B, Stream<R, E, A>]>

/**
 * Create a stream that switches to emitting elements of the most recent input stream.
 * @param stream
 */
export function switchLatest<R, E, A>(
  stream: Stream<R, E, Stream<R, E, A>>
): Stream<R, E, A> 

/**
 * Create a stream that switches to emitting the elements of the most recent stream produced by applying f to the
 * element most recently emitted
 * @param stream
 * @param f
 */
export function chainSwitchLatest<R, E, A, R2, E2, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[A], Stream<R2, E2, B>>
): Stream<R & R2, E | E2, B> 

/**
 * Merge a stream of streams into a single stream.
 *
 * This stream will run up to maxActive streams concurrently to produce values into the output stream.
 * @param stream the input stream
 * @param maxActive the maximum number of streams to hold active at any given time
 * this controls how much active streams are able to collectively produce in the face of a slow downstream consumer
 */
export function merge<R, E, A>(
  stream: Stream<R, E, Stream<R, E, A>>,
  maxActive: number
): Stream<R, E, A> 

export function chainMerge<R, E, A, B>(
  stream: Stream<R, E, A>,
  f: FunctionN<[A], Stream<R, E, B>>,
  maxActive: number
): Stream<R, E, B> 

export function mergeAll<R, E, A>(
  streams: Array<Stream<R, E, A>>
): Stream<R, E, A> 

/**
 * Drop elements of the stream while a predicate holds
 * @param stream
 * @param pred
 */
export function dropWhile<R, E, A>(
  stream: Stream<R, E, A>,
  pred: Predicate<A>
): Stream<R, E, A> 

/**
 * Collect all the elements emitted by a stream into an array.
 * @param stream
 */
export function collectArray<R, E, A>(
  stream: Stream<R, E, A>
): T.Effect<R, E, A[]> 

/**
 * Evaluate a stream for its effects
 * @param stream
 */
export function drain<R, E, A>(stream: Stream<R, E, A>): T.Effect<R, E, void> 

// instances
export const stream: Monad3E<URI> = {
  URI,
  map: map_,
  of: <R, E, A>(a: A): Stream<R, E, A> => (once(a) as any) as Stream<R, E, A>,
  ap: <R, R2, E, E2, A, B>(
    sfab: Stream<R, E, FunctionN<[A], B>>,
    sa: Stream<R2, E2, A>
  ) => zipWith(sfab, sa, (f, a) => f(a)),
  chain: chain_
} as const;

// encase a node js stream object readable into a stream
export function fromObjectReadStream<A>(stream: Readable) {
  return fromSource(getSourceFromObjectReadStream<A>(stream));
}

// encase a node js stream object readable into a stream (batched)
export function fromObjectReadStreamB<A>(
  stream: ReadStream,
  batch: number,
  every: number
) {
  return fromSource(getSourceFromObjectReadStreamB<A>(stream, batch, every));
}

Sink

export interface Sink<R, E, S, A, B> {
  readonly initial: T.Effect<R, E, SinkStep<A, S>>;
  step: (state: S, next: A) => T.Effect<R, E, SinkStep<A, S>>;
  extract: (step: S) => T.Effect<R, E, B>;
}

export interface SinkPure<S, A, B> {
  readonly initial: SinkStep<A, S>;
  step: (state: S, next: A) => SinkStep<A, S>;
  extract: (state: S) => B;
}

/**
 * Step a sink repeatedly.
 * If the sink completes before consuming all of the input, then the done state will include the ops leftovers
 * and anything left in the array
 * @param sink
 * @param s
 * @param multi
 */
export function stepMany<R, E, S, A, B>(
  sink: Sink<R, E, S, A, B>,
  s: S,
  multi: readonly A[]
): T.Effect<R, E, SinkStep<A, S>> 

export function liftPureSink<S, A, B>(
  sink: SinkPure<S, A, B>
): Sink<T.NoEnv, T.NoErr, S, A, B> 

export function collectArraySink<R, E, A>(): Sink<R, E, A[], A, A[]> 

export function drainSink<R, E, A>(): Sink<R, E, void, A, void> 

/**
 * A sink that consumes no input to produce a constant b
 * @param b
 */
export function constSink<R, E, A, B>(b: B): Sink<R, E, void, A, B> 

/**
 * A sink that produces the head element of a stream (if any elements are emitted)
 */
export function headSink<R, E, A>(): Sink<R, E, Option<A>, A, Option<A>> 

/**
 * A sink that produces the last element of a stream (if any elements are emitted)
 */
export function lastSink<R, E, A>(): Sink<R, E, Option<A>, A, Option<A>> 

/**
 * A sink that evalutes an action for every element of a sink and produces no value
 * @param f
 */
export function evalSink<R, E, A>(
  f: FunctionN<[A], T.Effect<R, E, unknown>>
): Sink<R, E, void, A, void> 

/**
 * A sink that consumes elements for which a predicate does not hold.
 *
 * Returns the first element for which the predicate did hold if such an element is found.
 * @param f
 */
export function drainWhileSink<R, E, A>(
  f: Predicate<A>
): Sink<R, E, Option<A>, A, Option<A>> 

/**
 * A sink that offers elements into a concurrent queue
 *
 * @param queue
 */
export function queueSink<R, E, A>(
  queue: ConcurrentQueue<A>
): Sink<R, E, void, A, void> 

/**
 * A sink that offers elements into a queue after wrapping them in an option.
 *
 * The sink will offer one final none into the queue when the stream terminates
 * @param queue
 */
export function queueOptionSink<R, E, A>(
  queue: ConcurrentQueue<Option<A>>
): Sink<R, E, void, A, void> 

/**
 * Map the output value of a sink
 * @param sink
 * @param f
 */
export function map<R, E, S, A, B, C>(
  sink: Sink<R, E, S, A, B>,
  f: FunctionN<[B], C>
): Sink<R, E, S, A, C> 

Usage

import { effect as T, stream as S } from "@matechs/effect";
import { pipe } from "fp-ts/lib/pipeable";

const program = pipe(
  S.periodically(100),
  S.chain(n =>
    S.encaseEffect(
      T.sync(() => {
        console.log(`got: ${n}`);

        return n + 1;
      })
    )
  ),
  S.filterWith(n => n > 5)
);

T.runToPromise(S.collectArray(S.take(program, 10))).then(r => {
  console.log(`elements: ${r.length}`);
});

const program2 = pipe(S.repeatedly(1), s =>
  S.foldM(S.take(s, 100000), (acc, n) => T.sync(() => acc + n), 0)
);

T.runToPromise(S.collectArray(program2)).then(r => {
  console.log(`fold: ${r[0]}`);
});

// prints
// fold: 100000
// got: 0
// got: 1
// got: 2
// got: 3
// got: 4
// got: 5
// got: 6
// got: 7
// got: 8
// got: 9
// got: 10
// got: 11
// got: 12
// got: 13
// got: 14
// elements: 10

Note that errors are final if not handled in place, if you want a non final implementation you may use a Stream<R, never, Either<E,A>> where you can manage how error is propagated.

An implementation of StreamEither will be provided in work in progress and will be added to the core.

Last updated