141 lines
2.9 KiB
Go
141 lines
2.9 KiB
Go
|
package rabbitmq
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"git.sch9.ru/new_gate/ms-runner/internal/runner"
|
||
|
"git.sch9.ru/new_gate/ms-runner/internal/runner/usecase"
|
||
|
runnerv1 "git.sch9.ru/new_gate/ms-runner/pkg/go/gen/proto/runner/v1"
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||
|
)
|
||
|
|
||
|
func NewRunnerBuilderConsumer(ch *amqp.Channel, runnerUC runner.RunnerUseCase, queueName string, instanceName string) {
|
||
|
msgs, err := ch.Consume(
|
||
|
queueName,
|
||
|
instanceName,
|
||
|
false,
|
||
|
false,
|
||
|
false,
|
||
|
false,
|
||
|
nil,
|
||
|
)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
for {
|
||
|
runnerUC.Lock()
|
||
|
if !runnerUC.Ready() {
|
||
|
continue
|
||
|
}
|
||
|
runnerUC.Unlock()
|
||
|
|
||
|
select {
|
||
|
case d := <-msgs:
|
||
|
var msg runnerv1.Instruction
|
||
|
|
||
|
err = proto.Unmarshal(d.Body, &msg)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
switch msg.Instruction.(type) {
|
||
|
case *runnerv1.Instruction_Build:
|
||
|
err = runnerUC.Process(usecase.BuildInstruction{
|
||
|
SolutionId: msg.GetBuild().GetSolutionId(),
|
||
|
BindingKey: msg.GetBuild().GetBindingKey(),
|
||
|
Language: msg.GetBuild().GetLanguage(),
|
||
|
Solution: msg.GetBuild().GetSolution(),
|
||
|
})
|
||
|
if err != nil {
|
||
|
fmt.Println(err)
|
||
|
} else {
|
||
|
err = d.Ack(false)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
case *runnerv1.Instruction_Run:
|
||
|
err = runnerUC.Process(usecase.RunInstruction{
|
||
|
SolutionId: msg.GetRun().GetSolutionId(),
|
||
|
TestId: msg.GetRun().GetTestId(),
|
||
|
BindingKey: msg.GetRun().GetBindingKey(),
|
||
|
})
|
||
|
if err != nil {
|
||
|
fmt.Println(err)
|
||
|
} else {
|
||
|
err = d.Ack(false)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
default:
|
||
|
fmt.Println("unknown instruction: ", msg.Instruction)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func NewRunnerBuilderProducer(ch *amqp.Channel, runnerUC runner.RunnerUseCase) {
|
||
|
go func() {
|
||
|
for res := range runnerUC.Results() {
|
||
|
var (
|
||
|
body []byte
|
||
|
err error
|
||
|
bindingKey string
|
||
|
)
|
||
|
|
||
|
switch result := res.(type) {
|
||
|
case usecase.BuildResult:
|
||
|
msg := &runnerv1.Result{
|
||
|
Result: &runnerv1.Result_Build{
|
||
|
Build: &runnerv1.BuildResult{
|
||
|
SolutionId: result.SolutionId,
|
||
|
Status: result.Status,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
body, err = proto.Marshal(msg)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
bindingKey = result.BindingKey
|
||
|
case usecase.RunResult:
|
||
|
msg := &runnerv1.Result{
|
||
|
Result: &runnerv1.Result_Run{
|
||
|
Run: &runnerv1.RunResult{
|
||
|
SolutionId: result.SolutionId,
|
||
|
TestId: result.TestId,
|
||
|
Status: result.Status,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
body, err = proto.Marshal(msg)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
bindingKey = result.BindingKey
|
||
|
default:
|
||
|
fmt.Println("unknown result: ", result)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fmt.Println("publishing: ", bindingKey)
|
||
|
err = ch.Publish(
|
||
|
"",
|
||
|
bindingKey,
|
||
|
false,
|
||
|
false,
|
||
|
amqp.Publishing{
|
||
|
ContentType: "text/plain",
|
||
|
Body: body,
|
||
|
})
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|