Compare commits

29 Commits

Author SHA1 Message Date
restitux a2acb99689 Fix not using correct environment variables 2023-02-25 02:57:13 -07:00
restitux 191b73fe41 Fix index out of range issue with empty tag list printing 2023-02-25 02:51:11 -07:00
restitux 3ca1481632 Make compose network external 2023-02-25 02:32:04 -07:00
restitux c0e33fa52a Update runner manager for new database driven runner config 2023-02-25 02:31:54 -07:00
restitux 63529b7174 Fix runner creation api 2023-02-25 01:36:55 -07:00
restitux 7e7c49c2e7 Exposed build output to api 2023-02-24 23:59:02 -07:00
restitux 712a7b1429 Fix graphql api not returning stdout/stderr as strings 2023-02-24 23:20:40 -07:00
restitux 3ae27bffc5 Cleanup logging 2023-02-24 23:00:31 -07:00
restitux 5373a37bee Change RUNID env var name 2023-02-24 23:00:26 -07:00
restitux 6fee5aa268 Record docker build output in database 2023-02-24 23:00:10 -07:00
restitux b475631df6 Remove outdated todo 2023-02-24 22:51:19 -07:00
restitux fbf918d627 Fix typo 2023-02-24 22:46:15 -07:00
restitux 4069e1b0e1 Update executor to run built user provided container 2023-02-24 22:37:17 -07:00
restitux 708fbca91a Tag built container with build and run name 2023-02-24 22:32:24 -07:00
restitux 77a5514578 Fix secret name validation not checking for beginning of string 2023-02-24 22:29:21 -07:00
restitux 77a8d0840a Fix inserting new repeated refs into db failing #20 2023-02-24 22:28:50 -07:00
restitux 62b4e8f17e Add runner query api 2023-02-24 22:28:07 -07:00
restitux 620c20f717 Change pipeline executor to build container in repo at .cursorius/Dockerfile 2023-02-24 22:27:07 -07:00
restitux 0979a2379e Change poll logic to query refs after sleep for easire debugging 2023-02-24 22:26:41 -07:00
restitux 85ebd856eb Add graphiql testing workflow (#15) 2023-02-18 20:45:57 -07:00
restitux 6b103d074e Add runprev launcher command 2023-02-18 19:27:17 -07:00
restitux 664fe8fd09 Remove replaced Job configuration from config file 2023-02-14 20:43:04 -07:00
restitux edafd5108a Add validation to secret names 2023-02-14 20:39:03 -07:00
restitux c0f6186eac Fix run progress field name (graphql) 2023-02-14 20:19:21 -07:00
restitux bfd05b6a8a Add secrets support (#14) 2023-02-14 20:18:41 -07:00
restitux 6d2936393b Persist repo ref hashes in db 2023-02-08 18:44:54 -07:00
restitux e4043ae3be Correct case of member name 2023-02-08 18:06:54 -07:00
restitux 7a665aa348 Add support for access repos with credentials 2023-02-07 21:34:32 -07:00
restitux d870335d25 Disable vcs info embedding because of git perms 2023-02-07 19:27:19 -07:00
19 changed files with 1282 additions and 154 deletions
+402 -7
View File
@@ -14,6 +14,137 @@ import (
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
func createSchema(db database.Database) (graphql.Schema, error) { func createSchema(db database.Database) (graphql.Schema, error) {
runnerType := graphql.NewObject(graphql.ObjectConfig{
Name: "Runner",
Description: "A runner available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Name, nil
}
return nil, nil
},
},
"token": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The token.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Token, nil
}
return nil, nil
},
},
},
})
secretType := graphql.NewObject(graphql.ObjectConfig{
Name: "Secret",
Description: "A secret available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Name, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Secret, nil
}
return nil, nil
},
},
},
})
cloneCredentialType := graphql.NewObject(graphql.ObjectConfig{
Name: "CloneCredential",
Description: "A credential for authenticating with the pipeline source host.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Name, nil
}
return nil, nil
},
},
"type": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The credential type.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Type, nil
}
return nil, nil
},
},
"username": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The username to user with the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Username, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret for the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Secret, nil
}
return nil, nil
},
},
},
})
webhookType := graphql.NewObject(graphql.ObjectConfig{ webhookType := graphql.NewObject(graphql.ObjectConfig{
Name: "Webhook", Name: "Webhook",
Description: "A webhook for triggering pipelines", Description: "A webhook for triggering pipelines",
@@ -65,7 +196,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"progress": &graphql.Field{ "inProgress": &graphql.Field{
Type: graphql.Boolean, Type: graphql.Boolean,
Description: "The progress status of the run.", Description: "The progress status of the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
@@ -86,12 +217,22 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"buildOutput": &graphql.Field{
Type: graphql.String,
Description: "Logs of the top level container build for the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.BuildOutput), nil
}
return nil, nil
},
},
"stdout": &graphql.Field{ "stdout": &graphql.Field{
Type: graphql.String, Type: graphql.String,
Description: "The stdout used to validate the run.", Description: "The stdout used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stdout, nil return string(run.Stdout), nil
} }
return nil, nil return nil, nil
}, },
@@ -101,7 +242,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Description: "The stderr used to validate the run.", Description: "The stderr used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stderr, nil return string(run.Stderr), nil
} }
return nil, nil return nil, nil
}, },
@@ -153,6 +294,28 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"cloneCredential": &graphql.Field{
Type: cloneCredentialType,
Description: "The configured credential for cloning the pipeline source.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
if pipeline.CloneCredential != nil {
return db.GetCloneCredentialById(*pipeline.CloneCredential)
}
}
return nil, nil
},
},
"secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(secretType))),
Description: "The list of secrets for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetSecretsForPipeline(pipeline.Id)
}
return []database.Secret{}, nil
},
},
"webhooks": &graphql.Field{ "webhooks": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(webhookType))), Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(webhookType))),
Description: "The list of webhooks for the pipeline.", Description: "The list of webhooks for the pipeline.",
@@ -202,6 +365,43 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return db.GetPipelines() return db.GetPipelines()
}, },
}, },
"CloneCredential": &graphql.Field{
Type: cloneCredentialType,
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the requested credential.",
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
id, err := uuid.Parse(p.Args["id"].(string))
if err != nil {
return nil, err
}
return db.GetCloneCredentialById(id)
},
},
"CloneCredentials": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(cloneCredentialType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetCredentials()
},
},
"Secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(secretType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetSecrets()
},
},
"Runners": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(runnerType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetRunners()
},
},
}, },
}) })
@@ -221,6 +421,9 @@ func createSchema(db database.Database) (graphql.Schema, error) {
"pollInterval": &graphql.ArgumentConfig{ "pollInterval": &graphql.ArgumentConfig{
Type: graphql.Int, Type: graphql.Int,
}, },
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
}, },
Resolve: func(params graphql.ResolveParams) (interface{}, error) { Resolve: func(params graphql.ResolveParams) (interface{}, error) {
var interval int var interval int
@@ -229,18 +432,28 @@ func createSchema(db database.Database) (graphql.Schema, error) {
} else { } else {
interval = 0 interval = 0
} }
var credential *uuid.UUID
if credentialVal, ok := params.Args["cloneCredentialId"]; ok {
id, err := uuid.Parse(credentialVal.(string))
if err != nil {
return nil, err
}
credential = &id
} else {
credential = nil
}
pipeline, err := db.CreatePipeline( pipeline, err := db.CreatePipeline(
params.Args["name"].(string), params.Args["name"].(string),
params.Args["url"].(string), params.Args["url"].(string),
interval, interval,
credential,
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Infof("Created pipeline with id: %v, name: %v, url: %v, and poll interval: %v",
pipeline.Id, pipeline.Name, pipeline.Url, pipeline.PollInterval)
return pipeline, nil return pipeline, nil
}, },
}, },
@@ -251,7 +464,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
"type": &graphql.ArgumentConfig{ "type": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
}, },
"pipeline_id": &graphql.ArgumentConfig{ "pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
}, },
}, },
@@ -272,6 +485,188 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return webhook, nil return webhook, nil
}, },
}, },
"createCloneCredential": &graphql.Field{
Type: cloneCredentialType,
Description: "Create a new CloneCredential",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"type": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"username": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"secret": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
credential, err := db.CreateCredential(
params.Args["name"].(string),
database.CloneCredentialType(params.Args["type"].(string)),
params.Args["username"].(string),
params.Args["secret"].(string),
)
if err != nil {
return nil, err
}
return credential, nil
},
},
"createSecret": &graphql.Field{
Type: secretType,
Description: "Create a new secret",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"secret": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secret, err := db.CreateSecret(
params.Args["name"].(string),
params.Args["secret"].(string),
)
if err != nil {
return nil, err
}
return secret, nil
},
},
"createRunner": &graphql.Field{
Type: runnerType,
Description: "Create a new runner",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
runner, err := db.CreateRunner(
params.Args["name"].(string),
)
if err != nil {
return nil, err
}
return runner, nil
},
},
"setPipelineCloneCredential": &graphql.Field{
Type: pipelineType,
Description: "Set the CloneCredential used by a pipeline to clone the source repo",
Args: graphql.FieldConfigArgument{
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
if cloneCredentialIdVal, ok := params.Args["cloneCredentialId"]; ok {
cloneCredentialId, err := uuid.Parse(cloneCredentialIdVal.(string))
if err != nil {
return nil, err
}
pipeline, err := db.SetPipelineCloneCredential(pipelineId, &cloneCredentialId)
if err != nil {
return nil, err
}
return pipeline, nil
} else {
pipeline, err := db.SetPipelineCloneCredential(pipelineId, nil)
if err != nil {
return nil, err
}
return pipeline, nil
}
},
},
"addSecretToPipeline": &graphql.Field{
Type: pipelineType,
Description: "Allow a secret to be accessed by a pipeline.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.AssignSecretToPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
"removeSecretFromPipeline": &graphql.Field{
Type: pipelineType,
Description: "Remove a pipeline's access to a secret.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.RemoveSecretFromPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
}, },
}) })
-20
View File
@@ -6,24 +6,6 @@ import (
"git.ohea.xyz/golang/config" "git.ohea.xyz/golang/config"
) )
type WebhookSender string
const (
Gitea WebhookSender = "gitea"
)
type Webhook struct {
Sender WebhookSender
Secret string
}
type Job struct {
URL string
Webhook *Webhook
Cron *string
PollInterval uint64
}
type Runner struct { type Runner struct {
Secret string Secret string
} }
@@ -60,7 +42,6 @@ type Config struct {
Port int Port int
DBConfig DBConfig DBConfig DBConfig
PipelineConf PipelineConf PipelineConf PipelineConf
Jobs map[string]Job
Runners map[string]Runner Runners map[string]Runner
} }
@@ -88,7 +69,6 @@ func GetConfig() (config.Config[Config], error) {
Source: "/opt/cursorius/working", Source: "/opt/cursorius/working",
}, },
}, },
Jobs: make(map[string]Job),
Runners: make(map[string]Runner), Runners: make(map[string]Runner),
}, },
} }
+60 -13
View File
@@ -40,10 +40,15 @@ func LaunchDB(conf config.DBConfig) (Database, error) {
db := Database{} db := Database{}
var err error var err error
log.Infof("Connecting to database with URL \"%v\"", dbURLNoPasswd)
db.Conn, err = pgxpool.New(context.Background(), dbURL)
if err != nil {
return db, fmt.Errorf("could not create database pool: %w", err)
}
// sleep until we can sucessfully acquire a connection
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
// TODO: retry logic is broken with pgxpool _, err = db.Conn.Acquire(context.Background())
log.Infof("Connecting to database with URL \"%v\" (attempt %v)", dbURLNoPasswd, i)
db.Conn, err = pgxpool.New(context.Background(), dbURL)
if err == nil { if err == nil {
break break
} }
@@ -90,11 +95,43 @@ CREATE TABLE version (
); );
CREATE TABLE clone_credentials (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
username TEXT NOT NULL,
secret TEXT NOT NULL
);
CREATE TABLE pipelines ( CREATE TABLE pipelines (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
url TEXT NOT NULL, url TEXT NOT NULL,
poll_interval INTEGER poll_interval INTEGER,
clone_credential UUID DEFAULT NULL,
CONSTRAINT fk_clone_credential
FOREIGN KEY(clone_credential)
REFERENCES clone_credentials(id)
);
CREATE TABLE secrets (
id UUID PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
secret TEXT NOT NULL
);
CREATE TABLE pipeline_secret_mappings (
pipeline UUID NOT NULL,
secret UUID NOT NULL,
CONSTRAINT fk_pipeline
FOREIGN KEY(pipeline)
REFERENCES pipelines(id),
CONSTRAINT fk_secret
FOREIGN KEY(secret)
REFERENCES secrets(id)
); );
CREATE TABLE webhooks ( CREATE TABLE webhooks (
@@ -108,16 +145,11 @@ CREATE TABLE webhooks (
REFERENCES pipelines(id) REFERENCES pipelines(id)
); );
CREATE TABLE runners (
id UUID PRIMARY KEY,
name TEXT,
secret TEXT
);
CREATE TABLE runs ( CREATE TABLE runs (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
pipeline UUID, pipeline UUID,
in_progress BOOLEAN DEFAULT NULL, in_progress BOOLEAN DEFAULT NULL,
build_output TEXT DEFAULT NULL,
result BIGINT DEFAULT NULL, result BIGINT DEFAULT NULL,
stdout TEXT DEFAULT NULL, stdout TEXT DEFAULT NULL,
stderr TEXT DEFAULT NULL, stderr TEXT DEFAULT NULL,
@@ -142,6 +174,21 @@ CREATE TABLE command_executions (
REFERENCES runs(id) REFERENCES runs(id)
); );
CREATE TABLE runners (
id UUID PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
token TEXT NOT NULL
);
CREATE TABLE pipeline_refs (
name TEXT PRIMARY KEY NOT NULL,
pipeline_id UUID NOT NULL,
hash TEXT NOT NULL,
CONSTRAINT fk_pipeline_id
FOREIGN KEY(pipeline_id)
REFERENCES pipelines(id)
);
` `
_, err := conn.Exec(context.Background(), createTablesQuery) _, err := conn.Exec(context.Background(), createTablesQuery)
+418 -12
View File
@@ -3,13 +3,14 @@ package database
import ( import (
"context" "context"
"fmt" "fmt"
"regexp"
"github.com/google/uuid" "github.com/google/uuid"
) )
func (db *Database) GetPipelines() ([]Pipeline, error) { func (db *Database) GetPipelines() ([]Pipeline, error) {
query := ` query := `
SELECT id, name, url, poll_interval SELECT id, name, url, poll_interval, clone_credential
FROM pipelines;` FROM pipelines;`
pipelines := make([]Pipeline, 0) pipelines := make([]Pipeline, 0)
@@ -23,7 +24,7 @@ FROM pipelines;`
for rows.Next() { for rows.Next() {
var pipeline Pipeline var pipeline Pipeline
var idStr string var idStr string
if err := rows.Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval); err != nil { if err := rows.Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential); err != nil {
return pipelines, err return pipelines, err
} }
@@ -55,29 +56,71 @@ WHERE id=$1;`
return pipeline, nil return pipeline, nil
} }
func (db *Database) CreatePipeline(name string, url string, pollInterval int) (Pipeline, error) { func (db *Database) CreatePipeline(name string, url string, pollInterval int, credential *uuid.UUID) (Pipeline, error) {
query := ` query := `
INSERT INTO pipelines (id, name, url, poll_interval) INSERT INTO pipelines (id, name, url, poll_interval, clone_credential)
VALUES (uuid_generate_v4(), $1, $2, $3) VALUES (uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, url, poll_interval;` RETURNING id, name, url, poll_interval;`
pipeline := Pipeline{} pipeline := Pipeline{}
var idStr string var idStr string
err := db.Conn.QueryRow(context.Background(), query, name, url, pollInterval).Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval) err := db.Conn.QueryRow(context.Background(), query, name, url, pollInterval, credential).Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval)
if err != nil { if err != nil {
return pipeline, fmt.Errorf("Could not create pipeline: %w", err) return pipeline, fmt.Errorf("Could not create pipeline: %w", err)
} }
id, err := uuid.Parse(idStr) pipeline.Id, err = uuid.Parse(idStr)
if err != nil { if err != nil {
return pipeline, fmt.Errorf("Could not parse UUID generated by DB: %w", err) return pipeline, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
} }
pipeline.Id = id
return pipeline, nil return pipeline, nil
} }
func (db *Database) SetPipelineCloneCredential(pipelineId uuid.UUID, credentialId *uuid.UUID) (Pipeline, error) {
query := `
UPDATE pipelines
SET clone_credential=$1
WHERE id=$2
RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{
Id: pipelineId,
}
err := db.Conn.QueryRow(context.Background(),
query, credentialId, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) RemovePipelineCredential(pipelineId uuid.UUID) (Pipeline, error) {
query := `
UPDATE pipelines
SET credential=null
WHERE id=$1
RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{
Id: pipelineId,
}
err := db.Conn.QueryRow(context.Background(),
query, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) GetWebhooksForPipeline(id uuid.UUID) ([]Webhook, error) { func (db *Database) GetWebhooksForPipeline(id uuid.UUID) ([]Webhook, error) {
query := ` query := `
SELECT id, server_type, secret SELECT id, server_type, secret
@@ -151,6 +194,86 @@ RETURNING id, server_type, secret, pipeline;`
return webhook, nil return webhook, nil
} }
func (db *Database) CreateCredential(name string, credentialtype CloneCredentialType, username string, secret string) (CloneCredential, error) {
query := `
INSERT INTO clone_credentials (id, name, type, username, secret)
VALUES(uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, type, username, secret;`
credential := CloneCredential{}
var idStr string
err := db.Conn.QueryRow(
context.Background(),
query,
name,
string(credentialtype),
username,
secret,
).Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret)
if err != nil {
return credential, err
}
id, err := uuid.Parse(idStr)
if err != nil {
return credential, err
}
credential.Id = id
return credential, nil
}
func (db *Database) GetCloneCredentialById(id uuid.UUID) (CloneCredential, error) {
query := `
SELECT name, type, username, secret
FROM clone_credentials
WHERE id=$1;`
log.Debugf("requested credential with id %v", id)
credential := CloneCredential{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&credential.Name, &credential.Type, &credential.Username, &credential.Secret)
if err != nil {
return credential, fmt.Errorf("Could not query database for credential with id %v: %w", id.String(), err)
}
return credential, nil
}
func (db *Database) GetCredentials() ([]CloneCredential, error) {
query := `
SELECT id, name, type, username, secret
FROM clone_credentials;`
credentials := make([]CloneCredential, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return credentials, fmt.Errorf("Could not query database for credentials: %w", err)
}
defer rows.Close()
for rows.Next() {
var credential CloneCredential
var idStr string
if err := rows.Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret); err != nil {
return credentials, err
}
credential.Id, err = uuid.Parse(idStr)
if err != nil {
return credentials, err
}
credentials = append(credentials, credential)
}
return credentials, nil
}
func (db *Database) CreateRun(pipelineId uuid.UUID) (Run, error) { func (db *Database) CreateRun(pipelineId uuid.UUID) (Run, error) {
query := ` query := `
INSERT INTO runs (id, pipeline, in_progress) INSERT INTO runs (id, pipeline, in_progress)
@@ -172,22 +295,35 @@ RETURNING id, pipeline, in_progress;`
return run, nil return run, nil
} }
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
query := `
UPDATE runs
SET build_output=$1
WHERE id=$2;`
_, err := db.Conn.Exec(context.Background(),
query, buildResult, runId)
return err
}
func (db *Database) UpdateRunResult(r Run) error { func (db *Database) UpdateRunResult(r Run) error {
query := ` query := `
UPDATE runs UPDATE runs
SET in_progress=$1, result=$2, stdout=$3, stderr=$4 SET in_progress=$1, result=$2, stdout=$3, stderr=$4
WHERE id=$3;` WHERE id=$5;`
// TODO: does r.Result need a pointer derefrence? // TODO: does r.Result need a pointer derefrence?
_, err := db.Conn.Exec(context.Background(), _, err := db.Conn.Exec(context.Background(),
query, r.InProgress, r.Result, r.Stdout, r.Stderr) query, r.InProgress, r.Result, r.Stdout, r.Stderr, r.Id)
return err return err
} }
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) { func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
query := ` query := `
SELECT id, in_progress, result, stdout, stderr SELECT id, in_progress, result, build_output, stdout, stderr
FROM runs FROM runs
WHERE pipeline=$1;` WHERE pipeline=$1;`
@@ -206,6 +342,7 @@ WHERE pipeline=$1;`
&idStr, &idStr,
&run.InProgress, &run.InProgress,
&run.Result, &run.Result,
&run.BuildOutput,
&run.Stdout, &run.Stdout,
&run.Stderr, &run.Stderr,
); err != nil { ); err != nil {
@@ -221,3 +358,272 @@ WHERE pipeline=$1;`
return runs, nil return runs, nil
} }
func (db *Database) GetPipelineRefs(pipelineId uuid.UUID) (map[string]string, error) {
query := `
SELECT name, hash
FROM pipeline_refs
WHERE pipeline_id=$1;`
refsMap := make(map[string]string)
refs, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return refsMap, fmt.Errorf("Could not get pipeline refs for pipeline with id \"%v\": %w", pipelineId, err)
}
defer refs.Close()
for refs.Next() {
var name string
var hash string
if err := refs.Scan(
&name,
&hash,
); err != nil {
return refsMap, err
}
refsMap[name] = hash
}
return refsMap, nil
}
func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error {
query := `
INSERT INTO pipeline_refs(name, pipeline_id, hash)
VALUES($1, $2, $3)
ON CONFLICT (name)
DO
UPDATE SET hash=$3;`
for name, hash := range refsMap {
_, err := db.Conn.Exec(context.Background(), query, name, pipelineId, hash)
return err
}
return nil
}
func (db *Database) GetSecrets() ([]Secret, error) {
query := `
SELECT id, name, secret
FROM secrets;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return secrets, fmt.Errorf("Could not query database for secrets: %w", err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(&idStr, &secret.Name, &secret.Secret); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetSecretById(id uuid.UUID) (Secret, error) {
query := `
SELECT id, name, secret
FROM secrets
WHERE id=$1;`
secret := Secret{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&secret.Name, &secret.Secret)
if err != nil {
return secret, fmt.Errorf("Could not query database for secret with id %v: %w", id.String(), err)
}
return secret, nil
}
func (db *Database) CreateSecret(name string, secret string) (Secret, error) {
s := Secret{}
// validate that the secret is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("secret name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO secrets (id, name, secret)
VALUES (uuid_generate_v4(), $1, $2)
RETURNING id, name, secret;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name, secret).Scan(&idStr, &s.Name, &s.Secret)
if err != nil {
return s, fmt.Errorf("Could not create secret: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
func (db *Database) AssignSecretToPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
query := `
INSERT INTO pipeline_secret_mappings (pipeline, secret)
VALUES ($1, $2);`
_, err := db.Conn.Exec(context.Background(), query, pipelineId, secretId)
return err
}
func (db *Database) RemoveSecretFromPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
// TODO: implement this
return fmt.Errorf("Not implemented")
}
func (db *Database) GetSecretsForPipeline(pipelineId uuid.UUID) ([]Secret, error) {
query := `
SELECT
secrets.id, secrets.name, secrets.secret
FROM
secrets INNER JOIN pipeline_secret_mappings
ON secrets.id = pipeline_secret_mappings.secret
WHERE
pipeline_secret_mappings.pipeline=$1
;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return secrets, fmt.Errorf("Could not get secrets for pipeline with id \"%v\": %w", pipelineId, err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(
&idStr,
&secret.Name,
&secret.Secret,
); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetRunners() ([]Runner, error) {
query := `
SELECT id, name, token
FROM runners;`
runners := make([]Runner, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return runners, fmt.Errorf("Could not query database for runners: %w", err)
}
defer rows.Close()
for rows.Next() {
var runner Runner
var idStr string
if err := rows.Scan(&idStr, &runner.Name, &runner.Token); err != nil {
return runners, err
}
runner.Id, err = uuid.Parse(idStr)
if err != nil {
return runners, err
}
runners = append(runners, runner)
}
return runners, nil
}
func (db *Database) GetRunnerById(id uuid.UUID) (Runner, error) {
query := `
SELECT name, token
FROM runners
WHERE id=$1;`
runner := Runner{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
}
return runner, nil
}
func (db *Database) CreateRunner(name string) (Runner, error) {
s := Runner{}
// validate that the runner name is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("runner name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO runners (id, name, token)
VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name).Scan(&idStr, &s.Name, &s.Token)
if err != nil {
return s, fmt.Errorf("Could not create runner: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
+44 -16
View File
@@ -6,11 +6,38 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
type CloneCredentialType string
const (
USER_PASS CloneCredentialType = "USER_PASS"
SSH_KEY CloneCredentialType = "SSH_KEY"
)
type CloneCredential struct {
Id uuid.UUID
Name string
Type CloneCredentialType
Username string
Secret string
}
type Pipeline struct { type Pipeline struct {
Id uuid.UUID Id uuid.UUID
Name string Name string
Url string Url string
PollInterval int PollInterval int
CloneCredential *uuid.UUID
}
type Secret struct {
Id uuid.UUID
Name string
Secret string
}
type PipelineSecretMapping struct {
Pipeline uuid.UUID
Secret uuid.UUID
} }
type WebhookSender string type WebhookSender string
@@ -26,19 +53,14 @@ type Webhook struct {
Pipeline uuid.UUID Pipeline uuid.UUID
} }
type Runner struct {
Id uuid.UUID
Name string
Secret string
}
type Run struct { type Run struct {
Id uuid.UUID Id uuid.UUID
Pipeline uuid.UUID Pipeline uuid.UUID
InProgress bool InProgress bool
Result *int64 Result *int64
Stdout []byte BuildOutput []byte
Stderr []byte Stdout []byte
Stderr []byte
} }
type CommandExecution struct { type CommandExecution struct {
@@ -51,3 +73,9 @@ type CommandExecution struct {
StartTime time.Time StartTime time.Time
EndTime time.Time EndTime time.Time
} }
type Runner struct {
Id uuid.UUID
Name string
Token string
}
+3
View File
@@ -0,0 +1,3 @@
FROM nginx:latest
COPY graphiql.html /usr/share/nginx/html/index.html
COPY graphiql.conf /etc/nginx/conf.d/default.conf
+1 -1
View File
@@ -4,5 +4,5 @@
set -e set -e
cd /build/server cd /build/server
go build . go build -buildvcs=false .
./server ./server
+8
View File
@@ -22,6 +22,14 @@ services:
- POSTGRES_DB=cursorius - POSTGRES_DB=cursorius
networks: networks:
- cursorius - cursorius
graphiql:
build:
dockerfile: Dockerfile.graphiql
ports:
- "0.0.0.0:45421:80"
networks:
- cursorius
networks: networks:
cursorius: cursorius:
external: true
+52
View File
@@ -0,0 +1,52 @@
upstream backend {
server cursorius-server:45420;
}
server {
listen 80;
listen [::]:80;
server_name localhost;
#access_log /var/log/nginx/host.access.log main;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
}
location /graphql {
proxy_pass http://backend/graphql;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
+70
View File
@@ -0,0 +1,70 @@
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<title>GraphiQL</title>
<style>
body {
height: 100%;
margin: 0;
width: 100%;
overflow: hidden;
}
#graphiql {
height: 100vh;
}
</style>
<!--
This GraphiQL example depends on Promise and fetch, which are available in
modern browsers, but can be "polyfilled" for older browsers.
GraphiQL itself depends on React DOM.
If you do not want to rely on a CDN, you can host these files locally or
include them directly in your favored resource bundler.
-->
<script
src="https://unpkg.com/react@17/umd/react.development.js"
integrity="sha512-Vf2xGDzpqUOEIKO+X2rgTLWPY+65++WPwCHkX2nFMu9IcstumPsf/uKKRd5prX3wOu8Q0GBylRpsDB26R6ExOg=="
crossorigin="anonymous"
></script>
<script
src="https://unpkg.com/react-dom@17/umd/react-dom.development.js"
integrity="sha512-Wr9OKCTtq1anK0hq5bY3X/AvDI5EflDSAh0mE9gma+4hl+kXdTJPKZ3TwLMBcrgUeoY0s3dq9JjhCQc7vddtFg=="
crossorigin="anonymous"
></script>
<!--
These two files can be found in the npm module, however you may wish to
copy them directly into your environment, or perhaps include them in your
favored resource bundler.
-->
<link rel="stylesheet" href="https://unpkg.com/graphiql/graphiql.min.css" />
</head>
<body>
<div id="graphiql">Loading...</div>
<script
src="https://unpkg.com/graphiql/graphiql.min.js"
type="application/javascript"
></script>
<script>
ReactDOM.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
//url: 'https://swapi-graphql.netlify.app/.netlify/functions/index',
url: 'http://127.0.0.1:45421/graphql',
}),
defaultEditorToolsVisibility: true,
}),
document.getElementById('graphiql'),
);
</script>
</body>
</html>
+16
View File
@@ -58,6 +58,20 @@ function show_ps {
docker compose $compose_file_flags ps docker compose $compose_file_flags ps
} }
function runprev {
current_containers="$(cat _working/current_containers)"
if [ "$current_containers" == "default" ]
then
compose_files="$default_compose_files"
elif [ "$current_containers" == "gitea" ]
then
compose_files="$default_compose_files $gitea_compose"
fi
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags up --build -d
docker compose $compose_file_flags logs -f
}
case $1 in case $1 in
"default") "default")
@@ -83,6 +97,8 @@ case $1 in
show_logs;; show_logs;;
"ps") "ps")
show_ps;; show_ps;;
"runprev")
runprev;;
*) echo "ERROR: Unknown param \"$1\"" 2>&1 *) echo "ERROR: Unknown param \"$1\"" 2>&1
exit 255;; exit 255;;
esac esac
+1
View File
@@ -14,6 +14,7 @@ require (
github.com/graphql-go/graphql v0.8.0 github.com/graphql-go/graphql v0.8.0
github.com/graphql-go/handler v0.2.3 github.com/graphql-go/handler v0.2.3
github.com/jackc/pgx/v5 v5.2.0 github.com/jackc/pgx/v5 v5.2.0
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
golang.org/x/net v0.2.0 golang.org/x/net v0.2.0
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
+2
View File
@@ -858,6 +858,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d h1:q7n+5taxmM+9T2Q7Ydo7YN90FkoDuR5bbzByZwkQqPo=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d/go.mod h1:GN1Mg/uXQ6qwXA0HypnUO3xlcQJS9/y68EsHNeuuRa4=
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4= github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+1 -1
View File
@@ -40,7 +40,7 @@ func main() {
return return
} }
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil { if err != nil {
log.Errorf("Could not start runner: %v", err) log.Errorf("Could not start runner: %v", err)
return return
+6 -4
View File
@@ -64,11 +64,13 @@ func (s *ApiServer) GetRunner(
} }
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0]) if len(req.Msg.Tags) > 0 {
for _, tag := range req.Msg.Tags[1:] { fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0])
fmt.Fprintf(&runnerTagsStr, ", %v", tag) for _, tag := range req.Msg.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
} }
fmt.Fprintf(&runnerTagsStr, "]")
response := <-respChan response := <-respChan
if response.Err != nil { if response.Err != nil {
+115 -25
View File
@@ -4,19 +4,26 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io" "io/ioutil"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"strings"
"github.com/jhoonb/archivex"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/stdcopy"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
"github.com/op/go-logging" "github.com/op/go-logging"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
) )
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
@@ -29,6 +36,7 @@ type PipelineExecution struct {
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) { func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String()) jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
cloneFolder := filepath.Join(jobFolder, "repo")
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url) log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url)
@@ -40,19 +48,51 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
return return
} }
err = os.MkdirAll(jobFolder, 0755) err = os.MkdirAll(cloneFolder, 0755)
if err != nil { if err != nil {
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err) log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err)
return return
} }
log.Infof("Cloning source from URL %v", pe.Pipeline.Url) log.Infof("Cloning source from URL %v", pe.Pipeline.Url)
// TODO: should I use go-git here instead of shelling out to raw git?
cloneCmd := exec.Command("git", "clone", pe.Pipeline.Url, jobFolder) var auth transport.AuthMethod
output, err := cloneCmd.CombinedOutput()
if pe.Pipeline.CloneCredential != nil {
credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential)
if err != nil {
log.Errorf("could not get credenital from db: %v", err)
return
}
switch credential.Type {
case "USER_PASS":
log.Debugf("job %v configured to use credential %v", pe.Pipeline.Name, credential.Name)
auth = transport.AuthMethod(&http.BasicAuth{
Username: credential.Username,
Password: credential.Secret,
})
case "SSH_KEY":
publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "")
if err != nil {
log.Errorf("could not parse credential %v", credential.Name)
return
}
auth = transport.AuthMethod(publicKeys)
default:
log.Errorf("unsupported credential type %v", credential.Type)
return
}
} else {
auth = nil
}
_, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
URL: pe.Pipeline.Url,
Auth: auth,
})
if err != nil { if err != nil {
log.Debugf("%s", output) log.Errorf("could not clone repo: %v", err)
log.Errorf("could not clone source: %w", err)
return return
} }
@@ -65,28 +105,61 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
ctx := context.Background() ctx := context.Background()
imageName := "git.ohea.xyz/cursorius/pipeline-api/cursorius-pipeline:v2" log.Info("Building container")
log.Infof("Pulling image %v", imageName) tarFile := filepath.Join(jobFolder, "archive.tar")
pullOutput, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) tar := new(archivex.TarFile)
err = tar.Create(tarFile)
if err != nil { if err != nil {
log.Errorf("could not pull image %v: %w", imageName, err) log.Errorf("could not create tarfile: %w", err)
return return
} }
buf, err := io.ReadAll(pullOutput) err = tar.AddAll(cloneFolder, false)
if err != nil { if err != nil {
log.Errorf("could not read from io.ReadCloser:, %w", err) log.Errorf("could not add repo to tarfile: %w", err)
return return
} }
log.Infof("%s", buf)
err = pullOutput.Close() err = tar.Close()
if err != nil { if err != nil {
log.Errorf("could not close io.ReadCloser: %w", err) log.Errorf("could not close tarfile: %w", err)
return return
} }
log.Info("Image pulled sucessfully")
dockerBuildContext, err := os.Open(tarFile)
defer dockerBuildContext.Close()
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: ".cursorius/Dockerfile",
})
if err != nil {
log.Errorf("could not build container: %w", err)
return
}
response, err := ioutil.ReadAll(buildResponse.Body)
if err != nil {
log.Errorf("could no read build response: %w", err)
return
}
err = db.UpdateRunBuildOutput(pe.Run.Id, string(response))
if err != nil {
log.Errorf("could not update build output for run: %w", err)
return
}
err = buildResponse.Body.Close()
if err != nil {
log.Errorf("Could not close build response body: %w", err)
return
}
log.Info("Image built sucessfully")
hostConfig := container.HostConfig{} hostConfig := container.HostConfig{}
@@ -118,15 +191,32 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
) )
} }
env := make([]string, 0)
// set cursorius environment variables
env = append(env, []string{
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...)
// load secrets into environment
secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id)
if err != nil {
log.Errorf("Could not get secrets for pipeline", err)
return
}
for _, secret := range secrets {
// the env name is validated to be just uppercase letters, numbers, and underscores on ingestion
env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret))
}
resp, err := cli.ContainerCreate(ctx, resp, err := cli.ContainerCreate(ctx,
&container.Config{ &container.Config{
Image: imageName, Image: imageName,
Tty: false, Tty: false,
Env: []string{ Env: env,
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
}, },
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig) // TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig, &hostConfig,
+18 -7
View File
@@ -27,11 +27,16 @@ type tag struct {
} }
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
prevCommits := make(map[string]string)
for { for {
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second) time.Sleep(time.Duration(pipeline.PollInterval) * time.Second)
log.Infof("Polling repo %v", pipeline.Name) log.Infof("Polling repo %v", pipeline.Name)
prevRefs, err := db.GetPipelineRefs(pipeline.Id)
if err != nil {
log.Errorf("Could not get pipeline refs from db: %v", err)
return
}
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: pipeline.Url, URL: pipeline.Url,
}) })
@@ -50,18 +55,18 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
branches.ForEach(func(branch *plumbing.Reference) error { branches.ForEach(func(branch *plumbing.Reference) error {
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id) log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[branch.Name().String()] prevRef, ok := prevRefs[branch.Name().String()]
if ok { if ok {
if branch.Hash().String() != prevRef { if branch.Hash().String() != prevRef {
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String() prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String()) refsToRunFor = append(refsToRunFor, branch.Name().String())
} else { } else {
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef) log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
} }
} else { } else {
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String() prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String()) refsToRunFor = append(refsToRunFor, branch.Name().String())
} }
return nil return nil
@@ -74,23 +79,29 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
} }
tags.ForEach(func(tag *plumbing.Reference) error { tags.ForEach(func(tag *plumbing.Reference) error {
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id) log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[tag.Name().String()] prevRef, ok := prevRefs[tag.Name().String()]
if ok { if ok {
if tag.Hash().String() != prevRef { if tag.Hash().String() != prevRef {
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String() prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String()) refsToRunFor = append(refsToRunFor, tag.Name().String())
} else { } else {
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef) log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
} }
} else { } else {
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String() prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String()) refsToRunFor = append(refsToRunFor, tag.Name().String())
} }
return nil return nil
}) })
err = db.UpdatePipelineRefs(pipeline.Id, prevRefs)
if err != nil {
log.Errorf("Could not update pipeline refs: %v", err)
return
}
for _, ref := range refsToRunFor { for _, ref := range refsToRunFor {
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id) log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
+3 -2
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@@ -17,14 +18,14 @@ type RunnerData struct {
} }
type Runner struct { type Runner struct {
id string id uuid.UUID
tags []string tags []string
conn *websocket.Conn conn *websocket.Conn
receiveChan chan []byte receiveChan chan []byte
running bool running bool
} }
func (r *Runner) Id() string { func (r *Runner) Id() uuid.UUID {
return r.id return r.id
} }
+62 -46
View File
@@ -6,11 +6,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
) )
@@ -30,6 +32,7 @@ type runnerManager struct {
connectedRunners []Runner connectedRunners []Runner
numConnectedRunners uint64 numConnectedRunners uint64
configuredRunners map[string]config.Runner configuredRunners map[string]config.Runner
db database.Database
} }
type GetRunnerRequest struct { type GetRunnerRequest struct {
@@ -49,11 +52,13 @@ type runnerJob struct {
func (r *runnerManager) processRequest(req GetRunnerRequest) { func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0]) if len(req.Tags) > 0 {
for _, tag := range req.Tags[1:] { fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
fmt.Fprintf(&runnerTagsStr, ", %v", tag) for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
} }
fmt.Fprintf(&runnerTagsStr, "]")
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr.String()) log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
@@ -126,50 +131,60 @@ runnerIter:
func (r *runnerManager) processRegistration(reg RunnerRegistration) { func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret {
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: reg.Id,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data // Get runner with give id from database
runnerId, err := uuid.Parse(reg.Id)
} if err != nil {
}() log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", reg.Id, err)
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid") reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
} }
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if reg.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: runnerId,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan []byte),
running: false,
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
} }
func runRunnerManager(r runnerManager) { func runRunnerManager(r runnerManager) {
@@ -184,12 +199,13 @@ func runRunnerManager(r runnerManager) {
} }
} }
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (chan GetRunnerRequest, chan RunnerRegistration, error) {
scheduler := runnerManager{ scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest), getRunnerCh: make(chan GetRunnerRequest),
registerCh: make(chan RunnerRegistration), registerCh: make(chan RunnerRegistration),
connectedRunners: make([]Runner, 0), connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners, configuredRunners: configuredRunners,
db: db,
} }
go runRunnerManager(scheduler) go runRunnerManager(scheduler)