266 lines
6.8 KiB
Go
266 lines
6.8 KiB
Go
package pipeline_executor
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/jhoonb/archivex"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/mount"
|
|
"github.com/docker/docker/client"
|
|
"github.com/docker/docker/pkg/stdcopy"
|
|
"github.com/go-git/go-git/v5"
|
|
"github.com/go-git/go-git/v5/plumbing/transport"
|
|
"github.com/go-git/go-git/v5/plumbing/transport/http"
|
|
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
|
|
"github.com/op/go-logging"
|
|
|
|
"git.ohea.xyz/cursorius/server/config"
|
|
"git.ohea.xyz/cursorius/server/database"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type PipelineExecution struct {
|
|
Pipeline database.Pipeline
|
|
Ref string
|
|
Run database.Run
|
|
}
|
|
|
|
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
|
|
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
|
|
cloneFolder := filepath.Join(jobFolder, "repo")
|
|
|
|
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url)
|
|
|
|
log.Debugf("Job %v configured with folder \"%v\"", pe.Pipeline.Name, jobFolder)
|
|
|
|
err := os.RemoveAll(jobFolder)
|
|
if err != nil {
|
|
log.Errorf("could not delete existing folder %v", jobFolder)
|
|
return
|
|
}
|
|
|
|
err = os.MkdirAll(cloneFolder, 0755)
|
|
if err != nil {
|
|
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err)
|
|
return
|
|
}
|
|
|
|
log.Infof("Cloning source from URL %v", pe.Pipeline.Url)
|
|
|
|
var auth transport.AuthMethod
|
|
|
|
if pe.Pipeline.CloneCredential != nil {
|
|
credential, err := db.GetCloneCredentialById(*pe.Pipeline.CloneCredential)
|
|
if err != nil {
|
|
log.Errorf("could not get credenital from db: %v", err)
|
|
return
|
|
}
|
|
|
|
switch credential.Type {
|
|
case "USER_PASS":
|
|
log.Debugf("job %v configured to use credential %v", pe.Pipeline.Name, credential.Name)
|
|
auth = transport.AuthMethod(&http.BasicAuth{
|
|
Username: credential.Username,
|
|
Password: credential.Secret,
|
|
})
|
|
case "SSH_KEY":
|
|
publicKeys, err := ssh.NewPublicKeys(credential.Username, []byte(credential.Secret), "")
|
|
if err != nil {
|
|
log.Errorf("could not parse credential %v", credential.Name)
|
|
return
|
|
}
|
|
auth = transport.AuthMethod(publicKeys)
|
|
default:
|
|
log.Errorf("unsupported credential type %v", credential.Type)
|
|
return
|
|
}
|
|
} else {
|
|
auth = nil
|
|
}
|
|
|
|
_, err = git.PlainClone(cloneFolder, false, &git.CloneOptions{
|
|
URL: pe.Pipeline.Url,
|
|
Auth: auth,
|
|
})
|
|
if err != nil {
|
|
log.Errorf("could not clone repo: %v", err)
|
|
return
|
|
}
|
|
|
|
cli, err := client.NewClientWithOpts(client.FromEnv)
|
|
if err != nil {
|
|
log.Errorf("Could not create docker client: %w", err)
|
|
return
|
|
}
|
|
log.Info("Source cloned successfully")
|
|
|
|
ctx := context.Background()
|
|
|
|
log.Info("Building container")
|
|
|
|
tarFile := filepath.Join(jobFolder, "archive.tar")
|
|
tar := new(archivex.TarFile)
|
|
err = tar.Create(tarFile)
|
|
if err != nil {
|
|
log.Errorf("could not create tarfile: %w", err)
|
|
return
|
|
}
|
|
|
|
err = tar.AddAll(cloneFolder, false)
|
|
if err != nil {
|
|
log.Errorf("could not add repo to tarfile: %w", err)
|
|
return
|
|
}
|
|
|
|
err = tar.Close()
|
|
if err != nil {
|
|
log.Errorf("could not close tarfile: %w", err)
|
|
return
|
|
}
|
|
|
|
dockerBuildContext, err := os.Open(tarFile)
|
|
defer dockerBuildContext.Close()
|
|
|
|
imageName := fmt.Sprintf("%v-%v:latest", pe.Pipeline.Id.String(), pe.Run.Id.String())
|
|
|
|
buildResponse, err := cli.ImageBuild(context.Background(), dockerBuildContext, types.ImageBuildOptions{
|
|
Tags: []string{imageName},
|
|
Dockerfile: ".cursorius/Dockerfile",
|
|
})
|
|
if err != nil {
|
|
log.Errorf("could not build container: %w", err)
|
|
return
|
|
}
|
|
|
|
err = db.UpdateRunBuildOutput(pe.Run.Id, cleanupBuildOutput(buildResponse.Body))
|
|
if err != nil {
|
|
log.Errorf("could not update build output for run: %w", err)
|
|
return
|
|
}
|
|
|
|
err = buildResponse.Body.Close()
|
|
if err != nil {
|
|
log.Errorf("Could not close build response body: %w", err)
|
|
return
|
|
}
|
|
|
|
log.Info("Image built sucessfully")
|
|
|
|
hostConfig := container.HostConfig{}
|
|
|
|
if pipelineConf.DockerNetwork != nil {
|
|
hostConfig.NetworkMode = container.NetworkMode(*pipelineConf.DockerNetwork)
|
|
}
|
|
|
|
if pipelineConf.MountConf.Type == config.Bind {
|
|
sourceDir := filepath.Join(pipelineConf.MountConf.Source, pe.Pipeline.Id.String(), pe.Run.Id.String())
|
|
|
|
hostConfig.Mounts = append(hostConfig.Mounts,
|
|
mount.Mount{
|
|
Type: mount.TypeBind,
|
|
Source: sourceDir,
|
|
Target: "/cursorius/src",
|
|
ReadOnly: false,
|
|
Consistency: mount.ConsistencyDefault,
|
|
},
|
|
)
|
|
} else if pipelineConf.MountConf.Type == config.Volume {
|
|
hostConfig.Mounts = append(hostConfig.Mounts,
|
|
mount.Mount{
|
|
Type: mount.TypeVolume,
|
|
Source: pipelineConf.MountConf.Source,
|
|
Target: "/cursorius/src",
|
|
ReadOnly: false,
|
|
Consistency: mount.ConsistencyDefault,
|
|
},
|
|
)
|
|
}
|
|
|
|
env := make([]string, 0)
|
|
|
|
// set cursorius environment variables
|
|
env = append(env, []string{
|
|
fmt.Sprintf("CURSORIUS_RUN_ID=%v", pe.Run.Id),
|
|
"CURSORIUS_SRC_DIR=/cursorius/src",
|
|
fmt.Sprintf("CURSORIUS_SERVER_URL=%v", pipelineConf.AccessURL),
|
|
}...)
|
|
|
|
// load secrets into environment
|
|
secrets, err := db.GetSecretsForPipeline(pe.Pipeline.Id)
|
|
if err != nil {
|
|
log.Errorf("Could not get secrets for pipeline", err)
|
|
return
|
|
}
|
|
|
|
for _, secret := range secrets {
|
|
// the env name is validated to be just uppercase letters, numbers, and underscores on ingestion
|
|
env = append(env, fmt.Sprintf("%v=%v", strings.ToUpper(secret.Name), secret.Secret))
|
|
}
|
|
|
|
resp, err := cli.ContainerCreate(ctx,
|
|
&container.Config{
|
|
Image: imageName,
|
|
Tty: false,
|
|
Env: env,
|
|
},
|
|
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
|
|
&hostConfig,
|
|
nil, nil, "",
|
|
)
|
|
if err != nil {
|
|
log.Errorf("could not create container: %w", err)
|
|
return
|
|
}
|
|
|
|
log.Info("Launching container")
|
|
|
|
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
|
|
log.Errorf("could not start container: %v", err)
|
|
return
|
|
}
|
|
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
log.Errorf("container returned error: %v", err)
|
|
return
|
|
}
|
|
case okBody := <-statusCh:
|
|
if okBody.Error != nil {
|
|
log.Errorf("Could not wait on container: %v", err)
|
|
return
|
|
} else {
|
|
log.Debugf("Container finished running with return code: %v", okBody.StatusCode)
|
|
pe.Run.Result = &okBody.StatusCode
|
|
}
|
|
}
|
|
|
|
pe.Run.InProgress = false
|
|
|
|
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
|
|
if err != nil {
|
|
log.Errorf("could not get container logs: %w", err)
|
|
return
|
|
}
|
|
|
|
var stdOut bytes.Buffer
|
|
var stdErr bytes.Buffer
|
|
|
|
stdcopy.StdCopy(&stdOut, &stdErr, out)
|
|
|
|
pe.Run.Stdout = stdOut.Bytes()
|
|
pe.Run.Stderr = stdErr.Bytes()
|
|
|
|
db.UpdateRunResult(pe.Run)
|
|
|
|
return
|
|
}
|