#include "transport/transport.h" #include "starter/starter.h" void prepare_amqp_connection(int argc, char const *const *argv,struct connection_data *condata) { int status; if(secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_HOSTNAME")==NULL || secure_getenv("AMQP_EXCHANGE")==NULL || secure_getenv("AMQP_BINDINGKEY")==NULL) { fprintf(stderr,"no amqp connection parameters in environment"); exit(1); } condata->hostname = getenv("AMQP_HOSTNAME"); condata->port = atoi(getenv("AMQP_PORT")); condata->exchange = getenv("AMQP_EXCHANGE"); condata->bindingkey = getenv("AMQP_BINDINGKEY"); condata->conn = amqp_new_connection(); condata->socket = amqp_tcp_socket_new(condata->conn); if (!condata->socket) { error_log("failed to create TCP socket"); } status = amqp_socket_open(condata->socket, condata->hostname, condata->port); if (status) { error_log("failed to open TCP socket"); } amqp_assert(amqp_login(condata->conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "rmuser", "rmpassword")); amqp_channel_open(condata->conn, 1); amqp_assert(amqp_get_rpc_reply(condata->conn)); { amqp_queue_declare_ok_t *r = amqp_queue_declare( condata->conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); amqp_assert(amqp_get_rpc_reply(condata->conn)); condata->queuename = amqp_bytes_malloc_dup(r->queue); if (condata->queuename.bytes == NULL) { fprintf(stderr, "Out of memory while copying queue name"); exit(1); } } amqp_queue_bind(condata->conn, 1, condata->queuename, amqp_cstring_bytes(condata->exchange), amqp_cstring_bytes(condata->bindingkey), amqp_empty_table); amqp_assert(amqp_get_rpc_reply(condata->conn)); amqp_basic_consume(condata->conn, 1, condata->queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); amqp_assert(amqp_get_rpc_reply(condata->conn)); } int main(int argc, char const *const *argv) { struct connection_data condata; prepare_amqp_connection(argc,argv,&condata); { for (;;) { amqp_rpc_reply_t res; amqp_envelope_t envelope; amqp_maybe_release_buffers(condata.conn); res = amqp_consume_message(condata.conn, &envelope, NULL, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { break; } Runner__V1__Instruction *inst = runner__v1__instruction__unpack(NULL,envelope.message.body.len,envelope.message.body.bytes); if(inst==NULL) { fprintf(stderr, "error reading buffer"); exit(1); } switch(inst->instruction_case) { case RUNNER__V1__INSTRUCTION__INSTRUCTION_BUILD: puts("build request recieved"); printf("solution_id: %d\nbinding_key:%s\nlanguage:%d\nsolution:%s\n\n",inst->build->solution_id,inst->build->binding_key,inst->build->language,inst->build->solution); break; case RUNNER__V1__INSTRUCTION__INSTRUCTION_RUN: puts("run request recieved"); printf("solution_id: %d\ntest_id:%d\nbinding_key:%s\n\n",inst->run->solution_id,inst->run->test_id,inst->run->binding_key); break; case RUNNER__V1__INSTRUCTION__INSTRUCTION__NOT_SET: puts("empty request recieved"); break; default: puts("unknown request recieved"); } runner__v1__instruction__free_unpacked(inst, NULL); amqp_destroy_envelope(&envelope); } } amqp_bytes_free(condata.queuename); amqp_assert(amqp_channel_close(condata.conn, 1, AMQP_REPLY_SUCCESS)); amqp_assert(amqp_connection_close(condata.conn, AMQP_REPLY_SUCCESS)); amqp_code_assert(amqp_destroy_connection(condata.conn)); return 0; }