Skip to content

Instantly share code, notes, and snippets.

@patchthecode
Forked from JUSTINMKAUFMAN/SSE.swift
Created May 4, 2022 13:58
Show Gist options
  • Save patchthecode/0cff293548b3eaed99dfb3fd6dad586f to your computer and use it in GitHub Desktop.
Save patchthecode/0cff293548b3eaed99dfb3fd6dad586f to your computer and use it in GitHub Desktop.
Swift SSE EventSource Server for Vapor
import Foundation
import NIO
import Vapor
/**
SSE EventSource server implementation for Swift Vapor 3
Usage:
```
// Create a route for all incoming SSE connection
// requests and save a reference to the new stream
router.get("sse", String.parameter) { request -> SSEStream<SSEEvent> in
let stream = SSEStream<SSEEvent>(on: request)
self.streams.add(stream)
return stream
}
// Schedule an event to be published on the stream
let event = SSEEvent(type: .put, id: eventId, data: eventData)
stream.schedule(event)
```
*/
enum SSEEventType: String, Content {
case put, ping, patch, delete
}
public struct SSEEvent: Content {
let type: SSEEventType
let id: String?
let data: String?
init(type: SSEEventType,
id: String? = nil,
data: String? = nil) {
self.type = type
self.id = id
self.data = data
}
}
class SSEStream<ResponseType>: ResponseEncodable where ResponseType: Encodable {
private let chunkedStream: HTTPChunkedStream
private let allocator: ByteBufferAllocator
private let response: Response
private let request: Request
init(on request: Request) {
self.request = request
response = Response(using: request)
chunkedStream = HTTPChunkedStream(on: request)
response.http.headers.add(name: .contentType, value: "text/event-stream")
response.http.headers.add(name: .transferEncoding, value: "chunked")
response.http.headers.add(name: .cacheControl, value: "no-cache")
response.http.headers.add(name: .connection, value: "keep-alive")
response.http.headers.add(name: .accessControlAllowOrigin, value: "*")
response.http.status = .ok
response.http.body = chunkedStream.convertToHTTPBody()
allocator = ByteBufferAllocator()
}
func encode(for req: Request) throws -> EventLoopFuture<Response> {
return req.future(response)
}
public func schedule(_ event: SSEEvent,
initialDelay: TimeAmount = .seconds(0),
repeatInterval: TimeAmount? = nil) {
if let repeatInterval = repeatInterval {
response.eventLoop.scheduleRepeatedTask(initialDelay: initialDelay, delay: repeatInterval) { [unowned self] task -> Future<Void> in
guard !self.chunkedStream.isClosed else {
task.cancel()
return self.request.future()
}
let response = self.eventFuture(for: event)
response.whenFailure { _ in task.cancel() }
return response
}
} else {
request.eventLoop.scheduleTask(in: initialDelay) { [unowned self] () -> Future<Void> in
guard !self.chunkedStream.isClosed else { return self.request.future() }
return self.eventFuture(for: event)
}
}
}
@discardableResult
func succeed(with content: ResponseType) throws -> Future<Void> {
let encoder = JSONEncoder()
let data = try encoder.encode(content)
let stream = chunkedStream
var contentBuffer = allocator.buffer(capacity: data.count)
contentBuffer.write(bytes: data)
return stream.eventLoop.submit {
stream.write(.chunk(contentBuffer))
.then { stream.write(.end) }
}.then { $0 }
}
}
private extension SSEStream {
func startPulse(for request: Request) {
var keepAliveBuffer: ByteBuffer = allocator.buffer(capacity: 1)
keepAliveBuffer.write(string: "\n")
request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(0)) { [unowned self] task -> Future<Void> in
guard !self.chunkedStream.isClosed else {
task.cancel()
return request.future()
}
let response = self.chunkedStream.write(.chunk(keepAliveBuffer))
response.whenFailure { _ in task.cancel() }
return response
}
}
@discardableResult
func eventFuture(for event: SSEEvent) -> Future<Void> {
let data: [UInt8] = Array(payload(for: event).utf8)
var contentBuffer = allocator.buffer(capacity: data.count)
contentBuffer.write(bytes: data)
let stream = chunkedStream
return stream.eventLoop.submit {
stream.write(.chunk(contentBuffer))
}.then { $0 }
}
func payload(for event: SSEEvent) -> String {
var payload: String = ""
if let id = event.id { payload += "id:\(id)\n" }
if let eventType = event.type { payload += "event:\(eventType)\n" }
payload += "data:\(event.data ?? "")\n\n"
return payload
}
}
final class SSEStreamManager<T: Encodable> {
private var streams: [SSEStream<T>] = []
private let queue = DispatchQueue(label: "SSE-\(T.self)-Queue")
func add(_ stream: SSEStream<T>) {
queue.async { [weak self] in
self?.streams.append(stream)
}
}
func pop() -> SSEStream<T>? {
return queue.sync { [weak self] in
guard let self = self, !self.streams.isEmpty else { return nil }
return self.streams.removeFirst()
}
}
func publish(_ event: T) {
streams.forEach { stream in
if let event = event as? SSEEvent {
stream.schedule(event)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment