Compare commits

19 Commits

Author SHA1 Message Date
restitux a2acb99689 Fix not using correct environment variables 2023-02-25 02:57:13 -07:00
restitux 191b73fe41 Fix index out of range issue with empty tag list printing 2023-02-25 02:51:11 -07:00
restitux 3ca1481632 Make compose network external 2023-02-25 02:32:04 -07:00
restitux c0e33fa52a Update runner manager for new database driven runner config 2023-02-25 02:31:54 -07:00
restitux 63529b7174 Fix runner creation api 2023-02-25 01:36:55 -07:00
restitux 7e7c49c2e7 Exposed build output to api 2023-02-24 23:59:02 -07:00
restitux 712a7b1429 Fix graphql api not returning stdout/stderr as strings 2023-02-24 23:20:40 -07:00
restitux 3ae27bffc5 Cleanup logging 2023-02-24 23:00:31 -07:00
restitux 5373a37bee Change RUNID env var name 2023-02-24 23:00:26 -07:00
restitux 6fee5aa268 Record docker build output in database 2023-02-24 23:00:10 -07:00
restitux b475631df6 Remove outdated todo 2023-02-24 22:51:19 -07:00
restitux fbf918d627 Fix typo 2023-02-24 22:46:15 -07:00
restitux 4069e1b0e1 Update executor to run built user provided container 2023-02-24 22:37:17 -07:00
restitux 708fbca91a Tag built container with build and run name 2023-02-24 22:32:24 -07:00
restitux 77a5514578 Fix secret name validation not checking for beginning of string 2023-02-24 22:29:21 -07:00
restitux 77a8d0840a Fix inserting new repeated refs into db failing #20 2023-02-24 22:28:50 -07:00
restitux 62b4e8f17e Add runner query api 2023-02-24 22:28:07 -07:00
restitux 620c20f717 Change pipeline executor to build container in repo at .cursorius/Dockerfile 2023-02-24 22:27:07 -07:00
restitux 0979a2379e Change poll logic to query refs after sleep for easire debugging 2023-02-24 22:26:41 -07:00
13 changed files with 334 additions and 108 deletions
+75 -2
View File
@@ -14,6 +14,43 @@ import (
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
func createSchema(db database.Database) (graphql.Schema, error) { func createSchema(db database.Database) (graphql.Schema, error) {
runnerType := graphql.NewObject(graphql.ObjectConfig{
Name: "Runner",
Description: "A runner available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Name, nil
}
return nil, nil
},
},
"token": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The token.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Token, nil
}
return nil, nil
},
},
},
})
secretType := graphql.NewObject(graphql.ObjectConfig{ secretType := graphql.NewObject(graphql.ObjectConfig{
Name: "Secret", Name: "Secret",
Description: "A secret available for use inside of a pipeline.", Description: "A secret available for use inside of a pipeline.",
@@ -180,12 +217,22 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"buildOutput": &graphql.Field{
Type: graphql.String,
Description: "Logs of the top level container build for the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.BuildOutput), nil
}
return nil, nil
},
},
"stdout": &graphql.Field{ "stdout": &graphql.Field{
Type: graphql.String, Type: graphql.String,
Description: "The stdout used to validate the run.", Description: "The stdout used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stdout, nil return string(run.Stdout), nil
} }
return nil, nil return nil, nil
}, },
@@ -195,7 +242,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Description: "The stderr used to validate the run.", Description: "The stderr used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stderr, nil return string(run.Stderr), nil
} }
return nil, nil return nil, nil
}, },
@@ -348,6 +395,13 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return db.GetSecrets() return db.GetSecrets()
}, },
}, },
"Runners": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(runnerType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetRunners()
},
},
}, },
}) })
@@ -485,6 +539,25 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return secret, nil return secret, nil
}, },
}, },
"createRunner": &graphql.Field{
Type: runnerType,
Description: "Create a new runner",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
runner, err := db.CreateRunner(
params.Args["name"].(string),
)
if err != nil {
return nil, err
}
return runner, nil
},
},
"setPipelineCloneCredential": &graphql.Field{ "setPipelineCloneCredential": &graphql.Field{
Type: pipelineType, Type: pipelineType,
Description: "Set the CloneCredential used by a pipeline to clone the source repo", Description: "Set the CloneCredential used by a pipeline to clone the source repo",
+3 -2
View File
@@ -149,6 +149,7 @@ CREATE TABLE runs (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
pipeline UUID, pipeline UUID,
in_progress BOOLEAN DEFAULT NULL, in_progress BOOLEAN DEFAULT NULL,
build_output TEXT DEFAULT NULL,
result BIGINT DEFAULT NULL, result BIGINT DEFAULT NULL,
stdout TEXT DEFAULT NULL, stdout TEXT DEFAULT NULL,
stderr TEXT DEFAULT NULL, stderr TEXT DEFAULT NULL,
@@ -175,8 +176,8 @@ CREATE TABLE command_executions (
CREATE TABLE runners ( CREATE TABLE runners (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
name TEXT, name TEXT NOT NULL UNIQUE,
secret TEXT token TEXT NOT NULL
); );
CREATE TABLE pipeline_refs ( CREATE TABLE pipeline_refs (
+115 -19
View File
@@ -6,7 +6,6 @@ import (
"regexp" "regexp"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jackc/pgx/v5"
) )
func (db *Database) GetPipelines() ([]Pipeline, error) { func (db *Database) GetPipelines() ([]Pipeline, error) {
@@ -296,23 +295,35 @@ RETURNING id, pipeline, in_progress;`
return run, nil return run, nil
} }
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
query := `
UPDATE runs
SET build_output=$1
WHERE id=$2;`
_, err := db.Conn.Exec(context.Background(),
query, buildResult, runId)
return err
}
func (db *Database) UpdateRunResult(r Run) error { func (db *Database) UpdateRunResult(r Run) error {
// TODO: the id fiend is the query is broken
query := ` query := `
UPDATE runs UPDATE runs
SET in_progress=$1, result=$2, stdout=$3, stderr=$4 SET in_progress=$1, result=$2, stdout=$3, stderr=$4
WHERE id=$3;` WHERE id=$5;`
// TODO: does r.Result need a pointer derefrence? // TODO: does r.Result need a pointer derefrence?
_, err := db.Conn.Exec(context.Background(), _, err := db.Conn.Exec(context.Background(),
query, r.InProgress, r.Result, r.Stdout, r.Stderr) query, r.InProgress, r.Result, r.Stdout, r.Stderr, r.Id)
return err return err
} }
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) { func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
query := ` query := `
SELECT id, in_progress, result, stdout, stderr SELECT id, in_progress, result, build_output, stdout, stderr
FROM runs FROM runs
WHERE pipeline=$1;` WHERE pipeline=$1;`
@@ -331,6 +342,7 @@ WHERE pipeline=$1;`
&idStr, &idStr,
&run.InProgress, &run.InProgress,
&run.Result, &run.Result,
&run.BuildOutput,
&run.Stdout, &run.Stdout,
&run.Stderr, &run.Stderr,
); err != nil { ); err != nil {
@@ -379,21 +391,18 @@ WHERE pipeline_id=$1;`
func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error { func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error {
refsSlice := make([][]interface{}, 0) query := `
for name, ref := range refsMap { INSERT INTO pipeline_refs(name, pipeline_id, hash)
refsSlice = append(refsSlice, []interface{}{name, pipelineId, ref}) VALUES($1, $2, $3)
} ON CONFLICT (name)
DO
UPDATE SET hash=$3;`
copyCount, err := db.Conn.CopyFrom( for name, hash := range refsMap {
context.Background(), _, err := db.Conn.Exec(context.Background(), query, name, pipelineId, hash)
pgx.Identifier{"pipeline_refs"},
[]string{"name", "pipeline_id", "hash"}, return err
pgx.CopyFromRows(refsSlice),
)
if err != nil {
return fmt.Errorf("could not insert updated pipeline refs: %w", err)
} }
log.Debugf("copyCount: %v", copyCount)
return nil return nil
} }
@@ -453,7 +462,7 @@ func (db *Database) CreateSecret(name string, secret string) (Secret, error) {
return s, fmt.Errorf("secret name must be 256 characters or less") return s, fmt.Errorf("secret name must be 256 characters or less")
} }
validName := regexp.MustCompile(`^[A-Z0-9_]+$`) validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) { if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores") return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
} }
@@ -531,3 +540,90 @@ WHERE
return secrets, nil return secrets, nil
} }
func (db *Database) GetRunners() ([]Runner, error) {
query := `
SELECT id, name, token
FROM runners;`
runners := make([]Runner, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return runners, fmt.Errorf("Could not query database for runners: %w", err)
}
defer rows.Close()
for rows.Next() {
var runner Runner
var idStr string
if err := rows.Scan(&idStr, &runner.Name, &runner.Token); err != nil {
return runners, err
}
runner.Id, err = uuid.Parse(idStr)
if err != nil {
return runners, err
}
runners = append(runners, runner)
}
return runners, nil
}
func (db *Database) GetRunnerById(id uuid.UUID) (Runner, error) {
query := `
SELECT name, token
FROM runners
WHERE id=$1;`
runner := Runner{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
}
return runner, nil
}
func (db *Database) CreateRunner(name string) (Runner, error) {
s := Runner{}
// validate that the runner name is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("runner name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO runners (id, name, token)
VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name).Scan(&idStr, &s.Name, &s.Token)
if err != nil {
return s, fmt.Errorf("Could not create runner: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
+10 -9
View File
@@ -54,12 +54,13 @@ type Webhook struct {
} }
type Run struct { type Run struct {
Id uuid.UUID Id uuid.UUID
Pipeline uuid.UUID Pipeline uuid.UUID
InProgress bool InProgress bool
Result *int64 Result *int64
Stdout []byte BuildOutput []byte
Stderr []byte Stdout []byte
Stderr []byte
} }
type CommandExecution struct { type CommandExecution struct {
@@ -74,7 +75,7 @@ type CommandExecution struct {
} }
type Runner struct { type Runner struct {
Id uuid.UUID Id uuid.UUID
Name string Name string
Secret string Token string
} }
+1
View File
@@ -32,3 +32,4 @@ services:
networks: networks:
cursorius: cursorius:
external: true
+1
View File
@@ -14,6 +14,7 @@ require (
github.com/graphql-go/graphql v0.8.0 github.com/graphql-go/graphql v0.8.0
github.com/graphql-go/handler v0.2.3 github.com/graphql-go/handler v0.2.3
github.com/jackc/pgx/v5 v5.2.0 github.com/jackc/pgx/v5 v5.2.0
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
golang.org/x/net v0.2.0 golang.org/x/net v0.2.0
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
+2
View File
@@ -858,6 +858,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d h1:q7n+5taxmM+9T2Q7Ydo7YN90FkoDuR5bbzByZwkQqPo=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d/go.mod h1:GN1Mg/uXQ6qwXA0HypnUO3xlcQJS9/y68EsHNeuuRa4=
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4= github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+1 -1
View File
@@ -40,7 +40,7 @@ func main() {
return return
} }
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil { if err != nil {
log.Errorf("Could not start runner: %v", err) log.Errorf("Could not start runner: %v", err)
return return
+6 -4
View File
@@ -64,11 +64,13 @@ func (s *ApiServer) GetRunner(
} }
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0]) if len(req.Msg.Tags) > 0 {
for _, tag := range req.Msg.Tags[1:] { fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
fmt.Fprintf(&runnerTagsStr, ", %v", tag) for _, tag := range req.Msg.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
} }
fmt.Fprintf(&runnerTagsStr, "]")
response := <-respChan response := <-respChan
if response.Err != nil { if response.Err != nil {
+52 -20
View File
@@ -4,11 +4,13 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/jhoonb/archivex"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/mount"
@@ -34,6 +36,7 @@ type PipelineExecution struct {
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) { func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String()) jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
cloneFolder := filepath.Join(jobFolder, "repo")
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url) log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url)
@@ -45,7 +48,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
return return
} }
err = os.MkdirAll(jobFolder, 0755) err = os.MkdirAll(cloneFolder, 0755)
if err != nil { if err != nil {
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err) log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err)
return return
@@ -84,7 +87,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
auth = nil auth = nil
} }
_, err = git.PlainClone(jobFolder, false, &git.CloneOptions{ _, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
URL: pe.Pipeline.Url, URL: pe.Pipeline.Url,
Auth: auth, Auth: auth,
}) })
@@ -102,28 +105,61 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
ctx := context.Background() ctx := context.Background()
imageName := "git.ohea.xyz/cursorius/pipeline-api/cursorius-pipeline:v2" log.Info("Building container")
log.Infof("Pulling image %v", imageName) tarFile := filepath.Join(jobFolder, "archive.tar")
pullOutput, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) tar := new(archivex.TarFile)
err = tar.Create(tarFile)
if err != nil { if err != nil {
log.Errorf("could not pull image %v: %w", imageName, err) log.Errorf("could not create tarfile: %w", err)
return return
} }
buf, err := io.ReadAll(pullOutput) err = tar.AddAll(cloneFolder, false)
if err != nil { if err != nil {
log.Errorf("could not read from io.ReadCloser:, %w", err) log.Errorf("could not add repo to tarfile: %w", err)
return return
} }
log.Infof("%s", buf)
err = pullOutput.Close() err = tar.Close()
if err != nil { if err != nil {
log.Errorf("could not close io.ReadCloser: %w", err) log.Errorf("could not close tarfile: %w", err)
return return
} }
log.Info("Image pulled sucessfully")
dockerBuildContext, err := os.Open(tarFile)
defer dockerBuildContext.Close()
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: ".cursorius/Dockerfile",
})
if err != nil {
log.Errorf("could not build container: %w", err)
return
}
response, err := ioutil.ReadAll(buildResponse.Body)
if err != nil {
log.Errorf("could no read build response: %w", err)
return
}
err = db.UpdateRunBuildOutput(pe.Run.Id, string(response))
if err != nil {
log.Errorf("could not update build output for run: %w", err)
return
}
err = buildResponse.Body.Close()
if err != nil {
log.Errorf("Could not close build response body: %w", err)
return
}
log.Info("Image built sucessfully")
hostConfig := container.HostConfig{} hostConfig := container.HostConfig{}
@@ -159,9 +195,9 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
// set cursorius environment variables // set cursorius environment variables
env = append(env, []string{ env = append(env, []string{
fmt.Sprintf("RUNID=%v", pe.Run.Id), fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src", "CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL), fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...) }...)
// load secrets into environment // load secrets into environment
@@ -180,11 +216,7 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
&container.Config{ &container.Config{
Image: imageName, Image: imageName,
Tty: false, Tty: false,
Env: []string{ Env: env,
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
}, },
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig) // TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig, &hostConfig,
+3 -3
View File
@@ -28,15 +28,15 @@ type tag struct {
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
for { for {
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
log.Infof("Polling repo %v", pipeline.Name)
prevRefs, err := db.GetPipelineRefs(pipeline.Id) prevRefs, err := db.GetPipelineRefs(pipeline.Id)
if err != nil { if err != nil {
log.Errorf("Could not get pipeline refs from db: %v", err) log.Errorf("Could not get pipeline refs from db: %v", err)
return return
} }
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
log.Infof("Polling repo %v", pipeline.Name)
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: pipeline.Url, URL: pipeline.Url,
}) })
+3 -2
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@@ -17,14 +18,14 @@ type RunnerData struct {
} }
type Runner struct { type Runner struct {
id string id uuid.UUID
tags []string tags []string
conn *websocket.Conn conn *websocket.Conn
receiveChan chan []byte receiveChan chan []byte
running bool running bool
} }
func (r *Runner) Id() string { func (r *Runner) Id() uuid.UUID {
return r.id return r.id
} }
+62 -46
View File
@@ -6,11 +6,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
) )
@@ -30,6 +32,7 @@ type runnerManager struct {
connectedRunners []Runner connectedRunners []Runner
numConnectedRunners uint64 numConnectedRunners uint64
configuredRunners map[string]config.Runner configuredRunners map[string]config.Runner
db database.Database
} }
type GetRunnerRequest struct { type GetRunnerRequest struct {
@@ -49,11 +52,13 @@ type runnerJob struct {
func (r *runnerManager) processRequest(req GetRunnerRequest) { func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0]) if len(req.Tags) > 0 {
for _, tag := range req.Tags[1:] { fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
fmt.Fprintf(&runnerTagsStr, ", %v", tag) for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
} }
fmt.Fprintf(&runnerTagsStr, "]")
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
@@ -126,50 +131,60 @@ runnerIter:
func (r *runnerManager) processRegistration(reg RunnerRegistration) { func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret {
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: reg.Id,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data // Get runner with give id from database
runnerId, err := uuid.Parse(reg.Id)
} if err != nil {
}() log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
} }
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if reg.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: runnerId,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
} }
func runRunnerManager(r runnerManager) { func runRunnerManager(r runnerManager) {
@@ -184,12 +199,13 @@ func runRunnerManager(r runnerManager) {
} }
} }
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
scheduler := runnerManager{ scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest), getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration), registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0), connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners, configuredRunners: configuredRunners,
db: db,
} }
go runRunnerManager(scheduler) go runRunnerManager(scheduler)