Last active
April 20, 2024 22:39
-
-
Save giuliohome/7ab1e57bd510b72f14faf0d95488a8de to your computer and use it in GitHub Desktop.
Connections count while concurrent goroutines run inside a DB conn pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Threading.Tasks | |
open Npgsql.FSharp | |
open Npgsql | |
let connectionString : string = | |
Sql.host "myhome" | |
|> Sql.database "test_db" | |
|> Sql.username "test_user" | |
|> Sql.password "test123" | |
|> Sql.port 5432 | |
|> Sql.formatConnectionString | |
// construct the connection | |
let singleton = new NpgsqlConnection(connectionString) | |
singleton.Open() | |
let checkConnectionPool (singleton: NpgsqlConnection) : Task<int list> = | |
singleton | |
|> Sql.existingConnection | |
|> Sql.query "select count(*) as conn_num from pg_stat_activity where usename='test_user';" | |
|> Sql.executeAsync (fun read -> | |
read.int "conn_num") | |
type Distributor = { Id: int; Name: string; } | |
let getDistributors (singleton: NpgsqlConnection) (myid: int): Async<Distributor list> = async { | |
let! res = | |
singleton | |
|> Sql.existingConnection | |
|> Sql.query "SELECT * FROM distributors WHERE did = @id" | |
|> Sql.parameters [ "@id", Sql.int myid ] | |
|> Sql.executeAsync (fun read -> | |
{ | |
Id = read.int "did" | |
Name = read.text "name" | |
}) | |
|> Async.AwaitTask | |
let! connNum = checkConnectionPool singleton |> Async.AwaitTask | |
connNum | |
|> List.head | |
|> printfn "connections: %d" | |
return res | |
} | |
async { | |
printfn "start" | |
let tasks = | |
[|1..30|] | |
|> Array.map(getDistributors singleton) | |
printfn "tasks ready" | |
let tasks = | |
tasks | |
|> Async.Sequential | |
printfn "tasks sequentiated" | |
let! tasks = tasks | |
tasks | |
|> Array.iter( | |
List.iter (fun d -> | |
printfn "name : %s id %d" d.Name d.Id) | |
) | |
printfn "end" | |
} | |
|> Async.RunSynchronously | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
_ "github.com/lib/pq" | |
) | |
const ( | |
numGoroutines = 100 | |
connectionURL = "postgres://test_user:[email protected]:5432/test_db?sslmode=disable" | |
) | |
func main() { | |
db, err := sql.Open("postgres", connectionURL) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer db.Close() | |
// Set maximum open connections and idle connections | |
db.SetMaxOpenConns(10) // Maximum open connections | |
db.SetMaxIdleConns(5) // Maximum idle connections | |
// Wait group to wait for all goroutines to finish | |
var wg sync.WaitGroup | |
// Channel to signal completion of ticker goroutine | |
tickerDone := make(chan struct{}) | |
// Channel to signal completion of goroutines | |
done := make(chan struct{}) | |
// Start ticker to periodically fetch connection count | |
ticker := time.NewTicker(100 * time.Millisecond) | |
defer ticker.Stop() | |
go func() { | |
defer close(tickerDone) | |
for { | |
select { | |
case <-ticker.C: | |
var connectionCount int | |
err := db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&connectionCount) | |
if err != nil { | |
log.Printf("Error fetching connection count: %s\n", err) | |
return | |
} | |
fmt.Printf("Connection count: %d\n", connectionCount) | |
case <-done: | |
return | |
} | |
} | |
}() | |
// Fetch initial connection count | |
var initialConnectionCount int | |
err = db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&initialConnectionCount) | |
if err != nil { | |
log.Printf("Error fetching initial connection count: %s\n", err) | |
return | |
} | |
fmt.Printf("Initial connection count: %d\n", initialConnectionCount) | |
// Start goroutines to execute transactions | |
for i := 0; i < numGoroutines; i++ { | |
wg.Add(1) | |
go func(id int) { | |
defer wg.Done() | |
err := executeTransaction(db, id) | |
if err != nil { | |
log.Printf("Error executing transaction in goroutine %d: %s\n", id, err) | |
} | |
}(i) | |
} | |
// Wait for all goroutines to finish | |
wg.Wait() | |
fmt.Println("All goroutines finished execution") | |
// Signal ticker to stop | |
close(done) | |
fmt.Println("Ticker stopped") | |
// Wait for ticker goroutine to finish | |
<-tickerDone | |
// Fetch final connection count | |
var finalConnectionCount int | |
err = db.QueryRow("SELECT COUNT(*) FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&finalConnectionCount) | |
if err != nil { | |
log.Printf("Error fetching final connection count: %s\n", err) | |
return | |
} | |
fmt.Printf("Final connection count: %d\n", finalConnectionCount) | |
} | |
func executeTransaction(db *sql.DB, id int) error { | |
// Start transaction | |
tx, err := db.Begin() | |
if err != nil { | |
return err | |
} | |
defer tx.Rollback() | |
// Sleep to simulate work | |
time.Sleep(time.Second) | |
fmt.Printf("Goroutine %d: Transaction started\n", id) | |
// Query active connections | |
var connectionCount int | |
err = tx.QueryRow("SELECT COUNT(*) AS connection_count FROM pg_stat_activity WHERE usename = 'test_user'").Scan(&connectionCount) | |
if err != nil { | |
return err | |
} | |
fmt.Printf("Goroutine %d: Connection count: %d\n", id, connectionCount) | |
// Commit transaction | |
err = tx.Commit() | |
if err != nil { | |
return err | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment