96 lines
3.5 KiB
C
96 lines
3.5 KiB
C
#include "transport/transport.h"
|
|
#include "starter/starter.h"
|
|
|
|
void prepare_amqp_connection(int argc, char const *const *argv,struct connection_data *condata) {
|
|
int status;
|
|
if(secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_EXCHANGE")==NULL || secure_getenv("AMQP_BINDINGKEY")==NULL) {
|
|
fprintf(stderr,"no amqp connection parameters in environment");
|
|
exit(1);
|
|
}
|
|
condata->hostname = getenv("AMQP_HOSTNAME");
|
|
condata->port = atoi(getenv("AMQP_PORT"));
|
|
condata->exchange = getenv("AMQP_EXCHANGE");
|
|
condata->bindingkey = getenv("AMQP_BINDINGKEY");
|
|
|
|
condata->conn = amqp_new_connection();
|
|
|
|
condata->socket = amqp_tcp_socket_new(condata->conn);
|
|
if (!condata->socket) {
|
|
error_log("failed to create TCP socket");
|
|
}
|
|
|
|
status = amqp_socket_open(condata->socket, condata->hostname, condata->port);
|
|
if (status) {
|
|
error_log("failed to open TCP socket");
|
|
}
|
|
|
|
amqp_assert(amqp_login(condata->conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "rmuser", "rmpassword"));
|
|
amqp_channel_open(condata->conn, 1);
|
|
amqp_assert(amqp_get_rpc_reply(condata->conn));
|
|
|
|
{
|
|
amqp_queue_declare_ok_t *r = amqp_queue_declare( condata->conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
|
|
amqp_assert(amqp_get_rpc_reply(condata->conn));
|
|
condata->queuename = amqp_bytes_malloc_dup(r->queue);
|
|
if (condata->queuename.bytes == NULL) {
|
|
fprintf(stderr, "Out of memory while copying queue name");
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
amqp_queue_bind(condata->conn, 1, condata->queuename, amqp_cstring_bytes(condata->exchange), amqp_cstring_bytes(condata->bindingkey), amqp_empty_table);
|
|
amqp_assert(amqp_get_rpc_reply(condata->conn));
|
|
|
|
amqp_basic_consume(condata->conn, 1, condata->queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
|
|
amqp_assert(amqp_get_rpc_reply(condata->conn));
|
|
}
|
|
|
|
int main(int argc, char const *const *argv) {
|
|
struct connection_data condata;
|
|
prepare_amqp_connection(argc,argv,&condata);
|
|
|
|
{
|
|
for (;;) {
|
|
amqp_rpc_reply_t res;
|
|
amqp_envelope_t envelope;
|
|
|
|
amqp_maybe_release_buffers(condata.conn);
|
|
|
|
res = amqp_consume_message(condata.conn, &envelope, NULL, 0);
|
|
|
|
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
|
|
break;
|
|
}
|
|
|
|
Runner__V1__Instruction *inst = runner__v1__instruction__unpack(NULL,envelope.message.body.len,envelope.message.body.bytes);
|
|
if(inst==NULL) {
|
|
fprintf(stderr, "error reading buffer");
|
|
exit(1);
|
|
}
|
|
switch(inst->instruction_case) {
|
|
case RUNNER__V1__INSTRUCTION__INSTRUCTION_BUILD:
|
|
puts("build request recieved");
|
|
printf("solution_id: %d\nbinding_key:%s\nlanguage:%d\nsolution:%s\n\n",inst->build->solution_id,inst->build->binding_key,inst->build->language,inst->build->solution);
|
|
break;
|
|
case RUNNER__V1__INSTRUCTION__INSTRUCTION_RUN:
|
|
puts("run request recieved");
|
|
printf("solution_id: %d\ntest_id:%d\nbinding_key:%s\n\n",inst->run->solution_id,inst->run->test_id,inst->run->binding_key);
|
|
break;
|
|
case RUNNER__V1__INSTRUCTION__INSTRUCTION__NOT_SET:
|
|
puts("empty request recieved");
|
|
break;
|
|
default:
|
|
puts("unknown request recieved");
|
|
}
|
|
runner__v1__instruction__free_unpacked(inst, NULL);
|
|
amqp_destroy_envelope(&envelope);
|
|
}
|
|
}
|
|
|
|
amqp_bytes_free(condata.queuename);
|
|
|
|
amqp_assert(amqp_channel_close(condata.conn, 1, AMQP_REPLY_SUCCESS));
|
|
amqp_assert(amqp_connection_close(condata.conn, AMQP_REPLY_SUCCESS));
|
|
amqp_code_assert(amqp_destroy_connection(condata.conn));
|
|
return 0;
|
|
}
|