Skip to content

Instantly share code, notes, and snippets.

@jameshartig
Last active May 4, 2018 17:02
Show Gist options
  • Save jameshartig/1976bf549c91998f9f37435663eb66cf to your computer and use it in GitHub Desktop.
Save jameshartig/1976bf549c91998f9f37435663eb66cf to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"io"
"log"
"net"
"runtime"
"sync/atomic"
"time"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
)
var created int64
var numConns int64
var lifetimeConns int64
var totalDuration int64
var totalPings int64
func main() {
ng := flag.Int("parallel", 100, "number of pinging goroutines")
ps := flag.Int("size", 50, "pool size")
limited := flag.Bool("limited", false, "should we use limited pool")
flag.Parse()
// Listen for incoming connections.
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
defer l.Close()
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
log.Println("error accepting", err)
continue
}
atomic.AddInt64(&lifetimeConns, 1)
go handle(conn)
}
}()
}
df := func(network, addr string) (*redis.Client, error) {
atomic.AddInt64(&created, 1)
return redis.Dial(network, addr)
}
opts := []pool.Opt{
pool.OnFullBuffer((*ps)*10, time.Second),
}
if *limited {
opts = append(opts, pool.CreateLimit(*ps, 100*time.Millisecond))
}
p, err := pool.NewCustom("tcp", l.Addr().String(), *ps, df, opts...)
if err != nil {
panic(err)
}
go func() {
var lastCreated int64
var lastDuration int64
var lastPings int64
for range time.Tick(time.Second) {
log.Printf("num connections %d\n", numConns)
log.Printf("num new connections %d\n", created-lastCreated)
log.Printf("num available %d\n", p.Avail())
lastCreated = created
np := totalPings - lastPings
log.Printf("num pings %d\n", np)
log.Printf("mean ping duration %dms\n", (totalDuration-lastDuration)/np)
lastPings = totalPings
lastDuration = totalDuration
}
}()
for i := 0; i < *ng; i++ {
go func() {
for range time.Tick(100 * time.Microsecond) {
n := time.Now()
if err := p.Cmd("PING").Err; err != nil {
log.Printf("error from PING: %v\n", err)
continue
}
atomic.AddInt64(&totalPings, 1)
atomic.AddInt64(&totalDuration, time.Since(n).Nanoseconds()/int64(time.Millisecond))
}
}()
}
select {}
}
func handle(conn net.Conn) {
atomic.AddInt64(&numConns, 1)
defer atomic.AddInt64(&numConns, -1)
defer conn.Close()
for {
b := make([]byte, 16)
_, err := conn.Read(b)
if err != nil {
if err != io.EOF {
log.Printf("error reading from %s: %v\n", conn.RemoteAddr().String(), err.Error())
}
return
}
_, err = conn.Write([]byte("+PONG\r\n"))
if err != nil {
log.Printf("error writing to %s: %v\n", conn.RemoteAddr().String(), err.Error())
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment