f95a5f9ae5
The protobuf files and the generated golang code has been moved into git.ohea.xyz/cursorius/pipeline-api. The generated code is now imported from that location. The version of the API has also been bumped to v2 to avoid unsupported v1 modules in golang.
123 lines
3.4 KiB
Go
123 lines
3.4 KiB
Go
package pipeline_api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
|
|
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
|
|
"git.ohea.xyz/cursorius/server/runnermanager"
|
|
"github.com/bufbuild/connect-go"
|
|
"github.com/google/uuid"
|
|
"github.com/op/go-logging"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type ApiServer struct {
|
|
getRunnerCh chan runnermanager.GetRunnerRequest
|
|
allocatedRunners map[uuid.UUID]*RunnerWrapper
|
|
allocatedRunnersMutex sync.RWMutex
|
|
}
|
|
|
|
type RunnerWrapper struct {
|
|
runner runnermanager.Runner
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
func (r *RunnerWrapper) RunCommand(cmd string) (int64, string, string, error) {
|
|
r.mutex.Unlock()
|
|
defer r.mutex.Lock()
|
|
|
|
return_code, stdout, stderr, err := r.runner.RunCommand(cmd)
|
|
|
|
// TODO: run command by sending websocket packet
|
|
// TODO: get stdout and stderr response
|
|
return return_code, stdout, stderr, err
|
|
}
|
|
|
|
func (s *ApiServer) GetRunnerFromMap(uuid uuid) *RunnerWrapper {
|
|
s.allocatedRunnersMutex.RLock()
|
|
defer s.allocatedRunnersMutex.RUnlock()
|
|
return s.allocatedRunners[uuid]
|
|
}
|
|
|
|
func (s *ApiServer) GetRunner(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.GetRunnerRequest],
|
|
) (*connect.Response[apiv2.GetRunnerResponse], error) {
|
|
|
|
respChan := make(chan runnermanager.GetRunnerResponse)
|
|
s.getRunnerCh <- runnermanager.GetRunnerRequest{
|
|
Tags: req.Msg.Tags,
|
|
RespChan: respChan,
|
|
}
|
|
|
|
var runnerTagsStr strings.Builder
|
|
fmt.Fprintf(&runnerTagsStr, "%v", req.Msg.Tags[0])
|
|
for _, tag := range req.Msg.Tags[1:] {
|
|
fmt.Fprintf(&runnerTagsStr, " %v", tag)
|
|
}
|
|
|
|
response := <-respChan
|
|
if response.Err != nil {
|
|
log.Errorf("Could not get runner with tags \"%v\": %v", runnerTagsStr, response.Err)
|
|
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
|
|
}
|
|
|
|
log.Info("Got runner with tags: %v", runnerTagsStr)
|
|
|
|
runnerUuid := uuid.New()
|
|
|
|
s.allocatedRunnersMutex.Lock()
|
|
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
|
|
s.allocatedRunnersMutex.Unlock()
|
|
|
|
res := connect.NewResponse(&apiv2.GetRunnerResponse{
|
|
Id: runnerUuid.String(),
|
|
})
|
|
res.Header().Set("GetRunner-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func (s *ApiServer) RunCommand(
|
|
ctx context.Context,
|
|
req *connect.Request[apiv2.RunCommandRequest],
|
|
) (*connect.Response[apiv2.RunCommandResponse], error) {
|
|
|
|
uuid, err := uuid.Parse(req.Msg.Id)
|
|
if err != nil {
|
|
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
|
}
|
|
|
|
runner := s.GetRunnerFromMap(uuid)
|
|
|
|
return_code, stdout, stderr, err := runner.RunCommand(req.Msg.Command)
|
|
if err != nil {
|
|
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
|
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
|
|
}
|
|
|
|
res := connect.NewResponse(&apiv2.RunCommandResponse{
|
|
ReturnCode: return_code,
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
})
|
|
res.Header().Set("RunCommand-Version", "v2")
|
|
return res, nil
|
|
}
|
|
|
|
func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) {
|
|
api_server := &ApiServer{
|
|
getRunnerCh: getRunnerCh,
|
|
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
|
|
}
|
|
path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
path, handler = apiv2connect.NewRunCommandServiceHandler(api_server)
|
|
mux.Handle(path, handler)
|
|
}
|