From ff781673256507fd0b6c2090390f1d20ad449c12 Mon Sep 17 00:00:00 2001 From: restitux Date: Sun, 1 Jan 2023 13:43:45 -0700 Subject: [PATCH] Cleanup runnermanager code --- runnermanager/runner.go | 26 +++++ runnermanager/runnermanager.go | 198 ++++++++++++++++----------------- 2 files changed, 123 insertions(+), 101 deletions(-) create mode 100644 runnermanager/runner.go diff --git a/runnermanager/runner.go b/runnermanager/runner.go new file mode 100644 index 0000000..6147f3d --- /dev/null +++ b/runnermanager/runner.go @@ -0,0 +1,26 @@ +package runnermanager + +import "nhooyr.io/websocket" + +//var log = logging.MustGetLogger("cursorius-server") + +type RunnerData struct { + msgType websocket.MessageType + data []byte +} + +type Runner struct { + id string + tags []string + conn *websocket.Conn + receiveChan chan RunnerData + running bool +} + +func (r *Runner) Id() string { + return r.id +} + +func (r *Runner) RunCommand() (int64, string, string, error) { + return 0, "", "", nil +} diff --git a/runnermanager/runnermanager.go b/runnermanager/runnermanager.go index 168e384..5ec7168 100644 --- a/runnermanager/runnermanager.go +++ b/runnermanager/runnermanager.go @@ -21,18 +21,6 @@ type RunnerRegistration struct { conn *websocket.Conn } -type RunnerData struct { - msgType websocket.MessageType - data []byte -} -type Runner struct { - id string - tags []string - conn *websocket.Conn - receiveChan chan RunnerData - running bool -} - type runnerManager struct { getRunnerCh chan GetRunnerRequest registerCh chan RunnerRegistration @@ -55,101 +43,109 @@ type runnerJob struct { URL string } +func (r *runnerManager) processRequest(req GetRunnerRequest) { + var runnerTagsStr strings.Builder + fmt.Fprintf(&runnerTagsStr, "%v", req.Tags[0]) + for _, tag := range req.Tags[1:] { + fmt.Fprintf(&runnerTagsStr, " %v", tag) + } + log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) + + log.Debugf("Finding runner with tags %v", runnerTagsStr) + + for i, runner := range r.connectedRunners { + // don't allocate runner that is already occupied + if runner.running { + log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) + continue + } + // don't allocate runner with closed receiveChan (is defunct) + // there should never be messages to read on an inactive runner, + // so we aren't losing any data here + select { + case _, ok := <-runner.receiveChan: + if ok { + // this should never happen + // TODO: should we disconnect the runner? + log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) + continue + } + log.Noticef("Removing defunct runner \"%v\"", runner.id) + // if the receive channel is closed, swap delete the runner as it's defunct + r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1] + r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] + default: + runner.running = true + req.RespChan <- GetRunnerResponse{ + Runner: runner, + Err: nil, + } + return + } + } + errorMsg := "could not find valid runner" + if len(r.connectedRunners) == 0 { + errorMsg = "no connected runners" + } + log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) + req.RespChan <- GetRunnerResponse{ + Runner: Runner{}, + Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), + } + +} + +func (r *runnerManager) processRegistration(reg RunnerRegistration) { + log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) + if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist { + if configuredRunner.Secret == reg.Secret { + log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) + runner := Runner{ + id: reg.Id, + tags: reg.Tags, + conn: reg.conn, + receiveChan: make(chan RunnerData), + running: false, + } + r.connectedRunners = append(r.connectedRunners, runner) + // start goroutine to call Read function on websocket connection + // this is required to keep the connection functioning + go func() { + for { + msgType, data, err := reg.conn.Read(context.Background()) + if err != nil { + // TODO: this is still racy, since a runner could be alloctade between the + // connection returning an err and the channel closing + close(runner.receiveChan) + log.Errorf("Could not read from connection: %v", err) + log.Noticef("Deregistering runner with id: %v", runner.id) + + return + } else { + log.Debugf("%v: %v", msgType, data) + runner.receiveChan <- RunnerData{msgType: msgType, data: data} + } + } + }() + + } else { + log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, registration.Secret) + reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + } + } else { + log.Errorf("Disconnecting runner with invalid id: %v", reg.Id) + reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + } +} + func runRunnerManager(r runnerManager) { for { - msgCase: select { case request := <-r.getRunnerCh: - - var runnerTagsStr strings.Builder - fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0]) - for _, tag := range request.Tags[1:] { - fmt.Fprintf(&runnerTagsStr, " %v", tag) - } - log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) - - log.Debugf("Finding runner with tags %v", runnerTagsStr) - - for i, runner := range r.connectedRunners { - // don't allocate runner that is already occupied - if runner.running { - log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) - continue - } - // don't allocate runner with closed receiveChan (is defunct) - // there should never be messages to read on an inactive runner, - // so we aren't losing any data here - select { - case _, ok := <-runner.receiveChan: - if ok { - // this should never happen - log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) - continue - } - log.Noticef("Removing defunct runner \"%v\"", runner.id) - // if the receive channel is closed, swap delete the runner as it's defunct - r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1] - r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] - default: - runner.running = true - request.RespChan <- GetRunnerResponse{ - Runner: runner, - Err: nil, - } - break msgCase - } - } - errorMsg := "could not find valid runner" - if len(r.connectedRunners) == 0 { - errorMsg = "no connected runners" - } - log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) - request.RespChan <- GetRunnerResponse{ - Runner: Runner{}, - Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), - } + r.processRequest(request) case registration := <-r.registerCh: - log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret) - if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist { - if configuredRunner.Secret == registration.Secret { - log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags) - runner := Runner{ - id: registration.Id, - tags: registration.Tags, - conn: registration.conn, - receiveChan: make(chan RunnerData), - running: false, - } - r.connectedRunners = append(r.connectedRunners, runner) - // start goroutine to call Read function on websocket connection - // this is required to keep the connection functioning - go func() { - for { - msgType, data, err := registration.conn.Read(context.Background()) - if err != nil { - // TODO: this is still racy, since a runner could be alloctade between the - // connection returning an err and the channel closing - close(runner.receiveChan) - log.Errorf("Could not read from connection: %v", err) - log.Noticef("Deregistering runner with id: %v", runner.id) - - return - } else { - log.Debugf("%v: %v", msgType, data) - runner.receiveChan <- RunnerData{msgType: msgType, data: data} - } - } - }() - - } else { - log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret) - registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") - } - } else { - log.Errorf("Disconnecting runner with invalid id: %v", registration.Id) - registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") - } + r.processRegistration(registration) } } }