/*
 * call-seq:
 *      receive -> Spread::Message
 *
 * Receive a message from Spread. This method will block indefinitely
 * until a message is available. To avoid blocking, clients can call
 * #poll, or use Kernel#select on the +IO+ object returned by #io.
 *
 * The return value is an instance of a subclass of Spread::Message,
 * depending on the type of message that was received.
 */
static VALUE
spconn_receive(VALUE obj)
{
    struct SpreadConnection *sp;
    struct SpreadMessage *sp_mess;
    int n;
    VALUE message;
    VALUE msgKlass;

    Data_Get_Struct(obj, struct SpreadConnection, sp);

    if ((n = SP_poll(sp->mbox)) < 0)
        raise_sp_error(n);

    if (n == 0)
    {
        /*
         * There is no data available, so we need to block. If we were
         * to call SP_receive() here, we would block the entire Ruby
         * process, which is undesirable. Therefore, just block the
         * current thread using rb_thread_select()
         */
        fd_set read_set;

        for (;;)
        {
            FD_ZERO(&read_set);
            FD_SET(sp->mbox, &read_set);

            /* XXX: add mbox to error set as well? */
            n = rb_thread_select(sp->mbox + 1, &read_set, 0, 0, 0);
            if (n < 0)
                rb_sys_fail("select failed");

            if (FD_ISSET(sp->mbox, &read_set))
                break;
        }
    }

    /*
     * We could receive any type of message off the wire, so delay
     * creating a Ruby instance for it until we know what type it is
     * (and therefore, the type of Ruby class we need to wrap it in).
     */
    sp_mess = ALLOC(struct SpreadMessage);

    /*
     * Zero out the structure. This is probably wise due to padding
     * considerations, but is also necessary for at least one reason:
     * the `service_type' pointer to SP_receive() must point to either
     * zero or the constant DROP_RECV.
     */
    memset(sp_mess, 0, sizeof(struct SpreadMessage));
    sp_mess->rb_groups = Qnil;
    sp_mess->rb_delta = Qnil;

    /*
     * We don't know the size of the buffer that is required in
     * advance.  So we first invoke SP_receive() with an empty (NULL)
     * buffer, and then use the error returned by Spread to figure out
     * the size of the buffer we need to allocate.
     */
    n = SP_receive(sp->mbox,
                   &sp_mess->service_type,
                   sp_mess->sender,
                   MAX_GROUPS,
                   &sp_mess->num_groups,
                   sp_mess->groups,
                   &sp_mess->msg_type,
                   &sp_mess->endian,
                   0, NULL);

    if (n < 0)
    {
        if (n != BUFFER_TOO_SHORT)
            raise_sp_error(n);

        sp_mess->length = -sp_mess->endian; /* message length */
        sp_mess->message = ALLOC_N(char, sp_mess->length);

        if ((n = SP_receive(sp->mbox,
                            &sp_mess->service_type,
                            sp_mess->sender,
                            MAX_GROUPS,
                            &sp_mess->num_groups,
                            sp_mess->groups,
                            &sp_mess->msg_type,
                            &sp_mess->endian,
                            sp_mess->length, sp_mess->message)) < 0)
            raise_sp_error(n);
    }

    if (Is_regular_mess(sp_mess->service_type))
        msgKlass = rb_cSpreadDataMessage;
    else if (Is_reg_memb_mess(sp_mess->service_type))
        msgKlass = rb_cSpreadMemberMessage;
    else if (Is_transition_mess(sp_mess->service_type))
        msgKlass = rb_cSpreadTransitionMessage;
    else if (message_is_self_leave(sp_mess->service_type))
        msgKlass = rb_cSpreadSelfLeaveMessage;
    else
        rb_raise(rb_eRuntimeError, "unrecognized message type: %d",
                 sp_mess->service_type);

    message = Data_Wrap_Struct(msgKlass, mark_spmess, free_spmess, sp_mess);
    return message;
}