Effection Logo
@effectionx/stream-helpersv0.1.0thefrontside/effectionx
JSR BadgeNPM Badge with published versionBundle size badgeDependency count badgeTree shaking support badge
import { } from "@effectionx/stream-helpers"
import { } from "@effectionx/stream-helpers/test-helpers"

Stream Helpers

A collection of type-safe stream helpers built on top of Effection for efficient and controlled stream processing.

Included Helpers

Filter

The filter helper narrows a stream by only passing through items that match a predicate.

import { filter } from "@effectionx/stream-helpers";
import { each } from "effection";

// Example: Synchronous filtering
function* syncExample(source: Stream<number, unknown>) {	

  const gt5 = filter<number>(function* (x) { return x > 5 });

  for (const value of yield* each(gt5(stream))) {
    console.log(value); // Only values > 5
    yield* each.next();
  }
};

// Example: Asynchronous filtering
function* asyncExample(source: Stream<number, unknown>) {

  const evensOf = filter<number>(function* (x) {
    yield* sleep(100); // Simulate async operation
    return x % 2 === 0; // Keep only even numbers
  });

  for (const value of yield* each(evensOf(stream))) {
    console.log(value); // Only even numbers
    yield* each.next();
  }
});

Map

The map helper transforms each item in a stream using a provided function. This is useful for data transformation operations where you need to process each item individually.

import { map } from "@effectionx/stream-helpers";
import { each } from "effection";

function* example(stream: Stream<number, unknown>) {
  const double = map<number>(function* (x) {
    return x * 2;
  });

  for (const value of yield* each(double(stream))) {
    console.log(value); // Each value is doubled
    yield* each.next();
  }
}

Batch

The batch helper is useful when you want to convert individual items passing through the stream into arrays of items. The batches can be created either by specifying a maximum time or a maximum size. If both are specified, the batch will be created when either condition is met.

import { batch } from "@effectionx/stream-helpers";
import { each } from "effection";

// Example: Batch by size
function* exampleBySize(stream: Stream<number, unknown>) {
  const byThree = batch({ maxSize: 3});

  for (const items of yield* each(byThree(stream))) {
    console.log(batch); // [1, 2, 3], [4, 5, 6], ...
    yield* each.next();
  }
};

// Example: Batch by time
function* exampleByTime(stream: Stream<number, unknown>) {
  const stream = batch({ maxTime: 1000 })(sourceStream);

  for (const batch of yield* each(stream)) {
    console.log(batch); // Items received within 1 second
    yield* each.next();
  }
});

// Example: Combined batching
function* exampleCombined(stream: Stream<number, unknown>) {

  const batched = batch({
    maxSize: 5,
    maxTime: 1000,
  });

  for (const batch of yield* each(batched(stream))) {
    console.log(batch); // Up to 5 items within 1 second
    yield* each.next();
  }
});

Valve

Allows to apply backpressure to the source stream to prevent overwhelming the downstream consumer. This is useful with any stream that generates items faster than the consumer can consume them. It was originally designed for use with Kafka where the producer can cause the service to run out of memory when the producer produces many faster than the consumer to process the messages. It can be used as a buffer for any infinite stream.

import { valve } from "@effectionx/stream-helpers";
import { each } from "effection";

function* example() {
  const regulated = valve({
    // buffer size threshold when close operation will invoked
    closeAt: 1000,
    *close() {
      // pause the source stream
    },

    // buffer size threshold when open operation will be invoked
    openAt: 100,
    *open() {
      // resume the source stream
    },
  })(stream);

  for (const value of yield* each(regulated)) {
    console.log(value);
    yield* each.next();
  }
}

Passthrough Tracker

Passthrough Tracker stream helper provides a way to know if all items that passed through the stream have been handled. This is especially helpful when you want to ensure that all items were processed before completing an operation.

It's different from other stream helpers because you must first call createTracker function which retuns an object. The actual helper is on the passthrough method which you can call and chain as you would with other helpers.

import { each, signal } from "effection";
import { createTracker } from "@ffectionx/stream-helpers"

const source = signal(0);

// create the tracker
const tracker = yield* createTracker();

// create  passthrough stream helper
const track = tracker.passthrough();

for (const value of yield* each(track(source))) {
  // mark items 
  tracker.markOne(value);
  yield* each.next();
}

// will resolve when all items that passed through the stream were seen
yield* tracker;

Composing stream helpers

You can use a simple pipe() to compose a series of stream helpers together. In this example, we use one from remeda,

import { batch, filter, map, valve } from "@effectionx/stream-helpers";
import { each } from "effection";
// any standard pipe function should work
import { pipe } from "remeda";

function* example(source: Stream<number, unknown>) {
  // Compose stream helpers using pipe
  const stream = pipe(
    source,
    valve({ open, close, openAt: 100, closeAt: 100 }),
    filter(function* (x) {
      return x > 0;
    }),
    map(function* (x) {
      return x * 20;
    }),
    batch({ maxSize: 50 }),
  );

  for (const value of yield* each(stream)) {
    console.log(value);
    yield* each.next();
  }
}

Testing Streams

The library includes testing utilities to help you test your stream processing code. These are available in @effectionx/stream-helpers/test-helpers export.

Faucet

The useFaucet function creates a stream that can be used to test the behavior of streams that use backpressure. It's particularly useful in tests where you need a controllable source stream.

import { useFaucet } from "@effectionx/stream-helpers/test-helpers";
import { each, run, spawn } from "effection";

await run(function* () {
  const faucet = yield* useFaucet<number>({ open: true });

  // Remember to spawn the stream subscription before sending items to the stream
  yield* spawn(function* () {
    for (let i of yield* each(faucet)) {
      console.log(i);
      yield* each.next();
    }
  });

  // Pass an array of items to send items to the stream one at a time synchronously
  yield* faucet.pour([1, 2, 3]);

  // Pass an operation to control the rate at which items are sent to the stream
  yield* faucet.pour(function* (send) {
    yield* sleep(10);
    send(5);
    yield* sleep(30);
    send(6);
    yield* sleep(10);
    send(7);
  });

  // You can close the faucet to stop items from being sent
  faucet.close();

  // And open it again when needed
  faucet.open();
});

Items sent to the faucet stream while it's closed are not buffered, in other words, they'll be dropped.

API Reference

interface BatchOptions

Properties

maxTimereadonly: number

No documentation available.

maxSizereadonly: number

No documentation available.

function batch(options: RequireAtLeastOne<BatchOptions>): (stream: Stream<T, never>) => Stream<T[], never>

Creates batches of items from the source stream. The batches can be created either by specifying a maximum time or a maximum size. If both are specified, the batch will be created when either condition is met.

Parameters

options: RequireAtLeastOne<BatchOptions>

  • The options for the batch.

Return Type

<T>(stream: Stream<T, never>) => Stream<T[], never>

A stream of arrays of items from the source stream.

interface ValveOptions

Properties

openAt: number

No documentation available.

closeAt: number

No documentation available.

Methods

open
(): Operation<void>

No documentation available.

close
(): Operation<void>

No documentation available.

function valve(options: ValveOptions): (stream: Stream<T, never>) => Stream<T, never>

This function buffers incoming items, if the upstream is producing faster than the downstream can consume, the buffer will grow. If the buffer size exceeds the closeAt threshold, the close operation will be called which is expected to pause the upstream. The buffer will drain until the buffer size is less than the openAt threshold, at which point the open operation will be called to resume the upstream.

Parameters

options: ValveOptions

  • The operation to resume the upstream.

Return Type

<T>(stream: Stream<T, never>) => Stream<T, never>

A stream with backpressure applied.

function map<A, B>(fn: (value: A) => Operation<B>): (stream: Stream<A, TClose>) => Stream<B, TClose>

Transforms each item in the stream using the provided function.

Type Parameters

A

B

Parameters

fn: (value: A) => Operation<B>

  • The function to transform each item

Return Type

<TClose>(stream: Stream<A, TClose>) => Stream<B, TClose>

A stream transformer that applies the function to each item

function filter<T>(predicate: (value: T) => Operation<boolean>): (stream: Stream<T, TDone>) => Stream<T, TDone>

Filters items from the stream based on a predicate function.

Examples

Example 1
import { filter } from "@effectionx/stream-helpers";
import { run, each } from "effection";

await run(function* () {
  const stream = filter((x: number) => x > 5)(sourceStream);

  for (const value of yield* each(stream)) {
    console.log(value); // Only values > 5
  }
});

Type Parameters

T

Parameters

predicate: (value: T) => Operation<boolean>

  • The function to test each item

Return Type

<TDone>(stream: Stream<T, TDone>) => Stream<T, TDone>

A stream transformer that only emits items that pass the predicate

interface Tracker extends Operation<void>

Methods

passthrough
(): <T>(stream: Stream<T, never>) => Stream<T, never>

Returns a stream helper that doesn't modify the items passing through the stream, but will capture a reference to the item. Call the markOne or markMany methods with the item to indicate that it has exited the stream.

markOne
(item: unknown): void

Call this method with an item that has passed through the stream to indicate that it has exited the stream.

markMany
(items: Iterable<unknown>): void

Call this method with an iterable of items that have passed through the stream to indicate that they have exited the stream.

function createTracker(): Operation<Tracker>

Creates a tracker that can be used to verify that all items that entered the stream eventually exit the stream. This is helpful when you want to ensure that all items were processed before terminating the operation that created the stream.

Return Type

Operation<Tracker>

./test-helpers

interface Faucet<T> extends Stream<T, never>

Interface of the stream returned by useFaucet.

Type Parameters

T

Methods

pour
(items: T[]): Operation<void>

Pour items to the stream synchronously.

pour
(op: (send: (item: T) => Operation<void>) => Operation<void>): Operation<void>

Pour items to the stream using an operation that can be asynchronous.

open
(): void

Open the stream to allow items to be sent to the stream.

close
(): void

Close the stream to prevent items from being sent to the stream.

interface FaucetOptions

Options for the faucet.

Properties

openoptional: boolean

Whether the faucet is open when created.

function* useFaucet<T>(options: FaucetOptions): Operation<Faucet<T>>

Creates a stream that can be used to test the behavior of streams that use backpressure. It's useful in tests where it can be used as a source stream. This function is used to create the stream.

The returned stream has pour method that can be used to send items to the stream. It can accept an array of items or a generator function that will be called with a function to send items to the stream.

import { useFaucet } from "@effectionx/stream-helpers/test-helpers";
import { run, each, spawn } from "effection";

await run(function* () {
  const faucet = yield* useFaucet({ open: true });

  // Remember to spawn the stream subscription before sending items to the stream
  yield* spawn(function* () {
    for (let i of yield* each(faucet)) {
      console.log(i);
      yield* each.next();
    }
  });

  // Pass an array of items to send items to the stream one at a time synchronously
  yield* faucet.pour([1, 2, 3]);

  // Pass an operation to control the rate at which items are sent to the stream
  yield* faucet.pour(function* (send) {
    yield* sleep(10);
    send(5);
    yield* sleep(30);
    send(6);
    yield* sleep(10);
    send(7);
  });
});

Type Parameters

T

Parameters

options: FaucetOptions

  • The options for the faucet.

Return Type

Operation<Faucet<T>>

stream of items coming from the faucet