initial commit
This commit is contained in:
123
leader/leader.go
Normal file
123
leader/leader.go
Normal file
@ -0,0 +1,123 @@
|
||||
package leader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
|
||||
"novit.tech/go/etcdb"
|
||||
)
|
||||
|
||||
var leaderName string
|
||||
|
||||
func init() {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic("failed to get hostname: " + err.Error())
|
||||
}
|
||||
leaderName = hostname
|
||||
|
||||
flag.StringVar(&leaderName, "leader-name", leaderName, "leader name in leader election")
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
Etcd *etcdb.ClientSpec
|
||||
Election string
|
||||
OnLeader func(ctx context.Context)
|
||||
}
|
||||
|
||||
// RunProcess is design for processes that are a single leader. Binds signals to exit gracefully.
|
||||
func (spec *Spec) RunProcess() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
c := make(chan os.Signal, 1)
|
||||
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
s := <-c
|
||||
|
||||
log.Print("got signal ", s)
|
||||
cancel()
|
||||
|
||||
<-c
|
||||
log.Print("2nd signal, forced exit")
|
||||
os.Exit(0)
|
||||
}()
|
||||
|
||||
spec.Run(ctx)
|
||||
}
|
||||
|
||||
func (spec *Spec) Run(ctx context.Context) {
|
||||
for {
|
||||
err := spec.runSession(ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Print("leader election: failed: ", err)
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (spec *Spec) runSession(ctx context.Context) (err error) {
|
||||
cli := spec.Etcd.Client()
|
||||
|
||||
runCtx, runCancel := context.WithCancel(context.Background())
|
||||
defer runCancel()
|
||||
|
||||
s, err := concurrency.NewSession(
|
||||
cli,
|
||||
concurrency.WithTTL(5),
|
||||
concurrency.WithContext(runCtx),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start leader election: %v", err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
election := concurrency.NewElection(s, spec.Election)
|
||||
|
||||
log.Print("leader election: waiting to become leader")
|
||||
if err := election.Campaign(ctx, leaderName); err != nil {
|
||||
return fmt.Errorf("leader election: campaign failed: %v", err)
|
||||
}
|
||||
|
||||
log.Print("leader election: got leader status")
|
||||
|
||||
leaderCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
spec.OnLeader(leaderCtx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-s.Done():
|
||||
log.Print("leader election: session done")
|
||||
case <-ctx.Done():
|
||||
log.Print("leader election: context cancelled")
|
||||
}
|
||||
|
||||
cancel()
|
||||
log.Print("leader election: waiting for routine")
|
||||
wg.Wait()
|
||||
log.Print("leader election: routine exited")
|
||||
|
||||
return
|
||||
}
|
Reference in New Issue
Block a user