Compare commits

93 Commits

Author SHA1 Message Date
restitux e1382e50ea Cleanup docker build output before saving (#23) 2023-04-07 20:08:59 -06:00
restitux 7f44e5ed41 Add nil check to runner chan (#22) 2023-04-07 19:52:44 -06:00
restitux bcc53dfbe0 Refactored runner map wrap and add to function 2023-04-07 19:52:37 -06:00
restitux bbf96498aa Add updatePipeline endpoint 2023-04-07 18:44:04 -06:00
restitux 954966db58 Start/restart poll job when created or updated
This currently contains the logic for restarting updated jobs,
but nothing exercises this logic. The logic for starting polling
for a newly created pipeline is implemented.
2023-04-07 18:31:59 -06:00
restitux ed7df18f83 Add timeout and retry interval to GetRunner api 2023-04-05 18:38:06 -06:00
restitux a8e9a68f0e Don't sleep on first scan to ease testing 2023-04-05 18:01:10 -06:00
restitux 20c664f0ed Reorganize docker configuration 2023-03-11 18:56:13 -07:00
restitux 0000ea2a13 Rewrite run-dev.sh to use golang program 2023-03-11 13:05:50 -07:00
restitux fe53a17160 Runners are removed from manager when alloacted
This removes an existing unlocked shared access to runner.running.
This also sets us up for better management of the runners.
2023-03-08 00:13:40 -07:00
restitux f190274bce Utilify runner tag printing function 2023-03-07 23:42:01 -07:00
restitux d9ba14550e Removed outdated comments 2023-03-07 19:47:57 -07:00
restitux a2acb99689 Fix not using correct environment variables 2023-02-25 02:57:13 -07:00
restitux 191b73fe41 Fix index out of range issue with empty tag list printing 2023-02-25 02:51:11 -07:00
restitux 3ca1481632 Make compose network external 2023-02-25 02:32:04 -07:00
restitux c0e33fa52a Update runner manager for new database driven runner config 2023-02-25 02:31:54 -07:00
restitux 63529b7174 Fix runner creation api 2023-02-25 01:36:55 -07:00
restitux 7e7c49c2e7 Exposed build output to api 2023-02-24 23:59:02 -07:00
restitux 712a7b1429 Fix graphql api not returning stdout/stderr as strings 2023-02-24 23:20:40 -07:00
restitux 3ae27bffc5 Cleanup logging 2023-02-24 23:00:31 -07:00
restitux 5373a37bee Change RUNID env var name 2023-02-24 23:00:26 -07:00
restitux 6fee5aa268 Record docker build output in database 2023-02-24 23:00:10 -07:00
restitux b475631df6 Remove outdated todo 2023-02-24 22:51:19 -07:00
restitux fbf918d627 Fix typo 2023-02-24 22:46:15 -07:00
restitux 4069e1b0e1 Update executor to run built user provided container 2023-02-24 22:37:17 -07:00
restitux 708fbca91a Tag built container with build and run name 2023-02-24 22:32:24 -07:00
restitux 77a5514578 Fix secret name validation not checking for beginning of string 2023-02-24 22:29:21 -07:00
restitux 77a8d0840a Fix inserting new repeated refs into db failing #20 2023-02-24 22:28:50 -07:00
restitux 62b4e8f17e Add runner query api 2023-02-24 22:28:07 -07:00
restitux 620c20f717 Change pipeline executor to build container in repo at .cursorius/Dockerfile 2023-02-24 22:27:07 -07:00
restitux 0979a2379e Change poll logic to query refs after sleep for easire debugging 2023-02-24 22:26:41 -07:00
restitux 85ebd856eb Add graphiql testing workflow (#15) 2023-02-18 20:45:57 -07:00
restitux 6b103d074e Add runprev launcher command 2023-02-18 19:27:17 -07:00
restitux 664fe8fd09 Remove replaced Job configuration from config file 2023-02-14 20:43:04 -07:00
restitux edafd5108a Add validation to secret names 2023-02-14 20:39:03 -07:00
restitux c0f6186eac Fix run progress field name (graphql) 2023-02-14 20:19:21 -07:00
restitux bfd05b6a8a Add secrets support (#14) 2023-02-14 20:18:41 -07:00
restitux 6d2936393b Persist repo ref hashes in db 2023-02-08 18:44:54 -07:00
restitux e4043ae3be Correct case of member name 2023-02-08 18:06:54 -07:00
restitux 7a665aa348 Add support for access repos with credentials 2023-02-07 21:34:32 -07:00
restitux d870335d25 Disable vcs info embedding because of git perms 2023-02-07 19:27:19 -07:00
restitux 4bda3c7a3b Add admin_api and database logging of runs #9 #10 2023-01-31 20:25:52 -07:00
restitux 724757b23c Cleanup gitea override docker logic 2023-01-31 18:48:17 -07:00
restitux 1e0526a599 Add logging to pipeline create graphql mutation 2023-01-31 18:47:44 -07:00
restitux c1d4f3cc16 Fix createWebhook endpoint location 2023-01-20 23:03:59 -07:00
restitux 4a4b6fd185 Add basic graphql api 2023-01-14 06:02:17 -07:00
restitux 30fed27126 Refactor pipeline api lister to inside listen 2023-01-14 01:35:29 -07:00
restitux b58b1ab5d9 Update go modules 2023-01-14 01:35:19 -07:00
restitux 1a9f0415d0 Added pipeline and result to runs table 2023-01-14 00:29:32 -07:00
restitux 1acd9ae025 Extended database and refactored to enable export 2023-01-14 00:19:43 -07:00
restitux cf910aed64 Remove volume from psql container 2023-01-13 23:35:49 -07:00
restitux 420725eff8 Improve database logging 2023-01-13 23:28:46 -07:00
restitux ddf9fda092 Add basic postgres db config 2023-01-13 23:25:41 -07:00
restitux 1864cd2bee Add version string to log output 2023-01-09 20:28:40 -07:00
restitux 4a3fd0b902 Clean up prod docker configuration 2023-01-09 20:28:31 -07:00
restitux 62fe1cf747 Change pipeline container to python gRPC env (#6) 2023-01-09 20:06:23 -07:00
restitux 3d7bef3a82 Implment worker api (fixes #3) 2023-01-09 00:59:19 -07:00
restitux f4798233ba Refactor docker script to support override 2023-01-08 22:51:05 -07:00
restitux 5f754e3daa Fix debug printing 2023-01-08 14:41:35 -07:00
restitux cf29841c83 Fix docker script not overwriting tmp file 2023-01-08 14:41:16 -07:00
restitux 1e831ed330 Improve pipeline api debug output 2023-01-08 14:25:44 -07:00
restitux 3f5fd3ce36 Correct uuid type names 2023-01-08 13:53:45 -07:00
restitux f95a5f9ae5 Refactor gRPC definitions into seperate repo
The protobuf files and the generated golang code has been moved into
git.ohea.xyz/cursorius/pipeline-api. The generated code is now imported
from that location. The version of the API has also been bumped to v2 to
avoid unsupported v1 modules in golang.
2023-01-02 01:59:51 -07:00
restitux fc117cef39 Fix incorrectly defined RunCommand 2023-01-02 01:56:45 -07:00
restitux 01111fe676 Fix bug in runnermanager 2023-01-02 01:40:53 -07:00
restitux 6fc9af8280 Implement skeleton of RunCommand 2023-01-01 13:44:09 -07:00
restitux ff78167325 Cleanup runnermanager code 2023-01-01 13:43:45 -07:00
restitux 711a0257fc Changed api runner id to use a UUID 2023-01-01 13:01:55 -07:00
restitux 984321c9fc Change all fmt.Errorfs to use %w 2022-12-31 17:24:29 -07:00
restitux 3cbe670bc1 Implement getRunner grpc endpoint
This includes refactoring the jobscheduler into the runnermanager. This
service manages runner connections and allocating them to pipelines.
These requests are done via the pipeline grpc api
2022-12-31 17:22:00 -07:00
restitux 663306c3be Cleanup jobschedule channel passing 2022-12-28 17:13:33 -07:00
restitux 0287213433 Refactor proto into single package and update api 2022-12-28 17:08:30 -07:00
restitux 8e4e45047d Job exection logic now runs pipeline in container
This container's network is configured based on parameters in the config
file. If configured correctly, this will allow the pipeline script to
speak to the cursorius server over the pipeline api.
2022-12-24 22:12:30 -07:00
restitux e25ac0c01e Improve run-dev.sh script 2022-12-24 22:12:02 -07:00
restitux 825f269710 Change pipeline api types 2022-12-24 22:11:44 -07:00
restitux 112bca1a55 Fix poller not appending refs to array 2022-12-24 22:09:23 -07:00
restitux cd098d794d Add prototype of pipeline api using connect
This includes refactoring of the http listener to use
a custom mux.
2022-12-24 19:45:25 -07:00
restitux 08e8104cd9 Change docker go cache dir to _working
This was previously breaking the go tooling as the working directly was
being searched by the lsp and go mod tidy. The go tools ignore
directories that begin with underscores.
2022-12-24 19:45:00 -07:00
restitux 176de76581 Refactor webhook handling logic into seperate pkg 2022-12-24 17:20:09 -07:00
restitux e16444f3f1 Add stop command to run-dev.sh 2022-12-24 17:19:50 -07:00
restitux 91f0410485 Cleanup docker development setup 2022-12-24 17:05:57 -07:00
restitux c1ad692817 Webhook now type asserts payload and reads ref 2022-12-24 15:29:00 -07:00
restitux c731e560af Change launch logic to receive config in scheduler 2022-12-24 00:10:06 -07:00
restitux b29d3a8f80 Add WIP proper webhook support (for gitea) 2022-12-23 23:54:53 -07:00
restitux 46245b73c9 Add webhook testing and Docker compose cleanup 2022-12-23 20:16:24 -07:00
restitux 9cb4ca687e Add repo polling support 2022-12-20 16:24:27 -07:00
restitux 53a3a70a06 Update ports to allow standalone runner to connect 2022-10-23 19:59:36 -06:00
restitux dffb0adfa6 Fix setting running variable on runner 2022-10-17 01:10:25 -06:00
restitux 447e269437 Remove folder jobs and add Id to runner message 2022-10-17 00:31:31 -06:00
restitux bbe9dcaf2a WIP: Replace job execution with runner delegation
Server now accepts runner connections via websockets. Jobs
will be delegated to these runners. This is currently a WIP,
as it is still hardcoded to use the first available runner.
2022-10-16 14:54:49 -06:00
restitux b44d4352ff Add docker compose dind example 2022-09-19 17:28:14 -06:00
restitux 2f4016b32a Add docker container logic to runner 2022-09-16 14:42:25 -06:00
restitux 7154ddc700 Add docker and docker compose deployment files 2022-09-16 14:41:50 -06:00
31 changed files with 5683 additions and 76 deletions
+2
View File
@@ -0,0 +1,2 @@
*.toml
_working
+4
View File
@@ -0,0 +1,4 @@
_working
server
server.toml
docker/docker-compose.override.yml
+768
View File
@@ -0,0 +1,768 @@
package admin_api
import (
"fmt"
"net/http"
"git.ohea.xyz/cursorius/server/database"
"github.com/google/uuid"
"github.com/graphql-go/graphql"
"github.com/graphql-go/handler"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema, error) {
runnerType := graphql.NewObject(graphql.ObjectConfig{
Name: "Runner",
Description: "A runner available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Name, nil
}
return nil, nil
},
},
"token": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The token.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Token, nil
}
return nil, nil
},
},
},
})
secretType := graphql.NewObject(graphql.ObjectConfig{
Name: "Secret",
Description: "A secret available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Name, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Secret, nil
}
return nil, nil
},
},
},
})
cloneCredentialType := graphql.NewObject(graphql.ObjectConfig{
Name: "CloneCredential",
Description: "A credential for authenticating with the pipeline source host.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Name, nil
}
return nil, nil
},
},
"type": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The credential type.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Type, nil
}
return nil, nil
},
},
"username": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The username to user with the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Username, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret for the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Secret, nil
}
return nil, nil
},
},
},
})
webhookType := graphql.NewObject(graphql.ObjectConfig{
Name: "Webhook",
Description: "A webhook for triggering pipelines",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the webhook.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if webhook, ok := p.Source.(database.Webhook); ok {
return webhook.Id, nil
}
return nil, nil
},
},
"serverType": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The format of the webhook.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if webhook, ok := p.Source.(database.Webhook); ok {
return webhook.ServerType, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret used to validate the webhook.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if webhook, ok := p.Source.(database.Webhook); ok {
return webhook.Secret, nil
}
return nil, nil
},
},
},
})
runType := graphql.NewObject(graphql.ObjectConfig{
Name: "Run",
Description: "A pipeline run",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return run.Id, nil
}
return nil, nil
},
},
"inProgress": &graphql.Field{
Type: graphql.Boolean,
Description: "The progress status of the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return run.InProgress, nil
}
return nil, nil
},
},
"result": &graphql.Field{
// TODO: handle bigint properly here
Type: graphql.Float,
Description: "The result of the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return run.Result, nil
}
return nil, nil
},
},
"buildOutput": &graphql.Field{
Type: graphql.String,
Description: "Logs of the top level container build for the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.BuildOutput), nil
}
return nil, nil
},
},
"stdout": &graphql.Field{
Type: graphql.String,
Description: "The stdout used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.Stdout), nil
}
return nil, nil
},
},
"stderr": &graphql.Field{
Type: graphql.String,
Description: "The stderr used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.Stderr), nil
}
return nil, nil
},
},
},
})
pipelineType := graphql.NewObject(graphql.ObjectConfig{
Name: "Pipeline",
Description: "A pipeline for running ci jobs",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return pipeline.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return pipeline.Name, nil
}
return nil, nil
},
},
"url": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The url of the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return pipeline.Url, nil
}
return nil, nil
},
},
"pollInterval": &graphql.Field{
Type: graphql.NewNonNull(graphql.Int),
Description: "The polling interval for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return pipeline.PollInterval, nil
}
return nil, nil
},
},
"cloneCredential": &graphql.Field{
Type: cloneCredentialType,
Description: "The configured credential for cloning the pipeline source.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
if pipeline.CloneCredential != nil {
return db.GetCloneCredentialById(*pipeline.CloneCredential)
}
}
return nil, nil
},
},
"secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(secretType))),
Description: "The list of secrets for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetSecretsForPipeline(pipeline.Id)
}
return []database.Secret{}, nil
},
},
"webhooks": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(webhookType))),
Description: "The list of webhooks for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetWebhooksForPipeline(pipeline.Id)
}
return []database.Webhook{}, nil
},
},
"runs": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(runType))),
Description: "The list of runs for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetRunsForPipeline(pipeline.Id)
}
return []database.Webhook{}, nil
},
},
},
})
queryType := graphql.NewObject(graphql.ObjectConfig{
Name: "Query",
Fields: graphql.Fields{
"Pipeline": &graphql.Field{
Type: pipelineType,
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the requested pipeline.",
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
id, err := uuid.Parse(p.Args["id"].(string))
if err != nil {
return nil, err
}
return db.GetPipelineById(id)
},
},
"Pipelines": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(pipelineType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetPipelines()
},
},
"CloneCredential": &graphql.Field{
Type: cloneCredentialType,
Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the requested credential.",
},
},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
id, err := uuid.Parse(p.Args["id"].(string))
if err != nil {
return nil, err
}
return db.GetCloneCredentialById(id)
},
},
"CloneCredentials": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(cloneCredentialType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetCredentials()
},
},
"Secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(secretType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetSecrets()
},
},
"Runners": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(runnerType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetRunners()
},
},
},
})
mutationType := graphql.NewObject(graphql.ObjectConfig{
Name: "Mutation",
Fields: graphql.Fields{
"createPipeline": &graphql.Field{
Type: pipelineType,
Description: "Create a new pipeline",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"url": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"pollInterval": &graphql.ArgumentConfig{
Type: graphql.Int,
},
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
var interval int
if intervalVal, ok := params.Args["pollInterval"]; ok {
interval = intervalVal.(int)
} else {
interval = 0
}
var credential *uuid.UUID
if credentialVal, ok := params.Args["cloneCredentialId"]; ok {
id, err := uuid.Parse(credentialVal.(string))
if err != nil {
return nil, err
}
credential = &id
} else {
credential = nil
}
pipeline, err := db.CreatePipeline(
params.Args["name"].(string),
params.Args["url"].(string),
interval,
credential,
)
if err != nil {
return nil, err
}
pollChan <- pipeline.Id
return pipeline, nil
},
},
"createWebhook": &graphql.Field{
Type: webhookType,
Description: "Create a new webhook",
Args: graphql.FieldConfigArgument{
"type": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
id, err := uuid.Parse(params.Args["id"].(string))
if err != nil {
return nil, err
}
webhook, err := db.CreateWebhook(
database.WebhookSender(params.Args["type"].(string)),
id,
)
if err != nil {
return nil, err
}
return webhook, nil
},
},
"createCloneCredential": &graphql.Field{
Type: cloneCredentialType,
Description: "Create a new CloneCredential",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"type": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"username": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"secret": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
credential, err := db.CreateCredential(
params.Args["name"].(string),
database.CloneCredentialType(params.Args["type"].(string)),
params.Args["username"].(string),
params.Args["secret"].(string),
)
if err != nil {
return nil, err
}
return credential, nil
},
},
"createSecret": &graphql.Field{
Type: secretType,
Description: "Create a new secret",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"secret": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secret, err := db.CreateSecret(
params.Args["name"].(string),
params.Args["secret"].(string),
)
if err != nil {
return nil, err
}
return secret, nil
},
},
"createRunner": &graphql.Field{
Type: runnerType,
Description: "Create a new runner",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
runner, err := db.CreateRunner(
params.Args["name"].(string),
)
if err != nil {
return nil, err
}
return runner, nil
},
},
"updatePipeline": &graphql.Field{
Type: pipelineType,
Description: "Create a new pipeline",
Args: graphql.FieldConfigArgument{
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"name": &graphql.ArgumentConfig{
Type: graphql.String,
},
"url": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pollInterval": &graphql.ArgumentConfig{
Type: graphql.Int,
},
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
var name *string
var url *string
var interval *int
if nameVal, ok := params.Args["name"]; ok {
nameVal := nameVal.(string)
name = &nameVal
} else {
name = nil
}
if urlVal, ok := params.Args["url"]; ok {
urlVal := urlVal.(string)
url = &urlVal
} else {
url = nil
}
if intervalVal, ok := params.Args["pollInterval"]; ok {
intervalVal := intervalVal.(int)
interval = &intervalVal
} else {
interval = nil
}
pipeline, err := db.UpdatePipeline(
pipelineId,
name,
url,
interval,
)
if err != nil {
return nil, err
}
pollChan <- pipeline.Id
return pipeline, nil
},
},
"setPipelineCloneCredential": &graphql.Field{
Type: pipelineType,
Description: "Set the CloneCredential used by a pipeline to clone the source repo",
Args: graphql.FieldConfigArgument{
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
if cloneCredentialIdVal, ok := params.Args["cloneCredentialId"]; ok {
cloneCredentialId, err := uuid.Parse(cloneCredentialIdVal.(string))
if err != nil {
return nil, err
}
pipeline, err := db.SetPipelineCloneCredential(pipelineId, &cloneCredentialId)
if err != nil {
return nil, err
}
return pipeline, nil
} else {
pipeline, err := db.SetPipelineCloneCredential(pipelineId, nil)
if err != nil {
return nil, err
}
return pipeline, nil
}
},
},
"addSecretToPipeline": &graphql.Field{
Type: pipelineType,
Description: "Allow a secret to be accessed by a pipeline.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.AssignSecretToPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
"removeSecretFromPipeline": &graphql.Field{
Type: pipelineType,
Description: "Remove a pipeline's access to a secret.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.RemoveSecretFromPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
},
})
schema, err := graphql.NewSchema(graphql.SchemaConfig{
Query: queryType,
Mutation: mutationType,
})
if err != nil {
return schema, fmt.Errorf("Could not create schema: %w", err)
}
return schema, nil
}
func CreateHandler(db database.Database, pollChan chan uuid.UUID, mux *http.ServeMux) error {
schema, err := createSchema(db, pollChan)
if err != nil {
return err
}
h := handler.New(&handler.Config{
Schema: &schema,
Pretty: true,
GraphiQL: true,
})
mux.Handle("/graphql", h)
return nil
}
+49 -9
View File
@@ -2,40 +2,80 @@ package config
import (
"fmt"
"git.ohea.xyz/golang/config"
)
type Webhook struct {
type Runner struct {
Secret string
}
type Job struct {
URL *string
Folder *string
Webhook Webhook
Cron *string
type DBConfig struct {
Address string
Port int
Username string
Password string
Name string
}
type MountType string
const (
Bind MountType = "bind"
Volume = "volume"
)
type MountConf struct {
Type MountType
Source string
}
type PipelineConf struct {
AccessURL string // URL that the pipeline runner can access the server at
DockerNetwork *string // Name of the docker network that should be assigned to the pipeline script runner
WorkingDir string
MountConf MountConf // This script describes how to mount WorkingDir into the pipeline executor container
}
type Config struct {
Address string
Port int
Jobs map[string]Job
DBConfig DBConfig
PipelineConf PipelineConf
Runners map[string]Runner
}
func GetConfig() (config.Config[Config], error) {
defaultNetworkName := "cursorius"
configData := config.Config[Config]{
Name: "cursorius",
Filename: "server",
Config: Config{
Address: "127.0.0.1",
Port: 45420,
Jobs: make(map[string]Job),
DBConfig: DBConfig{
Address: "DB_ADDRESS",
Port: 5432,
Username: "USERNAME",
Password: "PASSWORD",
Name: "cursorius",
},
PipelineConf: PipelineConf{
AccessURL: "cursorius-server:45420",
DockerNetwork: &defaultNetworkName,
WorkingDir: "/opt/cursorius/working",
MountConf: MountConf{
Type: Bind,
Source: "/opt/cursorius/working",
},
},
Runners: make(map[string]Runner),
},
}
_, err := configData.Get()
if err != nil {
return configData, fmt.Errorf("Could not read config file: %v", err)
return configData, fmt.Errorf("Could not read config file: %w", err)
}
return configData, nil
+200
View File
@@ -0,0 +1,200 @@
package database
import (
"time"
"git.ohea.xyz/cursorius/server/config"
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
type Database struct {
Conn *pgxpool.Pool
}
func LaunchDB(conf config.DBConfig) (Database, error) {
dbURL := fmt.Sprintf(
"postgres://%v:%v@%v:%v/%v",
conf.Username,
conf.Password,
conf.Address,
conf.Port,
conf.Name,
)
dbURLNoPasswd := fmt.Sprintf(
"postgres://%v:********@%v:%v/%v",
conf.Username,
conf.Address,
conf.Port,
conf.Name,
)
db := Database{}
var err error
log.Infof("Connecting to database with URL \"%v\"", dbURLNoPasswd)
db.Conn, err = pgxpool.New(context.Background(), dbURL)
if err != nil {
return db, fmt.Errorf("could not create database pool: %w", err)
}
// sleep until we can sucessfully acquire a connection
for i := 0; i < 10; i++ {
_, err = db.Conn.Acquire(context.Background())
if err == nil {
break
}
time.Sleep(2 * time.Second)
}
if err != nil {
return db, fmt.Errorf("Could not open database: %w", err)
}
log.Infof("Database connected sucessfully!")
versionTableExistsQuery := `
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = 'version'
);`
var versionTableExists bool
err = db.Conn.QueryRow(context.Background(), versionTableExistsQuery).Scan(&versionTableExists)
if err != nil {
return db, fmt.Errorf("Could not check if database was initalized: %w", err)
}
if versionTableExists {
// TODO: migrations
} else {
log.Info("New database found, initializing....")
err = initDB(db.Conn)
if err != nil {
return db, fmt.Errorf("Could not initalize database: %w", err)
}
}
return db, nil
}
func initDB(conn *pgxpool.Pool) error {
createTablesQuery := `
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE version (
version INT NOT NULL
);
CREATE TABLE clone_credentials (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
username TEXT NOT NULL,
secret TEXT NOT NULL
);
CREATE TABLE pipelines (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
url TEXT NOT NULL,
poll_interval INTEGER,
clone_credential UUID DEFAULT NULL,
CONSTRAINT fk_clone_credential
FOREIGN KEY(clone_credential)
REFERENCES clone_credentials(id)
);
CREATE TABLE secrets (
id UUID PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
secret TEXT NOT NULL
);
CREATE TABLE pipeline_secret_mappings (
pipeline UUID NOT NULL,
secret UUID NOT NULL,
CONSTRAINT fk_pipeline
FOREIGN KEY(pipeline)
REFERENCES pipelines(id),
CONSTRAINT fk_secret
FOREIGN KEY(secret)
REFERENCES secrets(id)
);
CREATE TABLE webhooks (
id UUID PRIMARY KEY,
server_type TEXT,
secret TEXT,
pipeline UUID,
CONSTRAINT fk_pipeline
FOREIGN KEY(pipeline)
REFERENCES pipelines(id)
);
CREATE TABLE runs (
id UUID PRIMARY KEY,
pipeline UUID,
in_progress BOOLEAN DEFAULT NULL,
build_output TEXT DEFAULT NULL,
result BIGINT DEFAULT NULL,
stdout TEXT DEFAULT NULL,
stderr TEXT DEFAULT NULL,
CONSTRAINT fk_pipeline
FOREIGN KEY(pipeline)
REFERENCES pipelines(id)
);
CREATE TABLE command_executions (
id UUID PRIMARY KEY,
run_id UUID,
command TEXT,
return_code INT,
stdout TEXT,
stderr TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
CONSTRAINT fk_run_id
FOREIGN KEY(run_id)
REFERENCES runs(id)
);
CREATE TABLE runners (
id UUID PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
token TEXT NOT NULL
);
CREATE TABLE pipeline_refs (
name TEXT PRIMARY KEY NOT NULL,
pipeline_id UUID NOT NULL,
hash TEXT NOT NULL,
CONSTRAINT fk_pipeline_id
FOREIGN KEY(pipeline_id)
REFERENCES pipelines(id)
);
`
_, err := conn.Exec(context.Background(), createTablesQuery)
if err != nil {
return err
}
return nil
}
+672
View File
@@ -0,0 +1,672 @@
package database
import (
"context"
"fmt"
"regexp"
"github.com/google/uuid"
)
func (db *Database) GetPipelines() ([]Pipeline, error) {
query := `
SELECT id, name, url, poll_interval, clone_credential
FROM pipelines;`
pipelines := make([]Pipeline, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return pipelines, fmt.Errorf("Could not query database for pipelines: %w", err)
}
defer rows.Close()
for rows.Next() {
var pipeline Pipeline
var idStr string
if err := rows.Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential); err != nil {
return pipelines, err
}
pipeline.Id, err = uuid.Parse(idStr)
if err != nil {
return pipelines, err
}
pipelines = append(pipelines, pipeline)
}
return pipelines, nil
}
func (db *Database) GetPipelineById(id uuid.UUID) (Pipeline, error) {
query := `
SELECT name, url, poll_interval
FROM pipelines
WHERE id=$1;`
pipeline := Pipeline{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&pipeline.Name, &pipeline.Url, &pipeline.PollInterval)
if err != nil {
return pipeline, fmt.Errorf("Could not query database for pipeline with id %v: %w", id.String(), err)
}
return pipeline, nil
}
func (db *Database) CreatePipeline(name string, url string, pollInterval int, credential *uuid.UUID) (Pipeline, error) {
query := `
INSERT INTO pipelines (id, name, url, poll_interval, clone_credential)
VALUES (uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, url, poll_interval;`
pipeline := Pipeline{}
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name, url, pollInterval, credential).Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval)
if err != nil {
return pipeline, fmt.Errorf("Could not create pipeline: %w", err)
}
pipeline.Id, err = uuid.Parse(idStr)
if err != nil {
return pipeline, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return pipeline, nil
}
func (db *Database) UpdatePipeline(pipelineId uuid.UUID, name *string, url *string, pollInterval *int) (Pipeline, error) {
query := `
UPDATE pipelines
SET name=$1, url=$2, poll_interval=$3
WHERE id=$4
RETURNING name, url, poll_interval, clone_credential;`
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return pipeline, err
}
var nameNew string
var urlNew string
var pollIntervalNew int
if name != nil {
nameNew = *name
} else {
nameNew = pipeline.Name
}
if url != nil {
urlNew = *url
} else {
urlNew = pipeline.Url
}
if pollInterval != nil {
pollIntervalNew = *pollInterval
} else {
pollIntervalNew = pipeline.PollInterval
}
err = db.Conn.QueryRow(context.Background(),
query, nameNew, urlNew, pollIntervalNew, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) SetPipelineCloneCredential(pipelineId uuid.UUID, credentialId *uuid.UUID) (Pipeline, error) {
query := `
UPDATE pipelines
SET clone_credential=$1
WHERE id=$2
RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{
Id: pipelineId,
}
err := db.Conn.QueryRow(context.Background(),
query, credentialId, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) RemovePipelineCredential(pipelineId uuid.UUID) (Pipeline, error) {
query := `
UPDATE pipelines
SET credential=null
WHERE id=$1
RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{
Id: pipelineId,
}
err := db.Conn.QueryRow(context.Background(),
query, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) GetWebhooksForPipeline(id uuid.UUID) ([]Webhook, error) {
query := `
SELECT id, server_type, secret
FROM webhooks
WHERE pipeline=$1;`
webhooks := make([]Webhook, 0)
rows, err := db.Conn.Query(context.Background(), query, id)
if err != nil {
return webhooks, fmt.Errorf("Could not get webhooks for pipeline with id \"%v\": %w", id, err)
}
defer rows.Close()
for rows.Next() {
var webhook Webhook
var idStr string
if err := rows.Scan(&idStr, &webhook.ServerType, &webhook.Secret); err != nil {
return webhooks, err
}
webhook.Id, err = uuid.Parse(idStr)
if err != nil {
return webhooks, err
}
webhooks = append(webhooks, webhook)
}
return webhooks, nil
}
func (db *Database) GetWebhookById(id uuid.UUID) (Webhook, error) {
query := `
SELECT server_type, secret, pipeline
FROM webhooks
WHERE id=$1;`
webhook := Webhook{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&webhook.ServerType, &webhook.Secret, &webhook.Pipeline)
if err != nil {
return webhook, fmt.Errorf("Could not query database for webhook with id %v: %w", id.String(), err)
}
return webhook, nil
}
func (db *Database) CreateWebhook(serverType WebhookSender, pipelineId uuid.UUID) (Webhook, error) {
query := `
INSERT INTO webhooks (id, server_type, secret, pipeline)
VALUES (uuid_generate_v4(), $1, (select substr(md5(random()::text), 0, 50)), $2)
RETURNING id, server_type, secret, pipeline;`
webhook := Webhook{}
var idStr string
err := db.Conn.QueryRow(context.Background(), query, string(serverType), pipelineId).Scan(&idStr, &webhook.ServerType, &webhook.Secret, &webhook.Pipeline)
if err != nil {
return webhook, err
}
id, err := uuid.Parse(idStr)
if err != nil {
return webhook, err
}
webhook.Id = id
return webhook, nil
}
func (db *Database) CreateCredential(name string, credentialtype CloneCredentialType, username string, secret string) (CloneCredential, error) {
query := `
INSERT INTO clone_credentials (id, name, type, username, secret)
VALUES(uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, type, username, secret;`
credential := CloneCredential{}
var idStr string
err := db.Conn.QueryRow(
context.Background(),
query,
name,
string(credentialtype),
username,
secret,
).Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret)
if err != nil {
return credential, err
}
id, err := uuid.Parse(idStr)
if err != nil {
return credential, err
}
credential.Id = id
return credential, nil
}
func (db *Database) GetCloneCredentialById(id uuid.UUID) (CloneCredential, error) {
query := `
SELECT name, type, username, secret
FROM clone_credentials
WHERE id=$1;`
log.Debugf("requested credential with id %v", id)
credential := CloneCredential{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&credential.Name, &credential.Type, &credential.Username, &credential.Secret)
if err != nil {
return credential, fmt.Errorf("Could not query database for credential with id %v: %w", id.String(), err)
}
return credential, nil
}
func (db *Database) GetCredentials() ([]CloneCredential, error) {
query := `
SELECT id, name, type, username, secret
FROM clone_credentials;`
credentials := make([]CloneCredential, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return credentials, fmt.Errorf("Could not query database for credentials: %w", err)
}
defer rows.Close()
for rows.Next() {
var credential CloneCredential
var idStr string
if err := rows.Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret); err != nil {
return credentials, err
}
credential.Id, err = uuid.Parse(idStr)
if err != nil {
return credentials, err
}
credentials = append(credentials, credential)
}
return credentials, nil
}
func (db *Database) CreateRun(pipelineId uuid.UUID) (Run, error) {
query := `
INSERT INTO runs (id, pipeline, in_progress)
VALUES(uuid_generate_v4(), $1, true)
RETURNING id, pipeline, in_progress;`
run := Run{}
var idStr string
err := db.Conn.QueryRow(context.Background(), query, pipelineId).Scan(&idStr, &run.Pipeline, &run.InProgress)
if err != nil {
return run, err
}
run.Id, err = uuid.Parse(idStr)
if err != nil {
return run, err
}
return run, nil
}
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
query := `
UPDATE runs
SET build_output=$1
WHERE id=$2;`
_, err := db.Conn.Exec(context.Background(),
query, buildResult, runId)
return err
}
func (db *Database) UpdateRunResult(r Run) error {
query := `
UPDATE runs
SET in_progress=$1, result=$2, stdout=$3, stderr=$4
WHERE id=$5;`
// TODO: does r.Result need a pointer derefrence?
_, err := db.Conn.Exec(context.Background(),
query, r.InProgress, r.Result, r.Stdout, r.Stderr, r.Id)
return err
}
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
query := `
SELECT id, in_progress, result, build_output, stdout, stderr
FROM runs
WHERE pipeline=$1;`
runs := make([]Run, 0)
rows, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return runs, fmt.Errorf("Could not get runs for pipeline with id \"%v\": %w", pipelineId, err)
}
defer rows.Close()
for rows.Next() {
var run Run
var idStr string
if err := rows.Scan(
&idStr,
&run.InProgress,
&run.Result,
&run.BuildOutput,
&run.Stdout,
&run.Stderr,
); err != nil {
return runs, err
}
run.Id, err = uuid.Parse(idStr)
if err != nil {
return runs, err
}
runs = append(runs, run)
}
return runs, nil
}
func (db *Database) GetPipelineRefs(pipelineId uuid.UUID) (map[string]string, error) {
query := `
SELECT name, hash
FROM pipeline_refs
WHERE pipeline_id=$1;`
refsMap := make(map[string]string)
refs, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return refsMap, fmt.Errorf("Could not get pipeline refs for pipeline with id \"%v\": %w", pipelineId, err)
}
defer refs.Close()
for refs.Next() {
var name string
var hash string
if err := refs.Scan(
&name,
&hash,
); err != nil {
return refsMap, err
}
refsMap[name] = hash
}
return refsMap, nil
}
func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error {
query := `
INSERT INTO pipeline_refs(name, pipeline_id, hash)
VALUES($1, $2, $3)
ON CONFLICT (name)
DO
UPDATE SET hash=$3;`
for name, hash := range refsMap {
_, err := db.Conn.Exec(context.Background(), query, name, pipelineId, hash)
return err
}
return nil
}
func (db *Database) GetSecrets() ([]Secret, error) {
query := `
SELECT id, name, secret
FROM secrets;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return secrets, fmt.Errorf("Could not query database for secrets: %w", err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(&idStr, &secret.Name, &secret.Secret); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetSecretById(id uuid.UUID) (Secret, error) {
query := `
SELECT id, name, secret
FROM secrets
WHERE id=$1;`
secret := Secret{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&secret.Name, &secret.Secret)
if err != nil {
return secret, fmt.Errorf("Could not query database for secret with id %v: %w", id.String(), err)
}
return secret, nil
}
func (db *Database) CreateSecret(name string, secret string) (Secret, error) {
s := Secret{}
// validate that the secret is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("secret name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO secrets (id, name, secret)
VALUES (uuid_generate_v4(), $1, $2)
RETURNING id, name, secret;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name, secret).Scan(&idStr, &s.Name, &s.Secret)
if err != nil {
return s, fmt.Errorf("Could not create secret: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
func (db *Database) AssignSecretToPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
query := `
INSERT INTO pipeline_secret_mappings (pipeline, secret)
VALUES ($1, $2);`
_, err := db.Conn.Exec(context.Background(), query, pipelineId, secretId)
return err
}
func (db *Database) RemoveSecretFromPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
// TODO: implement this
return fmt.Errorf("Not implemented")
}
func (db *Database) GetSecretsForPipeline(pipelineId uuid.UUID) ([]Secret, error) {
query := `
SELECT
secrets.id, secrets.name, secrets.secret
FROM
secrets INNER JOIN pipeline_secret_mappings
ON secrets.id = pipeline_secret_mappings.secret
WHERE
pipeline_secret_mappings.pipeline=$1
;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return secrets, fmt.Errorf("Could not get secrets for pipeline with id \"%v\": %w", pipelineId, err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(
&idStr,
&secret.Name,
&secret.Secret,
); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetRunners() ([]Runner, error) {
query := `
SELECT id, name, token
FROM runners;`
runners := make([]Runner, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return runners, fmt.Errorf("Could not query database for runners: %w", err)
}
defer rows.Close()
for rows.Next() {
var runner Runner
var idStr string
if err := rows.Scan(&idStr, &runner.Name, &runner.Token); err != nil {
return runners, err
}
runner.Id, err = uuid.Parse(idStr)
if err != nil {
return runners, err
}
runners = append(runners, runner)
}
return runners, nil
}
func (db *Database) GetRunnerById(id uuid.UUID) (Runner, error) {
query := `
SELECT name, token
FROM runners
WHERE id=$1;`
runner := Runner{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
}
return runner, nil
}
func (db *Database) CreateRunner(name string) (Runner, error) {
s := Runner{}
// validate that the runner name is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("runner name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO runners (id, name, token)
VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name).Scan(&idStr, &s.Name, &s.Token)
if err != nil {
return s, fmt.Errorf("Could not create runner: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
+81
View File
@@ -0,0 +1,81 @@
package database
import (
"time"
"github.com/google/uuid"
)
type CloneCredentialType string
const (
USER_PASS CloneCredentialType = "USER_PASS"
SSH_KEY CloneCredentialType = "SSH_KEY"
)
type CloneCredential struct {
Id uuid.UUID
Name string
Type CloneCredentialType
Username string
Secret string
}
type Pipeline struct {
Id uuid.UUID
Name string
Url string
PollInterval int
CloneCredential *uuid.UUID
}
type Secret struct {
Id uuid.UUID
Name string
Secret string
}
type PipelineSecretMapping struct {
Pipeline uuid.UUID
Secret uuid.UUID
}
type WebhookSender string
const (
Gitea WebhookSender = "gitea"
)
type Webhook struct {
Id uuid.UUID
ServerType WebhookSender
Secret string
Pipeline uuid.UUID
}
type Run struct {
Id uuid.UUID
Pipeline uuid.UUID
InProgress bool
Result *int64
BuildOutput []byte
Stdout []byte
Stderr []byte
}
type CommandExecution struct {
Id uuid.UUID
RunId uuid.UUID
Command []string
ReturnCode int
Stdout string
Stderr string
StartTime time.Time
EndTime time.Time
}
type Runner struct {
Id uuid.UUID
Name string
Token string
}
+10
View File
@@ -0,0 +1,10 @@
FROM golang:1.19-alpine as builder
MAINTAINER restitux <restitux@ohea.xyz>
COPY . /server
WORKDIR /server
RUN go build .
FROM alpine:latest
COPY --from=builder /server/server /server
ENTRYPOINT ["/server"]
+7
View File
@@ -0,0 +1,7 @@
FROM golang:1.19-bullseye
MAINTAINER restitux <restitux@ohea.xyz>
RUN apt-get update && apt-get install -y \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
ENTRYPOINT ["/build/server/docker/cursorius/build-and-run.sh"]
+8
View File
@@ -0,0 +1,8 @@
#!/bin/bash
set -e
cd /build/server
go build -buildvcs=false .
./server
+50
View File
@@ -0,0 +1,50 @@
version: "3.3"
services:
cursorius-server:
build:
context: ".."
dockerfile: "docker/cursorius/Dockerfile.dev"
ports:
- "0.0.0.0:45420:45420"
networks:
- cursorius
volumes:
- "..:/build/server"
- "./server.toml:/root/.config/cursorius/server.toml"
- "/var/run/docker.sock:/var/run/docker.sock"
- "../_working/go:/go"
- "../_working/jobs:/cursorius/jobs"
cursorius-db:
image: postgres:14
environment:
- POSTGRES_USER=cursorius
- POSTGRES_PASSWORD=cursorius
- POSTGRES_DB=cursorius
volumes:
- "../_working/postgres:/var/lib/postgresql/data"
networks:
- cursorius
graphiql:
build:
context: "graphiql"
dockerfile: "Dockerfile.graphiql"
ports:
- "0.0.0.0:45421:80"
networks:
- cursorius
gitea:
image: gitea/gitea:latest
profiles: ["gitea"]
environment:
- GITEA__webhook__ALLOWED_HOST_LIST=cursorius-server, external
ports:
- "127.0.0.1:2222:22"
- "127.0.0.1:3000:3000"
networks:
- cursorius
volumes:
- "../_working/gitea:/data"
networks:
cursorius:
external: true
+3
View File
@@ -0,0 +1,3 @@
FROM nginx:latest
COPY graphiql.html /usr/share/nginx/html/index.html
COPY graphiql.conf /etc/nginx/conf.d/default.conf
+52
View File
@@ -0,0 +1,52 @@
upstream backend {
server cursorius-server:45420;
}
server {
listen 80;
listen [::]:80;
server_name localhost;
#access_log /var/log/nginx/host.access.log main;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
}
location /graphql {
proxy_pass http://backend/graphql;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
+70
View File
@@ -0,0 +1,70 @@
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<title>GraphiQL</title>
<style>
body {
height: 100%;
margin: 0;
width: 100%;
overflow: hidden;
}
#graphiql {
height: 100vh;
}
</style>
<!--
This GraphiQL example depends on Promise and fetch, which are available in
modern browsers, but can be "polyfilled" for older browsers.
GraphiQL itself depends on React DOM.
If you do not want to rely on a CDN, you can host these files locally or
include them directly in your favored resource bundler.
-->
<script
src="https://unpkg.com/react@17/umd/react.development.js"
integrity="sha512-Vf2xGDzpqUOEIKO+X2rgTLWPY+65++WPwCHkX2nFMu9IcstumPsf/uKKRd5prX3wOu8Q0GBylRpsDB26R6ExOg=="
crossorigin="anonymous"
></script>
<script
src="https://unpkg.com/react-dom@17/umd/react-dom.development.js"
integrity="sha512-Wr9OKCTtq1anK0hq5bY3X/AvDI5EflDSAh0mE9gma+4hl+kXdTJPKZ3TwLMBcrgUeoY0s3dq9JjhCQc7vddtFg=="
crossorigin="anonymous"
></script>
<!--
These two files can be found in the npm module, however you may wish to
copy them directly into your environment, or perhaps include them in your
favored resource bundler.
-->
<link rel="stylesheet" href="https://unpkg.com/graphiql/graphiql.min.css" />
</head>
<body>
<div id="graphiql">Loading...</div>
<script
src="https://unpkg.com/graphiql/graphiql.min.js"
type="application/javascript"
></script>
<script>
ReactDOM.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
//url: 'https://swapi-graphql.netlify.app/.netlify/functions/index',
url: 'http://127.0.0.1:45421/graphql',
}),
defaultEditorToolsVisibility: true,
}),
document.getElementById('graphiql'),
);
</script>
</body>
</html>
+4
View File
@@ -0,0 +1,4 @@
#!/bin/bash
docker build . -f docker/Dockerfile -t git.ohea.xyz/cursorius/server:latest
docker push git.ohea.xyz/cursorius/server:latest
+5
View File
@@ -0,0 +1,5 @@
#!/bin/bash
set -e
go run docker/run.go "$@"
+93
View File
@@ -0,0 +1,93 @@
package main
import (
"fmt"
"os"
"os/exec"
)
func panicError(errorString string, params ...any) {
fmt.Fprintf(os.Stderr, fmt.Sprintf("ERROR: %v\n", errorString), params...)
os.Exit(1)
}
func run(name string, arg ...string) {
cmd := exec.Command(name, arg...)
if err := cmd.Run(); err != nil {
panicError("could not run command %v: %v", name, err)
}
}
func runAttach(name string, arg ...string) {
cmd := exec.Command(name, arg...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
panicError("could not run command %v: %v", name, err)
}
}
func runCompose(args []string) {
runAttach("docker", append([]string{"compose"}, args...)...)
}
func createDirs() {
run("mkdir", "-p", "_working/go")
run("mkdir", "-p", "_working/jobs")
}
func currentContainers() string {
bytes, err := os.ReadFile("_working/current_containers")
if err != nil {
panicError("could not read current containers: %v", err)
}
return string(bytes)
}
func composeFlags() []string {
containers := currentContainers()
flags := []string{"-f", "docker/docker-compose.yml"}
switch containers {
case "gitea":
flags = append(flags, "--profile", "gitea")
}
return flags
}
func runContainers(containers string) {
err := os.WriteFile("_working/current_containers", []byte(containers), 0633)
if err != nil {
panicError("could not write current_containers file: %v", err)
}
runCompose(append(composeFlags(), "up", "--build", "-d"))
runCompose(append(composeFlags(), "logs", "-f"))
}
func main() {
if len(os.Args) < 2 {
panicError("not enough arguments passed")
}
createDirs()
switch os.Args[1] {
case "default", "gitea":
runContainers(os.Args[1])
case "runprev":
runContainers(currentContainers())
case "stop":
runCompose(append(composeFlags(), "down"))
case "dbshell":
runCompose(append(composeFlags(), "exec", "cursorius-db", "psql", "--user=cursorius"))
case "logs":
runCompose(append(composeFlags(), "logs", "-f"))
case "ps":
runCompose(append(composeFlags(), "ps"))
case "help":
fmt.Println("commands: default, gitea, runprev, stop, dbshell, logs, ps, help")
default:
panicError("Unknown subcommand: %v", os.Args[1])
}
}
+61 -1
View File
@@ -3,8 +3,68 @@ module git.ohea.xyz/cursorius/server
go 1.19
require (
git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230405234139-34d8875b72f4
git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2
git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173
github.com/bufbuild/connect-go v1.4.1
github.com/docker/docker v20.10.22+incompatible
github.com/go-git/go-git/v5 v5.4.3-0.20220529141257-bc1f419cebcf
github.com/go-playground/webhooks/v6 v6.0.1
github.com/google/uuid v1.3.0
github.com/graphql-go/graphql v0.8.0
github.com/graphql-go/handler v0.2.3
github.com/jackc/pgx/v5 v5.2.0
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
golang.org/x/net v0.2.0
google.golang.org/protobuf v1.30.0
nhooyr.io/websocket v1.8.7
)
require github.com/pelletier/go-toml/v2 v2.0.5 // indirect
require (
code.gitea.io/gitea v1.17.4 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20220930113650-c6815a8c17ad // indirect
github.com/acomagu/bufpipe v1.0.3 // indirect
github.com/cloudflare/circl v1.2.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/go-git/gcfg v1.5.0 // indirect
github.com/go-git/go-billy/v5 v5.3.1 // indirect
github.com/gobwas/ws v1.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/xanzy/ssh-agent v0.3.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.2.1-0.20221112162523-6fad3dfc1891 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/tools v0.1.12 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.4.0 // indirect
)
replace github.com/go-playground/webhooks/v6 => git.ohea.xyz/cursorius/webhooks/v6 v6.0.2-0.20221224221147-a2bdbf1756ed
+2285 -1
View File
File diff suppressed because it is too large Load Diff
+60 -14
View File
@@ -2,31 +2,77 @@ package listen
import (
"fmt"
"git.ohea.xyz/cursorius/server/runner"
"github.com/op/go-logging"
"net/http"
"git.ohea.xyz/cursorius/server/admin_api"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_api"
"git.ohea.xyz/cursorius/server/runnermanager"
"git.ohea.xyz/cursorius/server/webhook"
"github.com/google/uuid"
"github.com/op/go-logging"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"nhooyr.io/websocket"
)
var log = logging.MustGetLogger("cursorius-server")
func setupHTTPServer(runnerCh chan runner.Run) {
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST":
log.Info("Got webhook")
runnerCh <- runner.Run{Name: "test-local"}
default:
log.Errorf("Got request with method \"%v\", ignoring...", r.Method)
func setupHTTPServer(
mux *http.ServeMux,
conf config.PipelineConf,
db database.Database,
runnerManagerChans runnermanager.RunnerManagerChans,
pollChan chan uuid.UUID,
) error {
webhook.CreateWebhookHandler(db, conf, mux)
pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux)
err := admin_api.CreateHandler(db, pollChan, mux)
if err != nil {
return fmt.Errorf("Could not create admin api handler: %w", err)
}
mux.HandleFunc("/runner", func(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
log.Errorf("Could not upgrade runner connection to websocket: %v", err)
return
}
go runnermanager.RegisterRunner(conn, runnerManagerChans.Registration)
})
return nil
}
func Listen(address string, port int, runnerCh chan runner.Run) {
func Listen(
mux *http.ServeMux,
address string,
port int,
conf config.PipelineConf,
db database.Database,
runnerManagerChans runnermanager.RunnerManagerChans,
pollChan chan uuid.UUID,
) error {
setupHTTPServer(runnerCh)
err := setupHTTPServer(
mux,
conf,
db,
runnerManagerChans,
pollChan,
)
if err != nil {
return fmt.Errorf("Could not setup http endpoints: %w", err)
}
connect_string := fmt.Sprintf("%v:%v", address, port)
log.Noticef("Launching HTTP server on %v\n", connect_string)
log.Fatal(http.ListenAndServe(connect_string, nil))
return http.ListenAndServe(
connect_string,
h2c.NewHandler(mux, &http2.Server{}),
)
}
+31 -7
View File
@@ -1,11 +1,15 @@
package main
import (
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/listen"
"git.ohea.xyz/cursorius/server/runner"
"github.com/op/go-logging"
"net/http"
"os"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/listen"
"git.ohea.xyz/cursorius/server/poll"
"git.ohea.xyz/cursorius/server/runnermanager"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
@@ -22,17 +26,37 @@ func main() {
logging.SetBackend(backendLeveled)
log.Info("Starting cursorius-server")
log.Info("Starting cursorius-server v0.1.0")
configData, err := config.GetConfig()
if err != nil {
log.Errorf("Could not get configuration: %v", err)
return
}
ch, err := runner.StartRunner(configData.Config.Jobs)
db, err := database.LaunchDB(configData.Config.DBConfig)
if err != nil {
log.Errorf("Could not launch db: %v", err)
return
}
runnerManagerChans, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil {
log.Errorf("Could not start runner: %v", err)
return
}
listen.Listen(configData.Config.Address, configData.Config.Port, ch)
pollChan := poll.StartPolling(configData.Config.PipelineConf, db)
mux := http.NewServeMux()
log.Fatal(listen.Listen(
mux,
configData.Config.Address,
configData.Config.Port,
configData.Config.PipelineConf,
db,
runnerManagerChans,
pollChan,
))
}
+197
View File
@@ -0,0 +1,197 @@
package pipeline_api
import (
"context"
"fmt"
"net/http"
"sync"
"time"
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
"git.ohea.xyz/cursorius/server/runnermanager"
"git.ohea.xyz/cursorius/server/util"
"github.com/bufbuild/connect-go"
"github.com/google/uuid"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
type ApiServer struct {
allocationCh chan runnermanager.RunnerAllocationRequest
releaseCh chan runnermanager.RunnerReleaseRequest
allocatedRunners map[uuid.UUID]*RunnerWrapper
allocatedRunnersMutex sync.RWMutex
}
type RunnerWrapper struct {
runner *runnermanager.Runner
mutex sync.Mutex
}
func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, string, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args)
return return_code, stdout, stderr, err
}
func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) {
r.mutex.Lock()
defer r.mutex.Unlock()
releaseCh <- runnermanager.RunnerReleaseRequest{
Runner: r.runner,
}
r.runner = nil
}
func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
s.allocatedRunnersMutex.RLock()
defer s.allocatedRunnersMutex.RUnlock()
runner, ok := s.allocatedRunners[u]
return runner, ok
}
func (s *ApiServer) AddRunnerToMap(u uuid.UUID, runner *runnermanager.Runner) {
s.allocatedRunnersMutex.Lock()
defer s.allocatedRunnersMutex.Unlock()
s.allocatedRunners[u] = &RunnerWrapper{runner: runner}
}
func (s *ApiServer) GetRunner(
ctx context.Context,
req *connect.Request[apiv2.GetRunnerRequest],
) (*connect.Response[apiv2.GetRunnerResponse], error) {
var response runnermanager.RunnerAllocationResponse
var timeoutCtx *context.Context
var retryInterval int64 = 0
respChan := make(chan runnermanager.RunnerAllocationResponse)
tagsStr := util.FormatTags(req.Msg.Tags)
if req.Msg.Options != nil {
if req.Msg.Options.Timeout != 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.Msg.Options.Timeout)*time.Second)
timeoutCtx = &ctx
defer cancel()
}
retryInterval = req.Msg.Options.RetryInterval
}
for {
s.allocationCh <- runnermanager.RunnerAllocationRequest{
Tags: req.Msg.Tags,
RespChan: respChan,
}
response = <-respChan
if response.Err == nil {
break
}
log.Infof("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
// If no timeout is specified, skip after one attempt
if timeoutCtx == nil {
break
}
// If timeout is expired, stop trying to allocate runner
if (*timeoutCtx).Err() != nil {
break
}
log.Infof("Sleeping for %v seconds before retry...", retryInterval)
time.Sleep(time.Duration(retryInterval) * time.Second)
}
if response.Runner == nil {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
}
log.Infof("Got runner with tags: %v", tagsStr)
runnerUuid := uuid.New()
s.AddRunnerToMap(runnerUuid, response.Runner)
res := connect.NewResponse(&apiv2.GetRunnerResponse{
Id: runnerUuid.String(),
})
res.Header().Set("GetRunner-Version", "v2")
return res, nil
}
func (s *ApiServer) ReleaseRunner(
ctx context.Context,
req *connect.Request[apiv2.ReleaseRunnerRequest],
) (*connect.Response[apiv2.ReleaseRunnerResponse], error) {
uuid, err := uuid.Parse(req.Msg.Id)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
}
log.Infof("Releasing runner with ID \"%v\"", uuid)
s.allocatedRunnersMutex.Lock()
runner := s.allocatedRunners[uuid]
delete(s.allocatedRunners, uuid)
runner.Release(s.releaseCh)
s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{})
res.Header().Set("ReleaseRunner-Version", "v2")
return res, nil
}
func (s *ApiServer) RunCommand(
ctx context.Context,
req *connect.Request[apiv2.RunCommandRequest],
) (*connect.Response[apiv2.RunCommandResponse], error) {
uuid, err := uuid.Parse(req.Msg.Id)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
}
runner, ok := s.GetRunnerFromMap(uuid)
if !ok {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
}
returnCode, stdout, stderr, err := runner.RunCommand(req.Msg.Command, req.Msg.Args)
if err != nil {
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
}
res := connect.NewResponse(&apiv2.RunCommandResponse{
ReturnCode: returnCode,
Stdout: stdout,
Stderr: stderr,
})
res.Header().Set("RunCommand-Version", "v2")
return res, nil
}
func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) {
api_server := &ApiServer{
allocationCh: allocationCh,
releaseCh: releaseCh,
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
}
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)
mux.Handle(path, handler)
path, handler = apiv2connect.NewReleaseRunnerServiceHandler(api_server)
mux.Handle(path, handler)
path, handler = apiv2connect.NewRunCommandServiceHandler(api_server)
mux.Handle(path, handler)
}
+265
View File
@@ -0,0 +1,265 @@
package pipeline_executor
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/jhoonb/archivex"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
"github.com/op/go-logging"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
)
var log = logging.MustGetLogger("cursorius-server")
type PipelineExecution struct {
Pipeline database.Pipeline
Ref string
Run database.Run
}
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
cloneFolder := filepath.Join(jobFolder, "repo")
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url)
log.Debugf("Job %v configured with folder \"%v\"", pe.Pipeline.Name, jobFolder)
err := os.RemoveAll(jobFolder)
if err != nil {
log.Errorf("could not delete existing folder %v", jobFolder)
return
}
err = os.MkdirAll(cloneFolder, 0755)
if err != nil {
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err)
return
}
log.Infof("Cloning source from URL %v", pe.Pipeline.Url)
var auth transport.AuthMethod
if pe.Pipeline.CloneCredential != nil {
credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential)
if err != nil {
log.Errorf("could not get credenital from db: %v", err)
return
}
switch credential.Type {
case "USER_PASS":
log.Debugf("job %v configured to use credential %v", pe.Pipeline.Name, credential.Name)
auth = transport.AuthMethod(&http.BasicAuth{
Username: credential.Username,
Password: credential.Secret,
})
case "SSH_KEY":
publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "")
if err != nil {
log.Errorf("could not parse credential %v", credential.Name)
return
}
auth = transport.AuthMethod(publicKeys)
default:
log.Errorf("unsupported credential type %v", credential.Type)
return
}
} else {
auth = nil
}
_, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
URL: pe.Pipeline.Url,
Auth: auth,
})
if err != nil {
log.Errorf("could not clone repo: %v", err)
return
}
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Errorf("Could not create docker client: %w", err)
return
}
log.Info("Source cloned successfully")
ctx := context.Background()
log.Info("Building container")
tarFile := filepath.Join(jobFolder, "archive.tar")
tar := new(archivex.TarFile)
err = tar.Create(tarFile)
if err != nil {
log.Errorf("could not create tarfile: %w", err)
return
}
err = tar.AddAll(cloneFolder, false)
if err != nil {
log.Errorf("could not add repo to tarfile: %w", err)
return
}
err = tar.Close()
if err != nil {
log.Errorf("could not close tarfile: %w", err)
return
}
dockerBuildContext, err := os.Open(tarFile)
defer dockerBuildContext.Close()
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: ".cursorius/Dockerfile",
})
if err != nil {
log.Errorf("could not build container: %w", err)
return
}
err = db.UpdateRunBuildOutput(pe.Run.Id, cleanupBuildOutput(buildResponse.Body))
if err != nil {
log.Errorf("could not update build output for run: %w", err)
return
}
err = buildResponse.Body.Close()
if err != nil {
log.Errorf("Could not close build response body: %w", err)
return
}
log.Info("Image built sucessfully")
hostConfig := container.HostConfig{}
if pipelineConf.DockerNetwork != nil {
hostConfig.NetworkMode = container.NetworkMode(*pipelineConf.DockerNetwork)
}
if pipelineConf.MountConf.Type == config.Bind {
sourceDir := filepath.Join(pipelineConf.MountConf.Source, pe.Pipeline.Id.String(), pe.Run.Id.String())
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeBind,
Source: sourceDir,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
} else if pipelineConf.MountConf.Type == config.Volume {
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeVolume,
Source: pipelineConf.MountConf.Source,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
}
env := make([]string, 0)
// set cursorius environment variables
env = append(env, []string{
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...)
// load secrets into environment
secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id)
if err != nil {
log.Errorf("Could not get secrets for pipeline", err)
return
}
for _, secret := range secrets {
// the env name is validated to be just uppercase letters, numbers, and underscores on ingestion
env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret))
}
resp, err := cli.ContainerCreate(ctx,
&container.Config{
Image: imageName,
Tty: false,
Env: env,
},
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig,
nil, nil, "",
)
if err != nil {
log.Errorf("could not create container: %w", err)
return
}
log.Info("Launching container")
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
log.Errorf("could not start container: %v", err)
return
}
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
log.Errorf("container returned error: %v", err)
return
}
case okBody := <-statusCh:
if okBody.Error != nil {
log.Errorf("Could not wait on container: %v", err)
return
} else {
log.Debugf("Container finished running with return code: %v", okBody.StatusCode)
pe.Run.Result = &okBody.StatusCode
}
}
pe.Run.InProgress = false
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil {
log.Errorf("could not get container logs: %w", err)
return
}
var stdOut bytes.Buffer
var stdErr bytes.Buffer
stdcopy.StdCopy(&stdOut, &stdErr, out)
pe.Run.Stdout = stdOut.Bytes()
pe.Run.Stderr = stdErr.Bytes()
db.UpdateRunResult(pe.Run)
return
}
+25
View File
@@ -0,0 +1,25 @@
package pipeline_executor
import (
"bufio"
"encoding/json"
"io"
)
func cleanupBuildOutput(input io.ReadCloser) string {
output := ""
scanner := bufio.NewScanner(input)
for scanner.Scan() {
var log map[string]any
json.Unmarshal(scanner.Bytes(), &log)
if logVar, ok := log["stream"]; ok {
if log, ok := logVar.(string); ok {
output += log
}
}
}
return output
}
+194
View File
@@ -0,0 +1,194 @@
package poll
import (
"context"
"time"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/google/uuid"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
type branch struct {
ref string
commitHash string
}
type tag struct {
ref string
commitHash string
}
func pollJob(ctx context.Context, pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
firstScan := true
for {
// Don't sleep on first scan to ease testing
// TODO: this should be replaced with a script that mocks a webhook
if !firstScan {
ctx, cancel := context.WithTimeout(ctx, time.Duration(pipeline.PollInterval)*time.Second)
select {
case <-ctx.Done():
switch ctx.Err() {
case context.Canceled:
log.Infof("Polling for pipeline %v canceled, stopping", pipeline.Name)
cancel()
return
}
}
cancel()
log.Infof("Polling repo %v", pipeline.Name)
} else {
firstScan = false
}
prevRefs, err := db.GetPipelineRefs(pipeline.Id)
if err != nil {
log.Errorf("Could not get pipeline refs from db: %v", err)
return
}
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: pipeline.Url,
})
if err != nil {
log.Errorf("Could not clone repo %v from url %v: %v", pipeline.Name, pipeline.Url, err)
continue
}
refsToRunFor := []string{}
// get branches
branches, err := repo.Branches()
if err != nil {
log.Errorf("Could not enumerate branches in repo %v: %v", pipeline.Name, err)
continue
}
branches.ForEach(func(branch *plumbing.Reference) error {
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevRefs[branch.Name().String()]
if ok {
if branch.Hash().String() != prevRef {
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String())
} else {
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
}
} else {
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String())
}
return nil
})
tags, err := repo.Tags()
if err != nil {
log.Errorf("Could not enumerate tags in repo %v: %v", pipeline.Name, err)
continue
}
tags.ForEach(func(tag *plumbing.Reference) error {
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevRefs[tag.Name().String()]
if ok {
if tag.Hash().String() != prevRef {
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String())
} else {
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
}
} else {
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String())
}
return nil
})
err = db.UpdatePipelineRefs(pipeline.Id, prevRefs)
if err != nil {
log.Errorf("Could not update pipeline refs: %v", err)
return
}
for _, ref := range refsToRunFor {
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
run, err := db.CreateRun(pipeline.Id)
if err != nil {
log.Errorf("Could not create run for pipeline with id \"%v\": ", pipeline.Id, err)
continue
}
pe := pipeline_executor.PipelineExecution{
Pipeline: pipeline,
Ref: ref,
Run: run,
}
go pipeline_executor.ExecutePipeline(pe, db, pipelineConf)
}
}
}
func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan chan uuid.UUID) {
pipelines, err := db.GetPipelines()
if err != nil {
log.Errorf("Could not get pipelines from database: %w", err)
return
}
pipelineCancelations := make(map[uuid.UUID]context.CancelFunc)
for _, pipeline := range pipelines {
if pipeline.PollInterval == 0 {
continue
} else {
ctx, cancel := context.WithCancel(context.Background())
pipelineCancelations[pipeline.Id] = cancel
log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id)
go pollJob(ctx, pipeline, conf, db)
}
}
for {
jobUUID := <-pollChan
pipeline, err := db.GetPipelineById(jobUUID)
if err != nil {
log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err)
continue
}
// Cancel existing polling job if it exists
if cancelFunc, ok := pipelineCancelations[pipeline.Id]; ok {
cancelFunc()
}
// Start new polling job
log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id)
ctx, cancel := context.WithCancel(context.Background())
pipelineCancelations[pipeline.Id] = cancel
go pollJob(ctx, pipeline, conf, db)
}
}
func StartPolling(conf config.PipelineConf, db database.Database) chan uuid.UUID {
pollChan := make(chan uuid.UUID)
go launchPollJobs(conf, db, pollChan)
return pollChan
}
-42
View File
@@ -1,42 +0,0 @@
package runner
import (
"git.ohea.xyz/cursorius/server/config"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
type Run struct {
Name string
}
func runJob(job config.Job) {
if job.Folder != nil {
log.Debugf("Job configured with folder \"%v\"", *job.Folder)
} else if job.URL != nil {
log.Debugf("Job configured with URL \"%v\"", *job.URL)
}
}
func runnerListen(ch chan Run, jobs map[string]config.Job) {
for {
run := <-ch
log.Debugf("Got Run: %v", run)
job, exists := jobs[run.Name]
if exists {
log.Infof("Launching run for job %v", run.Name)
go runJob(job)
} else {
log.Errorf("No configured job with name %v", run.Name)
}
}
}
func StartRunner(jobs map[string]config.Job) (chan Run, error) {
ch := make(chan Run)
go runnerListen(ch, jobs)
return ch, nil
}
+111
View File
@@ -0,0 +1,111 @@
package runnermanager
import (
"context"
"fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
)
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id uuid.UUID
tags []string
conn *websocket.Conn
receiveChan chan []byte
}
func (r *Runner) HasTags(requestedTags []string) bool {
tagIter:
for _, requestedTag := range requestedTags {
for _, posessedTag := range r.tags {
// if we find the tag, move on to search for the next one
if posessedTag == requestedTag {
continue tagIter
}
}
// if we don't find the tag
return false
}
return true
}
func (r *Runner) Id() uuid.UUID {
return r.id
}
func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) {
if r.conn == nil {
return 0, "", "", fmt.Errorf("runner with id %v has nil conn, THIS IS A BUG", r.id)
}
// Write RunCommand message to client
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{
Msg: &runner_api.ServerToRunnerMsg_RunCommandMsg{
RunCommandMsg: &runner_api.RunCommand{
Command: cmd,
Args: args,
},
},
}
err = r.sendProtoStruct(serverToRunnerMsg)
if err != nil {
err = fmt.Errorf("Could not send command to client: %w", err)
return
}
for {
// Read RunCommandFinalResponse message from client
data, ok := <-r.receiveChan
if !ok {
err = fmt.Errorf("Channel is closed on runner")
return
}
runnerToServerMsg := &runner_api.RunnerToServerMsg{}
if err = proto.Unmarshal(data, runnerToServerMsg); err != nil {
err = fmt.Errorf("Could not parse RunCommand response: %w", err)
r.conn.Close(websocket.StatusUnsupportedData, "Invalid message")
return
}
switch x := runnerToServerMsg.Msg.(type) {
case *runner_api.RunnerToServerMsg_RunCommandPartialResponseMsg:
stdout += x.RunCommandPartialResponseMsg.Stdout
stderr += x.RunCommandPartialResponseMsg.Stderr
case *runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg:
stdout += x.RunCommandFinalResponseMsg.PartialResponse.Stdout
stderr += x.RunCommandFinalResponseMsg.PartialResponse.Stderr
returnCode = x.RunCommandFinalResponseMsg.ReturnCode
return
}
}
}
func (r *Runner) sendProtoStruct(p protoreflect.ProtoMessage) error {
protoOut, err := proto.Marshal(p)
if err != nil {
return fmt.Errorf("Could not marshal proto: %w", err)
}
ctx := context.Background()
log.Debugf("r.conn: %p", r.conn)
if err := r.conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil {
return fmt.Errorf("Could not send proto to websocket: %w", err)
}
return nil
}
+206
View File
@@ -0,0 +1,206 @@
package runnermanager
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/op/go-logging"
"google.golang.org/protobuf/proto"
"nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/util"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
)
var log = logging.MustGetLogger("cursorius-server")
func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) {
tagsStr := util.FormatTags(req.Tags)
log.Infof("Got request for runner with tags \"%v\"", tagsStr)
log.Debugf("Finding runner with tags %v", tagsStr)
foundRunner := false
runnersToRemove := []int{}
runnerIter:
for i, runner := range r.connectedRunners {
// don't allocate runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner,
// so we aren't losing any data here
select {
case _, ok := <-runner.receiveChan:
if ok {
// this should never happen
// TODO: should we disconnect the runner?
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
continue
}
log.Noticef("Removing defunct runner \"%v\"", runner.id)
runnersToRemove = append(runnersToRemove, i)
default:
log.Debugf("Checking runner %v for requested tags", runner.id)
if !runner.HasTags(req.Tags) {
continue runnerIter
}
runnersToRemove = append(runnersToRemove, i)
foundRunner = true
log.Debugf("Runner %v has requested tags, allocating", runner.id)
req.RespChan <- RunnerAllocationResponse{
Runner: &r.connectedRunners[i],
Err: nil,
}
}
}
// remove allocated runner plus defunct runners
// since we iterate, all the indexes will be in accending order
for i, runnerInd := range runnersToRemove {
r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1]
r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-1]
}
if foundRunner {
return
}
errorMsg := "could not find valid runner"
if len(r.connectedRunners) == 0 {
errorMsg = "no connected runners"
}
req.RespChan <- RunnerAllocationResponse{
Runner: nil,
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
}
}
func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) {
log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret)
// Get runner with give id from database
runnerId, err := uuid.Parse(req.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if req.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags)
runner := Runner{
id: runnerId,
tags: req.Tags,
conn: req.conn,
receiveChan: make(chan []byte),
}
r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection
// this is required to keep the connection functioning
go func() {
defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan)
for {
msgType, data, err := req.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing
// This should probably be handled by sending erroring, but not 100% sure
log.Errorf("Could not read from connection: %v", err)
return
}
if msgType != websocket.MessageBinary {
close(runner.receiveChan)
log.Errorf("Got binary data from connection")
return
}
runner.receiveChan <- data
}
}()
}
func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) {
r.connectedRunners = append(r.connectedRunners, *req.Runner)
}
func runRunnerManager(r runnerManager) {
for {
select {
case request := <-r.chans.Allocation:
r.processRunnerAllocation(request)
case release := <-r.chans.Release:
r.processRunnerRelease(release)
case registration := <-r.chans.Registration:
r.processRunnerRegistration(registration)
}
}
}
func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) {
scheduler := runnerManager{
chans: RunnerManagerChans{
Allocation: make(chan RunnerAllocationRequest),
Release: make(chan RunnerReleaseRequest),
Registration: make(chan RunnerRegistrationRequest),
},
connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners,
db: db,
}
go runRunnerManager(scheduler)
return scheduler.chans, nil
}
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
var registration RunnerRegistrationRequest
registration.conn = conn
typ, r, err := conn.Read(ctx)
if err != nil {
log.Errorf("Could not read from runner websocket connection: %v", err)
log.Errorf("Disconnecting...")
return
}
if typ != websocket.MessageBinary {
log.Error("Got non binary message from runner, disconnecting...")
conn.Close(websocket.StatusUnsupportedData, "Requires binary data")
return
}
registration_proto := &runner_api.Register{}
if err := proto.Unmarshal(r, registration_proto); err != nil {
log.Error("Could not parse registration message from runner, disconnection....")
conn.Close(websocket.StatusUnsupportedData, "Invalid message")
return
}
registration.Secret = registration_proto.Secret
registration.Id = registration_proto.Id
registration.Tags = registration_proto.Tags
registerCh <- registration
}
+49
View File
@@ -0,0 +1,49 @@
package runnermanager
import (
"nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
)
type RunnerManagerChans struct {
Allocation chan RunnerAllocationRequest
Release chan RunnerReleaseRequest
Registration chan RunnerRegistrationRequest
}
type runnerManager struct {
chans RunnerManagerChans
connectedRunners []Runner
numConnectedRunners uint64
configuredRunners map[string]config.Runner
db database.Database
}
type RunnerAllocationRequest struct {
Tags []string
RespChan chan RunnerAllocationResponse
CancelChan chan string
}
type RunnerAllocationResponse struct {
Runner *Runner
Err error
}
type RunnerReleaseRequest struct {
Runner *Runner
}
type RunnerRegistrationRequest struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
}
type runnerJob struct {
Id string
URL string
}
+18
View File
@@ -0,0 +1,18 @@
package util
import (
"fmt"
"strings"
)
func FormatTags(tags []string) string {
var tagsStr strings.Builder
if len(tags) > 0 {
fmt.Fprintf(&tagsStr, "[%v", tags[0])
for _, tag := range tags[1:] {
fmt.Fprintf(&tagsStr, ", %v", tag)
}
fmt.Fprintf(&tagsStr, "]")
}
return tagsStr.String()
}
+101
View File
@@ -0,0 +1,101 @@
package webhook
import (
"net/http"
"strings"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/go-playground/webhooks/v6/gitea"
"github.com/google/uuid"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
func webhookHandler(w http.ResponseWriter, r *http.Request, db database.Database, conf config.PipelineConf) {
switch r.Method {
case "POST":
splitUrl := strings.Split(r.URL.Path, "/")
if len(splitUrl) != 4 {
log.Errorf("Webhook recieved with invalid url \"%v\", ignoring...", r.URL)
return
}
// get URL path after /webhook/
// TODO: verify that this handles all valid URL formats
pipelineUUIDStr := splitUrl[2]
webhookUUIDStr := splitUrl[3]
pipelineUUID, err := uuid.Parse(pipelineUUIDStr)
if err != nil {
log.Errorf("Could not parse pipeline UUID: %v", err)
return
}
webhookUUID, err := uuid.Parse(webhookUUIDStr)
if err != nil {
log.Errorf("Could not parse webhook UUID: %v", err)
return
}
pipeline, err := db.GetPipelineById(pipelineUUID)
if err != nil {
log.Errorf("Could not get webhooks for pipeline with UUID \"%v\": %v", pipelineUUID, err)
return
}
webhooks, err := db.GetWebhooksForPipeline(pipelineUUID)
if err != nil {
log.Errorf("Could not get webhooks for pipeline with UUID \"%v\": %v", webhookUUID, err)
return
}
if len(webhooks) < 1 {
log.Errorf("No webhooks configured for pipeline with UUID \"%v\"", webhookUUID)
return
}
for _, webhook := range webhooks {
if webhook.Id == webhookUUID {
switch webhook.ServerType {
case database.Gitea:
hook, err := gitea.New(gitea.Options.Secret(webhook.Secret))
if err != nil {
log.Errorf("Could not create Gitea webhook handler: %v", err)
return
}
payload, err := hook.Parse(r, gitea.PushEvent)
if err != nil {
if err == gitea.ErrEventNotFound {
log.Warning("Got webhook \"%v\" for unexpected event type, ignoring...", webhookUUID)
break
}
}
switch payload.(type) {
case gitea.PushPayload:
pushPayload := payload.(gitea.PushPayload)
pe := pipeline_executor.PipelineExecution{
Pipeline: pipeline,
Ref: pushPayload.Ref,
}
go pipeline_executor.ExecutePipeline(pe, db, conf)
}
}
}
}
log.Errorf("No webhook found with ID \"%v\"", webhookUUID)
default:
log.Errorf("Got request with method \"%v\", ignoring...", r.Method)
}
}
func CreateWebhookHandler(db database.Database, conf config.PipelineConf, mux *http.ServeMux) {
mux.HandleFunc("/webhook/", func(w http.ResponseWriter, r *http.Request) {
webhookHandler(w, r, db, conf)
})
}