173 lines
5.1 KiB
Go
173 lines
5.1 KiB
Go
package jobscheduler
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.ohea.xyz/cursorius/server/config"
|
|
"github.com/op/go-logging"
|
|
"nhooyr.io/websocket"
|
|
"nhooyr.io/websocket/wsjson"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type RunnerRegistration struct {
|
|
Secret string
|
|
Id string
|
|
Tags []string
|
|
conn *websocket.Conn
|
|
}
|
|
|
|
type RunnerData struct {
|
|
msgType websocket.MessageType
|
|
data []byte
|
|
}
|
|
|
|
type Runner struct {
|
|
id string
|
|
tags []string
|
|
conn *websocket.Conn
|
|
receiveChan chan RunnerData
|
|
running bool
|
|
}
|
|
|
|
type jobScheduler struct {
|
|
runCh chan Run
|
|
registerCh chan RunnerRegistration
|
|
connectedRunners []Runner
|
|
configuredRunners map[string]config.Runner
|
|
jobs map[string]config.Job
|
|
}
|
|
|
|
type Run struct {
|
|
Name string
|
|
Ref string
|
|
}
|
|
|
|
type runnerJob struct {
|
|
Id string
|
|
URL string
|
|
}
|
|
|
|
func runJobScheduler(j jobScheduler) {
|
|
for {
|
|
select {
|
|
case run := <-j.runCh:
|
|
log.Infof("Got run: %v", run)
|
|
if job, exists := j.jobs[run.Name]; exists {
|
|
log.Debugf("Finding runner for job \"%v\"", run.Name)
|
|
rJ := runnerJob{
|
|
Id: run.Name,
|
|
URL: job.URL,
|
|
}
|
|
launched := false
|
|
for i, runner := range j.connectedRunners {
|
|
// don't send job to runner that is already occupied
|
|
if !runner.running {
|
|
// don't send job to runner with closed receiveChan (is defunct)
|
|
// there should never be messages to read on an inactive runner,
|
|
// so we aren't losing any data here
|
|
select {
|
|
case <-runner.receiveChan:
|
|
// if the receive channel is closed, swap delete the runner as it's defunct
|
|
j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1]
|
|
j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1]
|
|
default:
|
|
err := wsjson.Write(context.Background(), runner.conn, rJ)
|
|
if err != nil {
|
|
log.Debugf("Could not launch run: %v", err)
|
|
} else {
|
|
log.Infof("Launched run for job %v on runner %v", run.Name, runner.id)
|
|
launched = true
|
|
j.connectedRunners[i].running = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
|
|
}
|
|
}
|
|
if !launched {
|
|
errorMsg := "could not find valid runner"
|
|
if len(j.connectedRunners) == 0 {
|
|
errorMsg = "no connected runners"
|
|
}
|
|
log.Errorf("Could not launch run for job \"%v\": %v", run.Name, errorMsg)
|
|
}
|
|
} else {
|
|
log.Errorf("No configured job with name %v", run.Name)
|
|
}
|
|
case registration := <-j.registerCh:
|
|
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
|
|
if configuredRunner, doesExist := j.configuredRunners[registration.Id]; doesExist {
|
|
if configuredRunner.Secret == registration.Secret {
|
|
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
|
|
runner := Runner{
|
|
id: registration.Id,
|
|
tags: registration.Tags,
|
|
conn: registration.conn,
|
|
receiveChan: make(chan RunnerData),
|
|
running: false,
|
|
}
|
|
j.connectedRunners = append(j.connectedRunners, runner)
|
|
// start goroutine to call Read function on websocket connection
|
|
// this is required to keep the connection functioning
|
|
go func() {
|
|
for {
|
|
msgType, data, err := registration.conn.Read(context.Background())
|
|
if err != nil {
|
|
// TODO: this is still racy, since a job could be emitted between the
|
|
// connection returning an err and the channel closing
|
|
close(runner.receiveChan)
|
|
log.Errorf("Could not read from connection: %v", err)
|
|
log.Noticef("Deregistering runner with id: %v", runner.id)
|
|
|
|
return
|
|
} else {
|
|
log.Debugf("%v: %v", msgType, data)
|
|
runner.receiveChan <- RunnerData{msgType: msgType, data: data}
|
|
}
|
|
}
|
|
}()
|
|
|
|
} else {
|
|
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
|
|
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
}
|
|
} else {
|
|
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
|
|
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string]config.Runner) (chan Run, chan RunnerRegistration, error) {
|
|
scheduler := jobScheduler{
|
|
runCh: make(chan Run),
|
|
registerCh: make(chan RunnerRegistration),
|
|
connectedRunners: make([]Runner, 0),
|
|
configuredRunners: configuredRunners,
|
|
jobs: jobs,
|
|
}
|
|
|
|
go runJobScheduler(scheduler)
|
|
|
|
return scheduler.runCh, scheduler.registerCh, nil
|
|
}
|
|
|
|
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
defer cancel()
|
|
|
|
var registration RunnerRegistration
|
|
registration.conn = conn
|
|
err := wsjson.Read(ctx, conn, ®istration)
|
|
if err != nil {
|
|
log.Errorf("Could not read data from websocket connection: %v", err)
|
|
return
|
|
}
|
|
registerCh <- registration
|
|
}
|