- kill - terminate all deadlocked filaments.
- vwait - enter Tcl eventloop and wait on a variable, this allows filaments to be restarted with an external messages sent to one or more filaments.
- return - keeps filaments in the wait state, allowing an external message to be sent and later re-run.
############################################################################
#
# filament
#
# A very light weight non-preemptive thread/coroutine/generator package
#
# Similar to the real Tcl 'thread' package, each filament gets its own
# interpreter, and communicates with other filaments by sending messages.
# Unlike 'thread', messages are just strings rather than commands, and
# explicit 'init', 'run', and 'recv' states have separate code bodies.
# Filaments must be in a 'wait' state in order to recieve messages.
#
# Tom Poindexter, January 2006
#
package provide filament 0.1
namespace eval ::filament {
variable nextCnt 0
variable status
variable messages
array set status {}
array set messages {}
variable msgQueue {}
}
############################################################################
#
# ::filament::new
#
# Create a new filament. The filament will begin the the 'run' state, unless
# another state is set during initialization. Each filament creates a
# slave interp by the same name.
#
# name - name of the new filament, or "#auto" for a generated name
# args - a list of "type-list code" pairs,
#
# vars var-list a list of variables used by the filament, any other
# variables used in code are not retained.
#
# init code code to be executed on filament creation.
# also define proc's for your run & recv code here.
# the last command can be 'filament_wait' or
# 'filament_sleep' if a beginning state other than 'run'
# is required.
#
# run code code to be executed when a filament is run. defaults to
# 'filament_wait' if no run code is defined.
#
# recv code code to be executed when a message is received, the
# variable MESSAGE is set with a two element list of the
# sending filament, and the message. defaults to
# 'filament_wait' if no run code is defined.
#
#
# A type-list can specify any of "init", "run", or "recv". E.g., if the
# code to execute for 'run' and 'recv' are the same, then type-list should
# be a list of {run recv}.
#
# Any uncaught error generated by init, run, or recv will cause the
# filament to be terminated. The global variable 'FILAMENT' is set to the
# name of the filament.
#
proc ::filament::new {name args} {
variable nextCnt
variable status
variable messages
if {$name eq "#auto"} {
set name filament[incr nextCnt]
}
if {([llength $args] % 2) != 0} {
error "'args' must contain list of 'type code' pairs"
}
if {[info exists status($name)]} {
error "a filament with name \"$name\" already exists"
}
set vars ""
set init ""
set run filament_wait
set recv filament_wait
set ok 0
foreach {type codestr} $args {
if {[lsearch $type vars] >= 0} {
set vars $codestr
}
if {[lsearch $type init] >= 0} {
set init $codestr
}
if {[lsearch $type run] >= 0} {
set run $codestr
set ok 1
}
if {[lsearch $type recv] >= 0} {
set recv $codestr
set ok 1
}
}
if {! $ok} {
error "neither 'run' or 'recv' code was defined"
}
regsub -all "#\[^\n\]*\n" $vars { } vars
if {[catch {llength $vars}]} {
error "vars not a valid list of variable names"
}
lappend vars FILAMENT MESSAGE
set globalvars [concat [linsert $vars 0 global]]
interp create $name
interp eval $name [list set FILAMENT $name]
interp eval $name [list set MESSAGE {}]
set status($name) run
set messages($name) ""
set runproc "proc RUN TYPE \{\n \
$globalvars \n\
switch \$TYPE [list run $run recv $recv ]\}"
interp eval $name $runproc
interp alias $name filament_send {} ::filament::send $name
interp alias $name filament_wait {} ::filament::wait $name
interp alias $name filament_kill {} ::filament::kill $name
interp alias $name filament_sleep {} ::filament::sleep $name
interp alias $name filament_names {} ::filament::names $name
interp alias $name filament_messages {} ::filament::messages $name
interp alias $name filament_uplevel0 {} ::filament::uplevel0 $name
if {[catch {interp eval $name $init} err]} {
if {$err ne "FILAMENT_OK"} {
catch {interp delete $name}
catch {unset status($name)}
catch {unset messages($name)}
error "::filament::new: error during $name init:\n$err"
}
}
return $name
}
############################################################################
#
# ::filament::send
#
# Implements the 'filament_send' command
#
# Filaments can send messages to other filaments. filament_send does not
# cause the calling filament to yield. The target filament(s) are
# added to the msgQueue so that they are serviced before other running
# filaments. ::filament::send can also be used by the top level code
# to prime initial messages or to re-prime filaments that become
# deadlocked. The filament receiving the message has it's 'recv' code
# invoked, the global variable MESSAGE contains a list of two elements,
# the id of the sending filament, and the message.
#
# filament_send message ?id?
#
# message - the message to send
#
# id - the name of a filament to send. if id is a null string,
# or not specified, then the message is sent to all other
# filaments, excluding self
proc ::filament::send {this message {id ""}} {
variable status
variable messages
variable msgQueue
if {![string length $id]} {
foreach name [array names status] {
if {$name ne $this} {
lappend messages($name) [list $this $message]
lappend msgQueue $name
}
}
} elseif {![info exists status($id)]} {
error "filament $id does not exist"
} else {
lappend messages($id) [list $this $message]
lappend msgQueue $id
}
}
############################################################################
#
# ::filament::wait
#
# Implements the 'filament_wait' command
#
# filament_wait
#
# The calling filament waits until a message is received. Upon receiving a
# message, the 'recv' code is executed. filament_wait will cause the
# calling filament to yield. The global variable MESSAGE receives a list of
# the sending filament id and the message.
proc ::filament::wait {this} {
variable status
set status($this) wait
error FILAMENT_OK
}
############################################################################
#
# ::filament::kill
#
# Implements the 'filament_kill' command
#
# Cause a filament to terminate. If a filament is killing itself, the
# calling filament will immediately be terminated. If killing another
# filament, the calling filament does not yield.
#
# filament_kill ?id? ?when?
#
# id - the id to terminate. if not specified, then the calling filament
# is termiated.
#
# when - "waitkill" or "kill". "waitkill" terminates the filament after
# any pending messages have been delivered (default).
# "kill" terminates the filament immediately.
proc ::filament::kill {this {id ""} {when waitkill}} {
variable status
if {![string length $id]} {
set id $this
}
if {[string length $id] && ![info exists status($id)]} {
error "filament $id does not exist"
}
# when - "kill" or "waitkill" until all messages to filament are delivered
switch $when {
kill {set status($id) kill}
waitkill {set status($id) waitkill}
default {error "\"when\" argument \"$when\", must be either \"wait\" or \"kill\""}
}
if {$this eq $id} {
error FILAMENT_OK
}
}
############################################################################
#
# ::filament::sleep
#
# Implements the 'filament_sleep' command.
#
# filament_sleep
#
# The calling filament will yield, and be later rescheduled to run.
proc ::filament::sleep {this} {
variable status
set status($this) sleep
error FILAMENT_OK
}
############################################################################
#
# ::filament::messages
#
# Implements the 'filament_messages' command.
#
# filament_messages ?id?
#
# Returns the number of messages waiting to be delivered. An error is raised
# if the filament does not exist.
# If id is not specified, then the number of messages for the calling
# filament is returned.
proc ::filament::messages {this {id ""}} {
variable messages
if {![string length $id]} {
set id $this
}
if {[info exists messages($id)]} {
return [llength $messages($id)]
} else {
error "filament id \"$id\" does not exists"
}
}
############################################################################
#
# ::filament::names
#
# Implements the 'filament_names' command.
#
# filament_names
#
# Returns a list of the names of all filaments and their status. status
# is one of: "run", "wait", "waitkill", "kill", "sleep"
#
proc ::filament::names {this} {
variable status
return [array get status]
}
############################################################################
#
# ::filament::uplevel0
#
# Implements the 'filament_uplevel0' command.
#
# filament_uplevel0 args
#
# Runs a command as uplevel #0, any value is returned. This can be used
# to set/read global variables, run top level procs, access shared files, etc.
#
proc ::filament::uplevel0 {this args} {
return [uplevel #0 $args]
}
############################################################################
#
# ::filament::doKill
#
# Kill helper - destroy the filament interpreter, cleanup status & messages
#
proc ::filament::doKill {name} {
variable status
variable messages
catch {interp delete $name}
catch {unset status($name)}
catch {unset messages($name)}
}
############################################################################
#
# ::filament::doRun
#
# Run helper, type is either 'run' or 'recv'. If an interp throws the
# error "FILAMENT_OK", then we'll assume that one of our own.
# Any other error thrown by the interp will immediately terminate the
# filament and invoke 'bgerror' to report the condition.
proc ::filament::doRun {name type} {
variable status
if { [catch {interp eval $name RUN $type} err] } {
if {$err eq "FILAMENT_OK"} {
# check if this filament is still valid
if {[info exists status($name)]} {
# yes, reschedule
}
} else {
# some other error, kill this filament
catch {doKill $name}
catch {bgerror "\"::filament::run $name $type\", error:\n$err"}
}
} else {
# returned without error
return $err
}
return ""
}
############################################################################
#
# ::filament::run
#
# Begin execution of filamants.
#
# ::filament::run ?deadlock_resolution? ?waitvar?
#
# Execution will continue until:
#
# a) no filaments remain, all have been terminated
#
# b) all filaments are in a 'wait' state and no messages queued (deadlocked)
#
# optional arguments:
#
# deadlock_resolution -
# "kill" - cause all filaments to be terminated (default)
# "vwait" - enter Tcl event loop by executing "vwait $waitvar"
# this allows ::filament::send to be used to send
# a message to a filament, and run again
# "return" - return leaving filaments interps active. it is
# possible to send messages to filaments, and rerun
# ::filament::run to restart.
#
# waitvar - the name of the variable for deadlock_resolution method "vwait"
#
# ::filament::run returns a list of filaments that are deadlocked, if any
#
proc ::filament::run {{deadlock kill} {waitvar ""}} {
variable status
variable messages
variable msgQueue
if {[lsearch {kill vwait return} $deadlock] == -1} {
error "\"deadlock\" argument must be \"kill\", \"vwait\", or \"return\""
}
if {$deadlock eq "vwait" && ![string length $waitvar]} {
error "vwait specified, but \"waitvar\" is null"
}
while {1} {
set ran 0
# get names of all filaments, this list can be modified
# during the loop by sending messages, lappending the targets
set msgQueue [array names status]
while {[llength $msgQueue]} {
set name [lindex $msgQueue 0]
set msgQueue [lrange $msgQueue 1 end]
if {! [info exists status($name)]} {
continue
} else {
set state $status($name)
}
switch $state {
run {
doRun $name run
set ran 1
}
waitkill -
wait {
set allmessages ""
catch {set allmessages $messages($name)}
set messages($name) ""
foreach msg $allmessages {
if {[info exists status($name)]} {
# default next state to 'run'. the actual 'recv'
# code in the filament may call filament_wait,
# filament_sleep, or filament_kill to change
# to a different state upon exit.
set status($name) run
interp eval $name [list set MESSAGE $msg]
doRun $name recv
set ran 1
}
}
if {[info exists status($name)]} {
# if waitkill, then kill the filament now
if {$state eq "waitkill" && \
[llength $messages($name)]==0} {
doKill $name
} else {
catch {interp eval $name [list set MESSAGE {}]}
}
}
}
kill {
doKill $name
}
sleep {
set status($name) run
set ran 1
}
}
}
# if nothing was run and filaments exist, then we must be deadlocked
set len [llength [array names status]]
if {! $ran && $len} {
switch $deadlock {
kill {
foreach {name state} [array get status] {
doKill $name
}
break
}
vwait {
vwait $waitvar
}
return {
break
}
}
} elseif {! $len} {
# all filaments have ended
break
}
}
# return the filament names and status
return [array names status]
}Here are some simple examples to demostrate Filament.
####################################################################
# an anonymous filament, produces consecutive integers, and
# sends that result to two different consumers. terminate after 25
# integers have been produced randomly, the
# consumer threads are also killed, randomly with or without pending messages
# delivered.
#
package require filament
filament::new #auto vars {i} init {
set i 0
} run {
incr i
puts -nonewline "$FILAMENT running: "
if {$i > 25} {
set luck [expr rand()]
if {$luck > .66} {
puts "killing consumers, undelivered messages are lost!"
catch {filament_kill "" kill}
} elseif {$luck > .33} {
puts "killing consumers, pending messages to be delivered"
filament_kill consumer_odd
filament_kill consumer_even
catch {filament_kill busy_bee}
} else {
puts "ignoring consumers"
catch {filament_kill busy_bee}
}
puts "$FILAMENT killing self"
filament_kill
}
if {$i % 2} {
puts "produced $i, sending to consumer_odd"
filament_send $i consumer_odd
} else {
puts "produced $i, sending to consumer_even"
filament_send $i consumer_even
catch {filament_send "hey! be quiet!" busy_bee}
}
}
# consumer_odd - in running state, immediately wait for a message
# to be delivered.
filament::new consumer_odd run {
puts " consumer_odd is waiting"
filament_wait
} recv {
foreach {from msg} $MESSAGE {break}
puts "consumer_odd got message $msg from $from"
}
# consumer_even - in running state, do some trival work, but every
# fifth invocation, wait for a message
filament::new consumer_even vars {i} init {
set i 0
} run {
puts -nonewline " consumer_even is running: "
if {[incr i] % 5 == 0} {
puts "now waiting for messages"
filament_wait
}
puts "going to sleep"
filament_sleep
} recv {
foreach {from msg} $MESSAGE {break}
puts "consumer_even got message $msg from $from"
}
# busy_bee - a filament that runs and discards any messages
# randomly fail to test the error catching code
filament::new busy_bee {run recv} {
if {rand() < .05} {
error "this will raise an error, causing busy_bee to terminate"
}
puts -nonewline " buzzzzzzzz"
if {[filament_messages]} {
puts ": has [filament_messages] messages waiting"
filament_wait
}
puts ""
}
# report background errors
proc bgerror {message} {
puts "bgerror got: $message"
}
# start filaments
puts "starting trivial filaments\n\n"
set final [filament::run return]
puts "\n\nfilaments left waiting in deadlock: $final"
if {[llength $final]} {
puts ""
puts "sending an external message to all filaments"
# note that the first argument (sending filament id) can be anything
filament::send BIG_DADDY external_message
filament::run kill
}
puts "trivial done"Implement the Language Shootout [1] cheap-concurrency benchmark.
##################################################################### # shootout cheap-concurrency # http://shootout.alioth.debian.org/debian/benchmark.php?test=message&lang=allpackage require filament # set argv 3000 set N [lindex $argv 0] set Nthreads 500 for {set i 1} {$i < $Nthreads} {incr i} { set next [expr {$i + 1}] filament::new t$i vars {next} init "set next t$next; filament_wait" recv { set x [lindex $MESSAGE 1] filament_send [expr {$x + 1}] $next filament_wait } } filament::new t$i init {filament_wait} recv { set x [lindex $MESSAGE 1] filament_uplevel0 set sum [expr {$x + 1}] filament_wait } proc cheap-concurrency {N} { global sum set sum 0 for {set i 0} {$i < $N} {incr i} { filament::send "" $sum t1 filament::run return } puts $sum } cheap-concurrency $N
A simple demo using vwait to restart deadlocked filaments:
#####################################################################
# using vwait to inject messages
package require filament
filament::new getmsg vars {i} init {set i 0; filament_wait} recv {
puts "filament getmsg got $MESSAGE"
} run {
if {[incr i] > 5} {
puts "getmsg: $i done"
filament_kill
} else {
puts "getmsg: $i waiting"
filament_wait
}
}
set ::wait_is_over 0
set after_code {
filament::send mcp wakeup!
set ::wait_is_over 1
after 1000 [set ::after_code]
}
after 0 $after_code
filament::run vwait ::wait_is_overSee also:


