Compare commits

...

5 Commits

7 changed files with 84 additions and 61 deletions
+10 -3
View File
@@ -581,7 +581,7 @@ WHERE id=$1;`
Id: id, Id: id,
} }
err := db.Conn.QueryRow(context.Background(), query, id).Scan(nil, &runner.Name, &runner.Token) err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil { if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err) return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
} }
@@ -599,12 +599,19 @@ func (db *Database) CreateRunner(name string) (Runner, error) {
validName := regexp.MustCompile(`[A-Z0-9_]+$`) validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) { if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores") return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
} }
query := ` query := `
INSERT INTO runners (id, name, token) INSERT INTO runners (id, name, token)
VALUES (uuid_generate_v4(), $1, TODO_GENERATE_STRING) VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;` RETURNING id, name, token;`
var idStr string var idStr string
+1
View File
@@ -32,3 +32,4 @@ services:
networks: networks:
cursorius: cursorius:
external: true
+1 -1
View File
@@ -40,7 +40,7 @@ func main() {
return return
} }
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil { if err != nil {
log.Errorf("Could not start runner: %v", err) log.Errorf("Could not start runner: %v", err)
return return
+2
View File
@@ -64,11 +64,13 @@ func (s *ApiServer) GetRunner(
} }
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
if len(req.Msg.Tags) > 0 {
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0]) fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
for _, tag := range req.Msg.Tags[1:] { for _, tag := range req.Msg.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag) fmt.Fprintf(&runnerTagsStr, ", %v", tag)
} }
fmt.Fprintf(&runnerTagsStr, "]") fmt.Fprintf(&runnerTagsStr, "]")
}
response := <-respChan response := <-respChan
if response.Err != nil { if response.Err != nil {
+1 -5
View File
@@ -216,11 +216,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
&container.Config{ &container.Config{
Image: imageName, Image: imageName,
Tty: false, Tty: false,
Env: []string{ Env: env,
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
}, },
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig) // TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig, &hostConfig,
+3 -2
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@@ -17,14 +18,14 @@ type RunnerData struct {
} }
type Runner struct { type Runner struct {
id string id uuid.UUID
tags []string tags []string
conn *websocket.Conn conn *websocket.Conn
receiveChan chan []byte receiveChan chan []byte
running bool running bool
} }
func (r *Runner) Id() string { func (r *Runner) Id() uuid.UUID {
return r.id return r.id
} }
+29 -13
View File
@@ -6,11 +6,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
) )
@@ -30,6 +32,7 @@ type runnerManager struct {
connectedRunners []Runner connectedRunners []Runner
numConnectedRunners uint64 numConnectedRunners uint64
configuredRunners map[string]config.Runner configuredRunners map[string]config.Runner
db database.Database
} }
type GetRunnerRequest struct { type GetRunnerRequest struct {
@@ -49,11 +52,13 @@ type runnerJob struct {
func (r *runnerManager) processRequest(req GetRunnerRequest) { func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
if len(req.Tags) > 0 {
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0]) fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
for _, tag := range req.Tags[1:] { for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag) fmt.Fprintf(&runnerTagsStr, ", %v", tag)
} }
fmt.Fprintf(&runnerTagsStr, "]") fmt.Fprintf(&runnerTagsStr, "]")
}
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
@@ -126,11 +131,30 @@ runnerIter:
func (r *runnerManager) processRegistration(reg RunnerRegistration) { func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret { // Get runner with give id from database
runnerId, err := uuid.Parse(reg.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if reg.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{ runner := Runner{
id: reg.Id, id: runnerId,
tags: reg.Tags, tags: reg.Tags,
conn: reg.conn, conn: reg.conn,
receiveChan: make(chan []byte), receiveChan: make(chan []byte),
@@ -161,15 +185,6 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
} }
}() }()
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} }
func runRunnerManager(r runnerManager) { func runRunnerManager(r runnerManager) {
@@ -184,12 +199,13 @@ func runRunnerManager(r runnerManager) {
} }
} }
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
scheduler := runnerManager{ scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest), getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration), registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0), connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners, configuredRunners: configuredRunners,
db: db,
} }
go runRunnerManager(scheduler) go runRunnerManager(scheduler)