Update runner manager for new database driven runner config
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
"nhooyr.io/websocket"
|
||||
@@ -17,14 +18,14 @@ type RunnerData struct {
|
||||
}
|
||||
|
||||
type Runner struct {
|
||||
id string
|
||||
id uuid.UUID
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan []byte
|
||||
running bool
|
||||
}
|
||||
|
||||
func (r *Runner) Id() string {
|
||||
func (r *Runner) Id() uuid.UUID {
|
||||
return r.id
|
||||
}
|
||||
|
||||
|
||||
@@ -6,11 +6,13 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/op/go-logging"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"nhooyr.io/websocket"
|
||||
|
||||
"git.ohea.xyz/cursorius/server/config"
|
||||
"git.ohea.xyz/cursorius/server/database"
|
||||
|
||||
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
||||
)
|
||||
@@ -30,6 +32,7 @@ type runnerManager struct {
|
||||
connectedRunners []Runner
|
||||
numConnectedRunners uint64
|
||||
configuredRunners map[string]config.Runner
|
||||
db database.Database
|
||||
}
|
||||
|
||||
type GetRunnerRequest struct {
|
||||
@@ -126,50 +129,60 @@ runnerIter:
|
||||
|
||||
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
||||
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
||||
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
|
||||
if configuredRunner.Secret == reg.Secret {
|
||||
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
||||
runner := Runner{
|
||||
id: reg.Id,
|
||||
tags: reg.Tags,
|
||||
conn: reg.conn,
|
||||
receiveChan: make(chan []byte),
|
||||
running: false,
|
||||
}
|
||||
r.connectedRunners = append(r.connectedRunners, runner)
|
||||
// start goroutine to call Read function on websocket connection
|
||||
// this is required to keep the connection functioning
|
||||
go func() {
|
||||
defer log.Noticef("Deregistered runner with id: %v", runner.id)
|
||||
defer close(runner.receiveChan)
|
||||
for {
|
||||
msgType, data, err := reg.conn.Read(context.Background())
|
||||
if err != nil {
|
||||
// TODO: this is still racy, since a runner could be allocated between the
|
||||
// connection returning an err and the channel closing
|
||||
// This should probably be handled by sending erroring, but not 100% sure
|
||||
log.Errorf("Could not read from connection: %v", err)
|
||||
return
|
||||
}
|
||||
if msgType != websocket.MessageBinary {
|
||||
close(runner.receiveChan)
|
||||
log.Errorf("Got binary data from connection")
|
||||
return
|
||||
}
|
||||
|
||||
runner.receiveChan <- data
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
|
||||
// Get runner with give id from database
|
||||
runnerId, err := uuid.Parse(reg.Id)
|
||||
if err != nil {
|
||||
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
return
|
||||
}
|
||||
dbRunner, err := r.db.GetRunnerById(runnerId)
|
||||
if err != nil {
|
||||
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
return
|
||||
}
|
||||
|
||||
if reg.Secret != dbRunner.Token {
|
||||
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
||||
runner := Runner{
|
||||
id: runnerId,
|
||||
tags: reg.Tags,
|
||||
conn: reg.conn,
|
||||
receiveChan: make(chan []byte),
|
||||
running: false,
|
||||
}
|
||||
r.connectedRunners = append(r.connectedRunners, runner)
|
||||
// start goroutine to call Read function on websocket connection
|
||||
// this is required to keep the connection functioning
|
||||
go func() {
|
||||
defer log.Noticef("Deregistered runner with id: %v", runner.id)
|
||||
defer close(runner.receiveChan)
|
||||
for {
|
||||
msgType, data, err := reg.conn.Read(context.Background())
|
||||
if err != nil {
|
||||
// TODO: this is still racy, since a runner could be allocated between the
|
||||
// connection returning an err and the channel closing
|
||||
// This should probably be handled by sending erroring, but not 100% sure
|
||||
log.Errorf("Could not read from connection: %v", err)
|
||||
return
|
||||
}
|
||||
if msgType != websocket.MessageBinary {
|
||||
close(runner.receiveChan)
|
||||
log.Errorf("Got binary data from connection")
|
||||
return
|
||||
}
|
||||
|
||||
runner.receiveChan <- data
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func runRunnerManager(r runnerManager) {
|
||||
@@ -184,12 +197,13 @@ func runRunnerManager(r runnerManager) {
|
||||
}
|
||||
}
|
||||
|
||||
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) {
|
||||
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
|
||||
scheduler := runnerManager{
|
||||
getRunnerCh: make(chan GetRunnerRequest),
|
||||
registerCh: make(chan RunnerRegistration),
|
||||
connectedRunners: make([]Runner, 0),
|
||||
configuredRunners: configuredRunners,
|
||||
db: db,
|
||||
}
|
||||
|
||||
go runRunnerManager(scheduler)
|
||||
|
||||
Reference in New Issue
Block a user