Skip to content

Instantly share code, notes, and snippets.

@daviddenton
Last active August 30, 2021 15:01
Show Gist options
  • Save daviddenton/6f7c5b885b5f601313549e7affedbcc8 to your computer and use it in GitHub Desktop.
Save daviddenton/6f7c5b885b5f601313549e7affedbcc8 to your computer and use it in GitHub Desktop.
Bridging an SQS queue to an AWS Request SQS Event Lambda
fun main() {
val queue = QueueName.of("queueName")
val queueUrl = Uri.of("/$queue")
val fakeSqs = FakeSQS().apply { client().createQueue(queue, emptyList(), emptyMap()) }
val scheduler = Executors.newScheduledThreadPool(2)
scheduler.sendTestMessagesToQueue(fakeSqs, queueUrl)
val fn = LambdaFunction()
SQSToLambdaBridge(queueUrl, scheduler = scheduler, http = fakeSqs).start(fn::handleRequest)
}
fun LambdaFunction() = RequestHandler<SQSEvent, Unit> { input, _ ->
input.records.forEach {
println(it.body)
}
}
fun ScheduledExecutorService.sendTestMessagesToQueue(fakeSqs: FakeSQS, queueUrl: Uri) {
scheduleWithFixedDelay(
{
fakeSqs.client().sendMessage(queueUrl, "hello world")
}, 1, 1, TimeUnit.SECONDS
)
}
package org.http4k.example
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import org.http4k.aws.AwsSdkClient
import org.http4k.client.JavaHttpClient
import org.http4k.core.HttpHandler
import org.http4k.core.Uri
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import java.lang.reflect.Proxy
import java.time.Duration
import java.util.concurrent.Executors.newSingleThreadScheduledExecutor
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS
class SQSToLambdaBridge(
private val queueUrl: Uri,
private val checkInterval: Duration = Duration.ofSeconds(1),
private val scheduler: ScheduledExecutorService = newSingleThreadScheduledExecutor(),
private val http: HttpHandler = JavaHttpClient(),
private val makeContext: () -> Context = ::proxy
) {
fun start(fn: (SQSEvent, Context) -> Unit) {
val sqs = SqsClient.builder()
.credentialsProvider(create(AwsBasicCredentials.create("accessKeyId", "secretKey")))
.httpClient(AwsSdkClient(http))
.build()
scheduler.scheduleWithFixedDelay(
{
sqs.receiveMessage(
ReceiveMessageRequest.builder()
.queueUrl(queueUrl.toString())
.build()
).messages().forEach {
val event = SQSEvent().apply {
records = listOf(SQSEvent.SQSMessage().apply { body = it.body() })
}
fn(event, makeContext())
sqs.deleteMessage(
DeleteMessageRequest.builder()
.queueUrl(queueUrl.toString())
.receiptHandle(it.receiptHandle())
.build()
)
}
}, 0, checkInterval.toMillis(), MILLISECONDS
)
}
private inline fun <reified T> proxy(): T = Proxy.newProxyInstance(
T::class.java.classLoader,
arrayOf(T::class.java)
) { _, _, _ -> TODO("not implemented") } as T
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment