Skip to content

Instantly share code, notes, and snippets.

@developmentalmadness
Last active February 25, 2016 00:48
Show Gist options
  • Save developmentalmadness/f29226d20eb011f189b1 to your computer and use it in GitHub Desktop.
Save developmentalmadness/f29226d20eb011f189b1 to your computer and use it in GitHub Desktop.
How to continuously monitor a postgresql database table for new records (with optional 1/second throttling)
pg-postgres = {
url = ${DB_PG_URL}/postgres
user = ${DB_PG_USER}
password = ${DB_PG_PWD}
driver = org.postgresql.Driver
}
name := """postgresql-table-monitor"""
version := "1.0"
scalaVersion := "2.11.7"
val akkaVersion = "2.4.2"
libraryDependencies ++= List(
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.slick" %% "slick" % "3.0.0",
"org.postgresql" % "postgresql" % "9.4-1201-jdbc41",
"ch.qos.logback" % "logback-classic" % "1.1.3",
"com.zaxxer" % "HikariCP" % "2.4.1",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion
)
package com.dvMENTALmadness
import akka.actor.ActorSystem
import slick.driver.PostgresDriver.api._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.ExecutionContext.Implicits.global
case class Record(id: Int, value: String)
class Records(tag: Tag) extends Table[Record](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (Record.tupled, Record.unapply)
}
object Monitor {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Monitor")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val queryLimit = 5
try {
val newRecStream = Source.unfoldAsync(0) { n =>
val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
db.run(q.result).map { recs =>
val lastId = if(recs.isEmpty) n else recs.last.id
Some(lastId, recs)
}
}
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat { recs =>
Source.fromIterator(() => recs.iterator)
}
.runForeach { rec =>
println(s"${rec.id}, ${rec.value}")
}
Await.ready(newRecStream, Duration.Inf)
}
catch
{
case ex: Throwable => println(ex)
}
finally {
system.shutdown
db.close
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment