diff --git a/listen/listen.go b/listen/listen.go index e6401d3..6913999 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -5,7 +5,7 @@ import ( "net/http" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" + "git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/webhook" "github.com/op/go-logging" "golang.org/x/net/http2" @@ -15,7 +15,7 @@ import ( var log = logging.MustGetLogger("cursorius-server") -func setupHTTPServer(mux *http.ServeMux, registerCh chan jobscheduler.RunnerRegistration, +func setupHTTPServer(mux *http.ServeMux, registerCh chan runnermanager.RunnerRegistration, conf config.Config) { webhook.CreateWebhookHandler(conf, mux) @@ -25,11 +25,11 @@ func setupHTTPServer(mux *http.ServeMux, registerCh chan jobscheduler.RunnerRegi log.Errorf("Could not upgrade runner connection to websocket: %v", err) return } - go jobscheduler.RegisterRunner(conn, registerCh) + go runnermanager.RegisterRunner(conn, registerCh) }) } -func Listen(mux *http.ServeMux, address string, port int, registerCh chan jobscheduler.RunnerRegistration, conf config.Config) { +func Listen(mux *http.ServeMux, address string, port int, registerCh chan runnermanager.RunnerRegistration, conf config.Config) { setupHTTPServer(mux, registerCh, conf) diff --git a/main.go b/main.go index d2effe8..e1c2466 100644 --- a/main.go +++ b/main.go @@ -5,10 +5,10 @@ import ( "os" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" "git.ohea.xyz/cursorius/server/listen" "git.ohea.xyz/cursorius/server/pipeline_api" "git.ohea.xyz/cursorius/server/poll" + "git.ohea.xyz/cursorius/server/runnermanager" "github.com/op/go-logging" ) @@ -33,16 +33,16 @@ func main() { log.Errorf("Could not get configuration: %v", err) } - runCh, registerCh, err := jobscheduler.StartJobScheduler(configData.Config.Jobs, configData.Config.Runners) + getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) if err != nil { log.Errorf("Could not start runner: %v", err) } - poll.StartPolling(configData.Config, runCh) + poll.StartPolling(configData.Config) mux := http.NewServeMux() - pipeline_api.CreateHandler(mux, runCh) + pipeline_api.CreateHandler(mux, getRunnerCh) listen.Listen(mux, configData.Config.Address, configData.Config.Port, registerCh, configData.Config) } diff --git a/pipeline_api/pipeline_api.go b/pipeline_api/pipeline_api.go index 6da0e33..2ef389f 100644 --- a/pipeline_api/pipeline_api.go +++ b/pipeline_api/pipeline_api.go @@ -2,11 +2,14 @@ package pipeline_api import ( "context" + "fmt" "net/http" + "strings" + "sync" - "git.ohea.xyz/cursorius/server/jobscheduler" apiv1 "git.ohea.xyz/cursorius/server/proto/gen/api/v1" "git.ohea.xyz/cursorius/server/proto/gen/api/v1/apiv1connect" + "git.ohea.xyz/cursorius/server/runnermanager" "github.com/bufbuild/connect-go" "github.com/op/go-logging" ) @@ -14,7 +17,15 @@ import ( var log = logging.MustGetLogger("cursorius-server") type ApiServer struct { - runCh chan jobscheduler.Run + getRunnerCh chan runnermanager.GetRunnerRequest + allocatedRunners map[int64]RunnerWrapper + currentId int64 + currentIdMutex sync.Mutex +} + +type RunnerWrapper struct { + runner runnermanager.Runner + mutex sync.Mutex } func (s *ApiServer) GetRunner( @@ -22,8 +33,35 @@ func (s *ApiServer) GetRunner( req *connect.Request[apiv1.GetRunnerRequest], ) (*connect.Response[apiv1.GetRunnerResponse], error) { + respChan := make(chan runnermanager.GetRunnerResponse) + s.getRunnerCh <- runnermanager.GetRunnerRequest{ + Tags: req.Msg.Tags, + RespChan: respChan, + } + + var runnerTagsStr strings.Builder + fmt.Fprintf(&runnerTagsStr, "%v", req.Msg.Tags[0]) + for _, tag := range req.Msg.Tags[1:] { + fmt.Fprintf(&runnerTagsStr, " %v", tag) + } + + response := <-respChan + if response.Err != nil { + log.Errorf("Could not get runner with tags \"%v\": %v", runnerTagsStr, response.Err) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner")) + } + + log.Info("Got runner with tags: %v", runnerTagsStr) + + s.currentIdMutex.Lock() + runnerId := s.currentId + s.currentId++ + s.currentIdMutex.Unlock() + + s.allocatedRunners[runnerId] = RunnerWrapper{runner: response.Runner} + res := connect.NewResponse(&apiv1.GetRunnerResponse{ - RunnerId: 0, + RunnerId: runnerId, }) res.Header().Set("GetRunner-Version", "v1") return res, nil @@ -43,8 +81,12 @@ func (s *ApiServer) RunCommand( return res, nil } -func CreateHandler(mux *http.ServeMux, runCh chan jobscheduler.Run) { - api_server := &ApiServer{runCh: runCh} +func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) { + api_server := &ApiServer{ + getRunnerCh: getRunnerCh, + allocatedRunners: make(map[int64]RunnerWrapper), + currentId: 0, + } path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv1connect.NewRunCommandServiceHandler(api_server) diff --git a/poll/poll.go b/poll/poll.go index 73b51f0..d6e3f5f 100644 --- a/poll/poll.go +++ b/poll/poll.go @@ -6,7 +6,6 @@ import ( "github.com/op/go-logging" "git.ohea.xyz/cursorius/server/config" - "git.ohea.xyz/cursorius/server/jobscheduler" "git.ohea.xyz/cursorius/server/pipeline_executor" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" @@ -25,7 +24,7 @@ type tag struct { commitHash string } -func pollJob(repoName string, jobConfig config.Job, runCh chan jobscheduler.Run, pipelineConf config.PipelineConf) { +func pollJob(repoName string, jobConfig config.Job, pipelineConf config.PipelineConf) { prevCommits := make(map[string]string) for { time.Sleep(time.Duration(jobConfig.PollInterval) * time.Second) @@ -104,12 +103,12 @@ func pollJob(repoName string, jobConfig config.Job, runCh chan jobscheduler.Run, } } -func StartPolling(conf config.Config, runCh chan jobscheduler.Run) { +func StartPolling(conf config.Config) { for jobName, job := range conf.Jobs { if job.PollInterval == 0 { continue } else { - go pollJob(jobName, job, runCh, conf.PipelineConf) + go pollJob(jobName, job, conf.PipelineConf) } } } diff --git a/jobscheduler/jobscheduler.go b/runnermanager/runnermanager.go similarity index 53% rename from jobscheduler/jobscheduler.go rename to runnermanager/runnermanager.go index 60d0831..168e384 100644 --- a/jobscheduler/jobscheduler.go +++ b/runnermanager/runnermanager.go @@ -1,7 +1,9 @@ -package jobscheduler +package runnermanager import ( "context" + "fmt" + "strings" "time" "git.ohea.xyz/cursorius/server/config" @@ -23,7 +25,6 @@ type RunnerData struct { msgType websocket.MessageType data []byte } - type Runner struct { id string tags []string @@ -32,17 +33,21 @@ type Runner struct { running bool } -type jobScheduler struct { - runCh chan Run +type runnerManager struct { + getRunnerCh chan GetRunnerRequest registerCh chan RunnerRegistration connectedRunners []Runner configuredRunners map[string]config.Runner } -type Run struct { - JobName string - JobConfig config.Job - Ref string +type GetRunnerRequest struct { + Tags []string + RespChan chan GetRunnerResponse +} + +type GetRunnerResponse struct { + Runner Runner + Err error } type runnerJob struct { @@ -50,54 +55,63 @@ type runnerJob struct { URL string } -func runJobScheduler(j jobScheduler) { +func runRunnerManager(r runnerManager) { for { + msgCase: select { - case run := <-j.runCh: - log.Infof("Launching run for job \"%v\" on ref \"%v\"", run.JobName, run.Ref) - log.Debugf("Finding runner for job \"%v\"", run.JobName) - rJ := runnerJob{ - Id: run.JobName, - URL: run.JobConfig.URL, + case request := <-r.getRunnerCh: + + var runnerTagsStr strings.Builder + fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0]) + for _, tag := range request.Tags[1:] { + fmt.Fprintf(&runnerTagsStr, " %v", tag) } - launched := false - for i, runner := range j.connectedRunners { - // don't send job to runner that is already occupied - if !runner.running { - // don't send job to runner with closed receiveChan (is defunct) - // there should never be messages to read on an inactive runner, - // so we aren't losing any data here - select { - case <-runner.receiveChan: - // if the receive channel is closed, swap delete the runner as it's defunct - j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1] - j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1] - default: - err := wsjson.Write(context.Background(), runner.conn, rJ) - if err != nil { - log.Errorf("Could not launch run: %v", err) - break - } else { - log.Infof("Launched run for job %v on runner %v", run.JobName, runner.id) - launched = true - j.connectedRunners[i].running = true - break - } - } - } else { + log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) + + log.Debugf("Finding runner with tags %v", runnerTagsStr) + + for i, runner := range r.connectedRunners { + // don't allocate runner that is already occupied + if runner.running { log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) + continue + } + // don't allocate runner with closed receiveChan (is defunct) + // there should never be messages to read on an inactive runner, + // so we aren't losing any data here + select { + case _, ok := <-runner.receiveChan: + if ok { + // this should never happen + log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) + continue + } + log.Noticef("Removing defunct runner \"%v\"", runner.id) + // if the receive channel is closed, swap delete the runner as it's defunct + r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1] + r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] + default: + runner.running = true + request.RespChan <- GetRunnerResponse{ + Runner: runner, + Err: nil, + } + break msgCase } } - if !launched { - errorMsg := "could not find valid runner" - if len(j.connectedRunners) == 0 { - errorMsg = "no connected runners" - } - log.Errorf("Could not launch run for job \"%v\": %v", run.JobName, errorMsg) + errorMsg := "could not find valid runner" + if len(r.connectedRunners) == 0 { + errorMsg = "no connected runners" } - case registration := <-j.registerCh: + log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) + request.RespChan <- GetRunnerResponse{ + Runner: Runner{}, + Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), + } + + case registration := <-r.registerCh: log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret) - if configuredRunner, doesExist := j.configuredRunners[registration.Id]; doesExist { + if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist { if configuredRunner.Secret == registration.Secret { log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags) runner := Runner{ @@ -107,14 +121,14 @@ func runJobScheduler(j jobScheduler) { receiveChan: make(chan RunnerData), running: false, } - j.connectedRunners = append(j.connectedRunners, runner) + r.connectedRunners = append(r.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { for { msgType, data, err := registration.conn.Read(context.Background()) if err != nil { - // TODO: this is still racy, since a job could be emitted between the + // TODO: this is still racy, since a runner could be alloctade between the // connection returning an err and the channel closing close(runner.receiveChan) log.Errorf("Could not read from connection: %v", err) @@ -140,17 +154,17 @@ func runJobScheduler(j jobScheduler) { } } -func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string]config.Runner) (chan Run, chan RunnerRegistration, error) { - scheduler := jobScheduler{ - runCh: make(chan Run), +func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { + scheduler := runnerManager{ + getRunnerCh: make(chan GetRunnerRequest), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, } - go runJobScheduler(scheduler) + go runRunnerManager(scheduler) - return scheduler.runCh, scheduler.registerCh, nil + return scheduler.getRunnerCh, scheduler.registerCh, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {