Printed on: January 24, 2023
In iOS 13, we gained the power to simply ship and obtain information utilizing net sockets by way of URLSession
. With async/await, we gained the power to fetch information from servers utilizing the await
key phrase and we are able to iterate over asynchronous sequences utilizing async for
loops.
We will even learn information from a URL
one line at a time by calling the traces
property on URL
:
let url = URL(string: "https://donnywals.com")!
for attempt await line in url.traces {
// use line
}
Whereas that is actually cool and permits us to construct apps that ingest information in actual time if the server helps streaming our bodies, we can’t use the traces
property to arrange an online socket connection and hear for incoming messages and probably ship messages over the identical connection too.
On this put up, you’ll study every little thing you have to find out about constructing your individual mechanism to conveniently iterate over messages from an online socket asynchronously. We are going to leverage some present performance from URLSessionWebSocketTask
and AsyncThrowingStream
to construct our personal AsyncSequence
that conveniently wraps our URLSessionWebSocketTask
.
Notice that the ensuing code has solely had comparatively restricted testing achieved so I can’t assure that the supplied resolution will probably be 100% appropriate for every little thing you throw at it. For those who discover any points with the ultimate code, be at liberty to contact me. Bonus factors if you happen to’re capable of present some concepts for a possible repair.
Utilizing an online socket with out async / await
Earlier than we get began, let’s rapidly evaluate methods to use an online socket with out async/await. The code particulars are outlined in this put up. Make sure you learn it if you wish to study extra about utilizing net sockets in your apps.
let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
socketConnection.resume()
func setReceiveHandler() {
socketConnection.obtain { lead to
defer { self.setReceiveHandler() }
do {
let message = attempt end result.get()
change message {
case let .string(string):
print(string)
case let .information(information):
print(information)
@unknown default:
print("unkown message obtained")
}
} catch {
// deal with the error
print(error)
}
}
}
setReceiveHandler()
Discover how, to obtain messages from the socket, I have to name obtain
with a completion handler. This technique solely permits me to obtain a single incoming message, so I have to re-set my handler after receiving a message to robotically start listening for the following message.
This can be a nice instance of a state of affairs the place an async for loop corresponding to for attempt await message in socketConnection
would make a number of sense. Sadly, this isn’t doable out of the field. Nonetheless, URLSessionWebSocketTask
gives some type of help for async / await so we’re not solely out of luck.
A primary implementation of net sockets with async / await
Whereas URLSessionWebSocketTask
doesn’t expose an AsyncSequence
that emits incoming messages out of the field, it does include an async model of the obtain
technique you noticed earlier.
This permits us to rewrite the instance above as an async technique as follows:
func setReceiveHandler() async {
do {
let message = attempt await socketConnection.obtain()
change message {
case let .string(string):
print(string)
case let .information(information):
print(information)
@unknown default:
print("unkown message obtained")
}
} catch {
print(error)
}
await setReceiveHandler()
}
This code works simply superb, besides we don’t actually have a method to cease the recursion right here. The code you noticed earlier truly has the very same situation; there’s no situation to cease listening for net socket messages even when the net socket connection has already been closed.
We might enhance our code by solely recursing if:
- We didn’t encounter any errors
- The socket connection remains to be energetic
This may look a bit as follows:
func setReceiveHandler() async {
guard socketConnection.closeCode == .invalid else {
return
}
do {
let message = attempt await socketConnection.obtain()
change message {
case let .string(string):
print(string)
case let .information(information):
print(information)
@unknown default:
print("unkown message obtained")
}
await setReceiveHandler()
} catch {
print(error)
}
}
An open net socket’s closed code is all the time mentioned to invalid
to sign that the connection has not (but) been closed. We will leverage this to examine that our connection remains to be energetic earlier than ready for the following message to be obtained.
That is a lot better already as a result of we respect closed sockets and failures a lot nicer now, however we might enhance the readability of this code a tiny bit by leveraging a whereas
loop as an alternative of recursively calling the setReceiveHandler
perform:
func setReceiveHandler() async {
var isActive = true
whereas isActive && socketConnection.closeCode == .invalid {
do {
let message = attempt await socketConnection.obtain()
change message {
case let .string(string):
print(string)
case let .information(information):
print(information)
@unknown default:
print("unkown message obtained")
}
} catch {
print(error)
isActive = false
}
}
}
To me, this model of the code is barely simpler to learn however that may not be the case for you. It’s functionally equal so you possibly can select to make use of whichever choice fits you greatest.
Whereas this code works, I’m not fairly pleased with the place we’ve landed proper now. There’s a number of logic on this perform and I would like to separate dealing with the incoming values from the calls to socketConnection.obtain()
one way or the other. Ideally, I ought to be capable to write the next:
do {
for attempt await message in socketConnection {
change message {
case let .string(string):
print(string)
case let .information(information):
print(information)
@unknown default:
print("unkown message obtained")
}
} catch {
// deal with error
}
That is a lot, a lot nicer from a call-site perspective and it could permit us to place the ugly bits elsewhere.
To do that, we are able to leverage the facility of AsyncStream
which permits us to construct a customized async sequence of values.
Utilizing AsyncStream to emit net socket messages
Given our finish objective, there are a couple of methods for us to get the place we need to be. The best method can be to put in writing a perform in an extension on URLSessionWebSocketTask
that will encapsulate the whereas
loop you noticed earlier. This implementation would look as follows:
typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, Error>
public extension URLSessionWebSocketTask {
var stream: WebSocketStream {
return WebSocketStream { continuation in
Job {
var isAlive = true
whereas isAlive && closeCode == .invalid {
do {
let worth = attempt await obtain()
continuation.yield(worth)
} catch {
continuation.end(throwing: error)
isAlive = false
}
}
}
}
}
}
To make the code a little bit bit simpler to learn, I’ve outlined a typealias
for my AsyncThrowingStream
so we don’t have to take a look at the identical lengthy kind signature far and wide.
The code above creates an occasion of AsyncThrowingStream
that asynchronously awaits new values from the net socket so long as the net socket is taken into account energetic and hasn’t been closed. To emit incoming messages and potential errors, the continuation’s yield
and end
strategies are used. These strategies will both emit a brand new worth (yield
) or finish the stream of values with an error (end
).
This code works nice in lots of conditions, however there may be one situation. If we resolve to shut the net socket connection from the app’s aspect by calling cancel(with:cause:)
on our socketConnection
, our WebSocketStream
doesn’t finish. As an alternative, it will likely be caught ready for messages, and the decision website will probably be caught too.
Job {
attempt await Job.sleep(for: .seconds(5))
attempt await socketConnection.cancel(with: .goingAway, cause: nil)
}
Job {
do {
for attempt await message in socketConnection.stream2 {
// deal with incoming messages
}
} catch {
// deal with error
}
print("this might by no means be printed")
}
If every little thing works as anticipated, our net socket connection will shut after 5 seconds. At that time, our for loop ought to finish and our print assertion ought to execute, because the asynchronous stream is now not energetic. Sadly, this isn’t the case, so we have to discover a higher strategy to mannequin our stream.
URLSessionWebSocketTask
doesn’t present a method for us to detect cancellation. So, I’ve discovered that it’s best to make use of an object that wraps the URLSessionWebSocketTask
, and to cancel the duty by way of that object. This permits us to each finish the async stream we’re offering to callers and shut the net socket reference to one technique name.
Right here’s what that object seems like:
class SocketStream: AsyncSequence {
typealias AsyncIterator = WebSocketStream.Iterator
typealias Aspect = URLSessionWebSocketTask.Message
non-public var continuation: WebSocketStream.Continuation?
non-public let process: URLSessionWebSocketTask
non-public lazy var stream: WebSocketStream = {
return WebSocketStream { continuation in
self.continuation = continuation
Job {
var isAlive = true
whereas isAlive && process.closeCode == .invalid {
do {
let worth = attempt await process.obtain()
continuation.yield(worth)
} catch {
continuation.end(throwing: error)
isAlive = false
}
}
}
}
}()
init(process: URLSessionWebSocketTask) {
self.process = process
process.resume()
}
deinit {
continuation?.end()
}
func makeAsyncIterator() -> AsyncIterator {
return stream.makeAsyncIterator()
}
func cancel() async throws {
process.cancel(with: .goingAway, cause: nil)
continuation?.end()
}
}
There’s a bunch of code right here, however it’s not too dangerous. The primary few traces are all about organising some kind aliases and properties for comfort. The lazy var stream
is basically the very same code that you simply’ve already within the URLSessionWebSocketTask
extension from earlier than.
When our SocketStream
‘s deinit
known as we be sure that we finish our stream. There’s additionally a cancel
technique that closes the socket connection in addition to the stream. As a result of SocketStream
conforms to AsyncSequence
we should present an Iterator
object that’s used after we attempt to iterate over our SocketStream
s. We merely ask our inner stream
object to make an iterator and use that as our return worth.
Utilizing the code above seems as follows:
let url = URL(string: "ws://127.0.0.1:8080")!
let socketConnection = URLSession.shared.webSocketTask(with: url)
let stream = SocketStream(process: socketConnection)
Job {
do {
for attempt await message in stream {
// deal with incoming messages
}
} catch {
// deal with error
}
print("this will probably be printed as soon as the stream ends")
}
To cancel our stream after 5 seconds similar to earlier than, you possibly can run the next process in parallel with our iterating process:
Job {
attempt await Job.sleep(for: .seconds(5))
attempt await stream.cancel()
}
Job {
// iterate...
}
Whereas that is fairly cool, we do have a little bit of a problem right here due to the next little bit of code:
non-public lazy var stream: WebSocketStream = {
return WebSocketStream { continuation in
self.continuation = continuation
Job {
var isAlive = true
whereas isAlive && process.closeCode == .invalid {
do {
let worth = attempt await process.obtain()
continuation.yield(worth)
} catch {
continuation.end(throwing: error)
isAlive = false
}
}
}
}
}()
The duty that we run our whereas
loop in received’t finish except we finish our stream from inside our catch
block. If we manually shut the net socket connection utilizing the cancel
technique we write earlier, the decision to obtain()
won’t ever obtain an error nor a worth which signifies that it will likely be caught ceaselessly.
Essentially the most dependable strategy to repair that is to return to the callback primarily based model of obtain
to drive your async stream:
non-public lazy var stream: WebSocketStream = {
return WebSocketStream { continuation in
self.continuation = continuation
waitForNextValue()
}
}()
non-public func waitForNextValue() {
guard process.closeCode == .invalid else {
continuation?.end()
return
}
process.obtain(completionHandler: { [weak self] lead to
guard let continuation = self?.continuation else {
return
}
do {
let message = attempt end result.get()
continuation.yield(message)
self?.waitForNextValue()
} catch {
continuation.end(throwing: error)
}
})
}
With this method we don’t have any lingering duties, and our name website is as clear and concise as ever; we’ve solely modified a few of our inner logic.
In Abstract
Swift Concurrency gives many helpful options for writing higher code, and Apple rapidly adopted async / await for present APIs. Nonetheless, some APIs that will be helpful are lacking, corresponding to iterating over net socket messages.
On this put up, you realized methods to use async streams to create an async sequence that emits net socket messages. You first noticed a completely async / await model that was neat, however had reminiscence and process lifecycle points. Then, you noticed a model that mixes a callback-based method with the async stream.
The result’s a straightforward strategy to iterate over incoming net socket messages with async / await. You probably have any questions, feedback, or enhancements for this put up, please do not hesitate to achieve out to me on Twitter.