Runners are removed from manager when alloacted

This removes an existing unlocked shared access to runner.running.
This also sets us up for better management of the runners.
This commit is contained in:
restitux 2023-03-08 00:13:40 -07:00
parent f190274bce
commit fe53a17160
5 changed files with 93 additions and 76 deletions

View File

@ -22,13 +22,12 @@ func setupHTTPServer(
mux *http.ServeMux,
conf config.PipelineConf,
db database.Database,
registerCh chan runnermanager.RunnerRegistration,
getRunnerCh chan runnermanager.GetRunnerRequest,
runnerManagerChans runnermanager.RunnerManagerChans,
) error {
webhook.CreateWebhookHandler(db, conf, mux)
pipeline_api.CreateHandler(getRunnerCh, mux)
pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux)
err := admin_api.CreateHandler(db, mux)
if err != nil {
@ -41,7 +40,7 @@ func setupHTTPServer(
log.Errorf("Could not upgrade runner connection to websocket: %v", err)
return
}
go runnermanager.RegisterRunner(conn, registerCh)
go runnermanager.RegisterRunner(conn, runnerManagerChans.Registration)
})
return nil
}
@ -52,16 +51,14 @@ func Listen(
port int,
conf config.PipelineConf,
db database.Database,
registerCh chan runnermanager.RunnerRegistration,
getRunnerCh chan runnermanager.GetRunnerRequest,
runnerManagerChans runnermanager.RunnerManagerChans,
) error {
err := setupHTTPServer(
mux,
conf,
db,
registerCh,
getRunnerCh,
runnerManagerChans,
)
if err != nil {
return fmt.Errorf("Could not setup http endpoints: %w", err)

View File

@ -40,7 +40,7 @@ func main() {
return
}
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
runnerManagerChans, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil {
log.Errorf("Could not start runner: %v", err)
return
@ -56,7 +56,6 @@ func main() {
configData.Config.Port,
configData.Config.PipelineConf,
db,
registerCh,
getRunnerCh,
runnerManagerChans,
))
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net/http"
"strings"
"sync"
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
@ -19,7 +18,8 @@ import (
var log = logging.MustGetLogger("cursorius-server")
type ApiServer struct {
getRunnerCh chan runnermanager.GetRunnerRequest
allocationCh chan runnermanager.RunnerAllocationRequest
releaseCh chan runnermanager.RunnerReleaseRequest
allocatedRunners map[uuid.UUID]*RunnerWrapper
allocatedRunnersMutex sync.RWMutex
}
@ -38,10 +38,14 @@ func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, st
return return_code, stdout, stderr, err
}
func (r *RunnerWrapper) Release() {
func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.runner.Release()
releaseCh <- runnermanager.RunnerReleaseRequest{
Runner: r.runner,
}
r.runner = nil
}
func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
@ -56,8 +60,8 @@ func (s *ApiServer) GetRunner(
req *connect.Request[apiv2.GetRunnerRequest],
) (*connect.Response[apiv2.GetRunnerResponse], error) {
respChan := make(chan runnermanager.GetRunnerResponse)
s.getRunnerCh <- runnermanager.GetRunnerRequest{
respChan := make(chan runnermanager.RunnerAllocationResponse)
s.allocationCh <- runnermanager.RunnerAllocationRequest{
Tags: req.Msg.Tags,
RespChan: respChan,
}
@ -99,7 +103,7 @@ func (s *ApiServer) ReleaseRunner(
s.allocatedRunnersMutex.Lock()
runner := s.allocatedRunners[uuid]
delete(s.allocatedRunners, uuid)
runner.Release()
runner.Release(s.releaseCh)
s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{})
@ -138,9 +142,10 @@ func (s *ApiServer) RunCommand(
return res, nil
}
func CreateHandler(getRunnerCh chan runnermanager.GetRunnerRequest, mux *http.ServeMux) {
func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) {
api_server := &ApiServer{
getRunnerCh: getRunnerCh,
allocationCh: allocationCh,
releaseCh: releaseCh,
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
}
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)

View File

@ -22,17 +22,27 @@ type Runner struct {
tags []string
conn *websocket.Conn
receiveChan chan []byte
running bool
}
func (r *Runner) HasTags(requestedTags []string) bool {
tagIter:
for _, requestedTag := range requestedTags {
for _, posessedTag := range r.tags {
// if we find the tag, move on to search for the next one
if posessedTag == requestedTag {
continue tagIter
}
}
// if we don't find the tag
return false
}
return true
}
func (r *Runner) Id() uuid.UUID {
return r.id
}
func (r *Runner) Release() {
r.running = false
}
func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) {
// Write RunCommand message to client

View File

@ -19,53 +19,57 @@ import (
var log = logging.MustGetLogger("cursorius-server")
type RunnerRegistration struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
type RunnerManagerChans struct {
Allocation chan RunnerAllocationRequest
Release chan RunnerReleaseRequest
Registration chan RunnerRegistrationRequest
}
type runnerManager struct {
getRunnerCh chan GetRunnerRequest
registerCh chan RunnerRegistration
chans RunnerManagerChans
connectedRunners []Runner
numConnectedRunners uint64
configuredRunners map[string]config.Runner
db database.Database
}
type GetRunnerRequest struct {
type RunnerAllocationRequest struct {
Tags []string
RespChan chan GetRunnerResponse
RespChan chan RunnerAllocationResponse
}
type GetRunnerResponse struct {
type RunnerAllocationResponse struct {
Runner *Runner
Err error
}
type RunnerReleaseRequest struct {
Runner *Runner
}
type RunnerRegistrationRequest struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
}
type runnerJob struct {
Id string
URL string
}
func (r *runnerManager) processRequest(req GetRunnerRequest) {
func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) {
tagsStr := util.FormatTags(req.Tags)
log.Infof("Got request for runner with tags \"%v\"", tagsStr)
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", tagsStr)
foundRunner := false
runnersToRemove := []int{}
runnerIter:
for i, runner := range r.connectedRunners {
// don't allocate runner that is already occupied
if runner.running {
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
continue
}
// don't allocate runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner,
// so we aren't losing any data here
@ -82,25 +86,20 @@ runnerIter:
default:
log.Debugf("Checking runner %v for requested tags", runner.id)
tagIter:
for _, requestedTag := range req.Tags {
for _, posessedTag := range runner.tags {
if requestedTag == posessedTag {
continue tagIter
}
}
if !runner.HasTags(req.Tags) {
continue runnerIter
}
r.connectedRunners[i].running = true
runnersToRemove = append(runnersToRemove, i)
foundRunner = true
req.RespChan <- GetRunnerResponse{
req.RespChan <- RunnerAllocationResponse{
Runner: &r.connectedRunners[i],
Err: nil,
}
}
}
// remove allocated runner plus defunct runners
// since we iterate, all the indexes will be in accending order
for i, runnerInd := range runnersToRemove {
r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1]
@ -115,43 +114,42 @@ runnerIter:
if len(r.connectedRunners) == 0 {
errorMsg = "no connected runners"
}
req.RespChan <- GetRunnerResponse{
req.RespChan <- RunnerAllocationResponse{
Runner: &Runner{},
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
}
}
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) {
log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret)
// Get runner with give id from database
runnerId, err := uuid.Parse(reg.Id)
runnerId, err := uuid.Parse(req.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if reg.Secret != dbRunner.Token {
if req.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags)
runner := Runner{
id: runnerId,
tags: reg.Tags,
conn: reg.conn,
tags: req.Tags,
conn: req.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
@ -160,7 +158,7 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
msgType, data, err := req.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
@ -180,22 +178,30 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
}()
}
func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) {
r.connectedRunners = append(r.connectedRunners, *req.Runner)
}
func runRunnerManager(r runnerManager) {
for {
select {
case request := <-r.getRunnerCh:
r.processRequest(request)
case registration := <-r.registerCh:
r.processRegistration(registration)
case request := <-r.chans.Allocation:
r.processRunnerAllocation(request)
case release := <-r.chans.Release:
r.processRunnerRelease(release)
case registration := <-r.chans.Registration:
r.processRunnerRegistration(registration)
}
}
}
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) {
scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration),
chans: RunnerManagerChans{
Allocation: make(chan RunnerAllocationRequest),
Release: make(chan RunnerReleaseRequest),
Registration: make(chan RunnerRegistrationRequest),
},
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
db: db,
@ -203,14 +209,14 @@ func StartRunnerManager(configuredRunners map[string]config.Runner, db database.
go runRunnerManager(scheduler)
return scheduler.getRunnerCh, scheduler.registerCh, nil
return scheduler.chans, nil
}
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) {
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var registration RunnerRegistration
var registration RunnerRegistrationRequest
registration.conn = conn
typ, r, err := conn.Read(ctx)