Files
server/runnermanager/runnermanager.go
T

207 lines
6.0 KiB
Go

package runnermanager
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/op/go-logging"
"google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/util"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
)
var log = logging.MustGetLogger("cursorius-server")
func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) {
tagsStr := util.FormatTags(req.Tags)
log.Infof("Got request for runner with tags \"%v\"", tagsStr)
log.Debugf("Finding runner with tags %v", tagsStr)
foundRunner := false
runnersToRemove := []int{}
runnerIter:
for i, runner := range r.connectedRunners {
// don't allocate runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner,
// so we aren't losing any data here
select {
case _, ok := <-runner.receiveChan:
if ok {
// this should never happen
// TODO: should we disconnect the runner?
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
continue
}
log.Noticef("Removing defunct runner \"%v\"", runner.id)
runnersToRemove = append(runnersToRemove, i)
default:
log.Debugf("Checking runner %v for requested tags", runner.id)
if !runner.HasTags(req.Tags) {
continue runnerIter
}
runnersToRemove = append(runnersToRemove, i)
foundRunner = true
log.Debugf("Runner %v has requested tags, allocating", runner.id)
req.RespChan <- RunnerAllocationResponse{
Runner: &r.connectedRunners[i],
Err: nil,
}
}
}
// remove allocated runner plus defunct runners
// since we iterate, all the indexes will be in accending order
for i, runnerInd := range runnersToRemove {
r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1]
r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-1]
}
if foundRunner {
return
}
errorMsg := "could not find valid runner"
if len(r.connectedRunners) == 0 {
errorMsg = "no connected runners"
}
req.RespChan <- RunnerAllocationResponse{
Runner: nil,
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
}
}
func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) {
log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret)
// Get runner with give id from database
runnerId, err := uuid.Parse(req.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if req.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags)
runner := Runner{
id: runnerId,
tags: req.Tags,
conn: req.conn,
receiveChan: make(chan []byte),
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := req.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
}
func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) {
r.connectedRunners = append(r.connectedRunners, *req.Runner)
}
func runRunnerManager(r runnerManager) {
for {
select {
case request := <-r.chans.Allocation:
r.processRunnerAllocation(request)
case release := <-r.chans.Release:
r.processRunnerRelease(release)
case registration := <-r.chans.Registration:
r.processRunnerRegistration(registration)
}
}
}
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) {
scheduler := runnerManager{
chans: RunnerManagerChans{
Allocation: make(chan RunnerAllocationRequest),
Release: make(chan RunnerReleaseRequest),
Registration: make(chan RunnerRegistrationRequest),
},
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
db: db,
}
go runRunnerManager(scheduler)
return scheduler.chans, nil
}
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var registration RunnerRegistrationRequest
registration.conn = conn
typ, r, err := conn.Read(ctx)
if err != nil {
log.Errorf("Could not read from runner websocket connection: %v", err)
log.Errorf("Disconnecting...")
return
}
if typ != websocket.MessageBinary {
log.Error("Got non binary message from runner, disconnecting...")
conn.Close(websocket.StatusUnsupportedData, "Requires binary data")
return
}
registration_proto := &runner_api.Register{}
if err := proto.Unmarshal(r, registration_proto); err != nil {
log.Error("Could not parse registration message from runner, disconnection....")
conn.Close(websocket.StatusUnsupportedData, "Invalid message")
return
}
registration.Secret = registration_proto.Secret
registration.Id = registration_proto.Id
registration.Tags = registration_proto.Tags
registerCh <- registration
}