231 lines
6.3 KiB
Go
231 lines
6.3 KiB
Go
package runnermanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/op/go-logging"
|
|
"google.golang.org/protobuf/proto"
|
|
"nhooyr.io/websocket"
|
|
|
|
"git.ohea.xyz/cursorius/server/config"
|
|
|
|
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type RunnerRegistration struct {
|
|
Secret string
|
|
Id string
|
|
Tags []string
|
|
conn *websocket.Conn
|
|
}
|
|
|
|
type runnerManager struct {
|
|
getRunnerCh chan GetRunnerRequest
|
|
registerCh chan RunnerRegistration
|
|
connectedRunners []Runner
|
|
numConnectedRunners uint64
|
|
configuredRunners map[string]config.Runner
|
|
}
|
|
|
|
type GetRunnerRequest struct {
|
|
Tags []string
|
|
RespChan chan GetRunnerResponse
|
|
}
|
|
|
|
type GetRunnerResponse struct {
|
|
Runner *Runner
|
|
Err error
|
|
}
|
|
|
|
type runnerJob struct {
|
|
Id string
|
|
URL string
|
|
}
|
|
|
|
func (r *runnerManager) processRequest(req GetRunnerRequest) {
|
|
var runnerTagsStr strings.Builder
|
|
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
|
|
for _, tag := range req.Tags[1:] {
|
|
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
|
|
}
|
|
fmt.Fprintf(&runnerTagsStr, "]")
|
|
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
|
|
|
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
|
|
|
|
foundRunner := false
|
|
|
|
runnersToRemove := []int{}
|
|
runnerIter:
|
|
for i, runner := range r.connectedRunners {
|
|
// don't allocate runner that is already occupied
|
|
if runner.running {
|
|
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
|
|
continue
|
|
}
|
|
// don't allocate runner with closed receiveChan (is defunct)
|
|
// there should never be messages to read on an inactive runner,
|
|
// so we aren't losing any data here
|
|
select {
|
|
case _, ok := <-runner.receiveChan:
|
|
if ok {
|
|
// this should never happen
|
|
// TODO: should we disconnect the runner?
|
|
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
|
|
continue
|
|
}
|
|
log.Noticef("Removing defunct runner \"%v\"", runner.id)
|
|
runnersToRemove = append(runnersToRemove, i)
|
|
default:
|
|
log.Debugf("Checking runner %v for requested tags", runner.id)
|
|
|
|
tagIter:
|
|
for _, requestedTag := range req.Tags {
|
|
for _, posessedTag := range runner.tags {
|
|
if requestedTag == posessedTag {
|
|
continue tagIter
|
|
}
|
|
}
|
|
continue runnerIter
|
|
}
|
|
|
|
r.connectedRunners[i].running = true
|
|
foundRunner = true
|
|
req.RespChan <- GetRunnerResponse{
|
|
Runner: &r.connectedRunners[i],
|
|
Err: nil,
|
|
}
|
|
}
|
|
|
|
}
|
|
// since we iterate, all the indexes will be in accending order
|
|
for i, runnerInd := range runnersToRemove {
|
|
r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1]
|
|
r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-2]
|
|
}
|
|
|
|
if foundRunner {
|
|
return
|
|
}
|
|
|
|
errorMsg := "could not find valid runner"
|
|
if len(r.connectedRunners) == 0 {
|
|
errorMsg = "no connected runners"
|
|
}
|
|
req.RespChan <- GetRunnerResponse{
|
|
Runner: &Runner{},
|
|
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
|
|
}
|
|
|
|
}
|
|
|
|
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
|
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
|
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
|
|
if configuredRunner.Secret == reg.Secret {
|
|
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
|
runner := Runner{
|
|
id: reg.Id,
|
|
tags: reg.Tags,
|
|
conn: reg.conn,
|
|
receiveChan: make(chan []byte),
|
|
running: false,
|
|
}
|
|
r.connectedRunners = append(r.connectedRunners, runner)
|
|
// start goroutine to call Read function on websocket connection
|
|
// this is required to keep the connection functioning
|
|
go func() {
|
|
defer log.Noticef("Deregistered runner with id: %v", runner.id)
|
|
defer close(runner.receiveChan)
|
|
for {
|
|
msgType, data, err := reg.conn.Read(context.Background())
|
|
if err != nil {
|
|
// TODO: this is still racy, since a runner could be allocated between the
|
|
// connection returning an err and the channel closing
|
|
// This should probably be handled by sending erroring, but not 100% sure
|
|
log.Errorf("Could not read from connection: %v", err)
|
|
return
|
|
}
|
|
if msgType != websocket.MessageBinary {
|
|
close(runner.receiveChan)
|
|
log.Errorf("Got binary data from connection")
|
|
return
|
|
}
|
|
|
|
runner.receiveChan <- data
|
|
|
|
}
|
|
}()
|
|
|
|
} else {
|
|
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
|
|
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
}
|
|
} else {
|
|
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
|
|
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
}
|
|
}
|
|
|
|
func runRunnerManager(r runnerManager) {
|
|
for {
|
|
select {
|
|
case request := <-r.getRunnerCh:
|
|
r.processRequest(request)
|
|
|
|
case registration := <-r.registerCh:
|
|
r.processRegistration(registration)
|
|
}
|
|
}
|
|
}
|
|
|
|
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) {
|
|
scheduler := runnerManager{
|
|
getRunnerCh: make(chan GetRunnerRequest),
|
|
registerCh: make(chan RunnerRegistration),
|
|
connectedRunners: make([]Runner, 0),
|
|
configuredRunners: configuredRunners,
|
|
}
|
|
|
|
go runRunnerManager(scheduler)
|
|
|
|
return scheduler.getRunnerCh, scheduler.registerCh, nil
|
|
}
|
|
|
|
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
|
|
var registration RunnerRegistration
|
|
registration.conn = conn
|
|
|
|
typ, r, err := conn.Read(ctx)
|
|
if err != nil {
|
|
log.Errorf("Could not read from runner websocket connection: %v", err)
|
|
log.Errorf("Disconnecting...")
|
|
return
|
|
}
|
|
if typ != websocket.MessageBinary {
|
|
log.Error("Got non binary message from runner, disconnecting...")
|
|
conn.Close(websocket.StatusUnsupportedData, "Requires binary data")
|
|
return
|
|
}
|
|
registration_proto := &runner_api.Register{}
|
|
if err := proto.Unmarshal(r, registration_proto); err != nil {
|
|
log.Error("Could not parse registration message from runner, disconnection....")
|
|
conn.Close(websocket.StatusUnsupportedData, "Invalid message")
|
|
return
|
|
}
|
|
|
|
registration.Secret = registration_proto.Secret
|
|
registration.Id = registration_proto.Id
|
|
registration.Tags = registration_proto.Tags
|
|
|
|
registerCh <- registration
|
|
}
|