# mqueue.tcl --
#
# Simple blocking message queue for Tcl threads.
#
# Copyright (c) 2007 Neil Madden.
#
# License: http://www.cs.nott.ac.uk/~nem/license.terms (Tcl-style).
package require Tcl 8.4
package require Thread 2.6
package provide mqueue 0.2
namespace eval mqueue {
namespace export create destroy push pop
proc ::mqueue {subcommand args} {
uplevel 1 [linsert $args 0 ::mqueue::$subcommand]
}
tsv::lock ::mqueue {
if {![tsv::exists ::mqueue id]} {
tsv::set ::mqueue id 0
}
}
proc lock {mutex script} {
thread::mutex lock $mutex
set rc [catch { uplevel 1 $script } ret]
thread::mutex unlock $mutex
return -code $rc $ret
}
proc create {{size 1}} {
set id [tsv::incr ::mqueue id]
set self "::mqueue$id"
tsv::set $self mutex [thread::mutex create]
tsv::set $self read [thread::cond create]
tsv::set $self write [thread::cond create]
tsv::set $self size $size
tsv::set $self buffer [list]
return $self
}
proc destroy queue {
thread::cond destroy [tsv::get $queue read]
thread::cond destroy [tsv::get $queue write]
thread::mutex destroy [tsv::get $queue mutex]
tsv::unset $queue
}
proc push {queue data} {
lock [tsv::get $queue mutex] {
while {[tsv::llength $queue buffer] >= [tsv::get $queue size]} {
# Full already
thread::cond wait [tsv::get $queue write] \
[tsv::get $queue mutex]
}
tsv::lappend $queue buffer $data
thread::cond notify [tsv::get $queue read]
}
}
proc pop queue {
lock [tsv::get $queue mutex] {
while {[tsv::llength $queue buffer] == 0} {
# Empty
thread::cond wait [tsv::get $queue read] \
[tsv::get $queue mutex]
}
set ret [tsv::lpop $queue buffer]
thread::cond notify [tsv::get $queue write]
}
return $ret
}
}This should be basically thread-safe, except that calling destroy while the queue is in use will likely result in an error. However, shared state concurrency is hard to do right so a review would be welcome, especially from someone who actually uses the thread package regularly (I don't use it very often).As an example, here is a simple producer/consumer scenario:
package require mqueue 0.2
set t [thread::create]
thread::send $t {
package require mqueue 0.2
proc produce queue {
puts "Producer thread starting..."
while 1 {
puts "Looping"
mqueue push $queue "Message: [incr i]"
}
}
}
set q [mqueue create 5]
thread::send -async $t [list produce $q]
while 1 { puts [mqueue pop $q]; after 200 }
