Async Sequence

Another great feature new modern concurrency is offering us is async sequences. Async sequence is similar to the regular sequence types we have in Swift but adds asynchronous code. We will loop over a collection of elements, as with regular Swift sequences, but all the elements are not ready yet when defining a loop. We will have a long-living loop that will await the next element. Every time a new element is generated, the body of the loop is called with the element. The loop will listen for new elements until nil is received, and then the loop finishes.

There are multiple ways to implement async sequences, so let’s take a look at all of them.

System Async Sequence API

Swift already provides us with some async sequences API that we can use. One of the most common use cases is listening for notifications. As usual, we will explore using some basic examples.

It is very common to introduce some operation after an app transitions from the background to the foreground, and this is one of the cases when we have to listen for notification center updates. Something very common is refreshing screen information when the app is open after being in the background.

In this example, we have a feed of posts, and every time the app transitions from the background to the foreground, we will update the list of posts. Let’s take a look.

We have an observable model used by a view with an async loadFeed method. This method will update a boolean to indicate when it is loading, a date to track when it was updated, and the list of posts. We add a delay to simulate that it is making a network request.

Swift
final class FeedViewModel: ObservableObject {
    @Published var feed: [Feed] = []
    @Published var refreshTime: Date = Date()
    @Published var isLoading = false
    
    @MainActor
    func loadFeed() async {
        isLoading = true
        refreshTime = Date()
        
        // perform network request
        try? await Task.sleep(for: .seconds(3))
        
        feed = [
            .init(title: "First post"),
            .init(title: "Another post")
        ]
        
        isLoading = false
    }
}

struct Feed: Identifiable {
    let title: String
    let id = UUID()
}

The view is very simple and will display a progress view when it is loading. When it’s not loading, it shows the date when the feed was last updated and the list of posts. Let’s focus on the .task modifier. First, we call loadFeed to update the list of posts when the view appears.

Swift
struct ContentView: View {
    @StateObject var viewModel = FeedViewModel()
    
    var body: some View {
        VStack {
            if viewModel.isLoading {
                ProgressView()
            } else {
                Text(viewModel.refreshTime.description)
                List(viewModel.feed) { post in
                    VStack(alignment: .leading) {
                        Text(post.title)
                    }
                }
            }
            
        }
        .task {
            await viewModel.loadFeed()
        }
    }
}

Iterating an async sequence

Now, if we want to listen to background/foreground transitions to trigger a new loadFeed request, we can use an async sequence API for listening to notifications, in this case, didBecomeActiveNotification. This will create an infinite loop, which is triggered with a notification every time the app transitions to the foreground. We need to await the element because this will be asynchronous, and when we receive the notification, we update the feed.

Swift
.task {
    await viewModel.loadFeed()
    
    for await notification in NotificationCenter.default.notifications(named: UIApplication.didBecomeActiveNotification) {
        await viewModel.loadFeed()
    }
}

We can use this API to listen for any kind of notifications. There are a few more APIs for different purposes, but we can also implement our own async sequences.

Creating an Async Sequence

To create our own async sequences, all we have to do is implement the AsyncSequence protocol. It is a very simple protocol that requires providing an iterator. Let’s explore a different example now.

In this example, we are going to display a list of messages, similar to a chat conversation. In a chat application, we want to be listening to new messages, and when a message is received, we want to update the UI with the message. This is usually done using Web Sockets or long-living requests. We are going to simulate this using an async sequence that will produce a new message every two seconds.

We have a message service that implements the AsyncSequence protocol, defines the element type the sequence will return, and an Iterator. The iterator needs to implement AsyncIteratorProtocol and provide a next method. This method is an asynchronous method and should return the next element of the sequence when it is available or return nil when the sequence is finished. For our example, we return a new message every two seconds.

Swift
struct MessagesService: AsyncSequence {
    typealias AsyncIterator = MessagesServiceIterator
    typealias Element = Message
    
    func makeAsyncIterator() -> MessagesServiceIterator {
        MessagesServiceIterator()
    }
}

struct MessagesServiceIterator: AsyncIteratorProtocol {
    typealias Element = Message
    
    mutating func next() async throws -> Message? {
        try await Task.sleep(for: .seconds(2))
        
        return Message(text: "Some message")
    }
}

The rest of the code is very simple: a view that displays a list of messages and an observable model, which is iterating over the async sequence, awaiting the next message to update the UI.

Swift
final class ChatViewModel:  ObservableObject {
    @Published var messages = [Message]()
    
    @MainActor
    func listenToMessages() async throws {
        for try await newMessage in MessagesService() {
            messages.append(newMessage)
        }
    }
}

struct Message: Identifiable {
    let id = UUID()
    let text: String
}

struct ContentView: View {
    @StateObject var viewModel = ChatViewModel()
    
    var body: some View {
        VStack {
            Text("Messages")
            
            ForEach(viewModel.messages) {
                Text($0.text)
            }
            
            Spacer()
        }
        .task {
            do {
                try await viewModel.listenToMessages()
            } catch {
                // Handle error
            }
        }
    }
}

Using iterator

As we can see, we iterate over the async sequence using a for await loop, as in the previous example. However, we can also access the iterator ourselves if we want to implement some custom logic. For example, if we want to implement some custom logic for the first message, we can create the iterator and call next to get the first value. Later, we can keep iterating using a while loop.

Swift
var iterator = MessagesService().makeAsyncIterator()
        
let first = try await iterator.next()
        
while try await iterator.next() != nil {
    ...            
}

Async Stream

All of this is very nice, but is there a way to implement async sequences in a simpler way? There actually is, it’s called Async Stream, and it allows us to create async sequences using a closure approach instead of creating new types. We are going to show the same example as before but using async streams.

The code is all the same as before but without creating any new type. All we have to do is create an AsyncStream, which will receive a closure. This closure should return the next element in the sequence. In our case, it will generate a new message every two seconds.

Swift
func listenToMessages() async throws {
    let stream = AsyncStream<Message> {
        try? await Task.sleep(for: .seconds(2))
        return Message(text: "Some message")
    }
    
    for try await newMessage in stream {
        messages.append(newMessage)
    }
}

This is the simpler version of creating an AsyncStream. There is an alternative in which the closure is receiving a continuation. This is useful in cases where we want to yield elements outside of the closure. For example, when working with some APIs like delegates. If we were receiving a new message as a delegate call, how can we add this message to the sequence?

We keep a reference to the continuation, and then when we receive a new message on onNewMessage, we can add the element to the sequence using yield.

Swift
protocol MessagesGeneratorDelegate: AnyObject {
    func onNewMessage(_ message: Message)
}

final class ChatViewModel:  ObservableObject, MessagesGeneratorDelegate {
    @Published var messages = [Message]()
    var continuation: AsyncStream<Message>.Continuation?
    
    @MainActor
    func listenToMessages() async throws {
        var generator = MessagesGenerator()
        generator.delegate = self
        
        let stream = AsyncStream<Message> { [weak self] continuation in
            self?.continuation = continuation
        }
        
        for try await newMessage in stream {
            messages.append(newMessage)
        }
    }
    
    func onNewMessage(_ message: Message) {
        continuation?.yield(message)
    }
}

This can be very useful when dealing with legacy code or some frameworks like CoreLocation, which are based on delegate calls.

Operators

It is very common when dealing with sequences to use operators to perform filtering, mapping, sorting, and combining calls. So, it is very useful to use these operators when working with async sequences. By default, we have a list of operators that we can use directly, such as map, filter, prefix… but we can also implement our own extensions.

For example, forEach is a very useful operator when we want to perform some operation over a sequence.

In this example, we use some predefined operators and implement our own forEach operator, so we can combine operators over the elements of the sequence.

Swift
final class ChatViewModel:  ObservableObject {
    @Published var messages = [Message]()
    
    @MainActor
    func listenToMessages() async throws {
        var messageNumber = 0
        let stream = AsyncStream<Int> {
            try? await Task.sleep(for: .seconds(2))
            messageNumber += 1
            return messageNumber
        }
        
        //stream.map(transform: (Message) async -> Transformed)
        //stream.filter(isIncluded: (Message) async -> Bool)
        //stream.contains(where: (Message) async throws -> Bool>)
        //stream.prefix(count: Int>)
        
        try await stream
            .prefix(2)
            .map { Message(text: "Message: \($0)") }
            .forEach {
                messages.append($0)
            }
    }
}

extension AsyncSequence {
    func forEach(_ body: (Element) async throws -> Void) async throws {
        for try await element in self {
            try await body(element)
        }
    }
}

Apple provides a Swift package with a few more useful operators like Merge, CombineLatest, and much more. You can take a look at it here: https://github.com/apple/swift-async-algorithms

Conclusion

In this article, we explored all the different options when working with async sequences and provided some examples for each case. Async Stream is my favorite type because it is very simple to create our own sequences. However, we don’t forget about the rest of the options, which can be very useful depending on the case.