Effection Logo

Streams and Subscriptions

For every tool in the async/await toolbox, there is an equilavent in Effection. We were already introduced to the most important of these in the introduction.

AsyncEffection
PromiseOperation
awaityield*
async functionfunction*

Likewise, there are also equivalents for the Async iteration protocols. They are:

AsyncEffection
AsyncIteratorSubscription
AsyncIterableStream
for awaitfor yield*

If you're familiar with how to use these constructs, then you will be right at home working with sequences of data in Effection. In particular, streams are useful for modeling things like UI events and network connections, and in this guide we'll see how to create, transform, and consume them.

Subscription

The Effection equivalent of AsyncIterator is the Subscription. Like its async counterpart, a subscription acts like a queue that provides a sequence of values via a next() method. The only difference is that instead of returning a Promise that fulfills to an iterator result, it returns an Operation that yields an iterator result.

function* forEach(subscription) {
  let next = yield* subscription.next();
  while (!next.done) {
    console.log(`value: ${next.value}`);
    next = yield* subscription.next();
  }
  console.log(`done: ${next.value}`);
}

Subscriptions, like async iterators, are stateful objects, and calling the next() method on them alters their internal state. Crucially, taking the next value from the subscription not only retrieves that value, but also removes it, so that we don't iterate the same value twice.

Just as with async interators, subscriptions are low-level constructs that you don't interact with all that often, but it is good to know that they are there. Most of the time however, instead of working with subscriptions directly, you'll work with streams.

Stream

Streams are the Effection equivalent of async iterables, and like them, are stateless objects that does not do anything by themselves. Whereas a Subscription represents hot, active state, a Stream on the other hand contains a recipe on how to create a subscription.

A Stream can have any number of subscriptions, each of which receives the same set of values. Effection ships with Channel, which is a very handy way to create streams and by proxy, subscriptions. Let's use this to show the difference between them:

import { main, createChannel } from 'effection';

await main(function*() {
  let channel = createChannel();

  // the channel has no subscribers yet!
  yield* channel.send('too early');

  let subscription1 = yield* channel;
  let subscription2 = yield* channel;

  yield* channel.send('hello');
  yield* channel.send('world');

  console.log(yield* subscription1.next());
  //=> { done: false, value: "hello" }
  console.log(yield* subscription1.next());
  //=> { done: false, value: "world" }
  console.log(yield* subscription2.next());
  //=> { done: false, value: "hello" }
  console.log(yield* subscription2.next());
  //=> { done: false, value: "world" }
});

As you can see, the channel can have multiple subscribers and sending a message to the channel adds it to each active subscription. If the channel does not have any active subscriptions, then sending a message to it does nothing.

for yield* each

The entire point of having a stream is to consume values from it, and we have already seen how we can use yield* to subscribe to a Stream and turn a Stream into a Subscription. But there is an easier way!

When working with an async iterator, you would not normally traverse it by manually invoking the next() method repeatedly. Instead, you would use the for await of syntax to conveniently focus on the work to be done with each item instead of the mechanics of iteration. It is the same with Effection streams.

The simplest way to consume a stream in Effection is with the for yield* each loop. It is almost identical to the for await loop from async/await. The biggest difference is that you must yield to the each.next operation as the last step of your loop.

import { main, createChannel, each, spawn, sleep } from 'effection';

await main(function*() {
  let channel = createChannel();

  yield* spawn(function*() {
    yield* sleep(1000);
    yield* channel.send('hello');
    yield* sleep(1000);
    yield* channel.send('world');
  });

  for (let value of yield* each(channel)) {
    console.log('got value:', value);
    yield* each.next();
  }

Why do we need to use spawn() here? We know that sending values to a stream does nothing unless someone is subscribed to the stream, so we cannot send any values before we begin our loop, but we also cannot send any values after we begin our loop because the loop will never complete until the stream closes (more about that later). So we need to run both the loop and the sending of values concurrently, and as we've already learned, when we need to do multiple things concurrently, that's when we use spawn().

We could also flip this example around like this:

import { main, createChannel, next, spawn, sleep } from 'effection';

await main(function*() {
  let channel = createChannel();

  yield* spawn(function*() {
    for (let value of yield* each(channel)) {
      console.log('got value:', value);
      yield* each.next();
    }
  });

  yield* sleep(1000);
  yield* channel.send('hello');
  yield* sleep(1000);
  yield* channel.send('world');
});

Why Subscriptions?

Unlike listening for one-off events, a subscription guarantees that we can never miss any messages. We can see how a subscription will not drop a message even though it may receive deliveries even while the consumer is doing other things:

import { main, createChannel, spawn, sleep } from 'effection';

await main(function*() {
  let channel = createChannel();

  let subscription = yield* channel;

  yield* spawn(function*() {
    yield* sleep(1000);
    yield* channel.send('hello');
    yield* channel.send('world');
  });

  let { value: firstValue } = yield* subscription.next();
  console.log(firstValue); // logs 'hello'

  yield* sleep(1000);

  let { value: secondValue } = yield* subscription.next();
  console.log(secondValue); // logs 'world'
});

Closing streams

All good things must come to an end, including a stream of data. In these cases Effection streams and subscriptions optionally support passing a final piece of data when there are no more values to be received. Usually, if present, this piece of data represents a summary of the stream and something about its final end state. For example, when a websocket is closed it emits a close event that contains a numeric code and a description of why the socked was closed.

The value of this last piece of data is the value of the iterator result when the done attribute is true. Unsurprisingly, this mirrors the behavior of an async iterator exactly.

import { main, sleep, spawn, createChannel } from "effection";

await main(function*() {
  let channel = createChannel();

  let subscription = yield* channel;

  yield* spawn(function*() {
    yield* channel.send('hello');
    yield* sleep(1000);
    yield* channel.send('hello');
    yield* channel.close({ words: 2 });
  });

  let next = yield* subscription.next();

  while (!next.done) {
    console.log(next.value);
    next = yield* subscription.next();
  }

  console.log(`${next.value} words total`);
});

//=> hello
//=> world
//=> 2 words total

Just like with an async iterator, if you want to access the very last iterator result, you cannot use the for...of. You must use a while-loop. This is because the body of the loop only executes for iterator result where the done property is false. It does not execute for the last iterator result where done is true.

Creating your own streams

Most of the examples thus far have shown how to consume streams that are originated from within an operation, and so have used the preferred mechanism for that which is the Channel. To do this, we run the channel's send() operation. But because send() is an operation, it means that in order to send a message to the stream, we must already have to be inside an operation. However, this poses a problem for events that originate from outside of an operation such as an event listener. In this case, we need to be able to call a plain vanilla javascript function which is not an operation, and still notify any subscribers to a stream. For these cases there is the Signal interface which can be created with createSignal function.

In the following example, we use a signal as an event callback to log all of the events

import { createSignal, each } from "effection";

export function* logClicks(button) {
  let clicks = createSignal();
  try {
    button.addEventListener("click", clicks.send);

    for (let click of yield* each(clicks)) {
      console.dir({ click });
      yield* each.next();
    }
  } finally {
    button.removeEventListener("click", clicks.send);
  }
}

First, we create the signal, then attach its send() function to the event target's callback. It then loops over every click event received in this way and logs out its contents to the console. Notice that, like a well-behaved operation, we make sure to uninstall the listener on the button so that we left it in the same state we found it.

This is a good demonstration of the concept, but it isn't as useful as it could be because any time we want to do something like the above to consume a stream of clicks, we'd first need to create a signal, attach the signal's callback, and then iterate over its content. For example, if we wanted to make an operation that disabled a button by preventing the default action every time, we would have to regurgitate the same boilerplate code as in our click logger:

import { createSignal, each } from "effection";

export function* cancelClicks(button) {
  let clicks = createSignal();
  try {
    button.addEventListener("click", clicks.send);

    for (let click of yield* each(clicks)) {
      click.preventDefault();
      yield* each.next();
    }
  } finally {
    button.removeEventListener("click", clicks.send);
  }
}

In other words, the problem with this method, is that you have to define the stream and consume it in the same place. But what we'd like to do instead is to define the stream in one place, but consume it in another. That way, we could specify the setup and teardown logic once, but utilize it multiple times. If we had a way to do that, say a clicksOn() function that could take any html element and return a stream of clicks on that element, then we could write something like the following:

function* logAndCancel(button) {
  let clicks = clicksOn(button);

  yield* spawn(function*() {
    for (let click of yield* each(clicks)) {
      console.log(click);
      yield* each.next();
    }
  })

  yield* spawn(function*() {
    for (let click of yield* each(clicks)) {
      click.preventDefault();
      yield* each.next();
    }
  });
}

It turns out that resources are just what we need to make this happen. If you recall, a Stream is just an Operation that returns a Subscription. So the simplest way to implement such an operation is as a resource that provides a subscription.

💡Most of the time, you won't need to implement your own streams, but when you do, a resource is the easiest way to do it.

Armed with resources, we can now implement our hypothetical clicksOn utility.

import { resource } from "effection"

export function clicksOn(button) {
  return resource(function*(provide) {
    let clicks = createSignal();
    try {
      button.addEventListener("click", clicks.send);

      let subscription = yield* clicks;
      yield* provide(subscription);

    } finally {
      button.removeEventListener("click", send);
    }
  });
}

We have now encapsulated the setup and teardown of our listener in a single place, but the actual consuming of the stream of clicks can happen anywhere.

In fact, Effection provides a handy way to create a stream of events out of any EventTarget using the built in on() function, and this is precisely the technique that it uses. You can read more about using event targets in the next section.

  • PreviousSpawn
  • NextEvents