Last active
July 30, 2023 15:26
-
-
Save Shikugawa/11fb6b1f8588e69d9b46ac2dfdf83b9d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"math/rand" | |
"net" | |
"net/http" | |
"net/rpc" | |
"strings" | |
"sync" | |
"time" | |
log "github.com/sirupsen/logrus" | |
) | |
type RaftLogEntry []byte | |
type RaftLog struct { | |
commitedHeadIdx int | |
appliedHeadIdx int | |
logEntries []RaftLogEntry // TODO: max length | |
mu sync.Mutex | |
} | |
func NewRaftLog() *RaftLog { | |
return &RaftLog{} | |
} | |
func (r *RaftLog) NewLogEntries(entries []RaftLogEntry) { | |
r.mu.Lock() | |
defer r.mu.Unlock() | |
r.logEntries = append(r.logEntries, entries...) | |
} | |
func (r *RaftLog) ToCommit() []RaftLogEntry { | |
r.mu.Lock() | |
defer r.mu.Unlock() | |
if r.commitedHeadIdx >= len(r.logEntries) { | |
return []RaftLogEntry{} | |
} | |
entries := r.logEntries[r.commitedHeadIdx:] | |
r.commitedHeadIdx = len(r.logEntries) | |
return entries | |
} | |
func (r *RaftLog) ToApply(len int) []RaftLogEntry { | |
entries := r.logEntries[r.appliedHeadIdx : r.appliedHeadIdx+len] | |
r.appliedHeadIdx += len | |
return entries | |
} | |
type RaftState struct { | |
term int | |
state Role | |
leader string | |
deadline time.Time | |
} | |
func NewRaftState() *RaftState { | |
return &RaftState{ | |
term: 0, | |
state: Follower, | |
} | |
} | |
func (r *RaftState) UpdateDeadline() { | |
timeout := time.Duration((150 + rand.Intn(150+1)) * int(time.Millisecond)) | |
r.deadline = time.Now().Add(timeout) | |
} | |
func (r *RaftState) ToFollower() { | |
log.Info("became to Follower") | |
r.state = Follower | |
r.UpdateDeadline() | |
} | |
func (r *RaftState) ToCandidate() { | |
log.Info("became to Candidate") | |
r.state = Candidate | |
r.UpdateDeadline() | |
} | |
func (r *RaftState) ToLeader() { | |
log.Info("became to Leader") | |
r.state = Leader | |
} | |
func (r *RaftState) IsLeader(hostname string) bool { | |
return hostname == r.leader | |
} | |
func (r *RaftState) SetLeader(leader string) { | |
r.leader = leader | |
} | |
type InflightVoteRequest struct { | |
members []string | |
req *VoteRequestRequest | |
resp chan *VoteRequestResponse | |
deadline time.Time | |
} | |
func NewInflightVoteRequest(members []string, req *VoteRequestRequest, | |
timeout time.Duration) *InflightVoteRequest { | |
return &InflightVoteRequest{ | |
members: members, | |
req: req, | |
resp: make(chan *VoteRequestResponse, len(members)), | |
deadline: time.Now().Add(timeout), | |
} | |
} | |
func (i *InflightVoteRequest) Timeouted() bool { | |
// TODO: support timeout | |
return false | |
} | |
func (i *InflightVoteRequest) Finished() bool { | |
return i.Timeouted() || len(i.resp) >= len(i.members) | |
} | |
func (i *InflightVoteRequest) Elected() bool { | |
if !i.Finished() || i.Timeouted() { | |
return false | |
} | |
count := 0 | |
for j := 0; j < len(i.members); j++ { | |
res := <-i.resp | |
if res.Vote { | |
count += 1 | |
} | |
} | |
return count > (len(i.members)+1)/2 | |
} | |
func (i *InflightVoteRequest) Start() { | |
for j := range i.members { | |
go func(k int) { | |
res, err := NewRaftClient(i.members[k]).VoteRequest(i.req) | |
if err != nil { | |
log.Error(err) | |
return | |
} | |
i.resp <- res | |
}(j) | |
} | |
} | |
type InflightAppendEntiriesRequest struct { | |
resp chan *AppendEntiriesResponse | |
members []string | |
req *AppendEntiriesRequest | |
deadline time.Time | |
} | |
func NewInflightAppendEntiriesRequest(hostname string, members []string, | |
req *AppendEntiriesRequest, timeout time.Duration) *InflightAppendEntiriesRequest { | |
return &InflightAppendEntiriesRequest{ | |
resp: make(chan *AppendEntiriesResponse, len(members)), | |
members: members, | |
req: req, | |
deadline: time.Now().Add(timeout), | |
} | |
} | |
func (i *InflightAppendEntiriesRequest) Timeouted() bool { | |
// TODO: support timeout | |
return false | |
} | |
func (i *InflightAppendEntiriesRequest) Finished() bool { | |
return i.Timeouted() || len(i.resp) >= len(i.members) | |
} | |
func (i *InflightAppendEntiriesRequest) ShouldApply() bool { | |
if !i.Finished() || i.Timeouted() { | |
return false | |
} | |
count := 0 | |
for j := 0; j < len(i.members); j++ { | |
res := <-i.resp | |
if res.Ack { | |
count += 1 | |
} | |
} | |
return count > (len(i.members)+1)/2 | |
} | |
func (i *InflightAppendEntiriesRequest) Start() { | |
for j := range i.members { | |
go func(k int) { | |
res, err := NewRaftClient(i.members[k]).AppendEntiriesRequest(i.req) | |
if err != nil { | |
log.Error(err) | |
return | |
} | |
i.resp <- res | |
}(j) | |
} | |
} | |
type RaftAgent struct { | |
state *RaftState | |
log *RaftLog | |
storage *InMemoryStorage | |
hostname string | |
members []string | |
inflightVoteRequest *InflightVoteRequest | |
inflightAppendEntiriesRequest *InflightAppendEntiriesRequest | |
} | |
func NewRaftAgent(state *RaftState, log *RaftLog, storage *InMemoryStorage, | |
hostname string, members []string) *RaftAgent { | |
return &RaftAgent{ | |
state: state, | |
log: log, | |
storage: storage, | |
hostname: hostname, | |
members: members, | |
} | |
} | |
func (r *RaftAgent) doFollowerTask() { | |
if time.Now().After(r.state.deadline) { | |
r.state.term += 1 | |
r.state.ToCandidate() | |
} | |
} | |
func (r *RaftAgent) doCandidateTask() { | |
if r.inflightVoteRequest != nil { | |
if !r.inflightVoteRequest.Finished() { | |
return | |
} | |
if r.inflightVoteRequest.Elected() { | |
r.state.ToLeader() | |
r.state.SetLeader(r.hostname) | |
} else { | |
r.state.ToFollower() | |
} | |
r.inflightVoteRequest = nil | |
return | |
} | |
req := VoteRequestRequest{ | |
Term: r.state.term, | |
} | |
r.inflightVoteRequest = NewInflightVoteRequest(r.members, &req, 300*time.Millisecond) | |
r.inflightVoteRequest.Start() | |
} | |
func (r *RaftAgent) doLeaderTask() { | |
if r.inflightAppendEntiriesRequest != nil { | |
if !r.inflightAppendEntiriesRequest.Finished() { | |
return | |
} | |
if r.inflightAppendEntiriesRequest.ShouldApply() { | |
entries := r.log.ToApply(len(r.inflightAppendEntiriesRequest.req.Entries)) | |
data := [][]byte{} | |
for i := range entries { | |
data = append(data, entries[i]) | |
} | |
r.storage.Apply(data) | |
if len(data) != 0 { | |
log.Info("leader state", r.storage.data) | |
} | |
} | |
r.inflightAppendEntiriesRequest = nil | |
return | |
} | |
commitEntries := r.log.ToCommit() | |
req := AppendEntiriesRequest{ | |
RequestFrom: r.hostname, | |
Entries: commitEntries, | |
} | |
r.inflightAppendEntiriesRequest = NewInflightAppendEntiriesRequest( | |
r.hostname, r.members, &req, 300*time.Millisecond) | |
r.inflightAppendEntiriesRequest.Start() | |
} | |
func (r *RaftAgent) Start() { | |
r.state.UpdateDeadline() | |
interval := time.NewTicker(10 * time.Millisecond) | |
for { | |
select { | |
case <-interval.C: | |
if r.state.state == Follower { | |
r.doFollowerTask() | |
} else if r.state.state == Leader { | |
r.doLeaderTask() | |
} else if r.state.state == Candidate { | |
r.doCandidateTask() | |
} | |
} | |
} | |
} | |
type VoteRequestRequest struct { | |
Term int | |
} | |
type VoteRequestResponse struct { | |
Vote bool | |
} | |
type AppendEntiriesRequest struct { | |
RequestFrom string | |
Entries []RaftLogEntry | |
} | |
type AppendEntiriesResponse struct { | |
Ack bool | |
} | |
type RaftService struct { | |
state *RaftState | |
log *RaftLog | |
storage *InMemoryStorage | |
mu sync.Mutex | |
} | |
func NewRaftService(state *RaftState, log *RaftLog, storage *InMemoryStorage) *RaftService { | |
return &RaftService{ | |
state: state, | |
log: log, | |
storage: storage, | |
} | |
} | |
func (r *RaftService) VoteRequest(req *VoteRequestRequest, res *VoteRequestResponse) error { | |
// 2台以上のCandidateが選出され、それぞれが同時にVoteRequestを送った場合、ここが同時に実行される可能性がある。 | |
// しかし、それはつまり2台以上のCandidateに投票してしまうことを意味する。 | |
// 2台以上のCandidateに投票してしまうFollowerが複数いた場合、Leaderが複数選択されてしまう可能性がある。 | |
// そこで同時実行を防ぐためにロックを取っている。 | |
r.mu.Lock() | |
defer r.mu.Unlock() | |
if r.state.state != Follower || req.Term <= r.state.term { | |
res.Vote = false | |
return nil | |
} | |
r.state.UpdateDeadline() | |
r.state.term = req.Term | |
res.Vote = true | |
return nil | |
} | |
func (r *RaftService) AppendEntiries(req *AppendEntiriesRequest, res *AppendEntiriesResponse) error { | |
if r.state.state == Leader { | |
// リーダーがAppendEntiriesを受け取った時、リーダーが複数いる可能性があるので、再選挙が必要 | |
r.state.ToCandidate() | |
} else if r.state.state == Candidate { | |
r.state.ToCandidate() | |
} else { // Follower | |
entries := r.log.ToApply(len(r.log.logEntries) - r.log.appliedHeadIdx) | |
data := [][]byte{} | |
for i := range entries { | |
data = append(data, entries[i]) | |
} | |
r.storage.Apply(data) | |
if len(data) != 0 { | |
log.Info("follower state", r.storage.data) | |
} | |
r.log.NewLogEntries(req.Entries) | |
r.log.ToCommit() | |
res.Ack = true | |
r.state.SetLeader(req.RequestFrom) | |
r.state.UpdateDeadline() | |
} | |
return nil | |
} | |
type RaftClient struct { | |
target string | |
} | |
func NewRaftClient(target string) *RaftClient { | |
return &RaftClient{ | |
target: target, | |
} | |
} | |
func (r *RaftClient) VoteRequest(req *VoteRequestRequest) (*VoteRequestResponse, error) { | |
client, err := rpc.DialHTTP("tcp", r.target) | |
if err != nil { | |
return nil, err | |
} | |
defer client.Close() | |
var voteReqRes VoteRequestResponse | |
err = client.Call("RaftService.VoteRequest", req, &voteReqRes) | |
if err != nil { | |
return nil, err | |
} | |
return &voteReqRes, nil | |
} | |
func (r *RaftClient) AppendEntiriesRequest(req *AppendEntiriesRequest) (*AppendEntiriesResponse, error) { | |
client, err := rpc.DialHTTP("tcp", r.target) | |
if err != nil { | |
return nil, err | |
} | |
defer client.Close() | |
var res AppendEntiriesResponse | |
err = client.Call("RaftService.AppendEntiries", req, &res) | |
if err != nil { | |
return nil, err | |
} | |
return &res, nil | |
} | |
type Role string | |
const ( | |
Follower Role = "Follower" | |
Candidate Role = "Candidate" | |
Leader Role = "Leader" | |
) | |
func startServer(state *RaftState, log *RaftLog, storage *InMemoryStorage) error { | |
rpc.Register(NewRaftService(state, log, storage)) | |
rpc.HandleHTTP() | |
l, err := net.Listen("tcp", ":1234") | |
if err != nil { | |
return err | |
} | |
defer l.Close() | |
err = http.Serve(l, nil) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
type InMemoryStorage struct { | |
data [][]byte | |
} | |
func NewInMemoryStore() *InMemoryStorage { | |
return &InMemoryStorage{} | |
} | |
func (i *InMemoryStorage) Apply(data [][]byte) { | |
i.data = append(i.data, data...) | |
} | |
type HTTPService struct { | |
hostname string | |
state *RaftState | |
log *RaftLog | |
} | |
func (s *HTTPService) helloHandler(w http.ResponseWriter, r *http.Request) { | |
if !s.state.IsLeader(s.hostname) { | |
// TODO: redirect | |
} else { | |
val := []byte{'a'} | |
val2 := []byte{'b'} | |
s.log.NewLogEntries([]RaftLogEntry{val, val2}) | |
} | |
} | |
func init() { | |
rand.Seed(time.Now().UnixNano()) | |
} | |
func main() { | |
var hostname string | |
var members string | |
// Define flags and their usage descriptions | |
flag.StringVar(&hostname, "hostname", "", "hostname") | |
flag.StringVar(&members, "members", "", "member list") | |
// Parse the command-line flags | |
flag.Parse() | |
state := NewRaftState() | |
log := NewRaftLog() | |
storage := NewInMemoryStore() | |
go startServer(state, log, storage) | |
go func() { | |
service := &HTTPService{ | |
state: state, | |
log: log, | |
hostname: hostname, | |
} | |
// Define the route and handler for the server | |
http.HandleFunc("/", service.helloHandler) | |
// Start the HTTP server | |
port := 8080 | |
fmt.Printf("Starting server on port %d...\n", port) | |
err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil) | |
if err != nil { | |
fmt.Println("Error starting server:", err) | |
} | |
}() | |
time.Sleep(3 * time.Second) | |
membersList := strings.Split(members, ",") | |
agent := NewRaftAgent(state, log, storage, hostname, membersList) | |
agent.Start() | |
// TTODO: graceful | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment