From fe53a17160e55fa4de04bbebc4139ba68731ab06 Mon Sep 17 00:00:00 2001 From: restitux Date: Wed, 8 Mar 2023 00:13:40 -0700 Subject: [PATCH] Runners are removed from manager when alloacted This removes an existing unlocked shared access to runner.running. This also sets us up for better management of the runners. --- listen/listen.go | 13 ++-- main.go | 5 +- pipeline_api/pipeline_api.go | 23 ++++--- runnermanager/runner.go | 20 ++++-- runnermanager/runnermanager.go | 108 +++++++++++++++++---------------- 5 files changed, 93 insertions(+), 76 deletions(-) diff --git a/listen/listen.go b/listen/listen.go index ebcb063..4b1ba56 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -22,13 +22,12 @@ func setupHTTPServer( mux *http.ServeMux, conf config.PipelineConf, db database.Database, - registerCh chan runnermanager.RunnerRegistration, - getRunnerCh chan runnermanager.GetRunnerRequest, + runnerManagerChans runnermanager.RunnerManagerChans, ) error { webhook.CreateWebhookHandler(db, conf, mux) - pipeline_api.CreateHandler(getRunnerCh, mux) + pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux) err := admin_api.CreateHandler(db, mux) if err != nil { @@ -41,7 +40,7 @@ func setupHTTPServer( log.Errorf("Could not upgrade runner connection to websocket: %v", err) return } - go runnermanager.RegisterRunner(conn, registerCh) + go runnermanager.RegisterRunner(conn, runnerManagerChans.Registration) }) return nil } @@ -52,16 +51,14 @@ func Listen( port int, conf config.PipelineConf, db database.Database, - registerCh chan runnermanager.RunnerRegistration, - getRunnerCh chan runnermanager.GetRunnerRequest, + runnerManagerChans runnermanager.RunnerManagerChans, ) error { err := setupHTTPServer( mux, conf, db, - registerCh, - getRunnerCh, + runnerManagerChans, ) if err != nil { return fmt.Errorf("Could not setup http endpoints: %w", err) diff --git a/main.go b/main.go index ff30b50..e6298da 100644 --- a/main.go +++ b/main.go @@ -40,7 +40,7 @@ func main() { return } - getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db) + runnerManagerChans, err := runnermanager.StartRunnerManager(configData.Config.Runners, db) if err != nil { log.Errorf("Could not start runner: %v", err) return @@ -56,7 +56,6 @@ func main() { configData.Config.Port, configData.Config.PipelineConf, db, - registerCh, - getRunnerCh, + runnerManagerChans, )) } diff --git a/pipeline_api/pipeline_api.go b/pipeline_api/pipeline_api.go index 0d76276..b2bf021 100644 --- a/pipeline_api/pipeline_api.go +++ b/pipeline_api/pipeline_api.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/http" - "strings" "sync" apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2" @@ -19,7 +18,8 @@ import ( var log = logging.MustGetLogger("cursorius-server") type ApiServer struct { - getRunnerCh chan runnermanager.GetRunnerRequest + allocationCh chan runnermanager.RunnerAllocationRequest + releaseCh chan runnermanager.RunnerReleaseRequest allocatedRunners map[uuid.UUID]*RunnerWrapper allocatedRunnersMutex sync.RWMutex } @@ -38,10 +38,14 @@ func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, st return return_code, stdout, stderr, err } -func (r *RunnerWrapper) Release() { +func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) { r.mutex.Lock() defer r.mutex.Unlock() - r.runner.Release() + + releaseCh <- runnermanager.RunnerReleaseRequest{ + Runner: r.runner, + } + r.runner = nil } func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) { @@ -56,8 +60,8 @@ func (s *ApiServer) GetRunner( req *connect.Request[apiv2.GetRunnerRequest], ) (*connect.Response[apiv2.GetRunnerResponse], error) { - respChan := make(chan runnermanager.GetRunnerResponse) - s.getRunnerCh <- runnermanager.GetRunnerRequest{ + respChan := make(chan runnermanager.RunnerAllocationResponse) + s.allocationCh <- runnermanager.RunnerAllocationRequest{ Tags: req.Msg.Tags, RespChan: respChan, } @@ -99,7 +103,7 @@ func (s *ApiServer) ReleaseRunner( s.allocatedRunnersMutex.Lock() runner := s.allocatedRunners[uuid] delete(s.allocatedRunners, uuid) - runner.Release() + runner.Release(s.releaseCh) s.allocatedRunnersMutex.Unlock() res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{}) @@ -138,9 +142,10 @@ func (s *ApiServer) RunCommand( return res, nil } -func CreateHandler(getRunnerCh chan runnermanager.GetRunnerRequest, mux *http.ServeMux) { +func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) { api_server := &ApiServer{ - getRunnerCh: getRunnerCh, + allocationCh: allocationCh, + releaseCh: releaseCh, allocatedRunners: make(map[uuid.UUID]*RunnerWrapper), } path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server) diff --git a/runnermanager/runner.go b/runnermanager/runner.go index a9a2587..80314bd 100644 --- a/runnermanager/runner.go +++ b/runnermanager/runner.go @@ -22,17 +22,27 @@ type Runner struct { tags []string conn *websocket.Conn receiveChan chan []byte - running bool +} + +func (r *Runner) HasTags(requestedTags []string) bool { +tagIter: + for _, requestedTag := range requestedTags { + for _, posessedTag := range r.tags { + // if we find the tag, move on to search for the next one + if posessedTag == requestedTag { + continue tagIter + } + } + // if we don't find the tag + return false + } + return true } func (r *Runner) Id() uuid.UUID { return r.id } -func (r *Runner) Release() { - r.running = false -} - func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) { // Write RunCommand message to client diff --git a/runnermanager/runnermanager.go b/runnermanager/runnermanager.go index b67adba..a2092b2 100644 --- a/runnermanager/runnermanager.go +++ b/runnermanager/runnermanager.go @@ -19,53 +19,57 @@ import ( var log = logging.MustGetLogger("cursorius-server") -type RunnerRegistration struct { - Secret string - Id string - Tags []string - conn *websocket.Conn +type RunnerManagerChans struct { + Allocation chan RunnerAllocationRequest + Release chan RunnerReleaseRequest + Registration chan RunnerRegistrationRequest } type runnerManager struct { - getRunnerCh chan GetRunnerRequest - registerCh chan RunnerRegistration + chans RunnerManagerChans connectedRunners []Runner numConnectedRunners uint64 configuredRunners map[string]config.Runner db database.Database } -type GetRunnerRequest struct { +type RunnerAllocationRequest struct { Tags []string - RespChan chan GetRunnerResponse + RespChan chan RunnerAllocationResponse } -type GetRunnerResponse struct { +type RunnerAllocationResponse struct { Runner *Runner Err error } +type RunnerReleaseRequest struct { + Runner *Runner +} + +type RunnerRegistrationRequest struct { + Secret string + Id string + Tags []string + conn *websocket.Conn +} + type runnerJob struct { Id string URL string } -func (r *runnerManager) processRequest(req GetRunnerRequest) { +func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) { tagsStr := util.FormatTags(req.Tags) log.Infof("Got request for runner with tags \"%v\"", tagsStr) - log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) + log.Debugf("Finding runner with tags %v", tagsStr) foundRunner := false runnersToRemove := []int{} runnerIter: for i, runner := range r.connectedRunners { - // don't allocate runner that is already occupied - if runner.running { - log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) - continue - } // don't allocate runner with closed receiveChan (is defunct) // there should never be messages to read on an inactive runner, // so we aren't losing any data here @@ -82,25 +86,20 @@ runnerIter: default: log.Debugf("Checking runner %v for requested tags", runner.id) - tagIter: - for _, requestedTag := range req.Tags { - for _, posessedTag := range runner.tags { - if requestedTag == posessedTag { - continue tagIter - } - } + if !runner.HasTags(req.Tags) { continue runnerIter } - r.connectedRunners[i].running = true + runnersToRemove = append(runnersToRemove, i) foundRunner = true - req.RespChan <- GetRunnerResponse{ + req.RespChan <- RunnerAllocationResponse{ Runner: &r.connectedRunners[i], Err: nil, } } } + // remove allocated runner plus defunct runners // since we iterate, all the indexes will be in accending order for i, runnerInd := range runnersToRemove { r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1] @@ -115,43 +114,42 @@ runnerIter: if len(r.connectedRunners) == 0 { errorMsg = "no connected runners" } - req.RespChan <- GetRunnerResponse{ + req.RespChan <- RunnerAllocationResponse{ Runner: &Runner{}, Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), } } -func (r *runnerManager) processRegistration(reg RunnerRegistration) { - log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) +func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) { + log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret) // Get runner with give id from database - runnerId, err := uuid.Parse(reg.Id) + runnerId, err := uuid.Parse(req.Id) if err != nil { - log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err) - reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err) + req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } dbRunner, err := r.db.GetRunnerById(runnerId) if err != nil { log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err) - reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } - if reg.Secret != dbRunner.Token { + if req.Secret != dbRunner.Token { log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId) - reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + req.conn.Close(websocket.StatusNormalClosure, "registration invalid") return } - log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) + log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags) runner := Runner{ id: runnerId, - tags: reg.Tags, - conn: reg.conn, + tags: req.Tags, + conn: req.conn, receiveChan: make(chan []byte), - running: false, } r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection @@ -160,7 +158,7 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) { defer log.Noticef("Deregistered runner with id: %v", runner.id) defer close(runner.receiveChan) for { - msgType, data, err := reg.conn.Read(context.Background()) + msgType, data, err := req.conn.Read(context.Background()) if err != nil { // TODO: this is still racy, since a runner could be allocated between the // connection returning an err and the channel closing @@ -180,22 +178,30 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) { }() } +func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) { + r.connectedRunners = append(r.connectedRunners, *req.Runner) +} + func runRunnerManager(r runnerManager) { for { select { - case request := <-r.getRunnerCh: - r.processRequest(request) - - case registration := <-r.registerCh: - r.processRegistration(registration) + case request := <-r.chans.Allocation: + r.processRunnerAllocation(request) + case release := <-r.chans.Release: + r.processRunnerRelease(release) + case registration := <-r.chans.Registration: + r.processRunnerRegistration(registration) } } } -func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) { +func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) { scheduler := runnerManager{ - getRunnerCh: make(chan GetRunnerRequest), - registerCh: make(chan RunnerRegistration), + chans: RunnerManagerChans{ + Allocation: make(chan RunnerAllocationRequest), + Release: make(chan RunnerReleaseRequest), + Registration: make(chan RunnerRegistrationRequest), + }, connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, db: db, @@ -203,14 +209,14 @@ func StartRunnerManager(configuredRunners map[string]config.Runner, db database. go runRunnerManager(scheduler) - return scheduler.getRunnerCh, scheduler.registerCh, nil + return scheduler.chans, nil } -func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) { +func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - var registration RunnerRegistration + var registration RunnerRegistrationRequest registration.conn = conn typ, r, err := conn.Read(ctx)