Cleanup runnermanager code
This commit is contained in:
@@ -0,0 +1,26 @@
|
||||
package runnermanager
|
||||
|
||||
import "nhooyr.io/websocket"
|
||||
|
||||
//var log = logging.MustGetLogger("cursorius-server")
|
||||
|
||||
type RunnerData struct {
|
||||
msgType websocket.MessageType
|
||||
data []byte
|
||||
}
|
||||
|
||||
type Runner struct {
|
||||
id string
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan RunnerData
|
||||
running bool
|
||||
}
|
||||
|
||||
func (r *Runner) Id() string {
|
||||
return r.id
|
||||
}
|
||||
|
||||
func (r *Runner) RunCommand() (int64, string, string, error) {
|
||||
return 0, "", "", nil
|
||||
}
|
||||
+97
-101
@@ -21,18 +21,6 @@ type RunnerRegistration struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
type RunnerData struct {
|
||||
msgType websocket.MessageType
|
||||
data []byte
|
||||
}
|
||||
type Runner struct {
|
||||
id string
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan RunnerData
|
||||
running bool
|
||||
}
|
||||
|
||||
type runnerManager struct {
|
||||
getRunnerCh chan GetRunnerRequest
|
||||
registerCh chan RunnerRegistration
|
||||
@@ -55,101 +43,109 @@ type runnerJob struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (r *runnerManager) processRequest(req GetRunnerRequest) {
|
||||
var runnerTagsStr strings.Builder
|
||||
fmt.Fprintf(&runnerTagsStr, "%v", req.Tags[0])
|
||||
for _, tag := range req.Tags[1:] {
|
||||
fmt.Fprintf(&runnerTagsStr, " %v", tag)
|
||||
}
|
||||
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
||||
|
||||
log.Debugf("Finding runner with tags %v", runnerTagsStr)
|
||||
|
||||
for i, runner := range r.connectedRunners {
|
||||
// don't allocate runner that is already occupied
|
||||
if runner.running {
|
||||
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
|
||||
continue
|
||||
}
|
||||
// don't allocate runner with closed receiveChan (is defunct)
|
||||
// there should never be messages to read on an inactive runner,
|
||||
// so we aren't losing any data here
|
||||
select {
|
||||
case _, ok := <-runner.receiveChan:
|
||||
if ok {
|
||||
// this should never happen
|
||||
// TODO: should we disconnect the runner?
|
||||
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
|
||||
continue
|
||||
}
|
||||
log.Noticef("Removing defunct runner \"%v\"", runner.id)
|
||||
// if the receive channel is closed, swap delete the runner as it's defunct
|
||||
r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1]
|
||||
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
|
||||
default:
|
||||
runner.running = true
|
||||
req.RespChan <- GetRunnerResponse{
|
||||
Runner: runner,
|
||||
Err: nil,
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
errorMsg := "could not find valid runner"
|
||||
if len(r.connectedRunners) == 0 {
|
||||
errorMsg = "no connected runners"
|
||||
}
|
||||
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
|
||||
req.RespChan <- GetRunnerResponse{
|
||||
Runner: Runner{},
|
||||
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
||||
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
||||
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
|
||||
if configuredRunner.Secret == reg.Secret {
|
||||
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
||||
runner := Runner{
|
||||
id: reg.Id,
|
||||
tags: reg.Tags,
|
||||
conn: reg.conn,
|
||||
receiveChan: make(chan RunnerData),
|
||||
running: false,
|
||||
}
|
||||
r.connectedRunners = append(r.connectedRunners, runner)
|
||||
// start goroutine to call Read function on websocket connection
|
||||
// this is required to keep the connection functioning
|
||||
go func() {
|
||||
for {
|
||||
msgType, data, err := reg.conn.Read(context.Background())
|
||||
if err != nil {
|
||||
// TODO: this is still racy, since a runner could be alloctade between the
|
||||
// connection returning an err and the channel closing
|
||||
close(runner.receiveChan)
|
||||
log.Errorf("Could not read from connection: %v", err)
|
||||
log.Noticef("Deregistering runner with id: %v", runner.id)
|
||||
|
||||
return
|
||||
} else {
|
||||
log.Debugf("%v: %v", msgType, data)
|
||||
runner.receiveChan <- RunnerData{msgType: msgType, data: data}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, registration.Secret)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
}
|
||||
|
||||
func runRunnerManager(r runnerManager) {
|
||||
for {
|
||||
msgCase:
|
||||
select {
|
||||
case request := <-r.getRunnerCh:
|
||||
|
||||
var runnerTagsStr strings.Builder
|
||||
fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0])
|
||||
for _, tag := range request.Tags[1:] {
|
||||
fmt.Fprintf(&runnerTagsStr, " %v", tag)
|
||||
}
|
||||
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
||||
|
||||
log.Debugf("Finding runner with tags %v", runnerTagsStr)
|
||||
|
||||
for i, runner := range r.connectedRunners {
|
||||
// don't allocate runner that is already occupied
|
||||
if runner.running {
|
||||
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
|
||||
continue
|
||||
}
|
||||
// don't allocate runner with closed receiveChan (is defunct)
|
||||
// there should never be messages to read on an inactive runner,
|
||||
// so we aren't losing any data here
|
||||
select {
|
||||
case _, ok := <-runner.receiveChan:
|
||||
if ok {
|
||||
// this should never happen
|
||||
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
|
||||
continue
|
||||
}
|
||||
log.Noticef("Removing defunct runner \"%v\"", runner.id)
|
||||
// if the receive channel is closed, swap delete the runner as it's defunct
|
||||
r.connectedRunners[i] = r.connectedRunners[len(r.connectedRunners)-1]
|
||||
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
|
||||
default:
|
||||
runner.running = true
|
||||
request.RespChan <- GetRunnerResponse{
|
||||
Runner: runner,
|
||||
Err: nil,
|
||||
}
|
||||
break msgCase
|
||||
}
|
||||
}
|
||||
errorMsg := "could not find valid runner"
|
||||
if len(r.connectedRunners) == 0 {
|
||||
errorMsg = "no connected runners"
|
||||
}
|
||||
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
|
||||
request.RespChan <- GetRunnerResponse{
|
||||
Runner: Runner{},
|
||||
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
|
||||
}
|
||||
r.processRequest(request)
|
||||
|
||||
case registration := <-r.registerCh:
|
||||
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
|
||||
if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist {
|
||||
if configuredRunner.Secret == registration.Secret {
|
||||
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
|
||||
runner := Runner{
|
||||
id: registration.Id,
|
||||
tags: registration.Tags,
|
||||
conn: registration.conn,
|
||||
receiveChan: make(chan RunnerData),
|
||||
running: false,
|
||||
}
|
||||
r.connectedRunners = append(r.connectedRunners, runner)
|
||||
// start goroutine to call Read function on websocket connection
|
||||
// this is required to keep the connection functioning
|
||||
go func() {
|
||||
for {
|
||||
msgType, data, err := registration.conn.Read(context.Background())
|
||||
if err != nil {
|
||||
// TODO: this is still racy, since a runner could be alloctade between the
|
||||
// connection returning an err and the channel closing
|
||||
close(runner.receiveChan)
|
||||
log.Errorf("Could not read from connection: %v", err)
|
||||
log.Noticef("Deregistering runner with id: %v", runner.id)
|
||||
|
||||
return
|
||||
} else {
|
||||
log.Debugf("%v: %v", msgType, data)
|
||||
runner.receiveChan <- RunnerData{msgType: msgType, data: data}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
|
||||
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
|
||||
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
r.processRegistration(registration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user