Common Lisp Package: PATRON

README:

       ____   ____ ______ ____   ___  ____  
!!  ! |    \ /    |      T    \ /   \|    \  !!!!  
!!!   |  o  Y  o  |      |  D  Y     Y  _  l  ! !!!  
!!!!  |   _/|     l_   __j     |  O  |  |  |   !!!!  
!!:   |  |  |  _  | |  | |    \|     |  |  |  !: :!  
:!:   |  |  |  |  | |  | |  .  l     !  |  |   : ::  
 ::   l__j  l__j__j l__j l__j\_j\___/l__j__j   :  : 

Abstract

Patron is a multi-consumer/multi-producer thread pooling library written in Common Lisp with flexibility and performance in mind. You simply create a PATRON with a job queue of fixed size, specify a fixed number of WORKER threads and start submitting your JOBs into the work queue.

While Patron is written in portable Common Lisp in mind, because of some platform specific features[1], it currently works on SBCL and CCL platforms. As a side note, Patron currently depends on bordeaux-threads library for common threading functionalities.

[1] Semaphores, missing threading features (THREAD-JOIN, WITHOUT-INTERRUPTS, etc.) in bordeaux-threads, WITH-TIMEOUT macro.

Example

Below basic example should get you to a point where you can start creating thread pools in minutes.

(defvar *stream* *standard-output*)  
 
(defvar *stream-lock* (patron:make-lock))  
 
(defun safe-format (fmt &rest args)  
  (patron:with-lock *stream-lock*  
    (apply #'format *stream* fmt args)))  
 
(defun thread-stats (patron)  
  (safe-format  
   "Keepers: ~{~A~^ ~}~%Workers: ~{~A~^ ~}~%"  
   (map 'list #'patron:thread-alive-p (patron::keepers-of patron))  
   (map 'list #'patron:thread-alive-p (patron::workers-of patron))))  
 
(let ((state (make-random-state)))  
  (defun job ()  
    (let ((duration (random 5 state)))  
      (safe-format "  ~S => Sleeping... [~A]~%" (patron:current-thread) duration)  
      (sleep duration)  
      (safe-format "  ~S => Done!~%" (patron:current-thread)))))  
 
(defun report-result (job)  
  (safe-format "RESULT: JOB: ~S~%" job))  
 
(defun patron-test ()  
  (let* ((patron  
          (make-instance  
           'patron:patron  
           :worker-capacity 3  
           :job-capacity 32  
           :worker-timeout-duration 3)))  
    (safe-format "Starting...~%")  
    (patron:start-patron patron)  
    (sleep 1.0)  
    (thread-stats patron)  
    (safe-format "Submitting jobs...~%")  
    (loop repeat 5  
          do (patron:submit-job  
              patron  
              (make-instance  
                   'patron:job  
                   :function #'job  
                   :result-report-function #'report-result)))  
    (safe-format "Submitted.~%")  
    (safe-format "Stopping...~%")  
    (patron:stop-patron patron :wait t)  
    (safe-format "Stopped.~%")  
    (thread-stats patron)))  
 
; Starting...  
; Keepers: T T  
; Workers: T T T  
; Submitting jobs...  
; Submitted.  
; Stopping...  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8D21}> => Sleeping... [2]  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8F71}> => Sleeping... [2]  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040EA1D1}> => Sleeping... [3]  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8D21}> => Done!  
; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 14:41:49" :FINISH-TIME "2009-04-30 14:41:51" {1004155561}>  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8D21}> => Sleeping... [3]  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8F71}> => Done!  
; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 14:41:49" :FINISH-TIME "2009-04-30 14:41:51" {1004155891}>  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8F71}> => Sleeping... [2]  
; 2009-04-30 14:41:52 - #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 14:41:49" :CONDITION #<PATRON::TIMEOUT-CONDITION :DURATION 3 :TIME "2009-04-30 14:41:52"> {1004155BC1}>  
;   #<SB-THREAD:THREAD "Anonymous" RUNNING {10040E8F71}> => Done!  
; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 14:41:51" :FINISH-TIME "2009-04-30 14:41:53" {1004156231}>  
; 2009-04-30 14:41:54 - #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 14:41:51" :CONDITION #<PATRON::TIMEOUT-CONDITION :DURATION 3 :TIME "2009-04-30 14:41:54"> {1004155EF1}>  
; Stopped.  
; Keepers: NIL NIL  
; Workers: NIL NIL NIL 

Documentation

Before going into the syntatical details, here is a general figure about the inner workings of Patron.

  • Queue operations take action in a blocking manner and are wrapped by WITH-TIMEOUT statements.
  • There are no busy waits, synchronized access is supplied using semaphores.
  • Using a two-lock concurrent queue algorithm, consumer and producer lockings are separated from each other for performance purposes .
  • There are two keeper threads where each keeper is ensuring its partners existence and first (master) keeper ensuring the existence of specified number of worker threads.

While Patron source code is fully documented, below you'll find the documentation excerpts from the source code for exported symbols.

[Condition] timeout-condition (error-condition)  
  [Slot] time - Time condition instance is created.  
  [Slot] duration - Elapsed duration before condition is raised. 

> Condition thrown when the duration specified in the WITH-TIMEOUT is exceeded. (TIME slot is inherited from ERROR-CONDITION.)

[Function] default-error-report (condition) 

> Default function for reporting errors.

[Class] thread ()  
  [Slot] id - Implementation dependent thread identifier.  
  [Slot] function - Function executed by current thread.  
 
[Function] make-lock ()  
 
[Macro] with-lock (lock &body body)  
 
[Function] thread-start (thread)  
 
[Function] current-thread ()  
 
[Function] thread-alive-p (thread)  
 
[Function] thread-interrupt (thread function)  
 
[Function] thread-join (thread)  
 
[Macro] without-interrupts (&body body)  
 
[Class] job ()  
  [Slot] function - Function will be called to start the execution.  
  [Slot] result-report-function - Function will be called to report the result.  
  [Slot] error-report-function - Function will be called to report an error.  
  [Slot] submit-time - Job queue entrance time.  
  [Slot] start-time - Job execution start time.  
  [Slot] finish-time - Job execution finish time.  
  [Slot] condition - Signaled condition in case of a failure.  
  [Slot] result - Job result in case of no failure.  
 
[Class] patron ()  
  [Slot] error-report-function - Will get called for management related  
         errors -- e.g when found a dead worker, keeper, etc.  
  [Slot] job-capacity - Upper limit on the job queue size.  
  [Slot] worker-capacity - Number of serving `WORKER's.  
  [Slot] worker-timeout-duration - Time limit on the work processing duration.  
  [Slot] keeper-timeout-duration - Wait period for keepers.  
 
[Function] submit-job (patron job) 

> Submit given JOB into the job queue of PATRON. Function works in a blocking manner and returns inserted JOB, or throws a TIMEOUT-CONDITION.

[Function] worker-stats (patron) 

> Returns a property list of minimum, maximum, and average statistics of N-FAILURES, FAIL-DURATION, BUSY-DURATION, and IDLE-DURATION slots among workers. Function blocks job queue while gathering statistics.

[Function] start-patron (patron) 

> After switching STATE to :ACTIVE, starts WORKERSs and KEEPERs in order.

[Function] stop-patron (patron &key wait kill) 

> After switching STATE to :INACTIVE, stops KEEPERs and WORKERs in order. For related effects of keyword arguments see documentation of STOP-KEEPERS and STOP-WORKERS functions.

FUNCTION

Public

DEFAULT-ERROR-REPORT (CONDITION)

Default function for reporting errors.

START-PATRON (PATRON)

After switching `STATE' to `:ACTIVE', starts `WORKERS's and `KEEPER's in order.

STOP-PATRON (PATRON &KEY WAIT KILL)

After switching `STATE' to `:INACTIVE', stops `KEEPER's and `WORKER's in order. For related effects of keyword arguments see documentation of `STOP-KEEPERS' and `STOP-WORKERS' functions.

SUBMIT-JOB (PATRON JOB)

Submit given `JOB' into the job queue of `PATRON'. Function works in a blocking manner and returns inserted `JOB', or throws a `TIMEOUT-CONDITION'.

WORKER-STATS (PATRON)

Returns a property list of minimum, maximum, and average statistics of `N-FAILURES', `FAIL-DURATION', `BUSY-DURATION', and `IDLE-DURATION' slots among workers. Function blocks job queue while gathering statistics.

Undocumented

CURRENT-THREAD

MAKE-LOCK

THREAD-ALIVE-P (THREAD)

THREAD-INTERRUPT (THREAD FUNCTION)

THREAD-JOIN (THREAD)

THREAD-START (THREAD)

Private

%QUEUE-POP (QUEUE &OPTIONAL DEFAULT)

Pops an item from the given `QUEUE'. Function returns `DEFAULT' in case of no available elements found.

%QUEUE-PUSH (QUEUE ITEM)

Pushes given `ITEM' into the `QUEUE'. Function returns supplied `ITEM'.

KEEPER (KEEPER PATRON)

Keeper function to ensure the existence of its parent `KEEPER' and `WORKER's. In case of a dead `KEEPER'/`WORKER' instance is found, `N-KEEPER-FAILURES'/`N-WORKER-FAILURES' slot is incremented and `ERROR-REPORT-FUNCTION' the `PATRON' is called with the inactive instance as argument. Function loops infinitely by checking if `STATE' is still `:ACTIVE' before every `KEEPER-TIMEOUT-DURATION' interval. In case of an error, `CONDITION' slot of the `KEEPER' is filled appropriately.

MAKE-KEEPER (PATRON)

Make an appropriate `KEEPER' instance with a specific wrapper function around `KEEPER' function.

QUEUE-POP (QUEUE)

Pops an item from the given `QUEUE'. Function blocks if there isn't any available item in the queue.

QUEUE-PUSH (QUEUE ITEM)

Tries to push given `ITEM' into the `QUEUE'. If queue size gets exceeded, function blocks until at least an item is consumed from the queue. Function returns supplied `ITEM'.

QUEUE-TIMED-POP (QUEUE DURATION &OPTIONAL TIMEOUT)

Works like `QUEUE-POP', but function returns `TIMEOUT' if no available elements found in given `DURATION'.

QUEUE-TIMED-PUSH (QUEUE ITEM DURATION &OPTIONAL TIMEOUT)

Works like `QUEUE-PUSH', but function returns `TIMEOUT' if no push occurs in given `DURATION'.

ROTATE-PATRON-STATE (PATRON TARGET-STATE)

Switches `STATE' slot of `PATRON' to specified `TARGET-STATE'.

START-KEEPERS (PATRON)

Starts `KEEPER's and waits for them to wake up. Function returns given `PATRON'.

START-WORKERS (PATRON)

Fills `WORKERS' and `JOBS' slots of the given `PATRON' appropriately and spawns workers. Function returns supplied `PATRON'.

STOP-KEEPERS (PATRON &KEY KILL WAIT)

Function does nothing -- assuming `STATE' is switched to `:INACTIVE', `KEEPER' function will exit in the next loop round. Function returns given `PATRON'. If `KILL' is true, function will try to terminate every keeper via throwing a `KILL-CONDITION'. `CONDITION' slot of related `KEEPER's will get set to this condition appropriately. If `WAIT' is true, function will wait (at most `KEEPER-TIMEOUT-DURATION') for `KEEPER's to exit.

STOP-WORKERS (PATRON &KEY KILL WAIT)

Stops workers by pushing `NIL' jobs to the queue as much as total # of workers. Function blocks until there is enough space in the job queue to push dummy `NIL's. Function finally returns supplied `PATRON'. If `KILL' is true, function will try to terminate every worker that is still alive and report jobs about the situation via `ERROR-SUBMIT-FUNCTION'. `CONDITION' slot of the `JOB' will set to `KILL-CONDITION'. If `WAIT' is true, function will wait (at most `WORKER-TIMEOUT-DURATION') for `WORKER's to exit.

WAIT-KEEPER (KEEPER)

Wait for `KEEPER' to exit.

WORKER (WORKER PATRON)

Worker function to execute the next available job in the queue. Function infinitely tries to pop `JOB' from the queue until it receives a `NIL' job. During every job processing iteration, function resets `LAST-START-TIME', `LAST-FINISH-TIME', `N-FAILURES', `FAIL-DURATION', `BUSY-DURATION', and `IDLE-DURATION' slots of the `WORKER' accordingly. `START-TIME' and `FINISH-TIME' slots of the `JOB' is assigned respectively before and after the execution of the `FUNCTION' slot of the `JOB'. After execution, if there doesn't occur any errors, `RESULT' slot of the `JOB' is set accordingly and `RESULT-REPORT-FUNCTION' is called with `JOB' as argument. In case of an error, `CONDITION' slot is set and `ERROR-REPORT-FUNCTION' is called.

Undocumented

%WITH-TIMEOUT (DURATION BODY)

KILL-KEEPER (KEEPER)

KILL-WORKER (WORKER)

MAKE-WORKER (PATRON)

SEMAPHORE-MAKE (&OPTIONAL (COUNT 0))

SEMAPHORE-SIGNAL (SEMAPHORE)

SEMAPHORE-TIMED-WAIT (SEMAPHORE DURATION)

SEMAPHORE-WAIT (SEMAPHORE)

TIME->STRING (TIME)

WAIT-WORKER (WORKER)

MACRO

Public

Undocumented

WITH-LOCK (LOCK &BODY BODY)

WITHOUT-INTERRUPTS (&BODY BODY)

Private

WITH-BLOCKING-QUEUE-OPERATIONS (QUEUE &BODY BODY)

Function blocks any physical push/pop operations on the `QUEUE' while execution `BODY'.

WITH-TIMEOUT (DURATION &BODY BODY)

Execute `BODY' for no more than specified `DURATION'. In case of timeout, function throws a `TIMEOUT-CONDITION'.

Undocumented

PROG1-LET ((VAR VAL) &BODY BODY)

WHEN-LET ((VAR VAL) &BODY BODY)

WITH-UNIQUE-NAMES ((&REST BINDINGS) &BODY BODY)

GENERIC-FUNCTION

Public

Undocumented

DURATION-OF (CONDITION)

TIME-OF (CONDITION)

SLOT-ACCESSOR

Public

CONDITION-OF (OBJECT)

Condition catched in case of a crash.

SETFCONDITION-OF (NEW-VALUE OBJECT)

Condition catched in case of a crash.

ERROR-REPORT-FUNCTION-OF (OBJECT)

Will get called for management related errors -- e.g when found a dead worker, keeper, etc.

FINISH-TIME-OF (OBJECT)

Exit/Crash date.

SETFFINISH-TIME-OF (NEW-VALUE OBJECT)

Exit/Crash date.

FUNCTION-OF (OBJECT)

Function executed by current thread.

SETFFUNCTION-OF (NEW-VALUE OBJECT)

Function executed by current thread.

ID-OF (OBJECT)

Implementation dependent thread identifier.

SETFID-OF (NEW-VALUE OBJECT)

Implementation dependent thread identifier.

JOB-CAPACITY-OF (OBJECT)

Upper limit on the job queue size.

KEEPER-TIMEOUT-DURATION-OF (OBJECT)

Wait period for keepers.

RESULT-OF (OBJECT)

Job result in case of no failure.

SETFRESULT-OF (NEW-VALUE OBJECT)

Job result in case of no failure.

RESULT-REPORT-FUNCTION-OF (OBJECT)

Function will be called to report the result.

START-TIME-OF (OBJECT)

Birth date.

SETFSTART-TIME-OF (NEW-VALUE OBJECT)

Birth date.

SUBMIT-TIME-OF (OBJECT)

Job queue entrance time.

SETFSUBMIT-TIME-OF (NEW-VALUE OBJECT)

Job queue entrance time.

WORKER-CAPACITY-OF (OBJECT)

Number of serving `WORKER's.

WORKER-TIMEOUT-DURATION-OF (OBJECT)

Time limit on the work processing duration.

Private

BUSY-DURATION-OF (OBJECT)

Total non-idle duration.

SETFBUSY-DURATION-OF (NEW-VALUE OBJECT)

Total non-idle duration.

FAIL-DURATION-OF (OBJECT)

Total duration spend on failed processings.

SETFFAIL-DURATION-OF (NEW-VALUE OBJECT)

Total duration spend on failed processings.

IDLE-DURATION-OF (OBJECT)

Total duration worker stayed idle.

SETFIDLE-DURATION-OF (NEW-VALUE OBJECT)

Total duration worker stayed idle.

JOBS-OF (OBJECT)

FIFO queue of `JOB's waiting to be processed.

SETFJOBS-OF (NEW-VALUE OBJECT)

FIFO queue of `JOB's waiting to be processed.

KEEPERS-OF (OBJECT)

`KEEPER' couple for `WORKER's and each other.

SETFKEEPERS-OF (NEW-VALUE OBJECT)

`KEEPER' couple for `WORKER's and each other.

LAST-FINISH-TIME-OF (OBJECT)

Last time worker finished a job.

SETFLAST-FINISH-TIME-OF (NEW-VALUE OBJECT)

Last time worker finished a job.

LAST-START-TIME-OF (OBJECT)

Last time worker started a job.

SETFLAST-START-TIME-OF (NEW-VALUE OBJECT)

Last time worker started a job.

N-FAILURES-OF (OBJECT)

# of failed processings.

SETFN-FAILURES-OF (NEW-VALUE OBJECT)

# of failed processings.

N-KEEPER-FAILURES-OF (OBJECT)

# of `KEEPER' failures found.

SETFN-KEEPER-FAILURES-OF (NEW-VALUE OBJECT)

# of `KEEPER' failures found.

N-WORKER-FAILURES-OF (OBJECT)

# of `WORKER' failures found.

SETFN-WORKER-FAILURES-OF (NEW-VALUE OBJECT)

# of `WORKER' failures found.

POP-LOCK-OF (OBJECT)

Lock serializing pop operations.

SETFPOP-LOCK-OF (NEW-VALUE OBJECT)

Lock serializing pop operations.

POP-SEMAPHORE-OF (OBJECT)

Semaphore blocking pop operations while queue is empty.

SETFPOP-SEMAPHORE-OF (NEW-VALUE OBJECT)

Semaphore blocking pop operations while queue is empty.

PUSH-LOCK-OF (OBJECT)

Lock serializing push operations.

SETFPUSH-LOCK-OF (NEW-VALUE OBJECT)

Lock serializing push operations.

PUSH-SEMAPHORE-OF (OBJECT)

Semaphore blocking push operations while queue is full.

SETFPUSH-SEMAPHORE-OF (NEW-VALUE OBJECT)

Semaphore blocking push operations while queue is full.

SIZE-OF (OBJECT)

Maximum # of items allowed in the queue.

STATE-LOCK-OF (OBJECT)

Synchronization primitive for `STATE' slot.

STATE-OF (OBJECT)

State of the patron; either `ACTIVE', or `INACTIVE'.

SETFSTATE-OF (NEW-VALUE OBJECT)

State of the patron; either `ACTIVE', or `INACTIVE'.

WORKERS-OF (OBJECT)

Vector of serving `WORKER's.

SETFWORKERS-OF (NEW-VALUE OBJECT)

Vector of serving `WORKER's.

Undocumented

HEAD-OF (OBJECT)

SETFHEAD-OF (NEW-VALUE OBJECT)

TAIL-OF (OBJECT)

SETFTAIL-OF (NEW-VALUE OBJECT)

VARIABLE

Private

*ERROR-STREAM*

Generic stream used by default error reporting functions.

CLASS

Public

Undocumented

JOB

PATRON

THREAD

Private

QUEUE

Size bounded two-lock concurrent FIFO queue.

Undocumented

KEEPER (KEEPER PATRON)

WORKER (WORKER PATRON)

CONDITION

Private

ERROR-CONDITION

Generic wrapper condition for application specific conditions.

KILL-CONDITION

Condition passed to the `CONDITION' slot of a `KEEPER'/`JOB' while killing a keeper/worker.

TIMEOUT-CONDITION

Condition thrown when the duration specified in the `WITH-TIMEOUT' is exceeded.