Files
server/jobscheduler/jobscheduler.go
T

169 lines
5.0 KiB
Go

package jobscheduler
import (
"context"
"time"
"git.ohea.xyz/cursorius/server/config"
"github.com/op/go-logging"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
var log = logging.MustGetLogger("cursorius-server")
type RunnerRegistration struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
}
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
type jobScheduler struct {
runCh chan Run
registerCh chan RunnerRegistration
connectedRunners []Runner
configuredRunners map[string]config.Runner
}
type Run struct {
JobName string
JobConfig config.Job
Ref string
}
type runnerJob struct {
Id string
URL string
}
func runJobScheduler(j jobScheduler) {
for {
select {
case run := <-j.runCh:
log.Infof("Launching run for job \"%v\" on ref \"%v\"", run.JobName, run.Ref)
log.Debugf("Finding runner for job \"%v\"", run.JobName)
rJ := runnerJob{
Id: run.JobName,
URL: run.JobConfig.URL,
}
launched := false
for i, runner := range j.connectedRunners {
// don't send job to runner that is already occupied
if !runner.running {
// don't send job to runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner,
// so we aren't losing any data here
select {
case <-runner.receiveChan:
// if the receive channel is closed, swap delete the runner as it's defunct
j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1]
j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1]
default:
err := wsjson.Write(context.Background(), runner.conn, rJ)
if err != nil {
log.Errorf("Could not launch run: %v", err)
break
} else {
log.Infof("Launched run for job %v on runner %v", run.JobName, runner.id)
launched = true
j.connectedRunners[i].running = true
break
}
}
} else {
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
}
}
if !launched {
errorMsg := "could not find valid runner"
if len(j.connectedRunners) == 0 {
errorMsg = "no connected runners"
}
log.Errorf("Could not launch run for job \"%v\": %v", run.JobName, errorMsg)
}
case registration := <-j.registerCh:
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
if configuredRunner, doesExist := j.configuredRunners[registration.Id]; doesExist {
if configuredRunner.Secret == registration.Secret {
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
runner := Runner{
id: registration.Id,
tags: registration.Tags,
conn: registration.conn,
receiveChan: make(chan RunnerData),
running: false,
}
j.connectedRunners = append(j.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
for {
msgType, data, err := registration.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a job could be emitted between the
// connection returning an err and the channel closing
close(runner.receiveChan)
log.Errorf("Could not read from connection: %v", err)
log.Noticef("Deregistering runner with id: %v", runner.id)
return
} else {
log.Debugf("%v: %v", msgType, data)
runner.receiveChan <- RunnerData{msgType: msgType, data: data}
}
}
}()
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
}
}
}
func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string]config.Runner) (chan Run, chan RunnerRegistration, error) {
scheduler := jobScheduler{
runCh: make(chan Run),
registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
}
go runJobScheduler(scheduler)
return scheduler.runCh, scheduler.registerCh, nil
}
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var registration RunnerRegistration
registration.conn = conn
err := wsjson.Read(ctx, conn, &registration)
if err != nil {
log.Errorf("Could not read data from websocket connection: %v", err)
return
}
registerCh <- registration
}