/*
* call-seq:
* receive -> Spread::Message
*
* Receive a message from Spread. This method will block indefinitely
* until a message is available. To avoid blocking, clients can call
* #poll, or use Kernel#select on the +IO+ object returned by #io.
*
* The return value is an instance of a subclass of Spread::Message,
* depending on the type of message that was received.
*/
static VALUE
spconn_receive(VALUE obj)
{
struct SpreadConnection *sp;
struct SpreadMessage *sp_mess;
int n;
VALUE message;
VALUE msgKlass;
Data_Get_Struct(obj, struct SpreadConnection, sp);
if ((n = SP_poll(sp->mbox)) < 0)
raise_sp_error(n);
if (n == 0)
{
/*
* There is no data available, so we need to block. If we were
* to call SP_receive() here, we would block the entire Ruby
* process, which is undesirable. Therefore, just block the
* current thread using rb_thread_select()
*/
fd_set read_set;
for (;;)
{
FD_ZERO(&read_set);
FD_SET(sp->mbox, &read_set);
/* XXX: add mbox to error set as well? */
n = rb_thread_select(sp->mbox + 1, &read_set, 0, 0, 0);
if (n < 0)
rb_sys_fail("select failed");
if (FD_ISSET(sp->mbox, &read_set))
break;
}
}
/*
* We could receive any type of message off the wire, so delay
* creating a Ruby instance for it until we know what type it is
* (and therefore, the type of Ruby class we need to wrap it in).
*/
sp_mess = ALLOC(struct SpreadMessage);
/*
* Zero out the structure. This is probably wise due to padding
* considerations, but is also necessary for at least one reason:
* the `service_type' pointer to SP_receive() must point to either
* zero or the constant DROP_RECV.
*/
memset(sp_mess, 0, sizeof(struct SpreadMessage));
sp_mess->rb_groups = Qnil;
sp_mess->rb_delta = Qnil;
/*
* We don't know the size of the buffer that is required in
* advance. So we first invoke SP_receive() with an empty (NULL)
* buffer, and then use the error returned by Spread to figure out
* the size of the buffer we need to allocate.
*/
n = SP_receive(sp->mbox,
&sp_mess->service_type,
sp_mess->sender,
MAX_GROUPS,
&sp_mess->num_groups,
sp_mess->groups,
&sp_mess->msg_type,
&sp_mess->endian,
0, NULL);
if (n < 0)
{
if (n != BUFFER_TOO_SHORT)
raise_sp_error(n);
sp_mess->length = -sp_mess->endian; /* message length */
sp_mess->message = ALLOC_N(char, sp_mess->length);
if ((n = SP_receive(sp->mbox,
&sp_mess->service_type,
sp_mess->sender,
MAX_GROUPS,
&sp_mess->num_groups,
sp_mess->groups,
&sp_mess->msg_type,
&sp_mess->endian,
sp_mess->length, sp_mess->message)) < 0)
raise_sp_error(n);
}
if (Is_regular_mess(sp_mess->service_type))
msgKlass = rb_cSpreadDataMessage;
else if (Is_reg_memb_mess(sp_mess->service_type))
msgKlass = rb_cSpreadMemberMessage;
else if (Is_transition_mess(sp_mess->service_type))
msgKlass = rb_cSpreadTransitionMessage;
else if (message_is_self_leave(sp_mess->service_type))
msgKlass = rb_cSpreadSelfLeaveMessage;
else
rb_raise(rb_eRuntimeError, "unrecognized message type: %d",
sp_mess->service_type);
message = Data_Wrap_Struct(msgKlass, mark_spmess, free_spmess, sp_mess);
return message;
}