package runnermanager import ( "context" "fmt" "strings" "time" "github.com/op/go-logging" "google.golang.org/protobuf/proto" "nhooyr.io/websocket" "git.ohea.xyz/cursorius/server/config" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" ) var log = logging.MustGetLogger("cursorius-server") type RunnerRegistration struct { Secret string Id string Tags []string conn *websocket.Conn } type runnerManager struct { getRunnerCh chan GetRunnerRequest registerCh chan RunnerRegistration connectedRunners []Runner numConnectedRunners uint64 configuredRunners map[string]config.Runner } type GetRunnerRequest struct { Tags []string RespChan chan GetRunnerResponse } type GetRunnerResponse struct { Runner *Runner Err error } type runnerJob struct { Id string URL string } func (r *runnerManager) processRequest(req GetRunnerRequest) { var runnerTagsStr strings.Builder fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0]) for _, tag := range req.Tags[1:] { fmt.Fprintf(&runnerTagsStr, ", %v", tag) } fmt.Fprintf(&runnerTagsStr, "]") log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) foundRunner := false runnersToRemove := []int{} runnerIter: for i, runner := range r.connectedRunners { // don't allocate runner that is already occupied if runner.running { log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) continue } // don't allocate runner with closed receiveChan (is defunct) // there should never be messages to read on an inactive runner, // so we aren't losing any data here select { case _, ok := <-runner.receiveChan: if ok { // this should never happen // TODO: should we disconnect the runner? log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) continue } log.Noticef("Removing defunct runner \"%v\"", runner.id) runnersToRemove = append(runnersToRemove, i) default: log.Debugf("Checking runner %v for requested tags", runner.id) tagIter: for _, requestedTag := range req.Tags { for _, posessedTag := range runner.tags { if requestedTag == posessedTag { continue tagIter } } continue runnerIter } r.connectedRunners[i].running = true foundRunner = true req.RespChan <- GetRunnerResponse{ Runner: &r.connectedRunners[i], Err: nil, } } } // since we iterate, all the indexes will be in accending order for i, runnerInd := range runnersToRemove { r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1] r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-2] } if foundRunner { return } errorMsg := "could not find valid runner" if len(r.connectedRunners) == 0 { errorMsg = "no connected runners" } req.RespChan <- GetRunnerResponse{ Runner: &Runner{}, Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), } } func (r *runnerManager) processRegistration(reg RunnerRegistration) { log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist { if configuredRunner.Secret == reg.Secret { log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) runner := Runner{ id: reg.Id, tags: reg.Tags, conn: reg.conn, receiveChan: make(chan []byte), running: false, } r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { defer log.Noticef("Deregistered runner with id: %v", runner.id) defer close(runner.receiveChan) for { msgType, data, err := reg.conn.Read(context.Background()) if err != nil { // TODO: this is still racy, since a runner could be allocated between the // connection returning an err and the channel closing // This should probably be handled by sending erroring, but not 100% sure log.Errorf("Could not read from connection: %v", err) return } if msgType != websocket.MessageBinary { close(runner.receiveChan) log.Errorf("Got binary data from connection") return } runner.receiveChan <- data } }() } else { log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret) reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } else { log.Errorf("Disconnecting runner with invalid id: %v", reg.Id) reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } func runRunnerManager(r runnerManager) { for { select { case request := <-r.getRunnerCh: r.processRequest(request) case registration := <-r.registerCh: r.processRegistration(registration) } } } func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { scheduler := runnerManager{ getRunnerCh: make(chan GetRunnerRequest), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, } go runRunnerManager(scheduler) return scheduler.getRunnerCh, scheduler.registerCh, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var registration RunnerRegistration registration.conn = conn typ, r, err := conn.Read(ctx) if err != nil { log.Errorf("Could not read from runner websocket connection: %v", err) log.Errorf("Disconnecting...") return } if typ != websocket.MessageBinary { log.Error("Got non binary message from runner, disconnecting...") conn.Close(websocket.StatusUnsupportedData, "Requires binary data") return } registration_proto := &runner_api.Register{} if err := proto.Unmarshal(r, registration_proto); err != nil { log.Error("Could not parse registration message from runner, disconnection....") conn.Close(websocket.StatusUnsupportedData, "Invalid message") return } registration.Secret = registration_proto.Secret registration.Id = registration_proto.Id registration.Tags = registration_proto.Tags registerCh <- registration }