From 3cbe670bc1448d073112fba6fd6a71f879792421 Mon Sep 17 00:00:00 2001 From: restitux Date: Sat, 31 Dec 2022 17:22:00 -0700 Subject: [PATCH] Implement getRunner grpc endpoint This includes refactoring the jobscheduler into the runnermanager. This service manages runner connections and allocating them to pipelines. These requests are done via the pipeline grpc api --- listen/listen.go | 8 +- main.go | 8 +- pipeline_api/pipeline_api.go | 52 +++++++- poll/poll.go | 7 +- .../runnermanager.go | 124 ++++++++++-------- 5 files changed, 127 insertions(+), 72 deletions(-) rename jobscheduler/jobscheduler.go => runnermanager/runnermanager.go (53%) diff --git a/listen/listen.go b/listen/listen.go index e6401d3..6913999 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -5,7 +5,7 @@ import ( "net/http" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" + "git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/webhook" "github.com/op/go-logging" "golang.org/x/net/http2" @@ -15,7 +15,7 @@ import ( var log = logging.MustGetLogger("cursorius-server") -func setupHTTPServer(mux *http.ServeMux, registerCh chan jobscheduler.RunnerRegistration, +func setupHTTPServer(mux *http.ServeMux, registerCh chan runnermanager.RunnerRegistration, conf config.Config) { webhook.CreateWebhookHandler(conf, mux) @@ -25,11 +25,11 @@ func setupHTTPServer(mux *http.ServeMux, registerCh chan jobscheduler.RunnerRegi log.Errorf("Could not upgrade runner connection to websocket: %v", err) return } - go jobscheduler.RegisterRunner(conn, registerCh) + go runnermanager.RegisterRunner(conn, registerCh) }) } -func Listen(mux *http.ServeMux, address string, port int, registerCh chan jobscheduler.RunnerRegistration, conf config.Config) { +func Listen(mux *http.ServeMux, address string, port int, registerCh chan runnermanager.RunnerRegistration, conf config.Config) { setupHTTPServer(mux, registerCh, conf) diff --git a/main.go b/main.go index d2effe8..e1c2466 100644 --- a/main.go +++ b/main.go @@ -5,10 +5,10 @@ import ( "os" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" "git.ohea.xyz/cursorius/server/listen" "git.ohea.xyz/cursorius/server/pipeline_api" "git.ohea.xyz/cursorius/server/poll" + "git.ohea.xyz/cursorius/server/runnermanager" "github.com/op/go-logging" ) @@ -33,16 +33,16 @@ func main() { log.Errorf("Could not get configuration: %v", err) } - runCh, registerCh, err := jobscheduler.StartJobScheduler(configData.Config.Jobs, configData.Config.Runners) + getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) if err != nil { log.Errorf("Could not start runner: %v", err) } - poll.StartPolling(configData.Config, runCh) + poll.StartPolling(configData.Config) mux := http.NewServeMux() - pipeline_api.CreateHandler(mux, runCh) + pipeline_api.CreateHandler(mux, getRunnerCh) listen.Listen(mux, configData.Config.Address, configData.Config.Port, registerCh, configData.Config) } diff --git a/pipeline_api/pipeline_api.go b/pipeline_api/pipeline_api.go index 6da0e33..2ef389f 100644 --- a/pipeline_api/pipeline_api.go +++ b/pipeline_api/pipeline_api.go @@ -2,11 +2,14 @@ package pipeline_api import ( "context" + "fmt" "net/http" + "strings" + "sync" - "git.ohea.xyz/cursorius/server/jobscheduler" apiv1 "git.ohea.xyz/cursorius/server/proto/gen/api/v1" "git.ohea.xyz/cursorius/server/proto/gen/api/v1/apiv1connect" + "git.ohea.xyz/cursorius/server/runnermanager" "github.com/bufbuild/connect-go" "github.com/op/go-logging" ) @@ -14,7 +17,15 @@ import ( var log = logging.MustGetLogger("cursorius-server") type ApiServer struct { - runCh chan jobscheduler.Run + getRunnerCh chan runnermanager.GetRunnerRequest + allocatedRunners map[int64]RunnerWrapper + currentId int64 + currentIdMutex sync.Mutex +} + +type RunnerWrapper struct { + runner runnermanager.Runner + mutex sync.Mutex } func (s *ApiServer) GetRunner( @@ -22,8 +33,35 @@ func (s *ApiServer) GetRunner( req *connect.Request[apiv1.GetRunnerRequest], ) (*connect.Response[apiv1.GetRunnerResponse], error) { + respChan := make(chan runnermanager.GetRunnerResponse) + s.getRunnerCh <- runnermanager.GetRunnerRequest{ + Tags: req.Msg.Tags, + RespChan: respChan, + } + + var runnerTagsStr strings.Builder + fmt.Fprintf(&runnerTagsStr, "%v", req.Msg.Tags[0]) + for _, tag := range req.Msg.Tags[1:] { + fmt.Fprintf(&runnerTagsStr, " %v", tag) + } + + response := <-respChan + if response.Err != nil { + log.Errorf("Could not get runner with tags \"%v\": %v", runnerTagsStr, response.Err) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner")) + } + + log.Info("Got runner with tags: %v", runnerTagsStr) + + s.currentIdMutex.Lock() + runnerId := s.currentId + s.currentId++ + s.currentIdMutex.Unlock() + + s.allocatedRunners[runnerId] = RunnerWrapper{runner: response.Runner} + res := connect.NewResponse(&apiv1.GetRunnerResponse{ - RunnerId: 0, + RunnerId: runnerId, }) res.Header().Set("GetRunner-Version", "v1") return res, nil @@ -43,8 +81,12 @@ func (s *ApiServer) RunCommand( return res, nil } -func CreateHandler(mux *http.ServeMux, runCh chan jobscheduler.Run) { - api_server := &ApiServer{runCh: runCh} +func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) { + api_server := &ApiServer{ + getRunnerCh: getRunnerCh, + allocatedRunners: make(map[int64]RunnerWrapper), + currentId: 0, + } path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv1connect.NewRunCommandServiceHandler(api_server) diff --git a/poll/poll.go b/poll/poll.go index 73b51f0..d6e3f5f 100644 --- a/poll/poll.go +++ b/poll/poll.go @@ -6,7 +6,6 @@ import ( "github.com/op/go-logging" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" "git.ohea.xyz/cursorius/server/pipeline_executor" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" @@ -25,7 +24,7 @@ type tag struct { commitHash string } -func pollJob(repoName string, jobConfig config.Job, runCh chan jobscheduler.Run, pipelineConf config.PipelineConf) { +func pollJob(repoName string, jobConfig config.Job, pipelineConf config.PipelineConf) { prevCommits := make(map[string]string) for { time.Sleep(time.Duration(jobConfig.PollInterval) * time.Second) @@ -104,12 +103,12 @@ func pollJob(repoName string, jobConfig config.Job, runCh chan jobscheduler.Run, } } -func StartPolling(conf config.Config, runCh chan jobscheduler.Run) { +func StartPolling(conf config.Config) { for jobName, job := range conf.Jobs { if job.PollInterval == 0 { continue } else { - go pollJob(jobName, job, runCh, conf.PipelineConf) + go pollJob(jobName, job, conf.PipelineConf) } } } diff --git a/jobscheduler/jobscheduler.go b/runnermanager/runnermanager.go similarity index 53% rename from jobscheduler/jobscheduler.go rename to runnermanager/runnermanager.go index 60d0831..168e384 100644 --- a/jobscheduler/jobscheduler.go +++ b/runnermanager/runnermanager.go @@ -1,7 +1,9 @@ -package jobscheduler +package runnermanager import ( "context" + "fmt" + "strings" "time" "git.ohea.xyz/cursorius/server/config" @@ -23,7 +25,6 @@ type RunnerData struct { msgType websocket.MessageType data []byte } - type Runner struct { id string tags []string @@ -32,17 +33,21 @@ type Runner struct { running bool } -type jobScheduler struct { - runCh chan Run +type runnerManager struct { + getRunnerCh chan GetRunnerRequest registerCh chan RunnerRegistration connectedRunners []Runner configuredRunners map[string]config.Runner } -type Run struct { - JobName string - JobConfig config.Job - Ref string +type GetRunnerRequest struct { + Tags []string + RespChan chan GetRunnerResponse +} + +type GetRunnerResponse struct { + Runner Runner + Err error } type runnerJob struct { @@ -50,54 +55,63 @@ type runnerJob struct { URL string } -func runJobScheduler(j jobScheduler) { +func runRunnerManager(r runnerManager) { for { + msgCase: select { - case run := <-j.runCh: - log.Infof("Launching run for job \"%v\" on ref \"%v\"", run.JobName, run.Ref) - log.Debugf("Finding runner for job \"%v\"", run.JobName) - rJ := runnerJob{ - Id: run.JobName, - URL: run.JobConfig.URL, + case request := <-r.getRunnerCh: + + var runnerTagsStr strings.Builder + fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0]) + for _, tag := range request.Tags[1:] { + fmt.Fprintf(&runnerTagsStr, " %v", tag) } - launched := false - for i, runner := range j.connectedRunners { - // don't send job to runner that is already occupied - if !runner.running { - // don't send job to runner with closed receiveChan (is defunct) - // there should never be messages to read on an inactive runner, - // so we aren't losing any data here - select { - case <-runner.receiveChan: - // if the receive channel is closed, swap delete the runner as it's defunct - j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1] - j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1] - default: - err := wsjson.Write(context.Background(), runner.conn, rJ) - if err != nil { - log.Errorf("Could not launch run: %v", err) - break - } else { - log.Infof("Launched run for job %v on runner %v", run.JobName, runner.id) - launched = true - j.connectedRunners[i].running = true - break - } - } - } else { + log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) + + log.Debugf("Finding runner with tags %v", runnerTagsStr) + + for i, runner := range r.connectedRunners { + // don't allocate runner that is already occupied + if runner.running { log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) + continue + } + // don't allocate runner with closed receiveChan (is defunct) + // there should never be messages to read on an inactive runner, + // so we aren't losing any data here + select { + case _, ok := <-runner.receiveChan: + if ok { + // this should never happen + log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) + continue + } + log.Noticef("Removing defunct runner \"%v\"", runner.id) + // if the receive channel is closed, swap delete the runner as it's defunct + r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1] + r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] + default: + runner.running = true + request.RespChan <- GetRunnerResponse{ + Runner: runner, + Err: nil, + } + break msgCase } } - if !launched { - errorMsg := "could not find valid runner" - if len(j.connectedRunners) == 0 { - errorMsg = "no connected runners" - } - log.Errorf("Could not launch run for job \"%v\": %v", run.JobName, errorMsg) + errorMsg := "could not find valid runner" + if len(r.connectedRunners) == 0 { + errorMsg = "no connected runners" } - case registration := <-j.registerCh: + log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) + request.RespChan <- GetRunnerResponse{ + Runner: Runner{}, + Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), + } + + case registration := <-r.registerCh: log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret) - if configuredRunner, doesExist := j.configuredRunners[registration.Id]; doesExist { + if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist { if configuredRunner.Secret == registration.Secret { log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags) runner := Runner{ @@ -107,14 +121,14 @@ func runJobScheduler(j jobScheduler) { receiveChan: make(chan RunnerData), running: false, } - j.connectedRunners = append(j.connectedRunners, runner) + r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { for { msgType, data, err := registration.conn.Read(context.Background()) if err != nil { - // TODO: this is still racy, since a job could be emitted between the + // TODO: this is still racy, since a runner could be alloctade between the // connection returning an err and the channel closing close(runner.receiveChan) log.Errorf("Could not read from connection: %v", err) @@ -140,17 +154,17 @@ func runJobScheduler(j jobScheduler) { } } -func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string]config.Runner) (chan Run, chan RunnerRegistration, error) { - scheduler := jobScheduler{ - runCh: make(chan Run), +func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { + scheduler := runnerManager{ + getRunnerCh: make(chan GetRunnerRequest), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, } - go runJobScheduler(scheduler) + go runRunnerManager(scheduler) - return scheduler.runCh, scheduler.registerCh, nil + return scheduler.getRunnerCh, scheduler.registerCh, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {