diff --git a/.gitignore b/.gitignore index 9ad7717..94fd9c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.env +.idea /starter/alpine-make-rootfs/ /run /starter/shared diff --git a/Makefile b/Makefile index 5db88f0..430014f 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,11 @@ init: mv alpine-make-rootfs starter/alpine-make-rootfs mkrootfs: starter/create_rootfs.sh -generate: +gen: buf generate compile: go build -o ./run gcc starter/starter.c -o starter/starter +dev: + @make gen + @go run main.go \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..80b3c93 --- /dev/null +++ b/config/config.go @@ -0,0 +1,9 @@ +package config + +type Config struct { + Env string `env:"ENV" env-default:"prod"` + + RabbitDSN string `env:"RABBIT_DSN" required:"true"` + InstanceName string `env:"INSTANCE_NAME" required:"true"` + TQueueName string `env:"T_QUEUE_NAME" required:"true"` +} diff --git a/go.mod b/go.mod index 85db980..cc850d4 100644 --- a/go.mod +++ b/go.mod @@ -4,16 +4,26 @@ go 1.21.3 toolchain go1.23.0 -require github.com/containerd/cgroups v1.1.0 +require ( + git.sch9.ru/new_gate/ms-tester v0.0.0 + github.com/containerd/cgroups v1.1.0 + github.com/golang/protobuf v1.5.4 + github.com/ilyakaznacheev/cleanenv v1.5.0 + github.com/rabbitmq/amqp091-go v1.10.0 + google.golang.org/protobuf v1.34.2 +) require ( - git.sch9.ru/new_gate/ms-tester v0.0.0 // indirect + github.com/BurntSushi/toml v1.2.1 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/sys v0.22.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect ) diff --git a/go.sum b/go.sum index 6a46914..545359f 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,41 @@ -git.sch9.ru/new_gate/ms-tester v0.0.0-20240825162014-7b0e1b21f19e h1:QTGa3uw7Lu9HZJymg/FlarlrdofwEdPuzBMXGrILkmU= -git.sch9.ru/new_gate/ms-tester v0.0.0-20240825162014-7b0e1b21f19e/go.mod h1:gqA96jkobHh1HM/bksygv2AnxM+GkCqVAdgmJyC0rE0= git.sch9.ru/new_gate/ms-tester v0.0.0 h1:ZR5YkHCO9JAY/d1qqEgwu/Dtru6iR1p3sIBqp8tetnA= git.sch9.ru/new_gate/ms-tester v0.0.0/go.mod h1:gqA96jkobHh1HM/bksygv2AnxM+GkCqVAdgmJyC0rE0= +github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/ilyakaznacheev/cleanenv v1.5.0 h1:0VNZXggJE2OYdXE87bfSSwGxeiGt9moSR2lOrsHHvr4= +github.com/ilyakaznacheev/cleanenv v1.5.0/go.mod h1:a5aDzaJrLCQZsazHol1w8InnDcOX0OColm64SlIi6gk= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= @@ -37,8 +55,6 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 h1:GkvMjFtXUmahfDtashnc1mnrCtuBVcwse5QV2lUk/tI= -golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -50,4 +66,13 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 h1:slmdOY3vp8a7KQbHkL+FLbvbkgMqmXojpFUO/jENuqQ= +olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3/go.mod h1:oVgVk4OWVDi43qWBEyGhXgYxt7+ED4iYNpTngSLX2Iw= diff --git a/internal/runner/delivery.go b/internal/runner/delivery.go new file mode 100644 index 0000000..75c10db --- /dev/null +++ b/internal/runner/delivery.go @@ -0,0 +1 @@ +package runner diff --git a/internal/runner/delivery/rabbitmq/pubsub.go b/internal/runner/delivery/rabbitmq/pubsub.go new file mode 100644 index 0000000..2d76641 --- /dev/null +++ b/internal/runner/delivery/rabbitmq/pubsub.go @@ -0,0 +1,140 @@ +package rabbitmq + +import ( + "fmt" + "git.sch9.ru/new_gate/ms-runner/internal/runner" + "git.sch9.ru/new_gate/ms-runner/internal/runner/usecase" + runnerv1 "git.sch9.ru/new_gate/ms-runner/pkg/go/gen/proto/runner/v1" + "github.com/golang/protobuf/proto" + amqp "github.com/rabbitmq/amqp091-go" +) + +func NewRunnerBuilderConsumer(ch *amqp.Channel, runnerUC runner.RunnerUseCase, queueName string, instanceName string) { + msgs, err := ch.Consume( + queueName, + instanceName, + false, + false, + false, + false, + nil, + ) + if err != nil { + panic(err) + } + + go func() { + for { + runnerUC.Lock() + if !runnerUC.Ready() { + continue + } + runnerUC.Unlock() + + select { + case d := <-msgs: + var msg runnerv1.Instruction + + err = proto.Unmarshal(d.Body, &msg) + if err != nil { + panic(err) + } + + switch msg.Instruction.(type) { + case *runnerv1.Instruction_Build: + err = runnerUC.Process(usecase.BuildInstruction{ + SolutionId: msg.GetBuild().GetSolutionId(), + BindingKey: msg.GetBuild().GetBindingKey(), + Language: msg.GetBuild().GetLanguage(), + Solution: msg.GetBuild().GetSolution(), + }) + if err != nil { + fmt.Println(err) + } else { + err = d.Ack(false) + if err != nil { + panic(err) + } + } + case *runnerv1.Instruction_Run: + err = runnerUC.Process(usecase.RunInstruction{ + SolutionId: msg.GetRun().GetSolutionId(), + TestId: msg.GetRun().GetTestId(), + BindingKey: msg.GetRun().GetBindingKey(), + }) + if err != nil { + fmt.Println(err) + } else { + err = d.Ack(false) + if err != nil { + panic(err) + } + } + default: + fmt.Println("unknown instruction: ", msg.Instruction) + } + } + } + }() +} + +func NewRunnerBuilderProducer(ch *amqp.Channel, runnerUC runner.RunnerUseCase) { + go func() { + for res := range runnerUC.Results() { + var ( + body []byte + err error + bindingKey string + ) + + switch result := res.(type) { + case usecase.BuildResult: + msg := &runnerv1.Result{ + Result: &runnerv1.Result_Build{ + Build: &runnerv1.BuildResult{ + SolutionId: result.SolutionId, + Status: result.Status, + }, + }, + } + body, err = proto.Marshal(msg) + if err != nil { + panic(err) + } + bindingKey = result.BindingKey + case usecase.RunResult: + msg := &runnerv1.Result{ + Result: &runnerv1.Result_Run{ + Run: &runnerv1.RunResult{ + SolutionId: result.SolutionId, + TestId: result.TestId, + Status: result.Status, + }, + }, + } + body, err = proto.Marshal(msg) + if err != nil { + panic(err) + } + bindingKey = result.BindingKey + default: + fmt.Println("unknown result: ", result) + continue + } + + fmt.Println("publishing: ", bindingKey) + err = ch.Publish( + "", + bindingKey, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + }) + if err != nil { + panic(err) + } + } + }() +} diff --git a/internal/runner/usecase.go b/internal/runner/usecase.go new file mode 100644 index 0000000..8461d17 --- /dev/null +++ b/internal/runner/usecase.go @@ -0,0 +1,9 @@ +package runner + +type RunnerUseCase interface { + Process(i interface{}) error + Results() <-chan interface{} + Ready() bool + Lock() + Unlock() +} diff --git a/internal/runner/usecase/usecase.go b/internal/runner/usecase/usecase.go new file mode 100644 index 0000000..7cac439 --- /dev/null +++ b/internal/runner/usecase/usecase.go @@ -0,0 +1,105 @@ +package usecase + +import ( + "errors" + "fmt" + "sync" + "time" +) + +type BuildInstruction struct { + SolutionId int32 + BindingKey string + Language int32 + Solution string +} + +type BuildResult struct { + SolutionId int32 + BindingKey string + Status int32 +} + +type RunInstruction struct { + SolutionId int32 + TestId int32 + BindingKey string +} + +type RunResult struct { + SolutionId int32 + TestId int32 + BindingKey string + Status int32 +} + +type RunnerUseCase struct { + freeProcesses int32 + mtx sync.RWMutex + results chan interface{} +} + +func NewRunnerUseCase(limit int32) *RunnerUseCase { + return &RunnerUseCase{ + freeProcesses: limit, + results: make(chan interface{}), + } +} + +func (uc *RunnerUseCase) Process(i interface{}) error { + uc.mtx.Lock() + defer uc.mtx.Unlock() + if uc.freeProcesses == 0 { + return errors.New("no free processes") + } + uc.freeProcesses-- + + go func(i interface{}) { + defer func() { + uc.mtx.Lock() + uc.freeProcesses++ + uc.mtx.Unlock() + }() + + switch instruction := i.(type) { + case RunInstruction: + fmt.Println("running: ", instruction.SolutionId, instruction.TestId) + time.Sleep(time.Second) + uc.results <- RunResult{ + SolutionId: instruction.SolutionId, + TestId: instruction.TestId, + BindingKey: instruction.BindingKey, + Status: 0, + } + case BuildInstruction: + fmt.Println("building:", instruction.Language, instruction.Solution) + time.Sleep(time.Second) + uc.results <- BuildResult{ + SolutionId: instruction.SolutionId, + BindingKey: instruction.BindingKey, + Status: -1, + } + default: + fmt.Println("unknown instruction ignored") + } + }(i) + + return nil +} + +func (uc *RunnerUseCase) Results() <-chan interface{} { + return uc.results +} + +// Ready is not thread safe! +func (uc *RunnerUseCase) Ready() bool { + return uc.freeProcesses > 0 +} + +func (uc *RunnerUseCase) Lock() { + uc.mtx.Lock() +} + +func (uc *RunnerUseCase) Unlock() { + uc.mtx.Unlock() +} diff --git a/main.go b/main.go index 7f7aa4b..1a49a5b 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,49 @@ package main import ( - runner "git.sch9.ru/new_gate/ms-runner/runner" + "fmt" + "git.sch9.ru/new_gate/ms-runner/config" + runnerPubSub "git.sch9.ru/new_gate/ms-runner/internal/runner/delivery/rabbitmq" + runnerUseCase "git.sch9.ru/new_gate/ms-runner/internal/runner/usecase" + "git.sch9.ru/new_gate/ms-runner/pkg/external/rabbitmq" + "github.com/ilyakaznacheev/cleanenv" + "os" + "os/signal" + "syscall" ) func main() { - _,err := runner.NewRunnerService(); - if(err != nil) { - panic(err) - } + var cfg config.Config + err := cleanenv.ReadConfig(".env", &cfg) + if err != nil { + panic(fmt.Sprintf("error reading config: %s", err.Error())) + } + + conn, err := rabbitmq.NewRabbitClient(cfg.RabbitDSN) + if err != nil { + panic(err) + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + panic(err) + } + defer ch.Close() + err = ch.Qos( + 1, + 0, + true, + ) + if err != nil { + panic(err) + } + + runnerUC := runnerUseCase.NewRunnerUseCase(10) + runnerPubSub.NewRunnerBuilderProducer(ch, runnerUC) + runnerPubSub.NewRunnerBuilderConsumer(ch, runnerUC, cfg.TQueueName, cfg.InstanceName) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT) + + <-stop } diff --git a/pkg/external/rabbitmq/client.go b/pkg/external/rabbitmq/client.go new file mode 100644 index 0000000..09732fa --- /dev/null +++ b/pkg/external/rabbitmq/client.go @@ -0,0 +1,7 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +func NewRabbitClient(dsn string) (*amqp.Connection, error) { + return amqp.Dial(dsn) +} diff --git a/runner/runner.go b/runner/runner.go index 14af5a0..10e804c 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -1,3 +1,7 @@ +//go:build wtf +// just use https://github.com/opencontainers/runc please + + package runner import ( diff --git a/runner/runner_test.go b/runner/runner_test.go index 17df2c2..d0ad0fd 100644 --- a/runner/runner_test.go +++ b/runner/runner_test.go @@ -1,32 +1,34 @@ +//go:build wtf + package runner import ( - "testing" + "testing" ) type ENTestPair struct { - test string - result []int + test string + result []int } -var ENTests = []ENTestPair { - {"123", []int{123}}, - {"abc123abc", []int{123}}, - {"", []int{}}, - {"0", []int{0}}, - {"1a2a3a6", []int{1, 2, 3, 6}}, +var ENTests = []ENTestPair{ + {"123", []int{123}}, + {"abc123abc", []int{123}}, + {"", []int{}}, + {"0", []int{0}}, + {"1a2a3a6", []int{1, 2, 3, 6}}, } func TestExtractNumbers(t *testing.T) { - for _, test := range(ENTests) { - result := extractNumbers(test.test) - if(len(result) != len(test.result)) { - t.Error("for", test.test, "expected", test.result, "got", result) - } - for i := 0; i < len(result); i++ { - if(result[i] != test.result[i]) { - t.Error("for", test.test, "expected", test.result, "got", result) - } - } - } + for _, test := range ENTests { + result := extractNumbers(test.test) + if len(result) != len(test.result) { + t.Error("for", test.test, "expected", test.result, "got", result) + } + for i := 0; i < len(result); i++ { + if result[i] != test.result[i] { + t.Error("for", test.test, "expected", test.result, "got", result) + } + } + } }