fe53a17160
This removes an existing unlocked shared access to runner.running. This also sets us up for better management of the runners.
158 lines
4.5 KiB
Go
158 lines
4.5 KiB
Go
package pipeline_api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
|
|
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
|
|
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
|
|
"git.ohea.xyz/cursorius/server/runnermanager"
|
|
"git.ohea.xyz/cursorius/server/util"
|
|
"github.com/bufbuild/connect-go"
|
|
"github.com/google/uuid"
|
|
"github.com/op/go-logging"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type ApiServer struct {
|
|
allocationCh chan runnermanager.RunnerAllocationRequest
|
|
releaseCh chan runnermanager.RunnerReleaseRequest
|
|
allocatedRunners map[uuid.UUID]*RunnerWrapper
|
|
allocatedRunnersMutex sync.RWMutex
|
|
}
|
|
|
|
type RunnerWrapper struct {
|
|
runner *runnermanager.Runner
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, string, error) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args)
|
|
|
|
return return_code, stdout, stderr, err
|
|
}
|
|
|
|
func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
releaseCh <- runnermanager.RunnerReleaseRequest{
|
|
Runner: r.runner,
|
|
}
|
|
r.runner = nil
|
|
}
|
|
|
|
func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) {
|
|
s.allocatedRunnersMutex.RLock()
|
|
defer s.allocatedRunnersMutex.RUnlock()
|
|
runner, ok := s.allocatedRunners[u]
|
|
return runner, ok
|
|
}
|
|
|
|
func (s *ApiServer) GetRunner(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.GetRunnerRequest],
|
|
) (*connect.Response[apiv2.GetRunnerResponse], error) {
|
|
|
|
respChan := make(chan runnermanager.RunnerAllocationResponse)
|
|
s.allocationCh <- runnermanager.RunnerAllocationRequest{
|
|
Tags: req.Msg.Tags,
|
|
RespChan: respChan,
|
|
}
|
|
|
|
tagsStr := util.FormatTags(req.Msg.Tags)
|
|
|
|
response := <-respChan
|
|
if response.Err != nil {
|
|
log.Errorf("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
|
|
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
|
|
}
|
|
|
|
log.Infof("Got runner with tags: %v", tagsStr)
|
|
|
|
runnerUuid := uuid.New()
|
|
|
|
s.allocatedRunnersMutex.Lock()
|
|
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
|
|
s.allocatedRunnersMutex.Unlock()
|
|
|
|
res := connect.NewResponse(&apiv2.GetRunnerResponse{
|
|
Id: runnerUuid.String(),
|
|
})
|
|
res.Header().Set("GetRunner-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func (s *ApiServer) ReleaseRunner(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.ReleaseRunnerRequest],
|
|
) (*connect.Response[apiv2.ReleaseRunnerResponse], error) {
|
|
uuid, err := uuid.Parse(req.Msg.Id)
|
|
if err != nil {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
log.Infof("Releasing runner with ID \"%v\"", uuid)
|
|
|
|
s.allocatedRunnersMutex.Lock()
|
|
runner := s.allocatedRunners[uuid]
|
|
delete(s.allocatedRunners, uuid)
|
|
runner.Release(s.releaseCh)
|
|
s.allocatedRunnersMutex.Unlock()
|
|
|
|
res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{})
|
|
res.Header().Set("ReleaseRunner-Version", "v2")
|
|
return res, nil
|
|
|
|
}
|
|
|
|
func (s *ApiServer) RunCommand(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.RunCommandRequest],
|
|
) (*connect.Response[apiv2.RunCommandResponse], error) {
|
|
|
|
uuid, err := uuid.Parse(req.Msg.Id)
|
|
if err != nil {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
runner, ok := s.GetRunnerFromMap(uuid)
|
|
if !ok {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
returnCode, stdout, stderr, err := runner.RunCommand(req.Msg.Command, req.Msg.Args)
|
|
if err != nil {
|
|
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
|
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
|
|
}
|
|
|
|
res := connect.NewResponse(&apiv2.RunCommandResponse{
|
|
ReturnCode: returnCode,
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
})
|
|
res.Header().Set("RunCommand-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) {
|
|
api_server := &ApiServer{
|
|
allocationCh: allocationCh,
|
|
releaseCh: releaseCh,
|
|
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
|
|
}
|
|
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
path, handler = apiv2connect.NewReleaseRunnerServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
path, handler = apiv2connect.NewRunCommandServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
}
|