Compare commits

51 Commits

Author SHA1 Message Date
restitux 66eba61bbe Remove leftover debug print (#26) 2023-05-04 20:40:01 -06:00
restitux dc09c0f4f5 Add shell script to publish container 2023-05-04 20:35:39 -06:00
restitux dcaeaeb6d6 Fix pipeline executor logging to use Infof 2023-05-04 20:27:33 -06:00
restitux 32a86ace9f Update ON CONFLICT with pipeline_refs primary key 2023-05-04 20:26:43 -06:00
restitux 89be2c4816 Correct primary key on pipeline_refs table 2023-05-04 20:15:37 -06:00
restitux 63c0f83c16 Add debug print of prevRefs to polling logic 2023-05-04 19:21:22 -06:00
restitux 8188bd391b Add debug print to new ref value codepath in polling logic 2023-05-04 19:16:17 -06:00
restitux 8ac90700bf Add debug logging to pipeline_executor 2023-04-23 04:59:46 -06:00
restitux 1882d14bee Change version 2023-04-09 21:41:22 -06:00
restitux 4f7b315f54 Complete support for cron pipeline triggering 2023-04-08 16:28:00 -06:00
restitux fe9e1cac15 Update cron type with ref pattern 2023-04-08 15:18:29 -06:00
restitux a9481fa9bc Add scaffolding for cron trigger support 2023-04-08 14:42:23 -06:00
restitux e1382e50ea Cleanup docker build output before saving (#23) 2023-04-07 20:08:59 -06:00
restitux 7f44e5ed41 Add nil check to runner chan (#22) 2023-04-07 19:52:44 -06:00
restitux bcc53dfbe0 Refactored runner map wrap and add to function 2023-04-07 19:52:37 -06:00
restitux bbf96498aa Add updatePipeline endpoint 2023-04-07 18:44:04 -06:00
restitux 954966db58 Start/restart poll job when created or updated
This currently contains the logic for restarting updated jobs,
but nothing exercises this logic. The logic for starting polling
for a newly created pipeline is implemented.
2023-04-07 18:31:59 -06:00
restitux ed7df18f83 Add timeout and retry interval to GetRunner api 2023-04-05 18:38:06 -06:00
restitux a8e9a68f0e Don't sleep on first scan to ease testing 2023-04-05 18:01:10 -06:00
restitux 20c664f0ed Reorganize docker configuration 2023-03-11 18:56:13 -07:00
restitux 0000ea2a13 Rewrite run-dev.sh to use golang program 2023-03-11 13:05:50 -07:00
restitux fe53a17160 Runners are removed from manager when alloacted
This removes an existing unlocked shared access to runner.running.
This also sets us up for better management of the runners.
2023-03-08 00:13:40 -07:00
restitux f190274bce Utilify runner tag printing function 2023-03-07 23:42:01 -07:00
restitux d9ba14550e Removed outdated comments 2023-03-07 19:47:57 -07:00
restitux a2acb99689 Fix not using correct environment variables 2023-02-25 02:57:13 -07:00
restitux 191b73fe41 Fix index out of range issue with empty tag list printing 2023-02-25 02:51:11 -07:00
restitux 3ca1481632 Make compose network external 2023-02-25 02:32:04 -07:00
restitux c0e33fa52a Update runner manager for new database driven runner config 2023-02-25 02:31:54 -07:00
restitux 63529b7174 Fix runner creation api 2023-02-25 01:36:55 -07:00
restitux 7e7c49c2e7 Exposed build output to api 2023-02-24 23:59:02 -07:00
restitux 712a7b1429 Fix graphql api not returning stdout/stderr as strings 2023-02-24 23:20:40 -07:00
restitux 3ae27bffc5 Cleanup logging 2023-02-24 23:00:31 -07:00
restitux 5373a37bee Change RUNID env var name 2023-02-24 23:00:26 -07:00
restitux 6fee5aa268 Record docker build output in database 2023-02-24 23:00:10 -07:00
restitux b475631df6 Remove outdated todo 2023-02-24 22:51:19 -07:00
restitux fbf918d627 Fix typo 2023-02-24 22:46:15 -07:00
restitux 4069e1b0e1 Update executor to run built user provided container 2023-02-24 22:37:17 -07:00
restitux 708fbca91a Tag built container with build and run name 2023-02-24 22:32:24 -07:00
restitux 77a5514578 Fix secret name validation not checking for beginning of string 2023-02-24 22:29:21 -07:00
restitux 77a8d0840a Fix inserting new repeated refs into db failing #20 2023-02-24 22:28:50 -07:00
restitux 62b4e8f17e Add runner query api 2023-02-24 22:28:07 -07:00
restitux 620c20f717 Change pipeline executor to build container in repo at .cursorius/Dockerfile 2023-02-24 22:27:07 -07:00
restitux 0979a2379e Change poll logic to query refs after sleep for easire debugging 2023-02-24 22:26:41 -07:00
restitux 85ebd856eb Add graphiql testing workflow (#15) 2023-02-18 20:45:57 -07:00
restitux 6b103d074e Add runprev launcher command 2023-02-18 19:27:17 -07:00
restitux 664fe8fd09 Remove replaced Job configuration from config file 2023-02-14 20:43:04 -07:00
restitux edafd5108a Add validation to secret names 2023-02-14 20:39:03 -07:00
restitux c0f6186eac Fix run progress field name (graphql) 2023-02-14 20:19:21 -07:00
restitux bfd05b6a8a Add secrets support (#14) 2023-02-14 20:18:41 -07:00
restitux 6d2936393b Persist repo ref hashes in db 2023-02-08 18:44:54 -07:00
restitux e4043ae3be Correct case of member name 2023-02-08 18:06:54 -07:00
29 changed files with 1835 additions and 444 deletions
+450 -36
View File
@@ -13,16 +13,127 @@ import (
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
func createSchema(db database.Database) (graphql.Schema, error) { func createSchema(db database.Database, pollChan chan uuid.UUID, cronChan chan uuid.UUID) (graphql.Schema, error) {
credentialType := graphql.NewObject(graphql.ObjectConfig{ runnerType := graphql.NewObject(graphql.ObjectConfig{
Name: "Credential", Name: "Runner",
Description: "A runner available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the runner.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Name, nil
}
return nil, nil
},
},
"token": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The token.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if runner, ok := p.Source.(database.Runner); ok {
return runner.Token, nil
}
return nil, nil
},
},
},
})
secretType := graphql.NewObject(graphql.ObjectConfig{
Name: "Secret",
Description: "A secret available for use inside of a pipeline.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Id, nil
}
return nil, nil
},
},
"name": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The name of the secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Name, nil
}
return nil, nil
},
},
"secret": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The secret.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if secret, ok := p.Source.(database.Secret); ok {
return secret.Secret, nil
}
return nil, nil
},
},
},
})
cronType := graphql.NewObject(graphql.ObjectConfig{
Name: "Cron",
Description: "A cron available for trigger pipeline runs.",
Fields: graphql.Fields{
"id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The id of the cron.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if cron, ok := p.Source.(database.Cron); ok {
return cron.Id, nil
}
return nil, nil
},
},
"cron": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "The cron.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if cron, ok := p.Source.(database.Cron); ok {
return cron.Cron, nil
}
return nil, nil
},
},
"pattern": &graphql.Field{
Type: graphql.NewNonNull(graphql.String),
Description: "A pattern for determining what refs to run the cron on.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if cron, ok := p.Source.(database.Cron); ok {
return cron.Pattern, nil
}
return nil, nil
},
},
},
})
cloneCredentialType := graphql.NewObject(graphql.ObjectConfig{
Name: "CloneCredential",
Description: "A credential for authenticating with the pipeline source host.", Description: "A credential for authenticating with the pipeline source host.",
Fields: graphql.Fields{ Fields: graphql.Fields{
"id": &graphql.Field{ "id": &graphql.Field{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
Description: "The id of the credential.", Description: "The id of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.Credential); ok { if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Id, nil return credential.Id, nil
} }
return nil, nil return nil, nil
@@ -32,7 +143,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
Description: "The name of the credential.", Description: "The name of the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.Credential); ok { if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Name, nil return credential.Name, nil
} }
return nil, nil return nil, nil
@@ -42,7 +153,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
Description: "The credential type.", Description: "The credential type.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.Credential); ok { if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Type, nil return credential.Type, nil
} }
return nil, nil return nil, nil
@@ -52,7 +163,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
Description: "The username to user with the credential.", Description: "The username to user with the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.Credential); ok { if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Username, nil return credential.Username, nil
} }
return nil, nil return nil, nil
@@ -62,7 +173,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
Description: "The secret for the credential.", Description: "The secret for the credential.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if credential, ok := p.Source.(database.Credential); ok { if credential, ok := p.Source.(database.CloneCredential); ok {
return credential.Secret, nil return credential.Secret, nil
} }
return nil, nil return nil, nil
@@ -122,7 +233,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"progress": &graphql.Field{ "inProgress": &graphql.Field{
Type: graphql.Boolean, Type: graphql.Boolean,
Description: "The progress status of the run.", Description: "The progress status of the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
@@ -143,12 +254,22 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"buildOutput": &graphql.Field{
Type: graphql.String,
Description: "Logs of the top level container build for the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok {
return string(run.BuildOutput), nil
}
return nil, nil
},
},
"stdout": &graphql.Field{ "stdout": &graphql.Field{
Type: graphql.String, Type: graphql.String,
Description: "The stdout used to validate the run.", Description: "The stdout used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stdout, nil return string(run.Stdout), nil
} }
return nil, nil return nil, nil
}, },
@@ -158,7 +279,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
Description: "The stderr used to validate the run.", Description: "The stderr used to validate the run.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if run, ok := p.Source.(database.Run); ok { if run, ok := p.Source.(database.Run); ok {
return run.Stderr, nil return string(run.Stderr), nil
} }
return nil, nil return nil, nil
}, },
@@ -210,16 +331,38 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, nil return nil, nil
}, },
}, },
"credentialId": &graphql.Field{ "cloneCredential": &graphql.Field{
Type: graphql.String, Type: cloneCredentialType,
Description: "The configured credential for cloning the pipeline source.", Description: "The configured credential for cloning the pipeline source.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok { if pipeline, ok := p.Source.(database.Pipeline); ok {
return pipeline.Credential, nil if pipeline.CloneCredential != nil {
return db.GetCloneCredentialById(*pipeline.CloneCredential)
}
} }
return nil, nil return nil, nil
}, },
}, },
"secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(secretType))),
Description: "The list of secrets for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetSecretsForPipeline(pipeline.Id)
}
return []database.Secret{}, nil
},
},
"crons": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(cronType))),
Description: "The list of crons for the pipeline.",
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
if pipeline, ok := p.Source.(database.Pipeline); ok {
return db.GetCronsForPipeline(pipeline.Id)
}
return []database.Cron{}, nil
},
},
"webhooks": &graphql.Field{ "webhooks": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(webhookType))), Type: graphql.NewNonNull(graphql.NewList(graphql.NewNonNull(webhookType))),
Description: "The list of webhooks for the pipeline.", Description: "The list of webhooks for the pipeline.",
@@ -269,8 +412,8 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return db.GetPipelines() return db.GetPipelines()
}, },
}, },
"Credential": &graphql.Field{ "CloneCredential": &graphql.Field{
Type: credentialType, Type: cloneCredentialType,
Args: graphql.FieldConfigArgument{ Args: graphql.FieldConfigArgument{
"id": &graphql.ArgumentConfig{ "id": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
@@ -282,16 +425,30 @@ func createSchema(db database.Database) (graphql.Schema, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return db.GetCredentialById(id) return db.GetCloneCredentialById(id)
}, },
}, },
"Credentials": &graphql.Field{ "CloneCredentials": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(credentialType)), Type: graphql.NewNonNull(graphql.NewList(cloneCredentialType)),
Args: graphql.FieldConfigArgument{}, Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) { Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetCredentials() return db.GetCredentials()
}, },
}, },
"Secrets": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(secretType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetSecrets()
},
},
"Runners": &graphql.Field{
Type: graphql.NewNonNull(graphql.NewList(runnerType)),
Args: graphql.FieldConfigArgument{},
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
return db.GetRunners()
},
},
}, },
}) })
@@ -311,7 +468,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
"pollInterval": &graphql.ArgumentConfig{ "pollInterval": &graphql.ArgumentConfig{
Type: graphql.Int, Type: graphql.Int,
}, },
"credentialId": &graphql.ArgumentConfig{ "cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String, Type: graphql.String,
}, },
}, },
@@ -324,7 +481,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
} }
var credential *uuid.UUID var credential *uuid.UUID
if credentialVal, ok := params.Args["credentialId"]; ok { if credentialVal, ok := params.Args["cloneCredentialId"]; ok {
id, err := uuid.Parse(credentialVal.(string)) id, err := uuid.Parse(credentialVal.(string))
if err != nil { if err != nil {
return nil, err return nil, err
@@ -344,6 +501,8 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, err return nil, err
} }
pollChan <- pipeline.Id
return pipeline, nil return pipeline, nil
}, },
}, },
@@ -354,7 +513,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
"type": &graphql.ArgumentConfig{ "type": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
}, },
"pipeline_id": &graphql.ArgumentConfig{ "pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
}, },
}, },
@@ -375,9 +534,9 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return webhook, nil return webhook, nil
}, },
}, },
"createCredential": &graphql.Field{ "createCloneCredential": &graphql.Field{
Type: credentialType, Type: cloneCredentialType,
Description: "Create a new credential", Description: "Create a new CloneCredential",
Args: graphql.FieldConfigArgument{ Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{ "name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String), Type: graphql.NewNonNull(graphql.String),
@@ -396,7 +555,7 @@ func createSchema(db database.Database) (graphql.Schema, error) {
credential, err := db.CreateCredential( credential, err := db.CreateCredential(
params.Args["name"].(string), params.Args["name"].(string),
database.CredentialType(params.Args["type"].(string)), database.CloneCredentialType(params.Args["type"].(string)),
params.Args["username"].(string), params.Args["username"].(string),
params.Args["secret"].(string), params.Args["secret"].(string),
) )
@@ -406,11 +565,119 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return credential, nil return credential, nil
}, },
}, },
"setPipelineCredential": &graphql.Field{ "createSecret": &graphql.Field{
Type: pipelineType, Type: secretType,
Description: "Add an credential to a pipeline", Description: "Create a new secret",
Args: graphql.FieldConfigArgument{ Args: graphql.FieldConfigArgument{
"credentialId": &graphql.ArgumentConfig{ "name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"secret": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secret, err := db.CreateSecret(
params.Args["name"].(string),
params.Args["secret"].(string),
)
if err != nil {
return nil, err
}
return secret, nil
},
},
"createRunner": &graphql.Field{
Type: runnerType,
Description: "Create a new runner",
Args: graphql.FieldConfigArgument{
"name": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
runner, err := db.CreateRunner(
params.Args["name"].(string),
)
if err != nil {
return nil, err
}
return runner, nil
},
},
"updatePipeline": &graphql.Field{
Type: pipelineType,
Description: "Create a new pipeline",
Args: graphql.FieldConfigArgument{
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"name": &graphql.ArgumentConfig{
Type: graphql.String,
},
"url": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pollInterval": &graphql.ArgumentConfig{
Type: graphql.Int,
},
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String,
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
var name *string
var url *string
var interval *int
if nameVal, ok := params.Args["name"]; ok {
nameVal := nameVal.(string)
name = &nameVal
} else {
name = nil
}
if urlVal, ok := params.Args["url"]; ok {
urlVal := urlVal.(string)
url = &urlVal
} else {
url = nil
}
if intervalVal, ok := params.Args["pollInterval"]; ok {
intervalVal := intervalVal.(int)
interval = &intervalVal
} else {
interval = nil
}
pipeline, err := db.UpdatePipeline(
pipelineId,
name,
url,
interval,
)
if err != nil {
return nil, err
}
pollChan <- pipeline.Id
return pipeline, nil
},
},
"setPipelineCloneCredential": &graphql.Field{
Type: pipelineType,
Description: "Set the CloneCredential used by a pipeline to clone the source repo",
Args: graphql.FieldConfigArgument{
"cloneCredentialId": &graphql.ArgumentConfig{
Type: graphql.String, Type: graphql.String,
}, },
"pipelineId": &graphql.ArgumentConfig{ "pipelineId": &graphql.ArgumentConfig{
@@ -424,19 +691,19 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return nil, err return nil, err
} }
if credentialIdVal, ok := params.Args["credentialId"]; ok { if cloneCredentialIdVal, ok := params.Args["cloneCredentialId"]; ok {
credentialId, err := uuid.Parse(credentialIdVal.(string)) cloneCredentialId, err := uuid.Parse(cloneCredentialIdVal.(string))
if err != nil { if err != nil {
return nil, err return nil, err
} }
pipeline, err := db.SetPipelineCredential(pipelineId, &credentialId) pipeline, err := db.SetPipelineCloneCredential(pipelineId, &cloneCredentialId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return pipeline, nil return pipeline, nil
} else { } else {
pipeline, err := db.SetPipelineCredential(pipelineId, nil) pipeline, err := db.SetPipelineCloneCredential(pipelineId, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -445,6 +712,153 @@ func createSchema(db database.Database) (graphql.Schema, error) {
}, },
}, },
"addSecretToPipeline": &graphql.Field{
Type: pipelineType,
Description: "Allow a secret to be accessed by a pipeline.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.AssignSecretToPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
"removeSecretFromPipeline": &graphql.Field{
Type: pipelineType,
Description: "Remove a pipeline's access to a secret.",
Args: graphql.FieldConfigArgument{
"secretId": &graphql.ArgumentConfig{
Type: graphql.String,
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
secretId, err := uuid.Parse(params.Args["secretId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.RemoveSecretFromPipeline(pipelineId, secretId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
return pipeline, nil
},
},
"addCronToPipeline": &graphql.Field{
Type: pipelineType,
Description: "Add a cron string to trigger the pipeline",
Args: graphql.FieldConfigArgument{
"cron": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"pattern": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
cron := params.Args["cron"].(string)
pattern := params.Args["pattern"].(string)
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
cronObj, err := db.AddCronForPipeline(pipelineId, cron, pattern)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
cronChan <- cronObj.Id
return pipeline, nil
},
},
"removeCronFromPipeline": &graphql.Field{
Type: pipelineType,
Description: "Remove a cron trigger from a pipeline.",
Args: graphql.FieldConfigArgument{
"cronId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
"pipelineId": &graphql.ArgumentConfig{
Type: graphql.NewNonNull(graphql.String),
},
},
Resolve: func(params graphql.ResolveParams) (interface{}, error) {
cronId, err := uuid.Parse(params.Args["cronId"].(string))
if err != nil {
return nil, err
}
pipelineId, err := uuid.Parse(params.Args["pipelineId"].(string))
if err != nil {
return nil, err
}
err = db.RemoveCronForPipeline(cronId)
if err != nil {
return nil, err
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return nil, err
}
cronChan <- cronId
return pipeline, nil
},
},
}, },
}) })
@@ -459,9 +873,9 @@ func createSchema(db database.Database) (graphql.Schema, error) {
return schema, nil return schema, nil
} }
func CreateHandler(db database.Database, mux *http.ServeMux) error { func CreateHandler(db database.Database, pollChan chan uuid.UUID, cronChan chan uuid.UUID, mux *http.ServeMux) error {
schema, err := createSchema(db) schema, err := createSchema(db, pollChan, cronChan)
if err != nil { if err != nil {
return err return err
} }
-20
View File
@@ -6,24 +6,6 @@ import (
"git.ohea.xyz/golang/config" "git.ohea.xyz/golang/config"
) )
type WebhookSender string
const (
Gitea WebhookSender = "gitea"
)
type Webhook struct {
Sender WebhookSender
Secret string
}
type Job struct {
URL string
Webhook *Webhook
Cron *string
PollInterval uint64
}
type Runner struct { type Runner struct {
Secret string Secret string
} }
@@ -60,7 +42,6 @@ type Config struct {
Port int Port int
DBConfig DBConfig DBConfig DBConfig
PipelineConf PipelineConf PipelineConf PipelineConf
Jobs map[string]Job
Runners map[string]Runner Runners map[string]Runner
} }
@@ -88,7 +69,6 @@ func GetConfig() (config.Config[Config], error) {
Source: "/opt/cursorius/working", Source: "/opt/cursorius/working",
}, },
}, },
Jobs: make(map[string]Job),
Runners: make(map[string]Runner), Runners: make(map[string]Runner),
}, },
} }
+121
View File
@@ -0,0 +1,121 @@
package cron
import (
"fmt"
"regexp"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/google/uuid"
"github.com/op/go-logging"
"github.com/robfig/cron/v3"
)
var log = logging.MustGetLogger("cursorius-server")
func runPipeline(db database.Database, conf config.PipelineConf, pipelineId uuid.UUID, cron database.Cron) error {
run, err := db.CreateRun(pipelineId)
if err != nil {
return fmt.Errorf("Could not create run for pipeline with id %v: %w", pipelineId, err)
}
refs, err := db.GetPipelineRefs(pipelineId)
if err != nil {
return fmt.Errorf("Could not get refs for pipeline with id %v: %w", pipelineId, err)
}
useRef, err := regexp.Compile(cron.Pattern)
if err != nil {
return fmt.Errorf("Could not compile regex for cron %v for pipeline %v: %w", cron.Id, pipelineId, err)
}
for ref := range refs {
if !useRef.MatchString(ref) {
log.Debugf("Skipping ref %v for pipeline %v as regex %v in cron %v doesn't match", ref, pipelineId, cron.Pattern, cron.Id)
continue
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return fmt.Errorf("could not get pipeline with id %v from db: %w", pipelineId, err)
}
pe := pipeline_executor.PipelineExecution{
Pipeline: pipeline,
Ref: ref,
Run: run,
}
go pipeline_executor.ExecutePipeline(pe, db, conf)
}
return nil
}
func launchCrons(db database.Database, conf config.PipelineConf, updateChan chan uuid.UUID) {
pipelines, err := db.GetPipelines()
if err != nil {
log.Errorf("Could not get pipelines from database: %w", err)
return
}
cronManager := cron.New()
cronEntries := make(map[uuid.UUID]cron.EntryID)
for _, pipeline := range pipelines {
crons, err := db.GetCronsForPipeline(pipeline.Id)
if err != nil {
log.Errorf("Could not get crons for pipeline with id \"%v\": %w", pipeline.Id, err)
return
}
log.Infof("Starting crons for pipeline %v with id %v", pipeline.Name, pipeline.Id)
for _, cron := range crons {
cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() {
log.Infof("Triggering cron with value \"%v\" for pipeline with id \"%v\"", cron.Cron, cron.PipelineId)
err := runPipeline(db, conf, pipeline.Id, cron)
if err != nil {
log.Errorf("Could not run pipeline with id \"%v\": %w", pipeline.Id, err)
}
})
if err != nil {
log.Errorf("Could not configure cron for pipeline with id \"%v\": %w", pipeline.Id, err)
}
}
}
cronManager.Start()
for {
cronUUID := <-updateChan
if entryId, ok := cronEntries[cronUUID]; ok {
log.Infof("Canceling cron %v", cronUUID)
cronManager.Remove(entryId)
}
cron, err := db.GetCronById(cronUUID)
// if cron no longer exists, don't try to restart it
// TODO: this squashes other DB errors
if err != nil {
continue
}
log.Infof("Starting cron %v with value %v for pipeline %v", cron.Id, cron.Cron, cron.PipelineId)
cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() {
err := runPipeline(db, conf, cron.PipelineId, cron)
if err != nil {
log.Errorf("Could not setup run pipeline with id \"%v\": %w", cron.PipelineId, err)
}
})
}
}
func StartCrons(conf config.PipelineConf, db database.Database) chan uuid.UUID {
cronChan := make(chan uuid.UUID)
go launchCrons(db, conf, cronChan)
return cronChan
}
+50 -7
View File
@@ -95,7 +95,7 @@ CREATE TABLE version (
); );
CREATE TABLE credentials ( CREATE TABLE clone_credentials (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
name TEXT NOT NULL, name TEXT NOT NULL,
type TEXT NOT NULL, type TEXT NOT NULL,
@@ -108,11 +108,30 @@ CREATE TABLE pipelines (
name TEXT NOT NULL, name TEXT NOT NULL,
url TEXT NOT NULL, url TEXT NOT NULL,
poll_interval INTEGER, poll_interval INTEGER,
credential UUID DEFAULT NULL, clone_credential UUID DEFAULT NULL,
CONSTRAINT fk_credential CONSTRAINT fk_clone_credential
FOREIGN KEY(credential) FOREIGN KEY(clone_credential)
REFERENCES credentials(id) REFERENCES clone_credentials(id)
);
CREATE TABLE secrets (
id UUID PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
secret TEXT NOT NULL
);
CREATE TABLE pipeline_secret_mappings (
pipeline UUID NOT NULL,
secret UUID NOT NULL,
CONSTRAINT fk_pipeline
FOREIGN KEY(pipeline)
REFERENCES pipelines(id),
CONSTRAINT fk_secret
FOREIGN KEY(secret)
REFERENCES secrets(id)
); );
CREATE TABLE webhooks ( CREATE TABLE webhooks (
@@ -130,6 +149,7 @@ CREATE TABLE runs (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
pipeline UUID, pipeline UUID,
in_progress BOOLEAN DEFAULT NULL, in_progress BOOLEAN DEFAULT NULL,
build_output TEXT DEFAULT NULL,
result BIGINT DEFAULT NULL, result BIGINT DEFAULT NULL,
stdout TEXT DEFAULT NULL, stdout TEXT DEFAULT NULL,
stderr TEXT DEFAULT NULL, stderr TEXT DEFAULT NULL,
@@ -156,8 +176,31 @@ CREATE TABLE command_executions (
CREATE TABLE runners ( CREATE TABLE runners (
id UUID PRIMARY KEY, id UUID PRIMARY KEY,
name TEXT, name TEXT NOT NULL UNIQUE,
secret TEXT token TEXT NOT NULL
);
CREATE TABLE pipeline_refs (
name TEXT NOT NULL,
pipeline_id UUID NOT NULL,
hash TEXT NOT NULL,
PRIMARY KEY(name, pipeline_id),
CONSTRAINT fk_pipeline_id
FOREIGN KEY(pipeline_id)
REFERENCES pipelines(id)
);
CREATE TABLE crons (
id UUID PRIMARY KEY,
pipeline_id UUID NOT NULL,
cron TEXT NOT NULL,
pattern TEXT NOT NULL,
CONSTRAINT fk_pipeline_id
FOREIGN KEY(pipeline_id)
REFERENCES pipelines(id)
); );
` `
+437 -23
View File
@@ -3,13 +3,14 @@ package database
import ( import (
"context" "context"
"fmt" "fmt"
"regexp"
"github.com/google/uuid" "github.com/google/uuid"
) )
func (db *Database) GetPipelines() ([]Pipeline, error) { func (db *Database) GetPipelines() ([]Pipeline, error) {
query := ` query := `
SELECT id, name, url, poll_interval, credential SELECT id, name, url, poll_interval, clone_credential
FROM pipelines;` FROM pipelines;`
pipelines := make([]Pipeline, 0) pipelines := make([]Pipeline, 0)
@@ -23,7 +24,7 @@ FROM pipelines;`
for rows.Next() { for rows.Next() {
var pipeline Pipeline var pipeline Pipeline
var idStr string var idStr string
if err := rows.Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.Credential); err != nil { if err := rows.Scan(&idStr, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential); err != nil {
return pipelines, err return pipelines, err
} }
@@ -57,7 +58,7 @@ WHERE id=$1;`
func (db *Database) CreatePipeline(name string, url string, pollInterval int, credential *uuid.UUID) (Pipeline, error) { func (db *Database) CreatePipeline(name string, url string, pollInterval int, credential *uuid.UUID) (Pipeline, error) {
query := ` query := `
INSERT INTO pipelines (id, name, url, poll_interval, credential) INSERT INTO pipelines (id, name, url, poll_interval, clone_credential)
VALUES (uuid_generate_v4(), $1, $2, $3, $4) VALUES (uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, url, poll_interval;` RETURNING id, name, url, poll_interval;`
@@ -76,12 +77,55 @@ RETURNING id, name, url, poll_interval;`
return pipeline, nil return pipeline, nil
} }
func (db *Database) SetPipelineCredential(pipelineId uuid.UUID, credentialId *uuid.UUID) (Pipeline, error) { func (db *Database) UpdatePipeline(pipelineId uuid.UUID, name *string, url *string, pollInterval *int) (Pipeline, error) {
query := ` query := `
UPDATE pipelines UPDATE pipelines
SET credential=$1 SET name=$1, url=$2, poll_interval=$3
WHERE id=$4
RETURNING name, url, poll_interval, clone_credential;`
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return pipeline, err
}
var nameNew string
var urlNew string
var pollIntervalNew int
if name != nil {
nameNew = *name
} else {
nameNew = pipeline.Name
}
if url != nil {
urlNew = *url
} else {
urlNew = pipeline.Url
}
if pollInterval != nil {
pollIntervalNew = *pollInterval
} else {
pollIntervalNew = pipeline.PollInterval
}
err = db.Conn.QueryRow(context.Background(),
query, nameNew, urlNew, pollIntervalNew, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
)
if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
}
return pipeline, err
}
func (db *Database) SetPipelineCloneCredential(pipelineId uuid.UUID, credentialId *uuid.UUID) (Pipeline, error) {
query := `
UPDATE pipelines
SET clone_credential=$1
WHERE id=$2 WHERE id=$2
RETURNING name, url, poll_interval, credential;` RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{ pipeline := Pipeline{
Id: pipelineId, Id: pipelineId,
@@ -89,7 +133,7 @@ RETURNING name, url, poll_interval, credential;`
err := db.Conn.QueryRow(context.Background(), err := db.Conn.QueryRow(context.Background(),
query, credentialId, pipelineId).Scan( query, credentialId, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.Credential, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
) )
if err != nil { if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err) return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
@@ -103,7 +147,7 @@ func (db *Database) RemovePipelineCredential(pipelineId uuid.UUID) (Pipeline, er
UPDATE pipelines UPDATE pipelines
SET credential=null SET credential=null
WHERE id=$1 WHERE id=$1
RETURNING name, url, poll_interval, credential;` RETURNING name, url, poll_interval, clone_credential;`
pipeline := Pipeline{ pipeline := Pipeline{
Id: pipelineId, Id: pipelineId,
@@ -111,7 +155,7 @@ RETURNING name, url, poll_interval, credential;`
err := db.Conn.QueryRow(context.Background(), err := db.Conn.QueryRow(context.Background(),
query, pipelineId).Scan( query, pipelineId).Scan(
&pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.Credential, &pipeline.Name, &pipeline.Url, &pipeline.PollInterval, &pipeline.CloneCredential,
) )
if err != nil { if err != nil {
return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err) return pipeline, fmt.Errorf("Could not add credential to pipeline: %w", err)
@@ -193,13 +237,13 @@ RETURNING id, server_type, secret, pipeline;`
return webhook, nil return webhook, nil
} }
func (db *Database) CreateCredential(name string, credentialtype CredentialType, username string, secret string) (Credential, error) { func (db *Database) CreateCredential(name string, credentialtype CloneCredentialType, username string, secret string) (CloneCredential, error) {
query := ` query := `
INSERT INTO credentials (id, name, type, username, secret) INSERT INTO clone_credentials (id, name, type, username, secret)
VALUES(uuid_generate_v4(), $1, $2, $3, $4) VALUES(uuid_generate_v4(), $1, $2, $3, $4)
RETURNING id, name, type, username, secret;` RETURNING id, name, type, username, secret;`
credential := Credential{} credential := CloneCredential{}
var idStr string var idStr string
err := db.Conn.QueryRow( err := db.Conn.QueryRow(
context.Background(), context.Background(),
@@ -223,15 +267,15 @@ RETURNING id, name, type, username, secret;`
return credential, nil return credential, nil
} }
func (db *Database) GetCredentialById(id uuid.UUID) (Credential, error) { func (db *Database) GetCloneCredentialById(id uuid.UUID) (CloneCredential, error) {
query := ` query := `
SELECT name, type, username, secret SELECT name, type, username, secret
FROM credentials FROM clone_credentials
WHERE id=$1;` WHERE id=$1;`
log.Debugf("requested credential with id %v", id) log.Debugf("requested credential with id %v", id)
credential := Credential{ credential := CloneCredential{
Id: id, Id: id,
} }
@@ -243,12 +287,12 @@ WHERE id=$1;`
return credential, nil return credential, nil
} }
func (db *Database) GetCredentials() ([]Credential, error) { func (db *Database) GetCredentials() ([]CloneCredential, error) {
query := ` query := `
SELECT id, name, type, username, secret SELECT id, name, type, username, secret
FROM credentials;` FROM clone_credentials;`
credentials := make([]Credential, 0) credentials := make([]CloneCredential, 0)
rows, err := db.Conn.Query(context.Background(), query) rows, err := db.Conn.Query(context.Background(), query)
if err != nil { if err != nil {
@@ -257,7 +301,7 @@ FROM credentials;`
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var credential Credential var credential CloneCredential
var idStr string var idStr string
if err := rows.Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret); err != nil { if err := rows.Scan(&idStr, &credential.Name, &credential.Type, &credential.Username, &credential.Secret); err != nil {
return credentials, err return credentials, err
@@ -294,23 +338,35 @@ RETURNING id, pipeline, in_progress;`
return run, nil return run, nil
} }
func (db *Database) UpdateRunBuildOutput(runId uuid.UUID, buildResult string) error {
query := `
UPDATE runs
SET build_output=$1
WHERE id=$2;`
_, err := db.Conn.Exec(context.Background(),
query, buildResult, runId)
return err
}
func (db *Database) UpdateRunResult(r Run) error { func (db *Database) UpdateRunResult(r Run) error {
// TODO: the id fiend is the query is broken
query := ` query := `
UPDATE runs UPDATE runs
SET in_progress=$1, result=$2, stdout=$3, stderr=$4 SET in_progress=$1, result=$2, stdout=$3, stderr=$4
WHERE id=$3;` WHERE id=$5;`
// TODO: does r.Result need a pointer derefrence? // TODO: does r.Result need a pointer derefrence?
_, err := db.Conn.Exec(context.Background(), _, err := db.Conn.Exec(context.Background(),
query, r.InProgress, r.Result, r.Stdout, r.Stderr) query, r.InProgress, r.Result, r.Stdout, r.Stderr, r.Id)
return err return err
} }
func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) { func (db *Database) GetRunsForPipeline(pipelineId uuid.UUID) ([]Run, error) {
query := ` query := `
SELECT id, in_progress, result, stdout, stderr SELECT id, in_progress, result, build_output, stdout, stderr
FROM runs FROM runs
WHERE pipeline=$1;` WHERE pipeline=$1;`
@@ -329,6 +385,7 @@ WHERE pipeline=$1;`
&idStr, &idStr,
&run.InProgress, &run.InProgress,
&run.Result, &run.Result,
&run.BuildOutput,
&run.Stdout, &run.Stdout,
&run.Stderr, &run.Stderr,
); err != nil { ); err != nil {
@@ -344,3 +401,360 @@ WHERE pipeline=$1;`
return runs, nil return runs, nil
} }
func (db *Database) GetPipelineRefs(pipelineId uuid.UUID) (map[string]string, error) {
query := `
SELECT name, hash
FROM pipeline_refs
WHERE pipeline_id=$1;`
refsMap := make(map[string]string)
refs, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return refsMap, fmt.Errorf("Could not get pipeline refs for pipeline with id \"%v\": %w", pipelineId, err)
}
defer refs.Close()
for refs.Next() {
var name string
var hash string
if err := refs.Scan(
&name,
&hash,
); err != nil {
return refsMap, err
}
refsMap[name] = hash
}
return refsMap, nil
}
func (db *Database) UpdatePipelineRefs(pipelineId uuid.UUID, refsMap map[string]string) error {
query := `
INSERT INTO pipeline_refs(name, pipeline_id, hash)
VALUES($1, $2, $3)
ON CONFLICT (name, pipeline_id)
DO
UPDATE SET hash=$3;`
for name, hash := range refsMap {
_, err := db.Conn.Exec(context.Background(), query, name, pipelineId, hash)
return err
}
return nil
}
func (db *Database) GetSecrets() ([]Secret, error) {
query := `
SELECT id, name, secret
FROM secrets;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return secrets, fmt.Errorf("Could not query database for secrets: %w", err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(&idStr, &secret.Name, &secret.Secret); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetSecretById(id uuid.UUID) (Secret, error) {
query := `
SELECT id, name, secret
FROM secrets
WHERE id=$1;`
secret := Secret{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&secret.Name, &secret.Secret)
if err != nil {
return secret, fmt.Errorf("Could not query database for secret with id %v: %w", id.String(), err)
}
return secret, nil
}
func (db *Database) CreateSecret(name string, secret string) (Secret, error) {
s := Secret{}
// validate that the secret is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("secret name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("secren name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO secrets (id, name, secret)
VALUES (uuid_generate_v4(), $1, $2)
RETURNING id, name, secret;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name, secret).Scan(&idStr, &s.Name, &s.Secret)
if err != nil {
return s, fmt.Errorf("Could not create secret: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
func (db *Database) AssignSecretToPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
query := `
INSERT INTO pipeline_secret_mappings (pipeline, secret)
VALUES ($1, $2);`
_, err := db.Conn.Exec(context.Background(), query, pipelineId, secretId)
return err
}
func (db *Database) RemoveSecretFromPipeline(pipelineId uuid.UUID, secretId uuid.UUID) error {
// TODO: implement this
return fmt.Errorf("Not implemented")
}
func (db *Database) GetSecretsForPipeline(pipelineId uuid.UUID) ([]Secret, error) {
query := `
SELECT
secrets.id, secrets.name, secrets.secret
FROM
secrets INNER JOIN pipeline_secret_mappings
ON secrets.id = pipeline_secret_mappings.secret
WHERE
pipeline_secret_mappings.pipeline=$1
;`
secrets := make([]Secret, 0)
rows, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return secrets, fmt.Errorf("Could not get secrets for pipeline with id \"%v\": %w", pipelineId, err)
}
defer rows.Close()
for rows.Next() {
var secret Secret
var idStr string
if err := rows.Scan(
&idStr,
&secret.Name,
&secret.Secret,
); err != nil {
return secrets, err
}
secret.Id, err = uuid.Parse(idStr)
if err != nil {
return secrets, err
}
secrets = append(secrets, secret)
}
return secrets, nil
}
func (db *Database) GetRunners() ([]Runner, error) {
query := `
SELECT id, name, token
FROM runners;`
runners := make([]Runner, 0)
rows, err := db.Conn.Query(context.Background(), query)
if err != nil {
return runners, fmt.Errorf("Could not query database for runners: %w", err)
}
defer rows.Close()
for rows.Next() {
var runner Runner
var idStr string
if err := rows.Scan(&idStr, &runner.Name, &runner.Token); err != nil {
return runners, err
}
runner.Id, err = uuid.Parse(idStr)
if err != nil {
return runners, err
}
runners = append(runners, runner)
}
return runners, nil
}
func (db *Database) GetRunnerById(id uuid.UUID) (Runner, error) {
query := `
SELECT name, token
FROM runners
WHERE id=$1;`
runner := Runner{
Id: id,
}
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&runner.Name, &runner.Token)
if err != nil {
return runner, fmt.Errorf("Could not query database for runner with id %v: %w", id.String(), err)
}
return runner, nil
}
func (db *Database) CreateRunner(name string) (Runner, error) {
s := Runner{}
// validate that the runner name is only A-Z or underscores and less than 256 characters
if len(name) > 256 {
return s, fmt.Errorf("runner name must be 256 characters or less")
}
validName := regexp.MustCompile(`[A-Z0-9_]+$`)
if !validName.MatchString(name) {
return s, fmt.Errorf("runner name must be made up of only uppercase letters, numbers, and underscores")
}
query := `
INSERT INTO runners (id, name, token)
VALUES
(
uuid_generate_v4(),
$1,
(
SELECT md5(random()::text)
)
)
RETURNING id, name, token;`
var idStr string
err := db.Conn.QueryRow(context.Background(), query, name).Scan(&idStr, &s.Name, &s.Token)
if err != nil {
return s, fmt.Errorf("Could not create runner: %w", err)
}
s.Id, err = uuid.Parse(idStr)
if err != nil {
return s, fmt.Errorf("Could not parse UUID generated by DB: %w", err)
}
return s, nil
}
func (db *Database) GetCronsForPipeline(pipelineId uuid.UUID) ([]Cron, error) {
query := `
SELECT id, cron, pattern
FROM crons
WHERE pipeline_id=$1;`
var crons []Cron
cronEntrys, err := db.Conn.Query(context.Background(), query, pipelineId)
if err != nil {
return crons, fmt.Errorf("Could not get crons for pipeline with id \"%v\": %w", pipelineId, err)
}
defer cronEntrys.Close()
for cronEntrys.Next() {
var cron Cron
var idStr string
if err := cronEntrys.Scan(
&idStr, &cron.Cron, &cron.Pattern,
); err != nil {
return crons, err
}
cron.Id, err = uuid.Parse(idStr)
if err != nil {
return crons, err
}
crons = append(crons, cron)
}
return crons, nil
}
func (db *Database) GetCronById(id uuid.UUID) (Cron, error) {
query := `
SELECT cron, pipeline_id, pattern
FROM crons
WHERE id=$1;`
cron := Cron{
Id: id,
}
var pipelineIdStr string
err := db.Conn.QueryRow(context.Background(), query, id).Scan(&cron.Cron, &pipelineIdStr, &cron.Pattern)
if err != nil {
return cron, fmt.Errorf("Could not query database for cron with id %v: %w", id.String(), err)
}
cron.PipelineId, err = uuid.Parse(pipelineIdStr)
return cron, err
}
func (db *Database) AddCronForPipeline(pipelineId uuid.UUID, cronStr string, pattern string) (Cron, error) {
query := `
INSERT INTO crons (id, pipeline_id, cron, pattern)
VALUES (uuid_generate_v4(), $1, $2, $3)
RETURNING id;`
cron := Cron{
PipelineId: pipelineId,
Cron: cronStr,
Pattern: pattern,
}
var idStr string
err := db.Conn.QueryRow(context.Background(), query, pipelineId, cronStr, pattern).Scan(&idStr)
if err != nil {
return cron, fmt.Errorf("Could not create cron: %w", err)
}
cron.Id, err = uuid.Parse(idStr)
return cron, err
}
func (db *Database) RemoveCronForPipeline(cronId uuid.UUID) error {
query := `
DELETE FROM crons
WHERE id=$1;`
_, err := db.Conn.Exec(context.Background(), query, cronId)
return err
}
+26 -7
View File
@@ -6,17 +6,17 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
type CredentialType string type CloneCredentialType string
const ( const (
USER_PASS CredentialType = "USER_PASS" USER_PASS CloneCredentialType = "USER_PASS"
SSH_KEY CredentialType = "SSH_KEY" SSH_KEY CloneCredentialType = "SSH_KEY"
) )
type Credential struct { type CloneCredential struct {
Id uuid.UUID Id uuid.UUID
Name string Name string
Type CredentialType Type CloneCredentialType
Username string Username string
Secret string Secret string
} }
@@ -26,7 +26,18 @@ type Pipeline struct {
Name string Name string
Url string Url string
PollInterval int PollInterval int
Credential *uuid.UUID CloneCredential *uuid.UUID
}
type Secret struct {
Id uuid.UUID
Name string
Secret string
}
type PipelineSecretMapping struct {
Pipeline uuid.UUID
Secret uuid.UUID
} }
type WebhookSender string type WebhookSender string
@@ -47,6 +58,7 @@ type Run struct {
Pipeline uuid.UUID Pipeline uuid.UUID
InProgress bool InProgress bool
Result *int64 Result *int64
BuildOutput []byte
Stdout []byte Stdout []byte
Stderr []byte Stderr []byte
} }
@@ -65,5 +77,12 @@ type CommandExecution struct {
type Runner struct { type Runner struct {
Id uuid.UUID Id uuid.UUID
Name string Name string
Secret string Token string
}
type Cron struct {
Id uuid.UUID
PipelineId uuid.UUID
Cron string
Pattern string
} }
@@ -4,4 +4,4 @@ MAINTAINER restitux <restitux@ohea.xyz>
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \
ca-certificates \ ca-certificates \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
ENTRYPOINT ["/build/server/docker/build-and-run.sh"] ENTRYPOINT ["/build/server/docker/cursorius/build-and-run.sh"]
+26 -3
View File
@@ -2,15 +2,15 @@ version: "3.3"
services: services:
cursorius-server: cursorius-server:
build: build:
context: .. context: ".."
dockerfile: docker/Dockerfile.dev dockerfile: "docker/cursorius/Dockerfile.dev"
ports: ports:
- "0.0.0.0:45420:45420" - "0.0.0.0:45420:45420"
networks: networks:
- cursorius - cursorius
volumes: volumes:
- "..:/build/server" - "..:/build/server"
- "../server.toml:/root/.config/cursorius/server.toml" - "./server.toml:/root/.config/cursorius/server.toml"
- "/var/run/docker.sock:/var/run/docker.sock" - "/var/run/docker.sock:/var/run/docker.sock"
- "../_working/go:/go" - "../_working/go:/go"
- "../_working/jobs:/cursorius/jobs" - "../_working/jobs:/cursorius/jobs"
@@ -20,8 +20,31 @@ services:
- POSTGRES_USER=cursorius - POSTGRES_USER=cursorius
- POSTGRES_PASSWORD=cursorius - POSTGRES_PASSWORD=cursorius
- POSTGRES_DB=cursorius - POSTGRES_DB=cursorius
volumes:
- "../_working/postgres:/var/lib/postgresql/data"
networks: networks:
- cursorius - cursorius
graphiql:
build:
context: "graphiql"
dockerfile: "Dockerfile.graphiql"
ports:
- "0.0.0.0:45421:80"
networks:
- cursorius
gitea:
image: gitea/gitea:latest
profiles: ["gitea"]
environment:
- GITEA__webhook__ALLOWED_HOST_LIST=cursorius-server, external
ports:
- "127.0.0.1:2222:22"
- "127.0.0.1:3000:3000"
networks:
- cursorius
volumes:
- "../_working/gitea:/data"
networks: networks:
cursorius: cursorius:
external: true
-24
View File
@@ -1,24 +0,0 @@
version: "3.3"
services:
cursorius-server:
networks:
- gitea
gitea:
image: gitea/gitea:latest
environment:
- GITEA__webhook__ALLOWED_HOST_LIST=cursorius-server, external
ports:
- "127.0.0.1:2222:22"
- "127.0.0.1:3000:3000"
networks:
- gitea
volumes:
- gitea-data:/data
volumes:
gitea-data:
networks:
gitea:
external: false
+3
View File
@@ -0,0 +1,3 @@
FROM nginx:latest
COPY graphiql.html /usr/share/nginx/html/index.html
COPY graphiql.conf /etc/nginx/conf.d/default.conf
+52
View File
@@ -0,0 +1,52 @@
upstream backend {
server cursorius-server:45420;
}
server {
listen 80;
listen [::]:80;
server_name localhost;
#access_log /var/log/nginx/host.access.log main;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
}
location /graphql {
proxy_pass http://backend/graphql;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
+70
View File
@@ -0,0 +1,70 @@
<!--
* Copyright (c) 2021 GraphQL Contributors
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<title>GraphiQL</title>
<style>
body {
height: 100%;
margin: 0;
width: 100%;
overflow: hidden;
}
#graphiql {
height: 100vh;
}
</style>
<!--
This GraphiQL example depends on Promise and fetch, which are available in
modern browsers, but can be "polyfilled" for older browsers.
GraphiQL itself depends on React DOM.
If you do not want to rely on a CDN, you can host these files locally or
include them directly in your favored resource bundler.
-->
<script
src="https://unpkg.com/react@17/umd/react.development.js"
integrity="sha512-Vf2xGDzpqUOEIKO+X2rgTLWPY+65++WPwCHkX2nFMu9IcstumPsf/uKKRd5prX3wOu8Q0GBylRpsDB26R6ExOg=="
crossorigin="anonymous"
></script>
<script
src="https://unpkg.com/react-dom@17/umd/react-dom.development.js"
integrity="sha512-Wr9OKCTtq1anK0hq5bY3X/AvDI5EflDSAh0mE9gma+4hl+kXdTJPKZ3TwLMBcrgUeoY0s3dq9JjhCQc7vddtFg=="
crossorigin="anonymous"
></script>
<!--
These two files can be found in the npm module, however you may wish to
copy them directly into your environment, or perhaps include them in your
favored resource bundler.
-->
<link rel="stylesheet" href="https://unpkg.com/graphiql/graphiql.min.css" />
</head>
<body>
<div id="graphiql">Loading...</div>
<script
src="https://unpkg.com/graphiql/graphiql.min.js"
type="application/javascript"
></script>
<script>
ReactDOM.render(
React.createElement(GraphiQL, {
fetcher: GraphiQL.createFetcher({
//url: 'https://swapi-graphql.netlify.app/.netlify/functions/index',
url: 'http://127.0.0.1:45421/graphql',
}),
defaultEditorToolsVisibility: true,
}),
document.getElementById('graphiql'),
);
</script>
</body>
</html>
+8 -2
View File
@@ -1,4 +1,10 @@
#!/bin/bash #!/bin/bash
docker build . -f docker/Dockerfile -t git.ohea.xyz/cursorius/server:latest if [[ -z "${1}" ]]; then
docker push git.ohea.xyz/cursorius/server:latest echo "You must provide a docker tag to push to."
else
echo "Building container git.ohea.xyz/cursorius/server:$1"
docker build . -f docker/cursorius/Dockerfile -t "git.ohea.xyz/cursorius/server:$1"
echo "Pushing container git.ohea.xyz/cursorius/server:$1"
docker push "git.ohea.xyz/cursorius/server:$1"
fi
+1 -85
View File
@@ -2,88 +2,4 @@
set -e set -e
mkdir -p _working/go go run docker/run.go "$@"
mkdir -p _working/jobs
base_default_compose_files="docker/docker-compose.yml"
default_compose_files="$base_default_compose_files"
override_compose="docker/docker-compose.override.yml"
gitea_compose="docker/gitea-override.yml"
if [ -f "$override_compose" ]
then
default_compose_files+=" $override_compose"
else
default_compose_files="docker/docker-compose.yml"
fi
function stop_containers {
current_containers="$(cat _working/current_containers)"
if [ "$current_containers" == "default" ]
then
compose_files="$default_compose_files"
elif [ "$current_containers" == "gitea" ]
then
compose_files="$default_compose_files $gitea_compose"
fi
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags down
}
function show_logs {
current_containers="$(cat _working/current_containers)"
if [ "$current_containers" == "default" ]
then
compose_files="$default_compose_files"
elif [ "$current_containers" == "gitea" ]
then
compose_files="$default_compose_files $gitea_compose"
fi
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags logs -f
}
function show_ps {
current_containers="$(cat _working/current_containers)"
if [ "$current_containers" == "default" ]
then
compose_files="$default_compose_files"
elif [ "$current_containers" == "gitea" ]
then
compose_files="$default_compose_files $gitea_compose"
fi
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags ps
}
case $1 in
"default")
echo "default" > _working/current_containers
compose_files="$default_compose_files"
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags up --build -d
docker compose $compose_file_flags logs -f;;
"gitea")
echo "gitea" > _working/current_containers
stop_containers
compose_files="$default_compose_files $gitea_compose"
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags up --build -d
docker compose $compose_file_flags logs -f;;
"dbshell")
compose_files="$default_compose_files $gitea_compose"
compose_file_flags=$(echo "$compose_files" | tr ' ' '\n' | xargs -I'{}' echo "-f {} " | tr -d '\n')
docker compose $compose_file_flags exec cursorius-db psql --user=cursorius;;
"stop")
stop_containers;;
"logs")
show_logs;;
"ps")
show_ps;;
*) echo "ERROR: Unknown param \"$1\"" 2>&1
exit 255;;
esac
+93
View File
@@ -0,0 +1,93 @@
package main
import (
"fmt"
"os"
"os/exec"
)
func panicError(errorString string, params ...any) {
fmt.Fprintf(os.Stderr, fmt.Sprintf("ERROR: %v\n", errorString), params...)
os.Exit(1)
}
func run(name string, arg ...string) {
cmd := exec.Command(name, arg...)
if err := cmd.Run(); err != nil {
panicError("could not run command %v: %v", name, err)
}
}
func runAttach(name string, arg ...string) {
cmd := exec.Command(name, arg...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
panicError("could not run command %v: %v", name, err)
}
}
func runCompose(args []string) {
runAttach("docker", append([]string{"compose"}, args...)...)
}
func createDirs() {
run("mkdir", "-p", "_working/go")
run("mkdir", "-p", "_working/jobs")
}
func currentContainers() string {
bytes, err := os.ReadFile("_working/current_containers")
if err != nil {
panicError("could not read current containers: %v", err)
}
return string(bytes)
}
func composeFlags() []string {
containers := currentContainers()
flags := []string{"-f", "docker/docker-compose.yml"}
switch containers {
case "gitea":
flags = append(flags, "--profile", "gitea")
}
return flags
}
func runContainers(containers string) {
err := os.WriteFile("_working/current_containers", []byte(containers), 0633)
if err != nil {
panicError("could not write current_containers file: %v", err)
}
runCompose(append(composeFlags(), "up", "--build", "-d"))
runCompose(append(composeFlags(), "logs", "-f"))
}
func main() {
if len(os.Args) < 2 {
panicError("not enough arguments passed")
}
createDirs()
switch os.Args[1] {
case "default", "gitea":
runContainers(os.Args[1])
case "runprev":
runContainers(currentContainers())
case "stop":
runCompose(append(composeFlags(), "down"))
case "dbshell":
runCompose(append(composeFlags(), "exec", "cursorius-db", "psql", "--user=cursorius"))
case "logs":
runCompose(append(composeFlags(), "logs", "-f"))
case "ps":
runCompose(append(composeFlags(), "ps"))
case "help":
fmt.Println("commands: default, gitea, runprev, stop, dbshell, logs, ps, help")
default:
panicError("Unknown subcommand: %v", os.Args[1])
}
}
+4 -2
View File
@@ -3,7 +3,7 @@ module git.ohea.xyz/cursorius/server
go 1.19 go 1.19
require ( require (
git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230109075652-ead0aeff2eb9 git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230405234139-34d8875b72f4
git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2 git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2
git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173 git.ohea.xyz/golang/config v0.0.0-20220915224621-b9debd233173
github.com/bufbuild/connect-go v1.4.1 github.com/bufbuild/connect-go v1.4.1
@@ -14,9 +14,11 @@ require (
github.com/graphql-go/graphql v0.8.0 github.com/graphql-go/graphql v0.8.0
github.com/graphql-go/handler v0.2.3 github.com/graphql-go/handler v0.2.3
github.com/jackc/pgx/v5 v5.2.0 github.com/jackc/pgx/v5 v5.2.0
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/robfig/cron/v3 v3.0.1
golang.org/x/net v0.2.0 golang.org/x/net v0.2.0
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.30.0
nhooyr.io/websocket v1.8.7 nhooyr.io/websocket v1.8.7
) )
+8 -3
View File
@@ -78,8 +78,8 @@ contrib.go.opencensus.io/exporter/stackdriver v0.13.5/go.mod h1:aXENhDJ1Y4lIg4EU
contrib.go.opencensus.io/integrations/ocsql v0.1.4/go.mod h1:8DsSdjz3F+APR+0z0WkU1aRorQCFfRxvqjUUPMbF3fE= contrib.go.opencensus.io/integrations/ocsql v0.1.4/go.mod h1:8DsSdjz3F+APR+0z0WkU1aRorQCFfRxvqjUUPMbF3fE=
contrib.go.opencensus.io/resource v0.1.1/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA= contrib.go.opencensus.io/resource v0.1.1/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230109075652-ead0aeff2eb9 h1:8p7Kw3B7dbi2zdgG+Me9ETRWrJzoNVjcase4YqXfGbs= git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230405234139-34d8875b72f4 h1:kKQQEg1nmWnqiNOqtUHteEuacyfy0NdxyDj6HPjbA2c=
git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230109075652-ead0aeff2eb9/go.mod h1:D7GGcFIH421mo6KuRaXXXmlXPwWeEsemTZG/BOZA/4o= git.ohea.xyz/cursorius/pipeline-api/go/api/v2 v2.0.0-20230405234139-34d8875b72f4/go.mod h1:D7GGcFIH421mo6KuRaXXXmlXPwWeEsemTZG/BOZA/4o=
git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2 h1:G1XQEqhj1LZPQbH7avzvT7QL9Wfbb4CXMm0nLL39eDc= git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2 h1:G1XQEqhj1LZPQbH7avzvT7QL9Wfbb4CXMm0nLL39eDc=
git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2/go.mod h1:F9y5Ck4Wchsaj5amSX2eDRUlQ/iYP1VNLFduvjNwmLc= git.ohea.xyz/cursorius/runner-api/go/api/v2 v2.0.0-20230109074922-e20285fe6cf2/go.mod h1:F9y5Ck4Wchsaj5amSX2eDRUlQ/iYP1VNLFduvjNwmLc=
git.ohea.xyz/cursorius/webhooks/v6 v6.0.2-0.20221224221147-a2bdbf1756ed h1:gsK15m4Npow74+R6OfZKwwAg1sl7QWQCRXOeE2QLUco= git.ohea.xyz/cursorius/webhooks/v6 v6.0.2-0.20221224221147-a2bdbf1756ed h1:gsK15m4Npow74+R6OfZKwwAg1sl7QWQCRXOeE2QLUco=
@@ -858,6 +858,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d h1:q7n+5taxmM+9T2Q7Ydo7YN90FkoDuR5bbzByZwkQqPo=
github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d/go.mod h1:GN1Mg/uXQ6qwXA0HypnUO3xlcQJS9/y68EsHNeuuRa4=
github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4= github.com/jhump/protoreflect v1.6.1/go.mod h1:RZQ/lnuN+zqeRVpQigTwO6o0AJUkxbnSnpuG7toUTG4=
github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/jhump/protoreflect v1.8.2/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
@@ -1208,6 +1210,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -2127,8 +2131,9 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+14 -9
View File
@@ -10,6 +10,8 @@ import (
"git.ohea.xyz/cursorius/server/pipeline_api" "git.ohea.xyz/cursorius/server/pipeline_api"
"git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/runnermanager"
"git.ohea.xyz/cursorius/server/webhook" "git.ohea.xyz/cursorius/server/webhook"
"github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/h2c" "golang.org/x/net/http2/h2c"
@@ -22,15 +24,16 @@ func setupHTTPServer(
mux *http.ServeMux, mux *http.ServeMux,
conf config.PipelineConf, conf config.PipelineConf,
db database.Database, db database.Database,
registerCh chan runnermanager.RunnerRegistration, runnerManagerChans runnermanager.RunnerManagerChans,
getRunnerCh chan runnermanager.GetRunnerRequest, pollChan chan uuid.UUID,
cronChan chan uuid.UUID,
) error { ) error {
webhook.CreateWebhookHandler(db, conf, mux) webhook.CreateWebhookHandler(db, conf, mux)
pipeline_api.CreateHandler(getRunnerCh, mux) pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux)
err := admin_api.CreateHandler(db, mux) err := admin_api.CreateHandler(db, pollChan, cronChan, mux)
if err != nil { if err != nil {
return fmt.Errorf("Could not create admin api handler: %w", err) return fmt.Errorf("Could not create admin api handler: %w", err)
} }
@@ -41,7 +44,7 @@ func setupHTTPServer(
log.Errorf("Could not upgrade runner connection to websocket: %v", err) log.Errorf("Could not upgrade runner connection to websocket: %v", err)
return return
} }
go runnermanager.RegisterRunner(conn, registerCh) go runnermanager.RegisterRunner(conn, runnerManagerChans.Registration)
}) })
return nil return nil
} }
@@ -52,16 +55,18 @@ func Listen(
port int, port int,
conf config.PipelineConf, conf config.PipelineConf,
db database.Database, db database.Database,
registerCh chan runnermanager.RunnerRegistration, runnerManagerChans runnermanager.RunnerManagerChans,
getRunnerCh chan runnermanager.GetRunnerRequest, pollChan chan uuid.UUID,
cronChan chan uuid.UUID,
) error { ) error {
err := setupHTTPServer( err := setupHTTPServer(
mux, mux,
conf, conf,
db, db,
registerCh, runnerManagerChans,
getRunnerCh, pollChan,
cronChan,
) )
if err != nil { if err != nil {
return fmt.Errorf("Could not setup http endpoints: %w", err) return fmt.Errorf("Could not setup http endpoints: %w", err)
+9 -5
View File
@@ -5,6 +5,7 @@ import (
"os" "os"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/cron"
"git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/listen" "git.ohea.xyz/cursorius/server/listen"
"git.ohea.xyz/cursorius/server/poll" "git.ohea.xyz/cursorius/server/poll"
@@ -26,7 +27,7 @@ func main() {
logging.SetBackend(backendLeveled) logging.SetBackend(backendLeveled)
log.Info("Starting cursorius-server v0.1.0") log.Info("Starting cursorius-server v0.3.0")
configData, err := config.GetConfig() configData, err := config.GetConfig()
if err != nil { if err != nil {
@@ -40,13 +41,15 @@ func main() {
return return
} }
getRunnerCh, registerCh, err := runnermanager.StartRunnerManager(configData.Config.Runners) runnerManagerChans, err := runnermanager.StartRunnerManager(configData.Config.Runners, db)
if err != nil { if err != nil {
log.Errorf("Could not start runner: %v", err) log.Errorf("Could not start runner: %v", err)
return return
} }
_ = poll.StartPolling(configData.Config.PipelineConf, db) pollChan := poll.StartPolling(configData.Config.PipelineConf, db)
cronChan := cron.StartCrons(configData.Config.PipelineConf, db)
mux := http.NewServeMux() mux := http.NewServeMux()
@@ -56,7 +59,8 @@ func main() {
configData.Config.Port, configData.Config.Port,
configData.Config.PipelineConf, configData.Config.PipelineConf,
db, db,
registerCh, runnerManagerChans,
getRunnerCh, pollChan,
cronChan,
)) ))
} }
+62 -23
View File
@@ -4,12 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"sync" "sync"
"time"
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2" apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect" "git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
"git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/runnermanager"
"git.ohea.xyz/cursorius/server/util"
"github.com/bufbuild/connect-go" "github.com/bufbuild/connect-go"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
@@ -18,7 +19,8 @@ import (
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
type ApiServer struct { type ApiServer struct {
getRunnerCh chan runnermanager.GetRunnerRequest allocationCh chan runnermanager.RunnerAllocationRequest
releaseCh chan runnermanager.RunnerReleaseRequest
allocatedRunners map[uuid.UUID]*RunnerWrapper allocatedRunners map[uuid.UUID]*RunnerWrapper
allocatedRunnersMutex sync.RWMutex allocatedRunnersMutex sync.RWMutex
} }
@@ -34,15 +36,17 @@ func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, st
return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args) return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args)
// TODO: run command by sending websocket packet
// TODO: get stdout and stderr response
return return_code, stdout, stderr, err return return_code, stdout, stderr, err
} }
func (r *RunnerWrapper) Release() { func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) {
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
r.runner.Release()
releaseCh <- runnermanager.RunnerReleaseRequest{
Runner: r.runner,
}
r.runner = nil
} }
func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) { func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
@@ -52,37 +56,71 @@ func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
return runner, ok return runner, ok
} }
func (s *ApiServer) AddRunnerToMap(u uuid.UUID, runner *runnermanager.Runner) {
s.allocatedRunnersMutex.Lock()
defer s.allocatedRunnersMutex.Unlock()
s.allocatedRunners[u] = &RunnerWrapper{runner: runner}
}
func (s *ApiServer) GetRunner( func (s *ApiServer) GetRunner(
ctx context.Context, ctx context.Context,
req *connect.Request[apiv2.GetRunnerRequest], req *connect.Request[apiv2.GetRunnerRequest],
) (*connect.Response[apiv2.GetRunnerResponse], error) { ) (*connect.Response[apiv2.GetRunnerResponse], error) {
respChan := make(chan runnermanager.GetRunnerResponse) var response runnermanager.RunnerAllocationResponse
s.getRunnerCh <- runnermanager.GetRunnerRequest{ var timeoutCtx *context.Context
var retryInterval int64 = 0
respChan := make(chan runnermanager.RunnerAllocationResponse)
tagsStr := util.FormatTags(req.Msg.Tags)
if req.Msg.Options != nil {
if req.Msg.Options.Timeout != 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.Msg.Options.Timeout)*time.Second)
timeoutCtx = &ctx
defer cancel()
}
retryInterval = req.Msg.Options.RetryInterval
}
for {
s.allocationCh <- runnermanager.RunnerAllocationRequest{
Tags: req.Msg.Tags, Tags: req.Msg.Tags,
RespChan: respChan, RespChan: respChan,
} }
var runnerTagsStr strings.Builder response = <-respChan
fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0]) if response.Err == nil {
for _, tag := range req.Msg.Tags[1:] { break
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
} }
fmt.Fprintf(&runnerTagsStr, "]")
response := <-respChan log.Infof("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
if response.Err != nil {
log.Errorf("Could not get runner with tags \"%v\": %v", runnerTagsStr.String(), response.Err) // If no timeout is specified, skip after one attempt
if timeoutCtx == nil {
break
}
// If timeout is expired, stop trying to allocate runner
if (*timeoutCtx).Err() != nil {
break
}
log.Infof("Sleeping for %v seconds before retry...", retryInterval)
time.Sleep(time.Duration(retryInterval) * time.Second)
}
if response.Runner == nil {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner")) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
} }
log.Infof("Got runner with tags: %v", runnerTagsStr.String()) log.Infof("Got runner with tags: %v", tagsStr)
runnerUuid := uuid.New() runnerUuid := uuid.New()
s.allocatedRunnersMutex.Lock() s.AddRunnerToMap(runnerUuid, response.Runner)
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv2.GetRunnerResponse{ res := connect.NewResponse(&apiv2.GetRunnerResponse{
Id: runnerUuid.String(), Id: runnerUuid.String(),
@@ -105,7 +143,7 @@ func (s *ApiServer) ReleaseRunner(
s.allocatedRunnersMutex.Lock() s.allocatedRunnersMutex.Lock()
runner := s.allocatedRunners[uuid] runner := s.allocatedRunners[uuid]
delete(s.allocatedRunners, uuid) delete(s.allocatedRunners, uuid)
runner.Release() runner.Release(s.releaseCh)
s.allocatedRunnersMutex.Unlock() s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{}) res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{})
@@ -144,9 +182,10 @@ func (s *ApiServer) RunCommand(
return res, nil return res, nil
} }
func CreateHandler(getRunnerCh chan runnermanager.GetRunnerRequest, mux *http.ServeMux) { func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) {
api_server := &ApiServer{ api_server := &ApiServer{
getRunnerCh: getRunnerCh, allocationCh: allocationCh,
releaseCh: releaseCh,
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper), allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
} }
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server) path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)
+110 -40
View File
@@ -4,9 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/jhoonb/archivex"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
@@ -32,38 +34,40 @@ type PipelineExecution struct {
} }
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) { func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
idStr := pe.Pipeline.Id.String()
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String()) jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
cloneFolder := filepath.Join(jobFolder, "repo")
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url) log.Debugf("%v: URL: %v", idStr, pe.Pipeline.Url)
log.Debugf("%v: Folder: %v", idStr, jobFolder)
log.Debugf("Job %v configured with folder \"%v\"", pe.Pipeline.Name, jobFolder)
err := os.RemoveAll(jobFolder) err := os.RemoveAll(jobFolder)
if err != nil { if err != nil {
log.Errorf("could not delete existing folder %v", jobFolder) log.Errorf("%v: could not delete existing folder %v", idStr, jobFolder)
return return
} }
err = os.MkdirAll(jobFolder, 0755) err = os.MkdirAll(cloneFolder, 0755)
if err != nil { if err != nil {
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err) log.Errorf("%v: could not create working directory: %w", idStr, pe.Pipeline.Name, err)
return return
} }
log.Infof("Cloning source from URL %v", pe.Pipeline.Url) log.Infof("%v: cloning source from URL %v", idStr, pe.Pipeline.Url)
var auth transport.AuthMethod var auth transport.AuthMethod
if pe.Pipeline.Credential != nil { if pe.Pipeline.CloneCredential != nil {
credential, err := db.GetCredentialById(*pe.Pipeline.Credential) credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential)
if err != nil { if err != nil {
log.Errorf("could not get credenital from db: %v", err) log.Errorf("%v: could not get credenital from db: %v", idStr, err)
return return
} }
switch credential.Type { switch credential.Type {
case "USER_PASS": case "USER_PASS":
log.Debugf("job %v configured to use credential %v", pe.Pipeline.Name, credential.Name) log.Debugf("%v: credential %v configured", idStr, credential.Name)
auth = transport.AuthMethod(&http.BasicAuth{ auth = transport.AuthMethod(&http.BasicAuth{
Username: credential.Username, Username: credential.Username,
Password: credential.Secret, Password: credential.Secret,
@@ -71,58 +75,97 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
case "SSH_KEY": case "SSH_KEY":
publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "") publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "")
if err != nil { if err != nil {
log.Errorf("could not parse credential %v", credential.Name) log.Errorf("%v: could not parse credential %v: %v", idStr, credential.Name, err)
return return
} }
auth = transport.AuthMethod(publicKeys) auth = transport.AuthMethod(publicKeys)
default: default:
log.Errorf("unsupported credential type %v", credential.Type) log.Errorf("%v: unsupported credential type %v", idStr, credential.Type)
return return
} }
} else { } else {
auth = nil auth = nil
} }
_, err = git.PlainClone(jobFolder, false, &git.CloneOptions{ _, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
URL: pe.Pipeline.Url, URL: pe.Pipeline.Url,
Auth: auth, Auth: auth,
}) })
if err != nil { if err != nil {
log.Errorf("could not clone repo: %v", err) log.Errorf("%v: could not clone repo: %v", idStr, err)
return return
} }
cli, err := client.NewClientWithOpts(client.FromEnv) cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil { if err != nil {
log.Errorf("Could not create docker client: %w", err) log.Errorf("%v: could not create docker client: %w", idStr, err)
return return
} }
log.Info("Source cloned successfully") log.Infof("%v: source cloned successfully", idStr)
ctx := context.Background() ctx := context.Background()
imageName := "git.ohea.xyz/cursorius/pipeline-api/cursorius-pipeline:v2" log.Debugf("%v: tarring up job source", idStr)
log.Infof("Pulling image %v", imageName) log.Debugf("%v: creating tarfile", idStr)
pullOutput, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{}) tarFile := filepath.Join(jobFolder, "archive.tar")
tar := new(archivex.TarFile)
err = tar.Create(tarFile)
if err != nil { if err != nil {
log.Errorf("could not pull image %v: %w", imageName, err) log.Errorf("%v: could not create tarfile: %w", idStr, err)
return return
} }
buf, err := io.ReadAll(pullOutput) log.Debugf("%v: adding files to tarfile", idStr)
err = tar.AddAll(cloneFolder, false)
if err != nil { if err != nil {
log.Errorf("could not read from io.ReadCloser:, %w", err) log.Errorf("%v: could not add repo to tarfile: %w", idStr, err)
return return
} }
log.Infof("%s", buf)
err = pullOutput.Close() log.Debugf("%v: saving tarfile tarfile", idStr)
err = tar.Close()
if err != nil { if err != nil {
log.Errorf("could not close io.ReadCloser: %w", err) log.Errorf("%v: could not close tarfile: %w", idStr, err)
return return
} }
log.Info("Image pulled sucessfully") log.Debugf("%v: job source tarred", idStr)
log.Infof("%v: building container", idStr)
dockerBuildContext, err := os.Open(tarFile)
defer dockerBuildContext.Close()
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: ".cursorius/Dockerfile",
})
if err != nil {
log.Errorf("%v: could not build container: %w", idStr, err)
return
}
log.Debugf("%v: reading build output from docker daemon", idStr)
err = db.UpdateRunBuildOutput(pe.Run.Id, cleanupBuildOutput(buildResponse.Body))
if err != nil {
log.Errorf("%v: could not update build output for run: %w", idStr, err)
return
}
log.Debugf("%v: build output read from docker daemon", idStr)
err = buildResponse.Body.Close()
if err != nil {
log.Errorf("%v: could not close build response body: %w", idStr, err)
return
}
log.Debugf("%v: build response closed", idStr)
log.Infof("%v: image built sucessfully", idStr)
hostConfig := container.HostConfig{} hostConfig := container.HostConfig{}
@@ -154,56 +197,83 @@ func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf co
) )
} }
env := make([]string, 0)
// set cursorius environment variables
env = append(env, []string{
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
}...)
// load secrets into environment
secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id)
if err != nil {
log.Errorf("%v: could not get secrets for pipeline", idStr, err)
return
}
for _, secret := range secrets {
// the env name is validated to be just uppercase letters, numbers, and underscores on ingestion
env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret))
}
log.Debugf("%v: creating container", idStr)
resp, err := cli.ContainerCreate(ctx, resp, err := cli.ContainerCreate(ctx,
&container.Config{ &container.Config{
Image: imageName, Image: imageName,
Tty: false, Tty: false,
Env: []string{ Env: env,
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
}, },
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig) // TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig, &hostConfig,
nil, nil, "", nil, nil, "",
) )
if err != nil { if err != nil {
log.Errorf("could not create container: %w", err) log.Errorf("%v: could not create container: %w", idStr, err)
return return
} }
log.Info("Launching container") log.Info("%v: starting container", idStr)
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
log.Errorf("could not start container: %v", err) log.Errorf("%v: could not start container: %v", idStr, err)
return return
} }
log.Debugf("%v: container started", idStr)
log.Debugf("%v: waiting on container", idStr)
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select { select {
case err := <-errCh: case err := <-errCh:
if err != nil { if err != nil {
log.Errorf("container returned error: %v", err) log.Errorf("%v: container returned error: %v", idStr, err)
return return
} }
case okBody := <-statusCh: case okBody := <-statusCh:
if okBody.Error != nil { if okBody.Error != nil {
log.Errorf("Could not wait on container: %v", err) log.Errorf("%v: could not wait on container: %v", idStr, err)
return return
} else { } else {
log.Debugf("Container finished running with return code: %v", okBody.StatusCode) log.Debugf("%v: container finished running with return code: %v", idStr, okBody.StatusCode)
pe.Run.Result = &okBody.StatusCode pe.Run.Result = &okBody.StatusCode
} }
} }
pe.Run.InProgress = false pe.Run.InProgress = false
log.Debugf("%v: getting container logs", idStr)
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil { if err != nil {
log.Errorf("could not get container logs: %w", err) log.Errorf("%v: could not get container logs: %w", idStr, err)
return return
} }
log.Debugf("%v: gotcontainer logs", idStr)
var stdOut bytes.Buffer var stdOut bytes.Buffer
var stdErr bytes.Buffer var stdErr bytes.Buffer
+25
View File
@@ -0,0 +1,25 @@
package pipeline_executor
import (
"bufio"
"encoding/json"
"io"
)
func cleanupBuildOutput(input io.ReadCloser) string {
output := ""
scanner := bufio.NewScanner(input)
for scanner.Scan() {
var log map[string]any
json.Unmarshal(scanner.Bytes(), &log)
if logVar, ok := log["stream"]; ok {
if log, ok := logVar.(string); ok {
output += log
}
}
}
return output
}
+68 -15
View File
@@ -1,17 +1,18 @@
package poll package poll
import ( import (
"context"
"time" "time"
"github.com/google/uuid"
"github.com/op/go-logging"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor" "git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/storage/memory" "github.com/go-git/go-git/v5/storage/memory"
"github.com/google/uuid"
"github.com/op/go-logging"
) )
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
@@ -26,11 +27,41 @@ type tag struct {
commitHash string commitHash string
} }
func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { func pollJob(ctx context.Context, pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) {
prevCommits := make(map[string]string) firstScan := true
for { for {
time.Sleep(time.Duration(pipeline.PollInterval) * time.Second) // Don't sleep on first scan to ease testing
// TODO: this should be replaced with a script that mocks a webhook
if !firstScan {
ctx, cancel := context.WithTimeout(ctx, time.Duration(pipeline.PollInterval)*time.Second)
select {
case <-ctx.Done():
switch ctx.Err() {
case context.Canceled:
log.Infof("Polling for pipeline %v canceled, stopping", pipeline.Name)
cancel()
return
}
}
cancel()
log.Infof("Polling repo %v", pipeline.Name) log.Infof("Polling repo %v", pipeline.Name)
} else {
firstScan = false
}
prevRefs, err := db.GetPipelineRefs(pipeline.Id)
if err != nil {
log.Errorf("Could not get pipeline refs from db: %v", err)
return
}
log.Debugf("Got pipeline hashs for repo %v (id: %v)", pipeline.Name, pipeline.Id)
for refName, hash := range prevRefs {
log.Debugf("%v: %v: %v", pipeline.Id, refName, hash)
}
repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: pipeline.Url, URL: pipeline.Url,
@@ -50,18 +81,19 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
branches.ForEach(func(branch *plumbing.Reference) error { branches.ForEach(func(branch *plumbing.Reference) error {
log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id) log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[branch.Name().String()] prevRef, ok := prevRefs[branch.Name().String()]
if ok { if ok {
if branch.Hash().String() != prevRef { if branch.Hash().String() != prevRef {
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which does not match the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String() prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String()) refsToRunFor = append(refsToRunFor, branch.Name().String())
} else { } else {
log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef) log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef)
} }
} else { } else {
log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String())
prevCommits[branch.Name().String()] = branch.Hash().String() prevRefs[branch.Name().String()] = branch.Hash().String()
refsToRunFor = append(refsToRunFor, branch.Name().String()) refsToRunFor = append(refsToRunFor, branch.Name().String())
} }
return nil return nil
@@ -74,23 +106,29 @@ func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db da
} }
tags.ForEach(func(tag *plumbing.Reference) error { tags.ForEach(func(tag *plumbing.Reference) error {
log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id) log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id)
prevRef, ok := prevCommits[tag.Name().String()] prevRef, ok := prevRefs[tag.Name().String()]
if ok { if ok {
if tag.Hash().String() != prevRef { if tag.Hash().String() != prevRef {
log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String() prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String()) refsToRunFor = append(refsToRunFor, tag.Name().String())
} else { } else {
log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef) log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef)
} }
} else { } else {
log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String())
prevCommits[tag.Name().String()] = tag.Hash().String() prevRefs[tag.Name().String()] = tag.Hash().String()
refsToRunFor = append(refsToRunFor, tag.Name().String()) refsToRunFor = append(refsToRunFor, tag.Name().String())
} }
return nil return nil
}) })
err = db.UpdatePipelineRefs(pipeline.Id, prevRefs)
if err != nil {
log.Errorf("Could not update pipeline refs: %v", err)
return
}
for _, ref := range refsToRunFor { for _, ref := range refsToRunFor {
log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id) log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id)
@@ -118,11 +156,17 @@ func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan cha
return return
} }
pipelineCancelations := make(map[uuid.UUID]context.CancelFunc)
for _, pipeline := range pipelines { for _, pipeline := range pipelines {
if pipeline.PollInterval == 0 { if pipeline.PollInterval == 0 {
continue continue
} else { } else {
go pollJob(pipeline, conf, db) ctx, cancel := context.WithCancel(context.Background())
pipelineCancelations[pipeline.Id] = cancel
log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id)
go pollJob(ctx, pipeline, conf, db)
} }
} }
@@ -133,8 +177,17 @@ func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan cha
log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err) log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err)
continue continue
} }
// TODO: stop existing polling process for given uuid
go pollJob(pipeline, conf, db) // Cancel existing polling job if it exists
if cancelFunc, ok := pipelineCancelations[pipeline.Id]; ok {
cancelFunc()
}
// Start new polling job
log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id)
ctx, cancel := context.WithCancel(context.Background())
pipelineCancelations[pipeline.Id] = cancel
go pollJob(ctx, pipeline, conf, db)
} }
} }
+22 -7
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@@ -17,23 +18,37 @@ type RunnerData struct {
} }
type Runner struct { type Runner struct {
id string id uuid.UUID
tags []string tags []string
conn *websocket.Conn conn *websocket.Conn
receiveChan chan []byte receiveChan chan []byte
running bool
} }
func (r *Runner) Id() string { func (r *Runner) HasTags(requestedTags []string) bool {
tagIter:
for _, requestedTag := range requestedTags {
for _, posessedTag := range r.tags {
// if we find the tag, move on to search for the next one
if posessedTag == requestedTag {
continue tagIter
}
}
// if we don't find the tag
return false
}
return true
}
func (r *Runner) Id() uuid.UUID {
return r.id return r.id
} }
func (r *Runner) Release() {
r.running = false
}
func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) { func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) {
if r.conn == nil {
return 0, "", "", fmt.Errorf("runner with id %v has nil conn, THIS IS A BUG", r.id)
}
// Write RunCommand message to client // Write RunCommand message to client
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{ serverToRunnerMsg := &runner_api.ServerToRunnerMsg{
Msg: &runner_api.ServerToRunnerMsg_RunCommandMsg{ Msg: &runner_api.ServerToRunnerMsg_RunCommandMsg{
+62 -86
View File
@@ -3,71 +3,33 @@ package runnermanager
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/google/uuid"
"github.com/op/go-logging" "github.com/op/go-logging"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/util"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2" runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
) )
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
type RunnerRegistration struct { func (r *runnerManager) processRunnerAllocation(req RunnerAllocationRequest) {
Secret string tagsStr := util.FormatTags(req.Tags)
Id string log.Infof("Got request for runner with tags \"%v\"", tagsStr)
Tags []string
conn *websocket.Conn
}
type runnerManager struct { log.Debugf("Finding runner with tags %v", tagsStr)
getRunnerCh chan GetRunnerRequest
registerCh chan RunnerRegistration
connectedRunners []Runner
numConnectedRunners uint64
configuredRunners map[string]config.Runner
}
type GetRunnerRequest struct {
Tags []string
RespChan chan GetRunnerResponse
}
type GetRunnerResponse struct {
Runner *Runner
Err error
}
type runnerJob struct {
Id string
URL string
}
func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "[%v", req.Tags[0])
for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, ", %v", tag)
}
fmt.Fprintf(&runnerTagsStr, "]")
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
log.Debugf("Finding runner with tags %v", runnerTagsStr.String())
foundRunner := false foundRunner := false
runnersToRemove := []int{} runnersToRemove := []int{}
runnerIter: runnerIter:
for i, runner := range r.connectedRunners { for i, runner := range r.connectedRunners {
// don't allocate runner that is already occupied
if runner.running {
log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id)
continue
}
// don't allocate runner with closed receiveChan (is defunct) // don't allocate runner with closed receiveChan (is defunct)
// there should never be messages to read on an inactive runner, // there should never be messages to read on an inactive runner,
// so we aren't losing any data here // so we aren't losing any data here
@@ -84,29 +46,25 @@ runnerIter:
default: default:
log.Debugf("Checking runner %v for requested tags", runner.id) log.Debugf("Checking runner %v for requested tags", runner.id)
tagIter: if !runner.HasTags(req.Tags) {
for _, requestedTag := range req.Tags {
for _, posessedTag := range runner.tags {
if requestedTag == posessedTag {
continue tagIter
}
}
continue runnerIter continue runnerIter
} }
r.connectedRunners[i].running = true runnersToRemove = append(runnersToRemove, i)
foundRunner = true foundRunner = true
req.RespChan <- GetRunnerResponse{ log.Debugf("Runner %v has requested tags, allocating", runner.id)
req.RespChan <- RunnerAllocationResponse{
Runner: &r.connectedRunners[i], Runner: &r.connectedRunners[i],
Err: nil, Err: nil,
} }
} }
} }
// remove allocated runner plus defunct runners
// since we iterate, all the indexes will be in accending order // since we iterate, all the indexes will be in accending order
for i, runnerInd := range runnersToRemove { for i, runnerInd := range runnersToRemove {
r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1] r.connectedRunners[runnerInd-i] = r.connectedRunners[len(r.connectedRunners)-1]
r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-2] r.connectedRunners = r.connectedRunners[0 : len(r.connectedRunners)-1]
} }
if foundRunner { if foundRunner {
@@ -117,24 +75,42 @@ runnerIter:
if len(r.connectedRunners) == 0 { if len(r.connectedRunners) == 0 {
errorMsg = "no connected runners" errorMsg = "no connected runners"
} }
req.RespChan <- GetRunnerResponse{ req.RespChan <- RunnerAllocationResponse{
Runner: &Runner{}, Runner: nil,
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
} }
} }
func (r *runnerManager) processRegistration(reg RunnerRegistration) { func (r *runnerManager) processRunnerRegistration(req RunnerRegistrationRequest) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret) log.Debugf("New runner appeared with id: %v and secret: %v", req.Id, req.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret { // Get runner with give id from database
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags) runnerId, err := uuid.Parse(req.Id)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not parse as UUID: %v", req.Id, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
dbRunner, err := r.db.GetRunnerById(runnerId)
if err != nil {
log.Errorf("Disconnecting runner with id: %v, could not find runner in DB: %v", runnerId, err)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
if req.Secret != dbRunner.Token {
log.Errorf("Disconnecting runner with id: %v, invalid secret", runnerId)
req.conn.Close(websocket.StatusNormalClosure, "registration invalid")
return
}
log.Infof("Registering runner \"%v\" with tags %v", req.Id, req.Tags)
runner := Runner{ runner := Runner{
id: reg.Id, id: runnerId,
tags: reg.Tags, tags: req.Tags,
conn: reg.conn, conn: req.conn,
receiveChan: make(chan []byte), receiveChan: make(chan []byte),
running: false,
} }
r.connectedRunners = append(r.connectedRunners, runner) r.connectedRunners = append(r.connectedRunners, runner)
// start goroutine to call Read function on websocket connection // start goroutine to call Read function on websocket connection
@@ -143,7 +119,7 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
defer log.Noticef("Deregistered runner with id: %v", runner.id) defer log.Noticef("Deregistered runner with id: %v", runner.id)
defer close(runner.receiveChan) defer close(runner.receiveChan)
for { for {
msgType, data, err := reg.conn.Read(context.Background()) msgType, data, err := req.conn.Read(context.Background())
if err != nil { if err != nil {
// TODO: this is still racy, since a runner could be allocated between the // TODO: this is still racy, since a runner could be allocated between the
// connection returning an err and the channel closing // connection returning an err and the channel closing
@@ -161,47 +137,47 @@ func (r *runnerManager) processRegistration(reg RunnerRegistration) {
} }
}() }()
}
} else { func (r *runnerManager) processRunnerRelease(req RunnerReleaseRequest) {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, reg.Secret) r.connectedRunners = append(r.connectedRunners, *req.Runner)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} }
func runRunnerManager(r runnerManager) { func runRunnerManager(r runnerManager) {
for { for {
select { select {
case request := <-r.getRunnerCh: case request := <-r.chans.Allocation:
r.processRequest(request) r.processRunnerAllocation(request)
case release := <-r.chans.Release:
case registration := <-r.registerCh: r.processRunnerRelease(release)
r.processRegistration(registration) case registration := <-r.chans.Registration:
r.processRunnerRegistration(registration)
} }
} }
} }
func StartRunnerManager(configuredRunners map[string]config.Runner) (chan GetRunnerRequest, chan RunnerRegistration, error) { func StartRunnerManager(configuredRunners map[string]config.Runner, db database.Database) (RunnerManagerChans, error) {
scheduler := runnerManager{ scheduler := runnerManager{
getRunnerCh: make(chan GetRunnerRequest), chans: RunnerManagerChans{
registerCh: make(chan RunnerRegistration), Allocation: make(chan RunnerAllocationRequest),
Release: make(chan RunnerReleaseRequest),
Registration: make(chan RunnerRegistrationRequest),
},
connectedRunners: make([]Runner, 0), connectedRunners: make([]Runner, 0),
configuredRunners: configuredRunners, configuredRunners: configuredRunners,
db: db,
} }
go runRunnerManager(scheduler) go runRunnerManager(scheduler)
return scheduler.getRunnerCh, scheduler.registerCh, nil return scheduler.chans, nil
} }
func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistration) { func RegisterRunner(conn *websocket.Conn, registerCh chan RunnerRegistrationRequest) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
var registration RunnerRegistration var registration RunnerRegistrationRequest
registration.conn = conn registration.conn = conn
typ, r, err := conn.Read(ctx) typ, r, err := conn.Read(ctx)
+49
View File
@@ -0,0 +1,49 @@
package runnermanager
import (
"nhooyr.io/websocket"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
)
type RunnerManagerChans struct {
Allocation chan RunnerAllocationRequest
Release chan RunnerReleaseRequest
Registration chan RunnerRegistrationRequest
}
type runnerManager struct {
chans RunnerManagerChans
connectedRunners []Runner
numConnectedRunners uint64
configuredRunners map[string]config.Runner
db database.Database
}
type RunnerAllocationRequest struct {
Tags []string
RespChan chan RunnerAllocationResponse
CancelChan chan string
}
type RunnerAllocationResponse struct {
Runner *Runner
Err error
}
type RunnerReleaseRequest struct {
Runner *Runner
}
type RunnerRegistrationRequest struct {
Secret string
Id string
Tags []string
conn *websocket.Conn
}
type runnerJob struct {
Id string
URL string
}
+18
View File
@@ -0,0 +1,18 @@
package util
import (
"fmt"
"strings"
)
func FormatTags(tags []string) string {
var tagsStr strings.Builder
if len(tags) > 0 {
fmt.Fprintf(&tagsStr, "[%v", tags[0])
for _, tag := range tags[1:] {
fmt.Fprintf(&tagsStr, ", %v", tag)
}
fmt.Fprintf(&tagsStr, "]")
}
return tagsStr.String()
}