198 lines
5.4 KiB
Go
198 lines
5.4 KiB
Go
package pipeline_api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
|
|
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
|
|
"git.ohea.xyz/cursorius/server/runnermanager"
|
|
"git.ohea.xyz/cursorius/server/util"
|
|
"github.com/bufbuild/connect-go"
|
|
"github.com/google/uuid"
|
|
"github.com/op/go-logging"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type ApiServer struct {
|
|
allocationCh chan runnermanager.RunnerAllocationRequest
|
|
releaseCh chan runnermanager.RunnerReleaseRequest
|
|
allocatedRunners map[uuid.UUID]*RunnerWrapper
|
|
allocatedRunnersMutex sync.RWMutex
|
|
}
|
|
|
|
type RunnerWrapper struct {
|
|
runner *runnermanager.Runner
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, string, error) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args)
|
|
|
|
return return_code, stdout, stderr, err
|
|
}
|
|
|
|
func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
releaseCh <- runnermanager.RunnerReleaseRequest{
|
|
Runner: r.runner,
|
|
}
|
|
r.runner = nil
|
|
}
|
|
|
|
func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
|
|
s.allocatedRunnersMutex.RLock()
|
|
defer s.allocatedRunnersMutex.RUnlock()
|
|
runner, ok := s.allocatedRunners[u]
|
|
return runner, ok
|
|
}
|
|
|
|
func (s *ApiServer) AddRunnerToMap(u uuid.UUID, runner *runnermanager.Runner) {
|
|
s.allocatedRunnersMutex.Lock()
|
|
defer s.allocatedRunnersMutex.Unlock()
|
|
s.allocatedRunners[u] = &RunnerWrapper{runner: runner}
|
|
}
|
|
|
|
func (s *ApiServer) GetRunner(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.GetRunnerRequest],
|
|
) (*connect.Response[apiv2.GetRunnerResponse], error) {
|
|
|
|
var response runnermanager.RunnerAllocationResponse
|
|
var timeoutCtx *context.Context
|
|
var retryInterval int64 = 0
|
|
|
|
respChan := make(chan runnermanager.RunnerAllocationResponse)
|
|
|
|
tagsStr := util.FormatTags(req.Msg.Tags)
|
|
|
|
if req.Msg.Options != nil {
|
|
if req.Msg.Options.Timeout != 0 {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.Msg.Options.Timeout)*time.Second)
|
|
timeoutCtx = &ctx
|
|
defer cancel()
|
|
}
|
|
|
|
retryInterval = req.Msg.Options.RetryInterval
|
|
}
|
|
|
|
for {
|
|
s.allocationCh <- runnermanager.RunnerAllocationRequest{
|
|
Tags: req.Msg.Tags,
|
|
RespChan: respChan,
|
|
}
|
|
|
|
response = <-respChan
|
|
if response.Err == nil {
|
|
break
|
|
}
|
|
|
|
log.Infof("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
|
|
|
|
// If no timeout is specified, skip after one attempt
|
|
if timeoutCtx == nil {
|
|
break
|
|
}
|
|
|
|
// If timeout is expired, stop trying to allocate runner
|
|
if (*timeoutCtx).Err() != nil {
|
|
break
|
|
}
|
|
|
|
log.Infof("Sleeping for %v seconds before retry...", retryInterval)
|
|
time.Sleep(time.Duration(retryInterval) * time.Second)
|
|
}
|
|
|
|
if response.Runner == nil {
|
|
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
|
|
}
|
|
|
|
log.Infof("Got runner with tags: %v", tagsStr)
|
|
|
|
runnerUuid := uuid.New()
|
|
|
|
s.AddRunnerToMap(runnerUuid, response.Runner)
|
|
|
|
res := connect.NewResponse(&apiv2.GetRunnerResponse{
|
|
Id: runnerUuid.String(),
|
|
})
|
|
res.Header().Set("GetRunner-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func (s *ApiServer) ReleaseRunner(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.ReleaseRunnerRequest],
|
|
) (*connect.Response[apiv2.ReleaseRunnerResponse], error) {
|
|
uuid, err := uuid.Parse(req.Msg.Id)
|
|
if err != nil {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
log.Infof("Releasing runner with ID \"%v\"", uuid)
|
|
|
|
s.allocatedRunnersMutex.Lock()
|
|
runner := s.allocatedRunners[uuid]
|
|
delete(s.allocatedRunners, uuid)
|
|
runner.Release(s.releaseCh)
|
|
s.allocatedRunnersMutex.Unlock()
|
|
|
|
res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{})
|
|
res.Header().Set("ReleaseRunner-Version", "v2")
|
|
return res, nil
|
|
|
|
}
|
|
|
|
func (s *ApiServer) RunCommand(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.RunCommandRequest],
|
|
) (*connect.Response[apiv2.RunCommandResponse], error) {
|
|
|
|
uuid, err := uuid.Parse(req.Msg.Id)
|
|
if err != nil {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
runner, ok := s.GetRunnerFromMap(uuid)
|
|
if !ok {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
returnCode, stdout, stderr, err := runner.RunCommand(req.Msg.Command, req.Msg.Args)
|
|
if err != nil {
|
|
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
|
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
|
|
}
|
|
|
|
res := connect.NewResponse(&apiv2.RunCommandResponse{
|
|
ReturnCode: returnCode,
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
})
|
|
res.Header().Set("RunCommand-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) {
|
|
api_server := &ApiServer{
|
|
allocationCh: allocationCh,
|
|
releaseCh: releaseCh,
|
|
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
|
|
}
|
|
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
path, handler = apiv2connect.NewReleaseRunnerServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
path, handler = apiv2connect.NewRunCommandServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
}
|