Combine: key operators to know

Combine contains a long list of operators which are great to know. While all of them are useful and can help you, I am going to focus on 4 operators which I see pretty much in any combine chain in a big project.

Prepend

A publisher that prefixes a sequence of elements prior to this publisher’s elements. Which allow us to publish a set of elements before the upstream publisher publish any value.

A common use case for this operator is the following scenario: We have a publisher the will perform some work and return some view state. This work may take a couple of seconds, so we want our publisher first publish a loading state until the work is done.

enum ViewState {
    case loading
    case loaded(String)
}

func performRequest() -> AnyPublisher<ViewState, Never> {
    Deferred {
        Future { promise  in
            DispatchQueue.main.asyncAfter(deadline: .now() + 1, execute: {
                promise(.success(ViewState.loaded("Performed network request")))
            })
        }
    }
    .eraseToAnyPublisher()
}

cancellable = performRequest()
    .prepend(.loading)
    .sink {
        print($0)
    }

The output will be:

loading
loaded("Performed network request")

We first get the loading state, and when the work is done, we get the result performed by the publisher.

Merge

Combine different publishers of the same type as a single one. We can use merge when we want to receive a new element whenever any of the upstream publishers emits an element.

For example, let’s say we use publishers to transmit events from a view. We have one for viewDidLoad, another for a retry operation, and a reload.

We want to perform some work when any of the three publishers emits a value, so we can use merge to combine them as a single publisher.

let viewDidLoad = PassthroughSubject<Void, Never>()
let retry = PassthroughSubject<Void, Never>()
let reload = PassthroughSubject<Void, Never>()

cancellable = viewDidLoad
    .merge(with: retry, reload)
    .sink { _ in
        print("performing work")
    }

cancellable = Publishers.MergeMany([
    viewDidLoad,
    retry,
    reload
]).sink { _ in
    print("performing work")
}


viewDidLoad.send(())
retry.send(())
reload.send(())

CombineLatest

Combine a set of publishers to receive elements together as a tuple. Until each publisher emit at least one value the combined publisher does not emit. After that, when any of them emits a value, the combined publisher will emit the latest known value from each publisher. If any of the combined publishers terminates with a failure, this publisher also fails.

One common scenario that combineLatest fits great is the following:

Let’s think of a screen that displays a list of results of any kind. Also, the user can filter some of the results. One publisher for performing some work and getting a list of results and another for how to filter the results.

We want to be able to reload the current results but also listen to modifications on the filters to refresh the filtered results.

We want an initial value for the filters so we can use CurrentValueSubject.

let performRequest = PassthroughSubject<String, Never>()
let currentFilters = CurrentValueSubject<String, Never>("animals")

cancellable = Publishers.CombineLatest(
    performRequest,
    currentFilters)
.sink(receiveValue: { allResults, filters in
    print("Filtering \(allResults) based on \(filters)")
})

performRequest.send("[car, cat]")
performRequest.send("[car, cat, moto]")
currentFilters.send("vehicles")

The output will be the following:

Filtering [car, cat] based on animals
Filtering [car, cat, moto] based on animals
Filtering [car, cat, moto] based on vehicles

Every time a request is performed, the results are filtered based on the latest known filters. But when the filters change, the latest results are filtered based on the new filters.

Zip

Zip is very similar to CombineLatest, but while CombineLatest publishes the latest known value from each publisher(as long as all publishers emitted before), even if the same value was emitted, Zip always sends values that were not emitted previously. Each publisher needs to emit a new value, until then, zip will not emit anything.

Let’s say that we have two publishers, and each one of them will perform some work and produce some data that we need to produce a view state. We need the response from both of them before updating the view. Also, if we perform a reload, both of them need to be executed again. This is a useful case for Zip since the updates will always be on pairs:

let playerStats = PassthroughSubject<String, Never>()
let matchStats = PassthroughSubject<Int, Never>()

cancellable = Publishers.Zip(
    playerStats,
    matchStats
).sink { (player, match) in
    print(player, "match: ", match)
}

playerStats.send("First player")
playerStats.send("Second player")
matchStats.send(1)
playerStats.send("Third player")
matchStats.send(2)

Output:

First player match:  1
Second player match:  2

As we can see, the third player is never emitted since Zip is still waiting for a third published match value.