package runnermanager import ( "context" "fmt" "time" "github.com/google/uuid" "github.com/op/go-logging" "google.golang.org/protobuf/proto" "nhooyr.io/websocket" "git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/util" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" ) var log = logging.MustGetLogger("cursorius-server") func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) { tagsStr := util.FormatTags(req.Tags) log.Infof("Got request for runner with tags \"%v\"", tagsStr) log.Debugf("Finding runner with tags %v", tagsStr) foundRunner := false runnersToRemove := []int{} runnerIter: for i, runner := range r.connectedRunners { // don't allocate runner with closed receiveChan (is defunct) // there should never be messages to read on an inactive runner, // so we aren't losing any data here select { case _, ok := <-runner.receiveChan: if ok { // this should never happen // TODO: should we disconnect the runner? log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) continue } log.Noticef("Removing defunct runner \"%v\"", runner.id) runnersToRemove = append(runnersToRemove, i) default: log.Debugf("Checking runner %v for requested tags", runner.id) if !runner.HasTags(req.Tags) { continue runnerIter } runnersToRemove = append(runnersToRemove, i) foundRunner = true log.Debugf("Runner %v has requested tags, allocating", runner.id) req.RespChan <- RunnerAllocationResponse{ Runner: &r.connectedRunners[i], Err: nil, } } } // remove allocated runner plus defunct runners // since we iterate, all the indexes will be in accending order for i, runnerInd := range runnersToRemove { r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1] r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-1] } if foundRunner { return } errorMsg := "could not find valid runner" if len(r.connectedRunners) == 0 { errorMsg = "no connected runners" } req.RespChan <- RunnerAllocationResponse{ Runner: nil, Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), } } func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) { log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret) // Get runner with give id from database runnerId, err := uuid.Parse(req.Id) if err != nil { log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err) req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } dbRunner, err := r.db.GetRunnerById(runnerId) if err != nil { log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err) req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } if req.Secret != dbRunner.Token { log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId) req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags) runner := Runner{ id: runnerId, tags: req.Tags, conn: req.conn, receiveChan: make(chan []byte), } r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { defer log.Noticef("Deregistered runner with id: %v", runner.id) defer close(runner.receiveChan) for { msgType, data, err := req.conn.Read(context.Background()) if err != nil { // TODO: this is still racy, since a runner could be allocated between the // connection returning an err and the channel closing // This should probably be handled by sending erroring, but not 100% sure log.Errorf("Could not read from connection: %v", err) return } if msgType != websocket.MessageBinary { close(runner.receiveChan) log.Errorf("Got binary data from connection") return } runner.receiveChan <- data } }() } func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) { r.connectedRunners = append(r.connectedRunners, *req.Runner) } func runRunnerManager(r runnerManager) { for { select { case request := <-r.chans.Allocation: r.processRunnerAllocation(request) case release := <-r.chans.Release: r.processRunnerRelease(release) case registration := <-r.chans.Registration: r.processRunnerRegistration(registration) } } } func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) { scheduler := runnerManager{ chans: RunnerManagerChans{ Allocation: make(chan RunnerAllocationRequest), Release: make(chan RunnerReleaseRequest), Registration: make(chan RunnerRegistrationRequest), }, connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, db: db, } go runRunnerManager(scheduler) return scheduler.chans, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var registration RunnerRegistrationRequest registration.conn = conn typ, r, err := conn.Read(ctx) if err != nil { log.Errorf("Could not read from runner websocket connection: %v", err) log.Errorf("Disconnecting...") return } if typ != websocket.MessageBinary { log.Error("Got non binary message from runner, disconnecting...") conn.Close(websocket.StatusUnsupportedData, "Requires binary data") return } registration_proto := &runner_api.Register{} if err := proto.Unmarshal(r, registration_proto); err != nil { log.Error("Could not parse registration message from runner, disconnection....") conn.Close(websocket.StatusUnsupportedData, "Invalid message") return } registration.Secret = registration_proto.Secret registration.Id = registration_proto.Id registration.Tags = registration_proto.Tags registerCh <- registration }