README.org
September 2, 2015 · View on GitHub
#+TITLE: CL-DISQUE #+AUTHOR: Cody Reichert #+EMAIL: codyreichert@gmail.com
A [[https://github.com/antirez/disque][Disque]] client for Common Lisp.
Disque is an in-memory job queue and message broker. It's distributed and fault-tolerant so it works as a middle layer among processes that want to exchange messages.
Learn more about message queues [[https://github.com/antirez/disque#what-is-a-message-queue][here]], and more about the Disque implementation [[https://github.com/antirez/disque#give-me-the-details][here]].
cl-disque provides a client for working with Disque through sending and receiving jobs and commands.
-
Usage ** Quickstart First, make sure a Disque server is running.
Load cl-disque and connect to the Disque server on the given host and port:
#+BEGIN_SRC lisp (ql:quickload 'cl-disque)
; host defaults to 127.0.0.1, port defaults to 7711 (cl-disque:connect :host
:port ) #+END_SRC You can interact with the server using commands from the =disque= package.
#+BEGIN_SRC lisp (disque:hello) ;=> (1 "node-id" ; ("node-ids" "host" "7711" "1")) #+END_SRC
Disconnect from the Disque server with:
#+BEGIN_SRC lisp (cl-disque:disconnect) #+END_SRC
Alternatively, you can wrap all interactions in the =with-connection= macro, which creates a new connection to execute the given body, and assures a disconnect afterwards:
#+BEGIN_SRC lisp (cl-disque:with-connection () (disque:addjob "queue" "job" 0) (disque:getjob "queue" 1)) ;=> (("queue" "job-hash" "job")) #+END_SRC
The Makefile offers a couple of commands for running the test-suite and loading cl-disque into an SBCL repl:
#+BEGIN_SRC sh
To run the test suite
$ make test #+END_SRC
#+BEGIN_SRC sh
To load an SBCL repl
$ make sbcl-repl #+END_SRC
** Available commands Cl-Disque supports all of the Disque client commands and their arguments. See [[https://github.com/antirez/disque][The Disque Documentation]] for more specifics on each command
*** INFO - Args: () - Response-type: :bulk
*** HELLO - Args: () - Response-type: :multi
*** QLEN - Args: (queue) - Response-type: :integer
*** QPEEK - Args: (queue count) - Response-type: :multi
*** QSCAN - Args: (&rest args &key count busyloop minlen maxlen importrate) - Response-type: :multi
*** GETJOB - Args: (queues &rest args &key nohang timeout count withcounters) - Reponse-type: :multi
/Note: queues can either be a single queue or a list of queues:/
#+BEGIN_SRC lisp
(disque:getjob "queue1")
;; or
(disque:getjob '("queue1" "queue2" "queue3")
#+END_SRC
*** ADDJOB - Args: (queue job timeout &rest args &key replicate delay retry ttl maxlen async) - Response-type: :status
*** ACKJOB - Args: (job &rest jobs) - Response-type: :integer
*** FASTACK - Args: (job &rest jobs) - Response-type: :integer
*** WORKING - Args: (job) - Response-type: :integer
*** NACK - Args: (job &rest jobs) - Response-type: :integer
*** ENQUEUE - Args: (job &rest jobs) - Reponse-type: :integer
*** DEQUEUE - Args: (job &rest jobs) - Response-type :integer
*** DELJOB - Args: (job &rest jobs) - Response-type: :integer
*** SHOW - Args: (job) - Response-type: :multi
*** JSCAN - Args: (cursor &rest args &key count blocking queue state reply) - Response-type: :multi
** Code organization The system provides two packages: =CL-DISQUE= and =DISQUE=.
Everything is available in the =CL-DISQUE= package.
The =DISQUE= package contains all of the commands for interacting with a Disque server. This is simply syntactic sugar, as all of the commands are /also/ available in the =CL-DISQUE= package with a command prefix. For Example:
#+BEGIN_SRC lisp (disque:info) ; is the same as (cl-disque:disque-info) #+END_SRC
** Installation
Git clone this repo into your =~/quicklisp/local-projects/= directory, and =(ql:quickload :cl-disque)=.
** Dependencies
- [[http://common-lisp.net/project/usocket/][usocket]]
- [[http://common-lisp.net/project/flexi-streams/][flexi-streams]]
- [[http://github.com/vseloved/rutils][rutils]]
- [[http://github.com/fukamachi/prove][prove]] (only for tests)
** Debugging and error recovery
If =echo-p= is =T=, all client-server communications will be echoed to the stream =echo-stream=, which defaults to =standard-output=.
Error handling is mimicked after [[http://common-lisp.net/project/postmodern/][Postmodern]]. In particular, whenever an error occurs that breaks the communication stream, a condition of type =disque-connection-error= is signalled offering a =:reconnect= restart. If it is selected the whole Disque command will be resent, if the reconnection attempt succeeds. Furthermore, =connect= checks if a connection to Disque is already established, and offers two restarts (=:leave= and =:replace=) if this is the case.
When the server respondes with an error reply a condition of type =disque-error-reply= is signalled.
There's also a high-level =with-persistent-connection= macro, that tries to do the right thing™ (i.e. automatically reopen the connection once, if it is broken).
** Advanced usage *** Pipelining
For better performance Disque allows to pipeline commands and delay receiving results until the end, and process them all in oine batch afterwards. To support that there's =with-pipelining= macro.
Note, that =with-pipelining= calls theoretically may nest, but the results will only be available to the highest-level pipeline, all the nested pipelines will return :PIPELINED. So a warining is signalled in this situation.
Note: Pipelining has not been tested since being ported form cl-redis.
** Credits
Cody Reichert codyreichert@gmail.com is the maintainer of =CL-DISQUE=.
=CL-DISQUE= is a ported of the [[http://github.com/vseloved/cl-redis][CL-REDIS]] client, which is developed and maintained by Vsevolod Dyomkin vseloved@gmail.com. Many thanks to him for implementing the protocol and providing most of the internals.
Alexandr Manzyuk manzyuk@googlemail.com also contributed to =CL-REDIS= client and developed the connection handling code following the implementation in [[http://common-lisp.net/project/postmodern/][Postmodern]]. It was since partially rewritten to accommodate more advanced connection handling strategies, like persistent connection.
** License
MIT (See LICENSE file for details).