Last active
February 23, 2022 23:28
-
-
Save tidwall/068bab96aad2c39e7604cc7ff809204f to your computer and use it in GitHub Desktop.
Uhaha using leveldb key/value store
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"io" | |
"os" | |
"github.com/syndtr/goleveldb/leveldb" | |
"github.com/tidwall/sds" | |
"github.com/tidwall/uhaha" | |
) | |
func main() { | |
// Choose where your leveldb will exist on the server. | |
path := "data.db" | |
// Open a blank representation of the data. | |
db, err := loadData(path, nil) | |
if err != nil { | |
panic(err) | |
} | |
// Configure the uhaha server. | |
var c uhaha.Config | |
// Set the initial data to point to the blank database. | |
c.InitialData = db | |
// Assign the Snapshot and Restore functions. These are important so | |
// the Raft log does not grow unbound. | |
c.Snapshot = saveToSnapshot | |
c.Restore = func(rd io.Reader) (data interface{}, err error) { | |
return loadData(path, rd) | |
} | |
// Create your commands. | |
c.AddWriteCommand("SET", cmdSET) | |
c.AddReadCommand("GET", cmdGET) | |
// Start the server. | |
uhaha.Main(c) | |
} | |
func cmdSET(m uhaha.Machine, args []string) (interface{}, error) { | |
db := m.Data().(*leveldb.DB) | |
if len(args) != 3 { | |
return nil, uhaha.ErrWrongNumArgs | |
} | |
err := db.Put([]byte(args[1]), []byte(args[2]), nil) | |
if err != nil { | |
return nil, err | |
} | |
return true, nil | |
} | |
func cmdGET(m uhaha.Machine, args []string) (interface{}, error) { | |
db := m.Data().(*leveldb.DB) | |
if len(args) != 2 { | |
return nil, uhaha.ErrWrongNumArgs | |
} | |
val, err := db.Get([]byte(args[1]), nil) | |
if err != nil { | |
if err == leveldb.ErrNotFound { | |
return nil, nil | |
} | |
return nil, err | |
} | |
return val, nil | |
} | |
type mySnapshot struct { | |
snap *leveldb.Snapshot | |
} | |
func (s *mySnapshot) Persist(w io.Writer) error { | |
// This happens in the background so there's no blocking the Uhaha | |
// application. Incoming connections can use commands while persisting. | |
// We'll use the "tidwall/sds" library for writing the key/value pairs. | |
sw := sds.NewWriter(w) | |
// Create a new snapshot iterator. | |
iter := s.snap.NewIterator(nil, nil) | |
// Always release the iterator and snapshot when done. | |
defer func() { | |
iter.Release() | |
s.snap.Release() | |
}() | |
for iter.Next() { | |
if err := sw.WriteBytes(iter.Key()); err != nil { | |
return err | |
} | |
if err := sw.WriteBytes(iter.Value()); err != nil { | |
return err | |
} | |
} | |
return iter.Error() | |
} | |
func (s *mySnapshot) Done(string) { | |
// This function can be left blank. | |
} | |
func saveToSnapshot(data interface{}) (uhaha.Snapshot, error) { | |
// Save the current leveldb database to a Uhaha snapshot. | |
// Leveldb has a handy snapshot utility. | |
db := data.(*leveldb.DB) | |
snap, err := db.GetSnapshot() | |
if err != nil { | |
return nil, err | |
} | |
return &mySnapshot{snap: snap}, nil | |
} | |
func loadData(path string, r io.Reader) (interface{}, error) { | |
// Remove the old database. This will be okay because all data is restored | |
// from the Raft snapshot. | |
if err := os.RemoveAll(path); err != nil { | |
return nil, err | |
} | |
var ok bool | |
db, err := leveldb.OpenFile(path, nil) | |
if err != nil { | |
return nil, err | |
} | |
defer func() { | |
if !ok { | |
// There was a problem restore the database. | |
// Close the database to free resources. | |
db.Close() | |
} | |
}() | |
if r == nil { | |
// A snapshot was not provided. Use a blank database. | |
ok = true | |
return db, nil | |
} | |
// Read from the snapshot reader. | |
var b leveldb.Batch | |
sr := sds.NewReader(r) | |
for i := 0; ; i++ { | |
// Read the next key. | |
key, err := sr.ReadBytes() | |
if err != nil { | |
if err == io.EOF { | |
// No more key/value pairs. | |
break | |
} | |
return nil, err | |
} | |
// Read the next value | |
value, err := sr.ReadBytes() | |
if err != nil { | |
return nil, err | |
} | |
// Add to a batch of up to 256 key/value pairs. | |
b.Put(key, value) | |
if b.Len() == 256 { | |
// Write the batch to disk. | |
if err := db.Write(&b, nil); err != nil { | |
return nil, err | |
} | |
// Reset the batch so we can add more pairs. | |
b.Reset() | |
} | |
} | |
// Write the remaining pairs. | |
if err := db.Write(&b, nil); err != nil { | |
return nil, err | |
} | |
// The database has now been restored. | |
ok = true | |
return db, err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment