Files
server/runnermanager/runnermanager.go
T
restitux 3cbe670bc1 Implement getRunner grpc endpoint
This includes refactoring the jobscheduler into the runnermanager. This
service manages runner connections and allocating them to pipelines.
These requests are done via the pipeline grpc api
2022-12-31 17:22:00 -07:00

183 lines
5.3 KiB
Go

package runnermanager
import (
"context"
"fmt"
"strings"
"time"
"git.ohea.xyz/cursorius/server/config"
"github.com/op/go-logging"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
var log = logging.MustGetLogger("cursorius-server")
type RunnerRegistration struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
}
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
type runnerManager struct {
getRunnerCh chan GetRunnerRequest
registerCh chan RunnerRegistration
connectedRunners []Runner
configuredRunners map[string]config.Runner
}
type GetRunnerRequest struct {
Tags []string
RespChan chan GetRunnerResponse
}
type GetRunnerResponse struct {
Runner Runner
Err error
}
type runnerJob struct {
Id string
URL string
}
func runRunnerManager(r runnerManager) {
for {
msgCase:
select {
case request := <-r.getRunnerCh:
var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0])
for _, tag := range request.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, " %v", tag)
}
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr)
for i, runner := range r.connectedRunners {
// don't allocate runner that is already occupied
if runner.running {
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
continue
}
// don't allocate runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner,
// so we aren't losing any data here
select {
case _, ok := <-runner.receiveChan:
if ok {
// this should never happen
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
continue
}
log.Noticef("Removing defunct runner \"%v\"", runner.id)
// if the receive channel is closed, swap delete the runner as it's defunct
r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1]
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
default:
runner.running = true
request.RespChan <- GetRunnerResponse{
Runner: runner,
Err: nil,
}
break msgCase
}
}
errorMsg := "could not find valid runner"
if len(r.connectedRunners) == 0 {
errorMsg = "no connected runners"
}
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
request.RespChan <- GetRunnerResponse{
Runner: Runner{},
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
}
case registration := <-r.registerCh:
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist {
if configuredRunner.Secret == registration.Secret {
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
runner := Runner{
id: registration.Id,
tags: registration.Tags,
conn: registration.conn,
receiveChan: make(chan RunnerData),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
for {
msgType, data, err := registration.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be alloctade between the
// connection returning an err and the channel closing
close(runner.receiveChan)
log.Errorf("Could not read from connection: %v", err)
log.Noticef("Deregistering runner with id: %v", runner.id)
return
} else {
log.Debugf("%v: %v", msgType, data)
runner.receiveChan <- RunnerData{msgType: msgType, data: data}
}
}
}()
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
}
}
}
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) {
scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
}
go runRunnerManager(scheduler)
return scheduler.getRunnerCh, scheduler.registerCh, nil
}
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var registration RunnerRegistration
registration.conn = conn
err := wsjson.Read(ctx, conn, &registration)
if err != nil {
log.Errorf("Could not read data from websocket connection: %v", err)
return
}
registerCh <- registration
}