Enable delayed messages in Zend_Queue

The default Zend_Queue DB implementation unfortunately does not allow you to pass a timeout value when saving a message on the queue. However not all is lost and you can easily extend the standard Zend classes to add that functionality.

All you need is your own Db adapter and Queue class.

For the adapter you only need to overwrite the send() function. The highlighted code below is the only change to the original class (2 lines affected…). You could actually apply this without extending Zend_Queue, but this way will be easier if you ever need to update your Zend library.

<?php

class TS_Queue_Adapter_Db extends Zend_Queue_Adapter_Db {

    /**
     * Send a message to the queue
     *
     * @param  string     $message Message to send to the active queue
     * @param  Zend_Queue $queue
     * @param  Timestamp $timeout
     * @return Zend_Queue_Message
     * @throws Zend_Queue_Exception - database error
     */
    public function send($message, Zend_Queue $queue = null<strong>, $timeout = null</strong>){
        if ($this->_messageRow === null) {
            $this->_messageRow = $this->_messageTable->createRow();
        }

        if ($queue === null) {
            $queue = $this->_queue;
        }

        if (is_scalar($message)) {
            $message = (string) $message;
        }
        if (is_string($message)) {
            $message = trim($message);
        }

        if (!$this->isExists($queue->getName())) {
            require_once 'Zend/Queue/Exception.php';
            throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
        }

        $msg = clone $this->_messageRow;
        $msg->queue_id = $this->getQueueId($queue->getName());
        $msg->created = time();
        $msg->body = $message;
        $msg->md5 = md5($message);
<strong>        $msg->timeout = $timeout;</strong>

        try {
            $msg->save();
        } catch (Exception $e) {
            require_once 'Zend/Queue/Exception.php';
            throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
        }

        $options = array(
            'queue' => $queue,
            'data' => $msg->toArray(),
        );

        $classname = $queue->getMessageClass();
        if (!class_exists($classname)) {
            require_once 'Zend/Loader.php';
            Zend_Loader::loadClass($classname);
        }
        return new $classname($options);
    }
}```

Your Queue class only needs to extend the **send()** function again to allow you to pass the timeout through. obviously you can use this class to add other functionality too.

class TSQueue extends ZendQueue {

/**
 * Send a message to the queue
 *
 * @param  mixed $message message
 * @return Zend_Queue_Message
 * @throws Zend_Queue_Exception
 */
public function send($message, $timeout = null){
    return $this->getAdapter()->send($message, null, $timeout);
}

}```