package runnermanager import ( "context" "fmt" "strings" "time" "git.ohea.xyz/cursorius/server/config" "github.com/op/go-logging" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) var log = logging.MustGetLogger("cursorius-server") type RunnerRegistration struct { Secret string Id string Tags []string conn *websocket.Conn } type runnerManager struct { getRunnerCh chan GetRunnerRequest registerCh chan RunnerRegistration connectedRunners []Runner configuredRunners map[string]config.Runner } type GetRunnerRequest struct { Tags []string RespChan chan GetRunnerResponse } type GetRunnerResponse struct { Runner Runner Err error } type runnerJob struct { Id string URL string } func (r *runnerManager) processRequest(req GetRunnerRequest) { var runnerTagsStr strings.Builder fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0]) for _, tag := range req.Tags[1:] { fmt.Fprintf(&runnerTagsStr, ", %v", tag) } fmt.Fprintf(&runnerTagsStr, "]") log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) for i, runner := range r.connectedRunners { // don't allocate runner that is already occupied if runner.running { log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) continue } // don't allocate runner with closed receiveChan (is defunct) // there should never be messages to read on an inactive runner, // so we aren't losing any data here select { case _, ok := <-runner.receiveChan: if ok { // this should never happen // TODO: should we disconnect the runner? log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) continue } log.Noticef("Removing defunct runner \"%v\"", runner.id) // if the receive channel is closed, swap delete the runner as it's defunct r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1] r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] default: runner.running = true req.RespChan <- GetRunnerResponse{ Runner: runner, Err: nil, } return } } errorMsg := "could not find valid runner" if len(r.connectedRunners) == 0 { errorMsg = "no connected runners" } log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) req.RespChan <- GetRunnerResponse{ Runner: Runner{}, Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), } } func (r *runnerManager) processRegistration(reg RunnerRegistration) { log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist { if configuredRunner.Secret == reg.Secret { log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) runner := Runner{ id: reg.Id, tags: reg.Tags, conn: reg.conn, receiveChan: make(chan RunnerData), running: false, } r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { for { msgType, data, err := reg.conn.Read(context.Background()) if err != nil { // TODO: this is still racy, since a runner could be alloctade between the // connection returning an err and the channel closing close(runner.receiveChan) log.Errorf("Could not read from connection: %v", err) log.Noticef("Deregistering runner with id: %v", runner.id) return } else { log.Debugf("%v: %v", msgType, data) runner.receiveChan <- RunnerData{msgType: msgType, data: data} } } }() } else { log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret) reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } else { log.Errorf("Disconnecting runner with invalid id: %v", reg.Id) reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } func runRunnerManager(r runnerManager) { for { select { case request := <-r.getRunnerCh: r.processRequest(request) case registration := <-r.registerCh: r.processRegistration(registration) } } } func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { scheduler := runnerManager{ getRunnerCh: make(chan GetRunnerRequest), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, } go runRunnerManager(scheduler) return scheduler.getRunnerCh, scheduler.registerCh, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var registration RunnerRegistration registration.conn = conn err := wsjson.Read(ctx, conn, ®istration) if err != nil { log.Errorf("Could not read data from websocket connection: %v", err) return } registerCh <- registration }