From c0e33fa52a9ac39368262ea59a3ec5f18e2fe5eb Mon Sep 17 00:00:00 2001 From: restitux Date: Sat, 25 Feb 2023 02:31:54 -0700 Subject: [PATCH] Update runner manager for new database driven runner config --- database/func.go | 2 +- main.go | 2 +- runnermanager/runner.go | 5 +- runnermanager/runnermanager.go | 98 +++++++++++++++++++--------------- 4 files changed, 61 insertions(+), 46 deletions(-) diff --git a/database/func.go b/database/func.go index 6101ea9..b17daef 100644 --- a/database/func.go +++ b/database/func.go @@ -581,7 +581,7 @@ WHERE id=$1;` Id: id, } - err := db.Conn.QueryRow(context.Background(), query, id).Scan(nil, &runner.Name, &runner.Token) + err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token) if err != nil { return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err) } diff --git a/main.go b/main.go index 97153ca..ff30b50 100644 --- a/main.go +++ b/main.go @@ -40,7 +40,7 @@ func main() { return } - getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) + getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db) if err != nil { log.Errorf("Could not start runner: %v", err) return diff --git a/runnermanager/runner.go b/runnermanager/runner.go index 1b76f04..a9a2587 100644 --- a/runnermanager/runner.go +++ b/runnermanager/runner.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/google/uuid" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/reflect/protoreflect" "nhooyr.io/websocket" @@ -17,14 +18,14 @@ type RunnerData struct { } type Runner struct { - id string + id uuid.UUID tags []string conn *websocket.Conn receiveChan chan []byte running bool } -func (r *Runner) Id() string { +func (r *Runner) Id() uuid.UUID { return r.id } diff --git a/runnermanager/runnermanager.go b/runnermanager/runnermanager.go index ac570d0..8a06f6b 100644 --- a/runnermanager/runnermanager.go +++ b/runnermanager/runnermanager.go @@ -6,11 +6,13 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/op/go-logging" "google.golang.org/protobuf/proto" "nhooyr.io/websocket" "git.ohea.xyz/cursorius/server/config" + "git.ohea.xyz/cursorius/server/database" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" ) @@ -30,6 +32,7 @@ type runnerManager struct { connectedRunners []Runner numConnectedRunners uint64 configuredRunners map[string]config.Runner + db database.Database } type GetRunnerRequest struct { @@ -126,50 +129,60 @@ runnerIter: func (r *runnerManager) processRegistration(reg RunnerRegistration) { log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) - if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist { - if configuredRunner.Secret == reg.Secret { - log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) - runner := Runner{ - id: reg.Id, - tags: reg.Tags, - conn: reg.conn, - receiveChan: make(chan []byte), - running: false, - } - r.connectedRunners = append(r.connectedRunners, runner) - // start goroutine to call Read function on websocket connection - // this is required to keep the connection functioning - go func() { - defer log.Noticef("Deregistered runner with id: %v", runner.id) - defer close(runner.receiveChan) - for { - msgType, data, err := reg.conn.Read(context.Background()) - if err != nil { - // TODO: this is still racy, since a runner could be allocated between the - // connection returning an err and the channel closing - // This should probably be handled by sending erroring, but not 100% sure - log.Errorf("Could not read from connection: %v", err) - return - } - if msgType != websocket.MessageBinary { - close(runner.receiveChan) - log.Errorf("Got binary data from connection") - return - } - runner.receiveChan <- data - - } - }() - - } else { - log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret) - reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") - } - } else { - log.Errorf("Disconnecting runner with invalid id: %v", reg.Id) + // Get runner with give id from database + runnerId, err := uuid.Parse(reg.Id) + if err != nil { + log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err) reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + return } + dbRunner, err := r.db.GetRunnerById(runnerId) + if err != nil { + log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err) + reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + return + } + + if reg.Secret != dbRunner.Token { + log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId) + reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") + return + } + + log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) + runner := Runner{ + id: runnerId, + tags: reg.Tags, + conn: reg.conn, + receiveChan: make(chan []byte), + running: false, + } + r.connectedRunners = append(r.connectedRunners, runner) + // start goroutine to call Read function on websocket connection + // this is required to keep the connection functioning + go func() { + defer log.Noticef("Deregistered runner with id: %v", runner.id) + defer close(runner.receiveChan) + for { + msgType, data, err := reg.conn.Read(context.Background()) + if err != nil { + // TODO: this is still racy, since a runner could be allocated between the + // connection returning an err and the channel closing + // This should probably be handled by sending erroring, but not 100% sure + log.Errorf("Could not read from connection: %v", err) + return + } + if msgType != websocket.MessageBinary { + close(runner.receiveChan) + log.Errorf("Got binary data from connection") + return + } + + runner.receiveChan <- data + + } + }() } func runRunnerManager(r runnerManager) { @@ -184,12 +197,13 @@ func runRunnerManager(r runnerManager) { } } -func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { +func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) { scheduler := runnerManager{ getRunnerCh: make(chan GetRunnerRequest), registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, + db: db, } go runRunnerManager(scheduler)