Compare commits
11 Commits
fbf918d627
...
v0.1.0
| Author | SHA1 | Date | |
|---|---|---|---|
| a2acb99689 | |||
| 191b73fe41 | |||
| 3ca1481632 | |||
| c0e33fa52a | |||
| 63529b7174 | |||
| 7e7c49c2e7 | |||
| 712a7b1429 | |||
| 3ae27bffc5 | |||
| 5373a37bee | |||
| 6fee5aa268 | |||
| b475631df6 |
+12
-2
@@ -217,12 +217,22 @@ func createSchema(db database.Database) (graphql.Schema, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"buildOutput": &graphql.Field{
|
||||||
|
Type: graphql.String,
|
||||||
|
Description: "Logs of the top level container build for the run.",
|
||||||
|
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
|
||||||
|
if run, ok := p.Source.(database.Run); ok {
|
||||||
|
return string(run.BuildOutput), nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
"stdout": &graphql.Field{
|
"stdout": &graphql.Field{
|
||||||
Type: graphql.String,
|
Type: graphql.String,
|
||||||
Description: "The stdout used to validate the run.",
|
Description: "The stdout used to validate the run.",
|
||||||
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
|
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
|
||||||
if run, ok := p.Source.(database.Run); ok {
|
if run, ok := p.Source.(database.Run); ok {
|
||||||
return run.Stdout, nil
|
return string(run.Stdout), nil
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
},
|
},
|
||||||
@@ -232,7 +242,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
|
|||||||
Description: "The stderr used to validate the run.",
|
Description: "The stderr used to validate the run.",
|
||||||
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
|
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
|
||||||
if run, ok := p.Source.(database.Run); ok {
|
if run, ok := p.Source.(database.Run); ok {
|
||||||
return run.Stderr, nil
|
return string(run.Stderr), nil
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -149,6 +149,7 @@ CREATE TABLE runs (
|
|||||||
id UUID PRIMARY KEY,
|
id UUID PRIMARY KEY,
|
||||||
pipeline UUID,
|
pipeline UUID,
|
||||||
in_progress BOOLEAN DEFAULT NULL,
|
in_progress BOOLEAN DEFAULT NULL,
|
||||||
|
build_output TEXT DEFAULT NULL,
|
||||||
result BIGINT DEFAULT NULL,
|
result BIGINT DEFAULT NULL,
|
||||||
stdout TEXT DEFAULT NULL,
|
stdout TEXT DEFAULT NULL,
|
||||||
stderr TEXT DEFAULT NULL,
|
stderr TEXT DEFAULT NULL,
|
||||||
|
|||||||
+25
-5
@@ -295,8 +295,20 @@ RETURNING id, pipeline, in_progress;`
|
|||||||
return run, nil
|
return run, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
|
||||||
|
query := `
|
||||||
|
UPDATE runs
|
||||||
|
SET build_output=$1
|
||||||
|
WHERE id=$2;`
|
||||||
|
|
||||||
|
_, err := db.Conn.Exec(context.Background(),
|
||||||
|
query, buildResult, runId)
|
||||||
|
|
||||||
|
return err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (db *Database) UpdateRunResult(r Run) error {
|
func (db *Database) UpdateRunResult(r Run) error {
|
||||||
// TODO: the id fiend is the query is broken
|
|
||||||
query := `
|
query := `
|
||||||
UPDATE runs
|
UPDATE runs
|
||||||
SET in_progress=$1, result=$2, stdout=$3, stderr=$4
|
SET in_progress=$1, result=$2, stdout=$3, stderr=$4
|
||||||
@@ -311,7 +323,7 @@ WHERE id=$5;`
|
|||||||
|
|
||||||
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
|
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
|
||||||
query := `
|
query := `
|
||||||
SELECT id, in_progress, result, stdout, stderr
|
SELECT id, in_progress, result, build_output, stdout, stderr
|
||||||
FROM runs
|
FROM runs
|
||||||
WHERE pipeline=$1;`
|
WHERE pipeline=$1;`
|
||||||
|
|
||||||
@@ -330,6 +342,7 @@ WHERE pipeline=$1;`
|
|||||||
&idStr,
|
&idStr,
|
||||||
&run.InProgress,
|
&run.InProgress,
|
||||||
&run.Result,
|
&run.Result,
|
||||||
|
&run.BuildOutput,
|
||||||
&run.Stdout,
|
&run.Stdout,
|
||||||
&run.Stderr,
|
&run.Stderr,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
@@ -568,7 +581,7 @@ WHERE id=$1;`
|
|||||||
Id: id,
|
Id: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := db.Conn.QueryRow(context.Background(), query, id).Scan(nil, &runner.Name, &runner.Token)
|
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
|
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
|
||||||
}
|
}
|
||||||
@@ -586,12 +599,19 @@ func (db *Database) CreateRunner(name string) (Runner, error) {
|
|||||||
|
|
||||||
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
|
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
|
||||||
if !validName.MatchString(name) {
|
if !validName.MatchString(name) {
|
||||||
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
|
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
|
||||||
}
|
}
|
||||||
|
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO runners (id, name, token)
|
INSERT INTO runners (id, name, token)
|
||||||
VALUES (uuid_generate_v4(), $1, TODO_GENERATE_STRING)
|
VALUES
|
||||||
|
(
|
||||||
|
uuid_generate_v4(),
|
||||||
|
$1,
|
||||||
|
(
|
||||||
|
SELECT md5(random()::text)
|
||||||
|
)
|
||||||
|
)
|
||||||
RETURNING id, name, token;`
|
RETURNING id, name, token;`
|
||||||
|
|
||||||
var idStr string
|
var idStr string
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ type Run struct {
|
|||||||
Pipeline uuid.UUID
|
Pipeline uuid.UUID
|
||||||
InProgress bool
|
InProgress bool
|
||||||
Result *int64
|
Result *int64
|
||||||
|
BuildOutput []byte
|
||||||
Stdout []byte
|
Stdout []byte
|
||||||
Stderr []byte
|
Stderr []byte
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,3 +32,4 @@ services:
|
|||||||
|
|
||||||
networks:
|
networks:
|
||||||
cursorius:
|
cursorius:
|
||||||
|
external: true
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners)
|
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Could not start runner: %v", err)
|
log.Errorf("Could not start runner: %v", err)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -64,11 +64,13 @@ func (s *ApiServer) GetRunner(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var runnerTagsStr strings.Builder
|
var runnerTagsStr strings.Builder
|
||||||
|
if len(req.Msg.Tags) > 0 {
|
||||||
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
|
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
|
||||||
for _, tag := range req.Msg.Tags[1:] {
|
for _, tag := range req.Msg.Tags[1:] {
|
||||||
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
|
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
|
||||||
}
|
}
|
||||||
fmt.Fprintf(&runnerTagsStr, "]")
|
fmt.Fprintf(&runnerTagsStr, "]")
|
||||||
|
}
|
||||||
|
|
||||||
response := <-respChan
|
response := <-respChan
|
||||||
if response.Err != nil {
|
if response.Err != nil {
|
||||||
|
|||||||
@@ -146,7 +146,12 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
|
|||||||
log.Errorf("could no read build response: %w", err)
|
log.Errorf("could no read build response: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debugf("build log: %v", string(response))
|
|
||||||
|
err = db.UpdateRunBuildOutput(pe.Run.Id, string(response))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("could not update build output for run: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
err = buildResponse.Body.Close()
|
err = buildResponse.Body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -154,7 +159,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Image built sucessfully sucessfully")
|
log.Info("Image built sucessfully")
|
||||||
|
|
||||||
hostConfig := container.HostConfig{}
|
hostConfig := container.HostConfig{}
|
||||||
|
|
||||||
@@ -190,7 +195,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
|
|||||||
|
|
||||||
// set cursorius environment variables
|
// set cursorius environment variables
|
||||||
env = append(env, []string{
|
env = append(env, []string{
|
||||||
fmt.Sprintf("RUNID=%v", pe.Run.Id),
|
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
|
||||||
"CURSORIUS_SRC_DIR=/cursorius/src",
|
"CURSORIUS_SRC_DIR=/cursorius/src",
|
||||||
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
|
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
|
||||||
}...)
|
}...)
|
||||||
@@ -211,11 +216,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
|
|||||||
&container.Config{
|
&container.Config{
|
||||||
Image: imageName,
|
Image: imageName,
|
||||||
Tty: false,
|
Tty: false,
|
||||||
Env: []string{
|
Env: env,
|
||||||
fmt.Sprintf("RUNID=%v", pe.Run.Id),
|
|
||||||
"CURSORIUS_SRC_DIR=/cursorius/src",
|
|
||||||
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
|
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
|
||||||
&hostConfig,
|
&hostConfig,
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/reflect/protoreflect"
|
"google.golang.org/protobuf/reflect/protoreflect"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
@@ -17,14 +18,14 @@ type RunnerData struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
id string
|
id uuid.UUID
|
||||||
tags []string
|
tags []string
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
receiveChan chan []byte
|
receiveChan chan []byte
|
||||||
running bool
|
running bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runner) Id() string {
|
func (r *Runner) Id() uuid.UUID {
|
||||||
return r.id
|
return r.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,11 +6,13 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/op/go-logging"
|
"github.com/op/go-logging"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
|
|
||||||
"git.ohea.xyz/cursorius/server/config"
|
"git.ohea.xyz/cursorius/server/config"
|
||||||
|
"git.ohea.xyz/cursorius/server/database"
|
||||||
|
|
||||||
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
||||||
)
|
)
|
||||||
@@ -30,6 +32,7 @@ type runnerManager struct {
|
|||||||
connectedRunners []Runner
|
connectedRunners []Runner
|
||||||
numConnectedRunners uint64
|
numConnectedRunners uint64
|
||||||
configuredRunners map[string]config.Runner
|
configuredRunners map[string]config.Runner
|
||||||
|
db database.Database
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetRunnerRequest struct {
|
type GetRunnerRequest struct {
|
||||||
@@ -49,11 +52,13 @@ type runnerJob struct {
|
|||||||
|
|
||||||
func (r *runnerManager) processRequest(req GetRunnerRequest) {
|
func (r *runnerManager) processRequest(req GetRunnerRequest) {
|
||||||
var runnerTagsStr strings.Builder
|
var runnerTagsStr strings.Builder
|
||||||
|
if len(req.Tags) > 0 {
|
||||||
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
|
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
|
||||||
for _, tag := range req.Tags[1:] {
|
for _, tag := range req.Tags[1:] {
|
||||||
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
|
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
|
||||||
}
|
}
|
||||||
fmt.Fprintf(&runnerTagsStr, "]")
|
fmt.Fprintf(&runnerTagsStr, "]")
|
||||||
|
}
|
||||||
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
||||||
|
|
||||||
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
|
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
|
||||||
@@ -126,11 +131,30 @@ runnerIter:
|
|||||||
|
|
||||||
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
||||||
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
||||||
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
|
|
||||||
if configuredRunner.Secret == reg.Secret {
|
// Get runner with give id from database
|
||||||
|
runnerId, err := uuid.Parse(reg.Id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
|
||||||
|
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dbRunner, err := r.db.GetRunnerById(runnerId)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
|
||||||
|
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if reg.Secret != dbRunner.Token {
|
||||||
|
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
|
||||||
|
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
||||||
runner := Runner{
|
runner := Runner{
|
||||||
id: reg.Id,
|
id: runnerId,
|
||||||
tags: reg.Tags,
|
tags: reg.Tags,
|
||||||
conn: reg.conn,
|
conn: reg.conn,
|
||||||
receiveChan: make(chan []byte),
|
receiveChan: make(chan []byte),
|
||||||
@@ -161,15 +185,6 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
} else {
|
|
||||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
|
|
||||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
|
|
||||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func runRunnerManager(r runnerManager) {
|
func runRunnerManager(r runnerManager) {
|
||||||
@@ -184,12 +199,13 @@ func runRunnerManager(r runnerManager) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) {
|
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
|
||||||
scheduler := runnerManager{
|
scheduler := runnerManager{
|
||||||
getRunnerCh: make(chan GetRunnerRequest),
|
getRunnerCh: make(chan GetRunnerRequest),
|
||||||
registerCh: make(chan RunnerRegistration),
|
registerCh: make(chan RunnerRegistration),
|
||||||
connectedRunners: make([]Runner, 0),
|
connectedRunners: make([]Runner, 0),
|
||||||
configuredRunners: configuredRunners,
|
configuredRunners: configuredRunners,
|
||||||
|
db: db,
|
||||||
}
|
}
|
||||||
|
|
||||||
go runRunnerManager(scheduler)
|
go runRunnerManager(scheduler)
|
||||||
|
|||||||
Reference in New Issue
Block a user