package jobscheduler import ( "context" "time" "git.ohea.xyz/cursorius/server/config" "github.com/op/go-logging" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) var log = logging.MustGetLogger("cursorius-server") type RunnerRegistration struct { Secret string Id string Tags []string conn *websocket.Conn } type RunnerData struct { msgType websocket.MessageType data []byte } type Runner struct { id string tags []string conn *websocket.Conn receiveChan chan RunnerData running bool } type jobScheduler struct { runCh chan Run registerCh chan RunnerRegistration connectedRunners []Runner configuredRunners map[string]config.Runner jobs map[string]config.Job } type Run struct { Name string } type runnerJob struct { Id string URL string } func runJobScheduler(j jobScheduler) { for { select { case run := <-j.runCh: log.Infof("Got run: %v", run) if job, exists := j.jobs[run.Name]; exists { log.Debugf("Finding runner for job \"%v\"", run.Name) rJ := runnerJob{ Id: run.Name, URL: job.URL, } launched := false for i, runner := range j.connectedRunners { // don't send job to runner that is already occupied if !runner.running { // don't send job to runnenr with closed receiveChan (is defunct) // there should never be messages to read on an inactive runner, // so we aren't losing any data here select { case <-runner.receiveChan: // if the receive channel is closed, swap delete the runner as it's defunct j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1] j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1] default: err := wsjson.Write(context.Background(), runner.conn, rJ) if err != nil { log.Debugf("Could not launch run: %v", err) } else { log.Infof("Launched run for job %v on runner %v", run.Name, runner.id) launched = true j.connectedRunners[i].running = true break } } } else { log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) } } if !launched { errorMsg := "could not find valid runner" if len(j.connectedRunners) == 0 { errorMsg = "no connected runners" } log.Errorf("Could not launch run for job \"%v\": %v", run.Name, errorMsg) } } else { log.Errorf("No configured job with name %v", run.Name) } case registration := <-j.registerCh: log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret) if configuredRunner, doesExist := j.configuredRunners[registration.Id]; doesExist { if configuredRunner.Secret == registration.Secret { log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags) runner := Runner{ id: registration.Id, tags: registration.Tags, conn: registration.conn, receiveChan: make(chan RunnerData), running: false, } j.connectedRunners = append(j.connectedRunners, runner) // start goroutine to call Read function on websocket connection // this is required to keep the connection functioning go func() { for { msgType, data, err := registration.conn.Read(context.Background()) if err != nil { // TODO: this is still racy, since a job could be emitted between the // connection returning an err and the channel closing close(runner.receiveChan) log.Errorf("Could not read from connection: %v", err) log.Noticef("Deregistering runner with id: %v", runner.id) return } else { log.Debugf("%v: %v", msgType, data) runner.receiveChan <- RunnerData{msgType: msgType, data: data} } } }() } else { log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret) registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } else { log.Errorf("Disconnecting runner with invalid id: %v", registration.Id) registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") } } } } func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string]config.Runner) (chan Run, chan RunnerRegistration, error) { scheduler := jobScheduler{ runCh: make(chan Run), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, jobs: jobs, } go runJobScheduler(scheduler) return scheduler.runCh, scheduler.registerCh, nil } func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var registration RunnerRegistration registration.conn = conn err := wsjson.Read(ctx, conn, ®istration) if err != nil { log.Errorf("Could not read data from websocket connection: %v", err) return } registerCh <- registration }