Compare commits

11 Commits

10 changed files with 127 additions and 74 deletions
+12 -2
View File
@@ -217,12 +217,22 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil
},
},
"buildOutput": &graphql.Field{
Type: graphql.String,
Description: "Logs of the top level container build for the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.BuildOutput), nil
}
return nil, nil
},
},
"stdout": &graphql.Field{
Type: graphql.String,
Description: "The stdout used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return run.Stdout, nil
return string(run.Stdout), nil
}
return nil, nil
},
@@ -232,7 +242,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Description: "The stderr used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return run.Stderr, nil
return string(run.Stderr), nil
}
return nil, nil
},
+1
View File
@@ -149,6 +149,7 @@ CREATE TABLE runs (
id UUID PRIMARY KEY,
pipeline UUID,
in_progress BOOLEAN DEFAULT NULL,
build_output TEXT DEFAULT NULL,
result BIGINT DEFAULT NULL,
stdout TEXT DEFAULT NULL,
stderr TEXT DEFAULT NULL,
+25 -5
View File
@@ -295,8 +295,20 @@ RETURNING id, pipeline, in_progress;`
return run, nil
}
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
query := `
UPDATE runs
SET build_output=$1
WHERE id=$2;`
_, err := db.Conn.Exec(context.Background(),
query, buildResult, runId)
return err
}
func (db *Database) UpdateRunResult(r Run) error {
// TODO: the id fiend is the query is broken
query := `
UPDATE runs
SET in_progress=$1, result=$2, stdout=$3, stderr=$4
@@ -311,7 +323,7 @@ WHERE id=$5;`
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
query := `
SELECT id, in_progress, result, stdout, stderr
SELECT id, in_progress, result, build_output, stdout, stderr
FROM runs
WHERE pipeline=$1;`
@@ -330,6 +342,7 @@ WHERE pipeline=$1;`
&idStr,
&run.InProgress,
&run.Result,
&run.BuildOutput,
&run.Stdout,
&run.Stderr,
); err != nil {
@@ -568,7 +581,7 @@ WHERE id=$1;`
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(nil, &runner.Name, &runner.Token)
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
}
@@ -586,12 +599,19 @@ func (db *Database) CreateRunner(name string) (Runner, error) {
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO runners (id, name, token)
VALUES (uuid_generate_v4(), $1, TODO_GENERATE_STRING)
VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;`
var idStr string
+7 -6
View File
@@ -54,12 +54,13 @@ type Webhook struct {
}
type Run struct {
Id uuid.UUID
Pipeline uuid.UUID
InProgress bool
Result *int64
Stdout []byte
Stderr []byte
Id uuid.UUID
Pipeline uuid.UUID
InProgress bool
Result *int64
BuildOutput []byte
Stdout []byte
Stderr []byte
}
type CommandExecution struct {
+1
View File
@@ -32,3 +32,4 @@ services:
networks:
cursorius:
external: true
+1 -1
View File
@@ -40,7 +40,7 @@ func main() {
return
}
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners)
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil {
log.Errorf("Could not start runner: %v", err)
return
+6 -4
View File
@@ -64,11 +64,13 @@ func (s *ApiServer) GetRunner(
}
var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
for _, tag := range req.Msg.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
if len(req.Msg.Tags) > 0 {
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
for _, tag := range req.Msg.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
}
fmt.Fprintf(&runnerTagsStr, "]")
response := <-respChan
if response.Err != nil {
+9 -8
View File
@@ -146,7 +146,12 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
log.Errorf("could no read build response: %w", err)
return
}
log.Debugf("build log: %v", string(response))
err = db.UpdateRunBuildOutput(pe.Run.Id, string(response))
if err != nil {
log.Errorf("could not update build output for run: %w", err)
return
}
err = buildResponse.Body.Close()
if err != nil {
@@ -154,7 +159,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
return
}
log.Info("Image built sucessfully sucessfully")
log.Info("Image built sucessfully")
hostConfig := container.HostConfig{}
@@ -190,7 +195,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
// set cursorius environment variables
env = append(env, []string{
fmt.Sprintf("RUNID=%v", pe.Run.Id),
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...)
@@ -211,11 +216,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
&container.Config{
Image: imageName,
Tty: false,
Env: []string{
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
Env: env,
},
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig,
+3 -2
View File
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket"
@@ -17,14 +18,14 @@ type RunnerData struct {
}
type Runner struct {
id string
id uuid.UUID
tags []string
conn *websocket.Conn
receiveChan chan []byte
running bool
}
func (r *Runner) Id() string {
func (r *Runner) Id() uuid.UUID {
return r.id
}
+62 -46
View File
@@ -6,11 +6,13 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/op/go-logging"
"google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
)
@@ -30,6 +32,7 @@ type runnerManager struct {
connectedRunners []Runner
numConnectedRunners uint64
configuredRunners map[string]config.Runner
db database.Database
}
type GetRunnerRequest struct {
@@ -49,11 +52,13 @@ type runnerJob struct {
func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
if len(req.Tags) > 0 {
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
}
fmt.Fprintf(&runnerTagsStr, "]")
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
@@ -126,50 +131,60 @@ runnerIter:
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret {
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: reg.Id,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
// Get runner with give id from database
runnerId, err := uuid.Parse(reg.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if reg.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: runnerId,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
}
func runRunnerManager(r runnerManager) {
@@ -184,12 +199,13 @@ func runRunnerManager(r runnerManager) {
}
}
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) {
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
db: db,
}
go runRunnerManager(scheduler)