Package SimPy :: Module SimulationStep
[hide private]
[frames] | no frames]

Source Code for Module SimPy.SimulationStep

   1  #!/usr/bin/env python 
   2  from SimPy.Lister import * 
   3  import heapq as hq 
   4  import types 
   5  import sys 
   6  import new 
   7  import random 
   8  import inspect 
   9   
  10  # $Revision: 1.1.1.31 $ $Date: 2008/03/03 13:52:23 $ kgm 
  11  """SimulationStep 1.9.1 Supports stepping through SimPy simulation event-by-event. 
  12  Based on generators (Python 2.3 and later) 
  13   
  14  LICENSE: 
  15  Copyright (C) 2002,2005,2006,2007  Klaus G. Muller, Tony Vignaux 
  16  mailto: kgmuller@xs4all.nl and Tony.Vignaux@vuw.ac.nz 
  17   
  18      This library is free software; you can redistribute it and/or 
  19      modify it under the terms of the GNU Lesser General Public 
  20      License as published by the Free Software Foundation; either 
  21      version 2.1 of the License, or (at your option) any later version. 
  22   
  23      This library is distributed in the hope that it will be useful, 
  24      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  25      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  26      Lesser General Public License for more details. 
  27   
  28      You should have received a copy of the GNU Lesser General Public 
  29      License along with this library; if not, write to the Free Software 
  30      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  31  END OF LICENSE 
  32   
  33  **Change history:** 
  34   
  35      Started out as SiPy 0.9 
  36       
  37      5/9/2002: SiPy 0.9.1 
  38       
  39          - Addition of '_cancel' method in class Process and supporting '_unpost' method in  
  40            class __Evlist. 
  41           
  42          - Removal of redundant 'Action' method in class Process. 
  43           
  44      12/9/2002: 
  45       
  46          - Addition of resource class 
  47           
  48          - Addition of "_request" and "_release" coroutine calls 
  49           
  50      15/9/2002: moved into SimPy package 
  51       
  52      16/9/2002: 
  53          - Resource attributes fully implemented (resources can now have more 
  54            than 1 shareable resource units) 
  55           
  56      17/9/2002: 
  57       
  58          - corrected removal from waitQ (Vignaux) 
  59           
  60      17/9/2002: 
  61       
  62          - added test for queue discipline in "test_demo()". Must be FIFO 
  63           
  64      26/9/02: Version 0.2.0 
  65       
  66          - cleaned up code; more consistent naming 
  67           
  68          - prefixed all Simulation-private variable names with "_". 
  69           
  70          - prefixed all class-private variable names with "__". 
  71           
  72          - made normal exit quiet (but return message from scheduler() 
  73           
  74      28/9/02: 
  75       
  76          - included stopSimulation() 
  77           
  78      15/10/02: Simulation version 0.3 
  79       
  80          - Version printout now only if __TESTING 
  81           
  82          - "_stop" initialized to True by module load, and set to False in  
  83        initialize() 
  84           
  85          - Introduced 'simulate(until=0)' instead of 'scheduler(till=0)'.  
  86        Left 'scheduler()' in for backward compatibility, but marked 
  87        as deprecated. 
  88           
  89          - Added attribute "name" to class Process; default=="a_process" 
  90           
  91          - Changed Resource constructor to  
  92        'def __init__(self,capacity=1,name="a_resource",unitName="units"'. 
  93           
  94      13/11/02: Simulation version 0.6 
  95       
  96          - Major changes to class Resource: 
  97           
  98              - Added two queue types for resources, FIFO (default) and PriorityQ 
  99               
 100              - Changed constructor to allow selection of queue type. 
 101               
 102              - Introduced preemption of resources (to be used with PriorityQ 
 103                queue type) 
 104               
 105              - Changed constructor of class Resource to allow selection of preemption 
 106               
 107              - Changes to class Process to support preemption of service 
 108               
 109              - Cleaned up 'simulate' by replacing series of if-statements by dispatch table. 
 110   
 111      19/11/02: Simulation version 0.6.1 
 112          - Changed priority schemes so that higher values of Process  
 113            attribute "priority" represent higher priority. 
 114   
 115      20/11/02: Simulation version 0.7 
 116          - Major change of priority approach: 
 117   
 118              - Priority set by "yield request,self,res,priority" 
 119   
 120              - Priority of a Process instance associated with a specific  
 121                resource 
 122   
 123      25/11/02: Simulation version 0.7.1 
 124   
 125          - Code cleanup and optimization 
 126   
 127          - Made process attributes remainService and preempted private  
 128           (_remainService and _preempted) 
 129   
 130      11/12/2002: First process interrupt implementation 
 131   
 132          - Addition of user methods 'interrupt' and 'resume' 
 133   
 134          - Significant code cleanup to maintain process state 
 135   
 136      20/12/2002: Changes to "interrupt"; addition of boolean methods to show 
 137                       process states 
 138   
 139      16/3/2003: Changed hold (allowing posting events past _endtime) 
 140       
 141      18/3/2003: Changed _nextev to prevent _t going past _endtime 
 142   
 143      23/3/2003: Introduced new interrupt construct; deleted 'resume' method 
 144       
 145      25/3/2003: Expanded interrupt construct: 
 146       
 147          - Made 'interrupt' a method  of Process 
 148   
 149          - Added 'interruptCause' as an attribute of an interrupted process 
 150   
 151          - Changed definition of 'active' to  
 152           'self._nextTime <> None and not self._inInterrupt' 
 153   
 154          - Cleaned up test_interrupt function 
 155   
 156      30/3/2003: Modification of 'simulate': 
 157   
 158          - error message if 'initialize' not called (fatal) 
 159   
 160          - error message if no process scheduled (warning) 
 161   
 162          - Ensured that upon exit from 'simulate', now() == _endtime is  
 163            always valid 
 164   
 165      2/04/2003: 
 166   
 167          - Modification of 'simulate': leave _endtime alone (undid change 
 168            of 30 Mar 03) 
 169   
 170          - faster '_unpost' 
 171   
 172      3/04/2003: Made 'priority' private ('_priority') 
 173   
 174      4/04/2003: Catch activation of non-generator error 
 175   
 176      5/04/2003: Added 'interruptReset()' function to Process. 
 177   
 178      7/04/2003: Changed '_unpost' to ensure that process has 
 179                     _nextTime == None (is passive) afterwards. 
 180   
 181      8/04/2003: Changed _hold to allow for 'yield hold,self'  
 182                     (equiv to 'yield hold,self,0') 
 183   
 184      10/04/2003: Changed 'cancel' syntax to 'Process().cancel(victim)' 
 185   
 186      12/5/2003: Changed eventlist handling from dictionary to bisect 
 187       
 188      9/6/2003: - Changed eventlist handling from pure dictionary to bisect- 
 189                  sorted "timestamps" list of keys, resulting in greatly  
 190                  improved performance for models with large 
 191                  numbers of event notices with differing event times. 
 192                  ========================================================= 
 193                  This great change was suggested by Prof. Simon Frost.  
 194                  Thank you, Simon! This version 1.3 is dedicated to you! 
 195                  ========================================================= 
 196                - Added import of Lister which supports well-structured  
 197                  printing of all attributes of Process and Resource instances. 
 198   
 199      Oct 2003: Added monitored Resource instances (Monitors for activeQ and waitQ) 
 200   
 201      13 Dec 2003: Merged in Monitor and Histogram 
 202   
 203      27 Feb 2004: Repaired bug in activeQ monitor of class Resource. Now actMon 
 204                   correctly records departures from activeQ. 
 205                    
 206      19 May 2004: Added erroneously omitted Histogram class. 
 207   
 208      5 Sep 2004: Added SimEvents synchronization constructs 
 209       
 210      17 Sep 2004: Added waituntil synchronization construct 
 211       
 212      01 Dec 2004: SimPy version 1.5 
 213                   Changes in this module: Repaired SimEvents bug re proc.eventsFired 
 214                    
 215      12 Jan 2005: SimPy version 1.5.1 
 216                   Changes in this module: Monitor objects now have a default name 
 217                                           'a_Monitor' 
 218                                            
 219      29 Mar 2005: Start SimPy 1.6: compound "yield request" statements 
 220       
 221      05 Jun 2005: Fixed bug in _request method -- waitMon did not work properly in 
 222                   preemption case 
 223                    
 224      09 Jun 2005: Added test in 'activate' to see whether 'initialize()' was called first. 
 225       
 226      23 Aug 2005: - Added Tally data collection class 
 227                   - Adjusted Resource to work with Tally 
 228                   - Redid function allEventNotices() (returns prettyprinted string with event 
 229                     times and names of process instances 
 230                   - Added function allEventTimes (returns event times of all scheduled events) 
 231                    
 232      16 Mar 2006: - Added Store and Level classes 
 233                   - Added 'yield get' and 'yield put' 
 234                    
 235      10 May 2006: - Repaired bug in Store._get method 
 236                   - Repaired Level to allow initialBuffered have float value 
 237                   - Added type test for Level get parameter 'nrToGet' 
 238                    
 239      06 Jun 2006: - To improve pretty-printed output of 'Level' objects, changed attribute 
 240                     _nrBuffered to nrBuffered (synonym for amount property) 
 241                   - To improve pretty-printed output of 'Store' objects, added attribute 
 242                     buffered (which refers to _theBuffer) 
 243                      
 244      25 Aug 2006: - Start of version 1.8 
 245                   - made 'version' public 
 246                   - corrected condQ initialization bug 
 247                    
 248      30 Sep 2006: - Introduced checks to ensure capacity of a Buffer > 0 
 249                   - Removed from __future__ import (so Python 2.3 or later needed) 
 250                   
 251      15 Oct 2006: - Added code to register all Monitors and all Tallies in variables 
 252                     'allMonitors' and 'allTallies' 
 253                   - Added function 'startCollection' to activate Monitors and Tallies at a 
 254                     specified time (e.g. after warmup period) 
 255                   - Moved all test/demo programs to after 'if __name__=="__main__":'. 
 256                   
 257      17 Oct 2006: - Added compound 'put' and 'get' statements for Level and Store. 
 258       
 259      18 Oct 2006: - Repaired bug: self.eventsFired now gets set after an event fires 
 260                     in a compound yield get/put with a waitevent clause (reneging case). 
 261                      
 262      21 Oct 2006: - Introduced Store 'yield get' with a filter function. 
 263                   
 264      22 Oct 2006: - Repaired bug in prettyprinting of Store objects (the buffer  
 265                     content==._theBuffer was not shown) by changing ._theBuffer  
 266                     to .theBuffer. 
 267                   
 268      04 Dec 2006: - Added printHistogram method to Tally and Monitor (generates 
 269                     table-form histogram) 
 270                       
 271      07 Dec 2006: - Changed the __str__ method of Histogram to print a table  
 272                     (like printHistogram). 
 273       
 274      18 Dec 2006: - Added trace printing of Buffers' "unitName" for yield get and put. 
 275       
 276      09 Jun 2007: - Cleaned out all uses of "object" to prevent name clash. 
 277       
 278      18 Nov 2007: - Start of 1.9 development 
 279                   - Added 'start' method (alternative to activate) to Process 
 280                    
 281      22 Nov 2007: - Major change to event list handling to speed up larger models: 
 282                      * Drop dictionary 
 283                      * Replace bisect by heapq 
 284                      * Mark cancelled event notices in unpost and skip them in 
 285                        nextev (great idea of Tony Vignaux)) 
 286                         
 287      4 Dec 2007: - Added twVariance calculation for both Monitor and Tally (gav) 
 288       
 289      5 Dec 2007: - Changed name back to timeVariance (gav) 
 290       
 291      1 Mar 2008: - Start of 1.9.1 bugfix release 
 292                  - Delete circular reference in Process instances when event  
 293                    notice has been processed (caused much circular garbage) 
 294                  - Added capability for multiple preempts of a process 
 295       
 296  """ 
 297   
 298  __TESTING=False 
 299  version=__version__="1.9.1 $Revision: 1.1.1.31 $ $Date: 2008/03/03 13:52:23 $" 
 300  if __TESTING:  
 301      print "SimPy.SimulationStep %s" %__version__, 
 302      if __debug__: 
 303          print "__debug__ on" 
 304      else: 
 305          print 
 306   
 307  # yield keywords 
 308  hold=1 
 309  passivate=2 
 310  request=3 
 311  release=4 
 312  waitevent=5 
 313  queueevent=6 
 314  waituntil=7 
 315  get=8 
 316  put=9 
 317   
 318  _endtime=0 
 319  _t=0 
 320  _e=None 
 321  _stop=True 
 322  _step=False 
 323  _wustep=False #controls per event stepping for waituntil construct; not user API 
 324  try: 
 325    True, False 
 326  except NameError: 
 327    True, False = (1 == 1), (0 == 1) 
 328  condQ=[] 
 329  allMonitors=[] 
 330  allTallies=[] 
 331   
332 -def initialize():
333 global _e,_t,_stop,condQ,allMonitors,allTallies 334 _e=__Evlist() 335 _t=0 336 _stop=False 337 condQ=[] 338 allMonitors=[] 339 allTallies=[]
340
341 -def now():
342 return _t
343
344 -def stopSimulation():
345 """Application function to stop simulation run""" 346 global _stop 347 _stop=True
348
349 -def _startWUStepping():
350 """Application function to start stepping through simulation for waituntil construct.""" 351 global _wustep 352 _wustep=True
353
354 -def _stopWUStepping():
355 """Application function to stop stepping through simulation.""" 356 global _wustep 357 _wustep=False
358
359 -def startStepping():
360 """Application function to start stepping through simulation.""" 361 global _step 362 _step=True
363
364 -def stopStepping():
365 """Application function to stop stepping through simulation.""" 366 global _step 367 _step=False
368
369 -class Simerror(Exception):
370 - def __init__(self,value):
371 self.value=value
372
373 - def __str__(self):
374 return `self.value`
375
376 -class FatalSimerror(Simerror):
377 - def __init__(self,value):
378 Simerror.__init__(self,value) 379 self.value=value
380
381 -class Process(Lister):
382 """Superclass of classes which may use generator functions"""
383 - def __init__(self,name="a_process"):
384 #the reference to this Process instances single process (==generator) 385 self._nextpoint=None 386 self.name=name 387 self._nextTime=None #next activation time 388 self._remainService=0 389 self._preempted=0 390 self._priority={} 391 self._getpriority={} 392 self._putpriority={} 393 self._terminated= False 394 self._inInterrupt= False 395 self.eventsFired=[] #which events process waited/queued for occurred
396
397 - def active(self):
398 return self._nextTime <> None and not self._inInterrupt
399
400 - def passive(self):
401 return self._nextTime is None and not self._terminated
402
403 - def terminated(self):
404 return self._terminated
405
406 - def interrupted(self):
407 return self._inInterrupt and not self._terminated
408
409 - def queuing(self,resource):
410 return self in resource.waitQ
411
412 - def cancel(self,victim):
413 """Application function to cancel all event notices for this Process 414 instance;(should be all event notices for the _generator_).""" 415 _e._unpost(whom=victim)
416
417 - def start(self,pem=None,at="undefined",delay="undefined",prior=False):
418 """Activates PEM of this Process. 419 p.start(p.pemname([args])[,{at= t |delay=period}][,prior=False]) or 420 p.start([p.ACTIONS()][,{at= t |delay=period}][,prior=False]) (ACTIONS 421 parameter optional) 422 """ 423 if pem is None: 424 try: 425 pem=self.ACTIONS() 426 except AttributeError: 427 raise FatalSimerror\ 428 ("Fatal SimPy error: no generator function to activate") 429 else: 430 pass 431 if _e is None: 432 raise FatalSimerror\ 433 ("Fatal SimPy error: simulation is not initialized"\ 434 "(call initialize() first)") 435 if not (type(pem) == types.GeneratorType): 436 raise FatalSimerror("Fatal SimPy error: activating function which"+ 437 " is not a generator (contains no 'yield')") 438 if not self._terminated and not self._nextTime: 439 #store generator reference in object; needed for reactivation 440 self._nextpoint=pem 441 if at=="undefined": 442 at=_t 443 if delay=="undefined": 444 zeit=max(_t,at) 445 else: 446 zeit=max(_t,_t+delay) 447 _e._post(what=self,at=zeit,prior=prior)
448
449 - def _hold(self,a):
450 if len(a[0]) == 3: 451 delay=abs(a[0][2]) 452 else: 453 delay=0 454 who=a[1] 455 self.interruptLeft=delay 456 self._inInterrupt=False 457 self.interruptCause=None 458 _e._post(what=who,at=_t+delay)
459
460 - def _passivate(self,a):
461 a[0][1]._nextTime=None
462
463 - def interrupt(self,victim):
464 """Application function to interrupt active processes""" 465 # can't interrupt terminated/passive/interrupted process 466 if victim.active(): 467 victim.interruptCause=self # self causes interrupt 468 left=victim._nextTime-_t 469 victim.interruptLeft=left # time left in current 'hold' 470 victim._inInterrupt=True 471 reactivate(victim) 472 return left 473 else: #victim not active -- can't interrupt 474 return None
475
476 - def interruptReset(self):
477 """ 478 Application function for an interrupt victim to get out of 479 'interrupted' state. 480 """ 481 self._inInterrupt= False
482
483 - def acquired(self,res):
484 """Multi-functional test for reneging for 'request' and 'get': 485 (1)If res of type Resource: 486 Tests whether resource res was acquired when proces reactivated. 487 If yes, the parallel wakeup process is killed. 488 If not, process is removed from res.waitQ (reneging). 489 (2)If res of type Store: 490 Tests whether item(s) gotten from Store res. 491 If yes, the parallel wakeup process is killed. 492 If no, process is removed from res.getQ 493 (3)If res of type Level: 494 Tests whether units gotten from Level res. 495 If yes, the parallel wakeup process is killed. 496 If no, process is removed from res.getQ. 497 """ 498 if isinstance(res,Resource): 499 test=self in res.activeQ 500 if test: 501 self.cancel(self._holder) 502 else: 503 res.waitQ.remove(self) 504 if res.monitored: 505 res.waitMon.observe(len(res.waitQ),t=now()) 506 return test 507 elif isinstance(res,Store): 508 test=len(self.got) 509 if test: 510 self.cancel(self._holder) 511 else: 512 res.getQ.remove(self) 513 if res.monitored: 514 res.getQMon.observe(len(res.getQ),t=now()) 515 return test 516 elif isinstance(res,Level): 517 test=not (self.got is None) 518 if test: 519 self.cancel(self._holder) 520 else: 521 res.getQ.remove(self) 522 if res.monitored: 523 res.getQMon.observe(len(res.getQ),t=now()) 524 return test
525
526 - def stored(self,buffer):
527 """Test for reneging for 'yield put . . .' compound statement (Level and 528 Store. Returns True if not reneged. 529 If self not in buffer.putQ, kill wakeup process, else take self out of 530 buffer.putQ (reneged)""" 531 test=self in buffer.putQ 532 if test: #reneged 533 buffer.putQ.remove(self) 534 if buffer.monitored: 535 buffer.putQMon.observe(len(buffer.putQ),t=now()) 536 else: 537 self.cancel(self._holder) 538 return not test
539
540 -def allEventNotices():
541 """Returns string with eventlist as; 542 t1: processname,processname2 543 t2: processname4,processname5, . . . 544 . . . . 545 """ 546 ret="" 547 tempList=[] 548 tempList[:]=_e.timestamps 549 tempList.sort() 550 # return only event notices which are not cancelled 551 tempList=[[x[0],x[2].name] for x in tempList if not x[3]] 552 tprev=-1 553 for t in tempList: 554 # if new time, new line 555 if t[0]==tprev: 556 # continue line 557 ret+=",%s"%t[1] 558 else: 559 # new time 560 if tprev==-1: 561 ret="%s: %s"%(t[0],t[1]) 562 else: 563 ret+="\n%s: %s"%(t[0],t[1]) 564 tprev=t[0] 565 return ret+"\n"
566
567 -def allEventTimes():
568 """Returns list of all times for which events are scheduled. 569 """ 570 r=[] 571 r[:]=_e.timestamps 572 r.sort() 573 # return only event times of not cancelled event notices 574 r1=[x[0] for x in r if not r[3]] 575 tprev=-1 576 ret=[] 577 for t in r1: 578 if t==tprev: 579 #skip time, already in list 580 pass 581 else: 582 ret.append(t) 583 tprev=t 584 return ret
585
586 -class __Evlist(object):
587 """Defines event list and operations on it"""
588 - def __init__(self):
589 # always sorted list of events (sorted by time, priority) 590 # make heapq 591 self.timestamps = [] 592 self.sortpr=0
593
594 - def _post(self, what, at, prior=False):
595 """Post an event notice for process what for time at""" 596 # event notices are Process instances 597 if at < _t: 598 raise Simerror("Attempt to schedule event in the past") 599 what._nextTime = at 600 self.sortpr-=1 601 if prior: 602 # before all other event notices at this time 603 # heappush with highest priority value so far (negative of 604 # monotonely decreasing number) 605 # store event notice in process instance 606 what._rec=[at,self.sortpr,what,False] 607 # make event list refer to it 608 hq.heappush(self.timestamps,what._rec) 609 else: 610 # heappush with lowest priority 611 # store event notice in process instance 612 what._rec=[at,-self.sortpr,what,False] 613 # make event list refer to it 614 hq.heappush(self.timestamps,what._rec)
615
616 - def _unpost(self, whom):
617 """ 618 Mark event notice for whom as cancelled if whom is a suspended process 619 """ 620 if whom._nextTime is not None: # check if whom was actually active 621 whom._rec[3]=True ## Mark as cancelled 622 whom._nextTime=None
623
624 - def _nextev(self):
625 """Retrieve next event from event list""" 626 global _t, _stop 627 noActiveNotice=True 628 ## Find next event notice which is not marked cancelled 629 while noActiveNotice: 630 if self.timestamps: 631 ## ignore priority value 632 (_tnotice, p,nextEvent,cancelled) = hq.heappop(self.timestamps) 633 noActiveNotice=cancelled 634 else: 635 raise Simerror("No more events at time %s" % _t) 636 nextEvent._rec=None 637 _t=_tnotice 638 if _t > _endtime: 639 _t = _endtime 640 _stop = True 641 return (None,) 642 try: 643 resultTuple = nextEvent._nextpoint.next() 644 except StopIteration: 645 nextEvent._nextpoint = None 646 nextEvent._terminated = True 647 nextEvent._nextTime = None 648 resultTuple = None 649 return (resultTuple, nextEvent)
650
651 - def _isEmpty(self):
652 return not self.timestamps
653
654 - def _allEventNotices(self):
655 """Returns string with eventlist as 656 t1: [procname,procname2] 657 t2: [procname4,procname5, . . . ] 658 . . . . 659 """ 660 ret="" 661 for t in self.timestamps: 662 ret+="%s:%s\n"%(t[1]._nextTime, t[1].name) 663 return ret[:-1]
664
665 - def _allEventTimes(self):
666 """Returns list of all times for which events are scheduled. 667 """ 668 return self.timestamps
669
670 -def activate(obj,process,at="undefined",delay="undefined",prior=False):
671 """Application function to activate passive process.""" 672 if _e is None: 673 raise FatalSimerror\ 674 ("Fatal error: simulation is not initialized (call initialize() first)") 675 if not (type(process) == types.GeneratorType): 676 raise FatalSimerror("Activating function which"+ 677 " is not a generator (contains no 'yield')") 678 if not obj._terminated and not obj._nextTime: 679 #store generator reference in object; needed for reactivation 680 obj._nextpoint=process 681 if at=="undefined": 682 at=_t 683 if delay=="undefined": 684 zeit=max(_t,at) 685 else: 686 zeit=max(_t,_t+delay) 687 _e._post(obj,at=zeit,prior=prior)
688
689 -def reactivate(obj,at="undefined",delay="undefined",prior=False):
690 """Application function to reactivate a process which is active, 691 suspended or passive.""" 692 # Object may be active, suspended or passive 693 if not obj._terminated: 694 a=Process("SimPysystem") 695 a.cancel(obj) 696 # object now passive 697 if at=="undefined": 698 at=_t 699 if delay=="undefined": 700 zeit=max(_t,at) 701 else: 702 zeit=max(_t,_t+delay) 703 _e._post(obj,at=zeit,prior=prior)
704
705 -class Histogram(list):
706 """ A histogram gathering and sampling class""" 707
708 - def __init__(self,name = '',low=0.0,high=100.0,nbins=10):
709 list.__init__(self) 710 self.name = name 711 self.low = float(low) 712 self.high = float(high) 713 self.nbins = nbins 714 self.binsize=(self.high-self.low)/nbins 715 self._nrObs=0 716 self._sum=0 717 self[:] =[[low+(i-1)*self.binsize,0] for i in range(self.nbins+2)]
718
719 - def addIn(self,y):
720 """ add a value into the correct bin""" 721 self._nrObs+=1 722 self._sum+=y 723 b = int((y-self.low+self.binsize)/self.binsize) 724 if b < 0: b = 0 725 if b > self.nbins+1: b = self.nbins+1 726 assert 0 <= b <=self.nbins+1,'Histogram.addIn: b out of range: %s'%b 727 self[b][1]+=1
728
729 - def __str__(self):
730 histo=self 731 ylab="value" 732 nrObs=self._nrObs 733 width=len(str(nrObs)) 734 res=[] 735 res.append("<Histogram %s:"%self.name) 736 res.append("\nNumber of observations: %s"%nrObs) 737 if nrObs: 738 su=self._sum 739 cum=histo[0][1] 740 fmt="%s" 741 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 742 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 743 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 744 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 745 l1width=len(("%s <= "%fmt)%histo[1][0]) 746 res.append(line1\ 747 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 748 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 749 ) 750 for i in range(1,len(histo)-1): 751 cum+=histo[i][1] 752 res.append(line\ 753 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 754 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 755 ) 756 cum+=histo[-1][1] 757 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 758 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 759 lnwidth=len(("<%s"%fmt)%histo[1][0]) 760 res.append(linen\ 761 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 762 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 763 ) 764 res.append("\n>") 765 return " ".join(res)
766
767 -def startCollection(when=0.0,monitors=None,tallies=None):
768 """Starts data collection of all designated Monitor and Tally objects 769 (default=all) at time 'when'. 770 """ 771 class Starter(Process): 772 def collect(self,monitors,tallies): 773 for m in monitors: 774 print m.name 775 m.reset() 776 for t in tallies: 777 t.reset() 778 yield hold,self
779 if monitors is None: 780 monitors=allMonitors 781 if tallies is None: 782 tallies=allTallies 783 s=Starter() 784 activate(s,s.collect(monitors=monitors,tallies=tallies),at=when) 785
786 -class Monitor(list):
787 """ Monitored variables 788 789 A Class for monitored variables, that is, variables that allow one 790 to gather simple statistics. A Monitor is a subclass of list and 791 list operations can be performed on it. An object is established 792 using m= Monitor(name = '..'). It can be given a 793 unique name for use in debugging and in tracing and ylab and tlab 794 strings for labelling graphs. 795 """
796 - def __init__(self,name='a_Monitor',ylab='y',tlab='t'):
797 list.__init__(self) 798 self.startTime = 0.0 799 self.name = name 800 self.ylab = ylab 801 self.tlab = tlab 802 allMonitors.append(self)
803
804 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
805 """Sets histogram parameters. 806 Must be called before call to getHistogram""" 807 if name=='': 808 histname=self.name 809 else: 810 histname=name 811 self.histo=Histogram(name=histname,low=low,high=high,nbins=nbins)
812
813 - def observe(self,y,t=None):
814 """record y and t""" 815 if t is None: t = now() 816 self.append([t,y])
817
818 - def tally(self,y):
819 """ deprecated: tally for backward compatibility""" 820 self.observe(y,0)
821
822 - def accum(self,y,t=None):
823 """ deprecated: accum for backward compatibility""" 824 self.observe(y,t)
825
826 - def reset(self,t=None):
827 """reset the sums and counts for the monitored variable """ 828 self[:]=[] 829 if t is None: t = now() 830 self.startTime = t
831
832 - def tseries(self):
833 """ the series of measured times""" 834 return list(zip(*self)[0])
835
836 - def yseries(self):
837 """ the series of measured values""" 838 return list(zip(*self)[1])
839
840 - def count(self):
841 """ deprecated: the number of observations made """ 842 return self.__len__()
843
844 - def total(self):
845 """ the sum of the y""" 846 if self.__len__()==0: return 0 847 else: 848 sum = 0.0 849 for i in range(self.__len__()): 850 sum += self[i][1] 851 return sum # replace by sum() later
852
853 - def mean(self):
854 """ the simple average of the monitored variable""" 855 try: return 1.0*self.total()/self.__len__() 856 except: print 'SimPy: No observations for mean'
857
858 - def var(self):
859 """ the sample variance of the monitored variable """ 860 n = len(self) 861 tot = self.total() 862 ssq=0.0 863 ##yy = self.yseries() 864 for i in range(self.__len__()): 865 ssq += self[i][1]**2 # replace by sum() eventually 866 try: return (ssq - float(tot*tot)/n)/n 867 except: print 'SimPy: No observations for sample variance'
868
869 - def timeAverage(self,t=None):
870 """ the time-weighted average of the monitored variable. 871 872 If t is used it is assumed to be the current time, 873 otherwise t = now() 874 """ 875 N = self.__len__() 876 if N == 0: 877 print 'SimPy: No observations for timeAverage' 878 return None 879 880 if t is None: t = now() 881 sum = 0.0 882 tlast = self.startTime 883 #print 'DEBUG: timave ',t,tlast 884 ylast = 0.0 885 for i in range(N): 886 ti,yi = self[i] 887 sum += ylast*(ti-tlast) 888 tlast = ti 889 ylast = yi 890 sum += ylast*(t-tlast) 891 T = t - self.startTime 892 if T == 0: 893 print 'SimPy: No elapsed time for timeAverage' 894 return None 895 #print 'DEBUG: timave ',sum,t,T 896 return sum/float(T)
897
898 - def timeVariance(self,t=None):
899 """ the time-weighted Variance of the monitored variable. 900 901 If t is used it is assumed to be the current time, 902 otherwise t = now() 903 """ 904 N = self.__len__() 905 if N == 0: 906 print 'SimPy: No observations for timeVariance' 907 return None 908 if t is None: t = now() 909 sm = 0.0 910 ssq = 0.0 911 tlast = self.startTime 912 # print 'DEBUG: 1 twVar ',t,tlast 913 ylast = 0.0 914 for i in range(N): 915 ti,yi = self[i] 916 sm += ylast*(ti-tlast) 917 ssq += ylast*ylast*(ti-tlast) 918 tlast = ti 919 ylast = yi 920 sm += ylast*(t-tlast) 921 ssq += ylast*ylast*(t-tlast) 922 T = t - self.startTime 923 if T == 0: 924 print 'SimPy: No elapsed time for timeVariance' 925 return None 926 mn = sm/float(T) 927 # print 'DEBUG: 2 twVar ',ssq,t,T 928 return ssq/float(T) - mn*mn
929 930
931 - def histogram(self,low=0.0,high=100.0,nbins=10):
932 """ A histogram of the monitored y data values. 933 """ 934 h = Histogram(name=self.name,low=low,high=high,nbins=nbins) 935 ys = self.yseries() 936 for y in ys: h.addIn(y) 937 return h
938
939 - def getHistogram(self):
940 """Returns a histogram based on the parameters provided in 941 preceding call to setHistogram. 942 """ 943 ys = self.yseries() 944 h=self.histo 945 for y in ys: h.addIn(y) 946 return h
947
948 - def printHistogram(self,fmt="%s"):
949 """Returns formatted frequency distribution table string from Monitor. 950 Precondition: setHistogram must have been called. 951 fmt==format of bin range values 952 """ 953 try: 954 histo=self.getHistogram() 955 except: 956 raise FatalSimerror("histogramTable: call setHistogram first"\ 957 " for Monitor %s"%self.name) 958 ylab=self.ylab 959 nrObs=self.count() 960 width=len(str(nrObs)) 961 res=[] 962 res.append("\nHistogram for %s:"%histo.name) 963 res.append("\nNumber of observations: %s"%nrObs) 964 su=sum(self.yseries()) 965 cum=histo[0][1] 966 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 967 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 968 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 969 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 970 l1width=len(("%s <= "%fmt)%histo[1][0]) 971 res.append(line1\ 972 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 973 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 974 ) 975 for i in range(1,len(histo)-1): 976 cum+=histo[i][1] 977 res.append(line\ 978 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 979 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 980 ) 981 cum+=histo[-1][1] 982 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 983 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 984 lnwidth=len(("<%s"%fmt)%histo[1][0]) 985 res.append(linen\ 986 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 987 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 988 ) 989 return " ".join(res)
990
991 -class Tally:
992 - def __init__(self, name="a_Tally", ylab="y",tlab="t"):
993 self.name = name 994 self.ylab = ylab 995 self.tlab = tlab 996 self.reset() 997 self.startTime = 0.0 998 self.histo = None 999 self.sum = 0.0 1000 self._sum_of_squares = 0 1001 self._integral = 0.0 # time-weighted sum 1002 self._integral2 = 0.0 # time-weighted sum of squares 1003 allTallies.append(self)
1004
1005 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
1006 """Sets histogram parameters. 1007 Must be called to prior to observations initiate data collection 1008 for histogram. 1009 """ 1010 if name=='': 1011 hname=self.name 1012 else: 1013 hname=name 1014 self.histo=Histogram(name=hname,low=low,high=high,nbins=nbins)
1015
1016 - def observe(self, y, t=None):
1017 if t is None: 1018 t = now() 1019 self._integral += (t - self._last_timestamp) * self._last_observation 1020 yy = self._last_observation* self._last_observation 1021 self._integral2 += (t - self._last_timestamp) * yy 1022 self._last_timestamp = t 1023 self._last_observation = y 1024 self._total += y 1025 self._count += 1 1026 self._sum += y 1027 self._sum_of_squares += y * y 1028 if self.histo: 1029 self.histo.addIn(y)
1030
1031 - def reset(self, t=None):
1032 if t is None: 1033 t = now() 1034 self.startTime = t 1035 self._last_timestamp = t 1036 self._last_observation = 0.0 1037 self._count = 0 1038 self._total = 0.0 1039 self._integral = 0.0 1040 self._integral2 = 0.0 1041 self._sum = 0.0 1042 self._sum_of_squares = 0.0
1043
1044 - def count(self):
1045 return self._count
1046
1047 - def total(self):
1048 return self._total
1049
1050 - def mean(self):
1051 return 1.0 * self._total / self._count
1052
1053 - def timeAverage(self,t=None):
1054 if t is None: 1055 t=now() 1056 integ=self._integral+(t - self._last_timestamp) * self._last_observation 1057 if (t > self.startTime): 1058 return 1.0 * integ/(t - self.startTime) 1059 else: 1060 print 'SimPy: No elapsed time for timeAverage' 1061 return None
1062
1063 - def var(self):
1064 return 1.0 * (self._sum_of_squares - (1.0 * (self._sum * self._sum)\ 1065 / self._count)) / (self._count)
1066
1067 - def timeVariance(self,t=None):
1068 """ the time-weighted Variance of the Tallied variable. 1069 1070 If t is used it is assumed to be the current time, 1071 otherwise t = now() 1072 """ 1073 if t is None: 1074 t=now() 1075 twAve = self.timeAverage(t) 1076 #print 'Tally timeVariance DEBUG: twave:', twAve 1077 last = self._last_observation 1078 twinteg2=self._integral2+(t - self._last_timestamp) * last * last 1079 #print 'Tally timeVariance DEBUG:tinteg2:', twinteg2 1080 if (t > self.startTime): 1081 return 1.0 * twinteg2/(t - self.startTime) - twAve*twAve 1082 else: 1083 print 'SimPy: No elapsed time for timeVariance' 1084 return None
1085 1086 1087
1088 - def __len__(self):
1089 return self._count
1090
1091 - def __eq__(self, l):
1092 return len(l) == self._count
1093
1094 - def getHistogram(self):
1095 return self.histo
1096
1097 - def printHistogram(self,fmt="%s"):
1098 """Returns formatted frequency distribution table string from Tally. 1099 Precondition: setHistogram must have been called. 1100 fmt==format of bin range values 1101 """ 1102 try: 1103 histo=self.getHistogram() 1104 except: 1105 raise FatalSimerror("histogramTable: call setHistogram first"\ 1106 " for Tally %s"%self.name) 1107 ylab=self.ylab 1108 nrObs=self.count() 1109 width=len(str(nrObs)) 1110 res=[] 1111 res.append("\nHistogram for %s:"%histo.name) 1112 res.append("\nNumber of observations: %s"%nrObs) 1113 su=self.total() 1114 cum=histo[0][1] 1115 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 1116 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 1117 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 1118 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 1119 l1width=len(("%s <= "%fmt)%histo[1][0]) 1120 res.append(line1\ 1121 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 1122 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1123 ) 1124 for i in range(1,len(histo)-1): 1125 cum+=histo[i][1] 1126 res.append(line\ 1127 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 1128 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1129 ) 1130 cum+=histo[-1][1] 1131 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 1132 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 1133 lnwidth=len(("<%s"%fmt)%histo[1][0]) 1134 res.append(linen\ 1135 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 1136 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1137 ) 1138 return " ".join(res)
1139
1140 -class Queue(list):
1141 - def __init__(self,res,moni):
1142 if not moni is None: #moni==[]: 1143 self.monit=True # True if a type of Monitor/Tally attached 1144 else: 1145 self.monit=False 1146 self.moni=moni # The Monitor/Tally 1147 self.resource=res # the resource/buffer this queue belongs to
1148
1149 - def enter(self,obj):
1150 pass
1151
1152 - def leave(self):
1153 pass
1154
1155 - def takeout(self,obj):
1156 self.remove(obj) 1157 if self.monit: 1158 self.moni.observe(len(self),t=now())
1159
1160 -class FIFO(Queue):
1161 - def __init__(self,res,moni):
1162 Queue.__init__(self,res,moni)
1163
1164 - def enter(self,obj):
1165 self.append(obj) 1166 if self.monit: 1167 self.moni.observe(len(self),t=now())
1168
1169 - def enterGet(self,obj):
1170 self.enter(obj)
1171
1172 - def enterPut(self,obj):
1173 self.enter(obj)
1174
1175 - def leave(self):
1176 a= self.pop(0) 1177 if self.monit: 1178 self.moni.observe(len(self),t=now()) 1179 return a
1180
1181 -class PriorityQ(FIFO):
1182 """Queue is always ordered according to priority. 1183 Higher value of priority attribute == higher priority. 1184 """
1185 - def __init__(self,res,moni):
1186 FIFO.__init__(self,res,moni)
1187
1188 - def enter(self,obj):
1189 """Handles request queue for Resource""" 1190 if len(self): 1191 ix=self.resource 1192 if self[-1]._priority[ix] >= obj._priority[ix]: 1193 self.append(obj) 1194 else: 1195 z=0 1196 while self[z]._priority[ix] >= obj._priority[ix]: 1197 z += 1 1198 self.insert(z,obj) 1199 else: 1200 self.append(obj) 1201 if self.monit: 1202 self.moni.observe(len(self),t=now())
1203
1204 - def enterGet(self,obj):
1205 """Handles getQ in Buffer""" 1206 if len(self): 1207 ix=self.resource 1208 #print "priority:",[x._priority[ix] for x in self] 1209 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 1210 self.append(obj) 1211 else: 1212 z=0 1213 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 1214 z += 1 1215 self.insert(z,obj) 1216 else: 1217 self.append(obj) 1218 if self.monit: 1219 self.moni.observe(len(self),t=now())
1220
1221 - def enterPut(self,obj):
1222 """Handles putQ in Buffer""" 1223 if len(self): 1224 ix=self.resource 1225 #print "priority:",[x._priority[ix] for x in self] 1226 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 1227 self.append(obj) 1228 else: 1229 z=0 1230 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 1231 z += 1 1232 self.insert(z,obj) 1233 else: 1234 self.append(obj) 1235 if self.monit: 1236 self.moni.observe(len(self),t=now())
1237
1238 -class Resource(Lister):
1239 """Models shared, limited capacity resources with queuing; 1240 FIFO is default queuing discipline. 1241 """ 1242
1243 - def __init__(self,capacity=1,name="a_resource",unitName="units", 1244 qType=FIFO,preemptable=0,monitored=False,monitorType=Monitor):
1245 """ 1246 monitorType={Monitor(default)|Tally} 1247 """ 1248 self.name=name # resource name 1249 self.capacity=capacity # resource units in this resource 1250 self.unitName=unitName # type name of resource units 1251 self.n=capacity # uncommitted resource units 1252 self.monitored=monitored 1253 1254 if self.monitored: # Monitor waitQ, activeQ 1255 self.actMon=monitorType(name="Active Queue Monitor %s"%self.name, 1256 ylab="nr in queue",tlab="time") 1257 monact=self.actMon 1258 self.waitMon=monitorType(name="Wait Queue Monitor %s"%self.name, 1259 ylab="nr in queue",tlab="time") 1260 monwait=self.waitMon 1261 else: 1262 monwait=None 1263 monact=None 1264 self.waitQ=qType(self,monwait) 1265 self.preemptable=preemptable 1266 self.activeQ=qType(self,monact) 1267 self.priority_default=0
1268
1269 - def _request(self,arg):
1270 """Process request event for this resource""" 1271 obj=arg[1] 1272 if len(arg[0]) == 4: # yield request,self,resource,priority 1273 obj._priority[self]=arg[0][3] 1274 else: # yield request,self,resource 1275 obj._priority[self]=self.priority_default 1276 if self.preemptable and self.n == 0: # No free resource 1277 # test for preemption condition 1278 preempt=obj._priority[self] > self.activeQ[-1]._priority[self] 1279 # If yes: 1280 if preempt: 1281 z=self.activeQ[-1] 1282 # Keep track of preempt level 1283 z._preempted+=1 1284 # suspend lowest priority process being served 1285 # record remaining service time at first preempt only 1286 if z._preempted==1: 1287 z._remainService = z._nextTime - _t 1288 # cancel only at first preempt 1289 Process().cancel(z) 1290 # remove from activeQ 1291 self.activeQ.remove(z) 1292 # put into front of waitQ 1293 self.waitQ.insert(0,z) 1294 # if self is monitored, update waitQ monitor 1295 if self.monitored: 1296 self.waitMon.observe(len(self.waitQ),now()) 1297 # passivate re-queued process 1298 z._nextTime=None 1299 # assign resource unit to preemptor 1300 self.activeQ.enter(obj) 1301 # post event notice for preempting process 1302 _e._post(obj,at=_t,prior=1) 1303 else: 1304 self.waitQ.enter(obj) 1305 # passivate queuing process 1306 obj._nextTime=None 1307 else: # treat non-preemption case 1308 if self.n == 0: 1309 self.waitQ.enter(obj) 1310 # passivate queuing process 1311 obj._nextTime=None 1312 else: 1313 self.n -= 1 1314 self.activeQ.enter(obj) 1315 _e._post(obj,at=_t,prior=1)
1316
1317 - def _release(self,arg):
1318 """Process release request for this resource""" 1319 self.n += 1 1320 self.activeQ.remove(arg[1]) 1321 if self.monitored: 1322 self.actMon.observe(len(self.activeQ),t=now()) 1323 #reactivate first waiting requestor if any; assign Resource to it 1324 if self.waitQ: 1325 obj=self.waitQ.leave() 1326 self.n -= 1 #assign 1 resource unit to object 1327 self.activeQ.enter(obj) 1328 # if resource preemptable: 1329 if self.preemptable: 1330 # if object had been preempted: 1331 if obj._preempted: 1332 # keep track of preempt level 1333 obj._preempted-=1 1334 # reactivate object delay= remaining service time 1335 # but only, if all other preempts are over 1336 if obj._preempted==0: 1337 reactivate(obj,delay=obj._remainService,prior=1) 1338 # else reactivate right away 1339 else: 1340 reactivate(obj,delay=0,prior=1) 1341 # else: 1342 else: 1343 reactivate(obj,delay=0,prior=1) 1344 _e._post(arg[1],at=_t,prior=1)
1345
1346 -class Buffer(Lister):
1347 """Abstract class for buffers 1348 Blocks a process when a put would cause buffer overflow or a get would cause 1349 buffer underflow. 1350 Default queuing discipline for blocked processes is FIFO.""" 1351 1352 priorityDefault=0
1353 - def __init__(self,name=None,capacity="unbounded",unitName="units", 1354 putQType=FIFO,getQType=FIFO, 1355 monitored=False,monitorType=Monitor,initialBuffered=None):
1356 if capacity=="unbounded": capacity=sys.maxint 1357 self.capacity=capacity 1358 self.name=name 1359 self.putQType=putQType 1360 self.getQType=getQType 1361 self.monitored=monitored 1362 self.initialBuffered=initialBuffered 1363 self.unitName=unitName 1364 if self.monitored: 1365 ## monitor for Producer processes' queue 1366 self.putQMon=monitorType(name="Producer Queue Monitor %s"%self.name, 1367 ylab="nr in queue",tlab="time") 1368 ## monitor for Consumer processes' queue 1369 self.getQMon=monitorType(name="Consumer Queue Monitor %s"%self.name, 1370 ylab="nr in queue",tlab="time") 1371 ## monitor for nr items in buffer 1372 self.bufferMon=monitorType(name="Buffer Monitor %s"%self.name, 1373 ylab="nr in buffer",tlab="time") 1374 else: 1375 self.putQMon=None 1376 self.getQMon=None 1377 self.bufferMon=None 1378 self.putQ=self.putQType(res=self,moni=self.putQMon) 1379 self.getQ=self.getQType(res=self,moni=self.getQMon) 1380 if self.monitored: 1381 self.putQMon.observe(y=len(self.putQ),t=now()) 1382 self.getQMon.observe(y=len(self.getQ),t=now()) 1383 self._putpriority={} 1384 self._getpriority={} 1385 1386 def _put(self): 1387 pass
1388 def _get(self): 1389 pass
1390
1391 -class Level(Buffer):
1392 """Models buffers for processes putting/getting un-distinguishable items. 1393 """
1394 - def getamount(self):
1395 return self.nrBuffered
1396
1397 - def gettheBuffer(self):
1398 return self.nrBuffered
1399 1400 theBuffer=property(gettheBuffer) 1401
1402 - def __init__(self,**pars):
1403 Buffer.__init__(self,**pars) 1404 if self.name is None: 1405 self.name="a_level" ## default name 1406 1407 if (type(self.capacity)!=type(1.0) and\ 1408 type(self.capacity)!=type(1)) or\ 1409 self.capacity<0: 1410 raise FatalSimerror\ 1411 ("Level: capacity parameter not a positive number: %s"\ 1412 %self.initialBuffered) 1413 1414 if type(self.initialBuffered)==type(1.0) or\ 1415 type(self.initialBuffered)==type(1): 1416 if self.initialBuffered>self.capacity: 1417 raise FatalSimerror("initialBuffered exceeds capacity") 1418 if self.initialBuffered>=0: 1419 self.nrBuffered=self.initialBuffered ## nr items initially in buffer 1420 ## buffer is just a counter (int type) 1421 else: 1422 raise FatalSimerror\ 1423 ("initialBuffered param of Level negative: %s"\ 1424 %self.initialBuffered) 1425 elif self.initialBuffered is None: 1426 self.initialBuffered=0 1427 self.nrBuffered=0 1428 else: 1429 raise FatalSimerror\ 1430 ("Level: wrong type of initialBuffered (parameter=%s)"\ 1431 %self.initialBuffered) 1432 if self.monitored: 1433 self.bufferMon.observe(y=self.amount,t=now())
1434 amount=property(getamount) 1435
1436 - def _put(self,arg):
1437 """Handles put requests for Level instances""" 1438 obj=arg[1] 1439 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1440 obj._putpriority[self]=arg[0][4] 1441 whatToPut=arg[0][3] 1442 elif len(arg[0]) == 4: # yield get,self,buff,whattoput 1443 obj._putpriority[self]=Buffer.priorityDefault #default 1444 whatToPut=arg[0][3] 1445 else: # yield get,self,buff 1446 obj._putpriority[self]=Buffer.priorityDefault #default 1447 whatToPut=1 1448 if type(whatToPut)!=type(1) and type(whatToPut)!=type(1.0): 1449 raise FatalSimerror("Level: put parameter not a number") 1450 if not whatToPut>=0.0: 1451 raise FatalSimerror("Level: put parameter not positive number") 1452 whatToPutNr=whatToPut 1453 if whatToPutNr+self.amount>self.capacity: 1454 obj._nextTime=None #passivate put requestor 1455 obj._whatToPut=whatToPutNr 1456 self.putQ.enterPut(obj) #and queue, with size of put 1457 else: 1458 self.nrBuffered+=whatToPutNr 1459 if self.monitored: 1460 self.bufferMon.observe(y=self.amount,t=now()) 1461 # service any getters waiting 1462 # service in queue-order; do not serve second in queue before first 1463 # has been served 1464 while len(self.getQ) and self.amount>0: 1465 proc=self.getQ[0] 1466 if proc._nrToGet<=self.amount: 1467 proc.got=proc._nrToGet 1468 self.nrBuffered-=proc.got 1469 if self.monitored: 1470 self.bufferMon.observe(y=self.amount,t=now()) 1471 self.getQ.takeout(proc) # get requestor's record out of queue 1472 _e._post(proc,at=_t) # continue a blocked get requestor 1473 else: 1474 break 1475 _e._post(obj,at=_t,prior=1) # continue the put requestor
1476
1477 - def _get(self,arg):
1478 """Handles get requests for Level instances""" 1479 obj=arg[1] 1480 obj.got=None 1481 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1482 obj._getpriority[self]=arg[0][4] 1483 nrToGet=arg[0][3] 1484 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1485 obj._getpriority[self]=Buffer.priorityDefault #default 1486 nrToGet=arg[0][3] 1487 else: # yield get,self,buff 1488 obj._getpriority[self]=Buffer.priorityDefault 1489 nrToGet=1 1490 if type(nrToGet)!=type(1.0) and type(nrToGet)!=type(1): 1491 raise FatalSimerror\ 1492 ("Level: get parameter not a number: %s"%nrToGet) 1493 if nrToGet<0: 1494 raise FatalSimerror\ 1495 ("Level: get parameter not positive number: %s"%nrToGet) 1496 if self.amount < nrToGet: 1497 obj._nrToGet=nrToGet 1498 self.getQ.enterGet(obj) 1499 # passivate queuing process 1500 obj._nextTime=None 1501 else: 1502 obj.got=nrToGet 1503 self.nrBuffered-=nrToGet 1504 if self.monitored: 1505 self.bufferMon.observe(y=self.amount,t=now()) 1506 _e._post(obj,at=_t,prior=1) 1507 # reactivate any put requestors for which space is now available 1508 # service in queue-order; do not serve second in queue before first 1509 # has been served 1510 while len(self.putQ): #test for queued producers 1511 proc=self.putQ[0] 1512 if proc._whatToPut+self.amount<=self.capacity: 1513 self.nrBuffered+=proc._whatToPut 1514 if self.monitored: 1515 self.bufferMon.observe(y=self.amount,t=now()) 1516 self.putQ.takeout(proc)#requestor's record out of queue 1517 _e._post(proc,at=_t) # continue a blocked put requestor 1518 else: 1519 break
1520
1521 -class Store(Buffer):
1522 """Models buffers for processes coupled by putting/getting distinguishable 1523 items. 1524 Blocks a process when a put would cause buffer overflow or a get would cause 1525 buffer underflow. 1526 Default queuing discipline for blocked processes is priority FIFO. 1527 """
1528 - def getnrBuffered(self):
1529 return len(self.theBuffer)
1530 nrBuffered=property(getnrBuffered) 1531
1532 - def getbuffered(self):
1533 return self.theBuffer
1534 buffered=property(getbuffered) 1535
1536 - def __init__(self,**pars):
1537 Buffer.__init__(self,**pars) 1538 self.theBuffer=[] 1539 if self.name is None: 1540 self.name="a_store" ## default name 1541 if type(self.capacity)!=type(1) or self.capacity<=0: 1542 raise FatalSimerror\ 1543 ("Store: capacity parameter not a positive integer > 0: %s"\ 1544 %self.initialBuffered) 1545 if type(self.initialBuffered)==type([]): 1546 if len(self.initialBuffered)>self.capacity: 1547 raise FatalSimerror("initialBuffered exceeds capacity") 1548 else: 1549 self.theBuffer[:]=self.initialBuffered##buffer==list of objects 1550 elif self.initialBuffered is None: 1551 self.theBuffer=[] 1552 else: 1553 raise FatalSimerror\ 1554 ("Store: initialBuffered not a list") 1555 if self.monitored: 1556 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1557 self._sort=None
1558 1559 1560
1561 - def addSort(self,sortFunc):
1562 """Adds buffer sorting to this instance of Store. It maintains 1563 theBuffer sorted by the sortAttr attribute of the objects in the 1564 buffer. 1565 The user-provided 'sortFunc' must look like this: 1566 1567 def mySort(self,par): 1568 tmplist=[(x.sortAttr,x) for x in par] 1569 tmplist.sort() 1570 return [x for (key,x) in tmplist] 1571 1572 """ 1573 1574 self._sort=new.instancemethod(sortFunc,self,self.__class__) 1575 self.theBuffer=self._sort(self.theBuffer)
1576
1577 - def _put(self,arg):
1578 """Handles put requests for Store instances""" 1579 obj=arg[1] 1580 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1581 obj._putpriority[self]=arg[0][4] 1582 whatToPut=arg[0][3] 1583 elif len(arg[0]) == 4: # yield put,self,buff,whattoput 1584 obj._putpriority[self]=Buffer.priorityDefault #default 1585 whatToPut=arg[0][3] 1586 else: # error, whattoput missing 1587 raise FatalSimerror("Item to put missing in yield put stmt") 1588 if type(whatToPut)!=type([]): 1589 raise FatalSimerror("put parameter is not a list") 1590 whatToPutNr=len(whatToPut) 1591 if whatToPutNr+self.nrBuffered>self.capacity: 1592 obj._nextTime=None #passivate put requestor 1593 obj._whatToPut=whatToPut 1594 self.putQ.enterPut(obj) #and queue, with items to put 1595 else: 1596 self.theBuffer.extend(whatToPut) 1597 if not(self._sort is None): 1598 self.theBuffer=self._sort(self.theBuffer) 1599 if self.monitored: 1600 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1601 1602 # service any waiting getters 1603 # service in queue order: do not serve second in queue before first 1604 # has been served 1605 while self.nrBuffered>0 and len(self.getQ): 1606 proc=self.getQ[0] 1607 if inspect.isfunction(proc._nrToGet): 1608 movCand=proc._nrToGet(self.theBuffer) #predicate parameter 1609 if movCand: 1610 proc.got=movCand[:] 1611 for i in movCand: 1612 self.theBuffer.remove(i) 1613 self.getQ.takeout(proc) 1614 if self.monitored: 1615 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1616 _e._post(what=proc,at=_t) # continue a blocked get requestor 1617 else: 1618 break 1619 else: #numerical parameter 1620 if proc._nrToGet<=self.nrBuffered: 1621 nrToGet=proc._nrToGet 1622 proc.got=[] 1623 proc.got[:]=self.theBuffer[0:nrToGet] 1624 self.theBuffer[:]=self.theBuffer[nrToGet:] 1625 if self.monitored: 1626 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1627 # take this get requestor's record out of queue: 1628 self.getQ.takeout(proc) 1629 _e._post(what=proc,at=_t) # continue a blocked get requestor 1630 else: 1631 break 1632 1633 _e._post(what=obj,at=_t,prior=1) # continue the put requestor
1634
1635 - def _get(self,arg):
1636 """Handles get requests""" 1637 filtfunc=None 1638 obj=arg[1] 1639 obj.got=[] # the list of items retrieved by 'get' 1640 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1641 obj._getpriority[self]=arg[0][4] 1642 if inspect.isfunction(arg[0][3]): 1643 filtfunc=arg[0][3] 1644 else: 1645 nrToGet=arg[0][3] 1646 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1647 obj._getpriority[self]=Buffer.priorityDefault #default 1648 if inspect.isfunction(arg[0][3]): 1649 filtfunc=arg[0][3] 1650 else: 1651 nrToGet=arg[0][3] 1652 else: # yield get,self,buff 1653 obj._getpriority[self]=Buffer.priorityDefault 1654 nrToGet=1 1655 if not filtfunc: #number specifies nr items to get 1656 if nrToGet<0: 1657 raise FatalSimerror\ 1658 ("Store: get parameter not positive number: %s"%nrToGet) 1659 if self.nrBuffered < nrToGet: 1660 obj._nrToGet=nrToGet 1661 self.getQ.enterGet(obj) 1662 # passivate/block queuing 'get' process 1663 obj._nextTime=None 1664 else: 1665 for i in range(nrToGet): 1666 obj.got.append(self.theBuffer.pop(0)) # move items from 1667 # buffer to requesting process 1668 if self.monitored: 1669 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1670 _e._post(obj,at=_t,prior=1) 1671 # reactivate any put requestors for which space is now available 1672 # serve in queue order: do not serve second in queue before first 1673 # has been served 1674 while len(self.putQ): 1675 proc=self.putQ[0] 1676 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1677 for i in proc._whatToPut: 1678 self.theBuffer.append(i) #move items to buffer 1679 if not(self._sort is None): 1680 self.theBuffer=self._sort(self.theBuffer) 1681 if self.monitored: 1682 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1683 self.putQ.takeout(proc) # dequeue requestor's record 1684 _e._post(proc,at=_t) # continue a blocked put requestor 1685 else: 1686 break 1687 else: # items to get determined by filtfunc 1688 movCand=filtfunc(self.theBuffer) 1689 if movCand: # get succeded 1690 _e._post(obj,at=_t,prior=1) 1691 obj.got=movCand[:] 1692 for item in movCand: 1693 self.theBuffer.remove(item) 1694 if self.monitored: 1695 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1696 # reactivate any put requestors for which space is now available 1697 # serve in queue order: do not serve second in queue before first 1698 # has been served 1699 while len(self.putQ): 1700 proc=self.putQ[0] 1701 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1702 for i in proc._whatToPut: 1703 self.theBuffer.append(i) #move items to buffer 1704 if not(self._sort is None): 1705 self.theBuffer=self._sort(self.theBuffer) 1706 if self.monitored: 1707 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1708 self.putQ.takeout(proc) # dequeue requestor's record 1709 _e._post(proc,at=_t) # continue a blocked put requestor 1710 else: 1711 break 1712 else: # get did not succeed, block 1713 obj._nrToGet=filtfunc 1714 self.getQ.enterGet(obj) 1715 # passivate/block queuing 'get' process 1716 obj._nextTime=None
1717
1718 -class SimEvent(Lister):
1719 """Supports one-shot signalling between processes. All processes waiting for an event to occur 1720 get activated when its occurrence is signalled. From the processes queuing for an event, only 1721 the first gets activated. 1722 """
1723 - def __init__(self,name="a_SimEvent"):
1724 self.name=name 1725 self.waits=[] 1726 self.queues=[] 1727 self.occurred=False 1728 self.signalparam=None
1729
1730 - def signal(self,param=None):
1731 """Produces a signal to self; 1732 Fires this event (makes it occur). 1733 Reactivates ALL processes waiting for this event. (Cleanup waits lists 1734 of other events if wait was for an event-group (OR).) 1735 Reactivates the first process for which event(s) it is queuing for 1736 have fired. (Cleanup queues of other events if wait was for an event-group (OR).) 1737 """ 1738 self.signalparam=param 1739 if not self.waits and not self.queues: 1740 self.occurred=True 1741 else: 1742 #reactivate all waiting processes 1743 for p in self.waits: 1744 p[0].eventsFired.append(self) 1745 reactivate(p[0],prior=True) 1746 #delete waits entries for this process in other events 1747 for ev in p[1]: 1748 if ev!=self: 1749 if ev.occurred: 1750 p[0].eventsFired.append(ev) 1751 for iev in ev.waits: 1752 if iev[0]==p[0]: 1753 ev.waits.remove(iev) 1754 break 1755 self.waits=[] 1756 if self.queues: 1757 proc=self.queues.pop(0)[0] 1758 proc.eventsFired.append(self) 1759 reactivate(proc)
1760
1761 - def _wait(self,par):
1762 """Consumes a signal if it has occurred, otherwise process 'proc' 1763 waits for this event. 1764 """ 1765 proc=par[0][1] #the process issuing the yield waitevent command 1766 proc.eventsFired=[] 1767 if not self.occurred: 1768 self.waits.append([proc,[self]]) 1769 proc._nextTime=None #passivate calling process 1770 else: 1771 proc.eventsFired.append(self) 1772 self.occurred=False 1773 _e._post(proc,at=_t,prior=1)
1774
1775 - def _waitOR(self,par):
1776 """Handles waiting for an OR of events in a tuple/list. 1777 """ 1778 proc=par[0][1] 1779 evlist=par[0][2] 1780 proc.eventsFired=[] 1781 anyoccur=False 1782 for ev in evlist: 1783 if ev.occurred: 1784 anyoccur=True 1785 proc.eventsFired.append(ev) 1786 ev.occurred=False 1787 if anyoccur: #at least one event has fired; continue process 1788 _e._post(proc,at=_t,prior=1) 1789 1790 else: #no event in list has fired, enter process in all 'waits' lists 1791 proc.eventsFired=[] 1792 proc._nextTime=None #passivate calling process 1793 for ev in evlist: 1794 ev.waits.append([proc,evlist])
1795
1796 - def _queue(self,par):
1797 """Consumes a signal if it has occurred, otherwise process 'proc' 1798 queues for this event. 1799 """ 1800 proc=par[0][1] #the process issuing the yield queueevent command 1801 proc.eventsFired=[] 1802 if not self.occurred: 1803 self.queues.append([proc,[self]]) 1804 proc._nextTime=None #passivate calling process 1805 else: 1806 proc.eventsFired.append(self) 1807 self.occurred=False 1808 _e._post(proc,at=_t,prior=1)
1809
1810 - def _queueOR(self,par):
1811 """Handles queueing for an OR of events in a tuple/list. 1812 """ 1813 proc=par[0][1] 1814 evlist=par[0][2] 1815 proc.eventsFired=[] 1816 anyoccur=False 1817 for ev in evlist: 1818 if ev.occurred: 1819 anyoccur=True 1820 proc.eventsFired.append(ev) 1821 ev.occurred=False 1822 if anyoccur: #at least one event has fired; continue process 1823 _e._post(proc,at=_t,prior=1) 1824 1825 else: #no event in list has fired, enter process in all 'waits' lists 1826 proc.eventsFired=[] 1827 proc._nextTime=None #passivate calling process 1828 for ev in evlist: 1829 ev.queues.append([proc,evlist])
1830 1831 ## begin waituntil functionality
1832 -def _test():
1833 """ 1834 Gets called by simulate after every event, as long as there are processes 1835 waiting in condQ for a condition to be satisfied. 1836 Tests the conditions for all waiting processes. Where condition satisfied, 1837 reactivates that process immediately and removes it from queue. 1838 """ 1839 global condQ 1840 rList=[] 1841 for el in condQ: 1842 if el.cond(): 1843 rList.append(el) 1844 reactivate(el) 1845 for i in rList: 1846 condQ.remove(i) 1847 1848 if not condQ: 1849 _stopWUStepping()
1850
1851 -def _waitUntilFunc(proc,cond):
1852 global condQ 1853 """ 1854 Puts a process 'proc' waiting for a condition into a waiting queue. 1855 'cond' is a predicate function which returns True if the condition is 1856 satisfied. 1857 """ 1858 if not cond(): 1859 condQ.append(proc) 1860 proc.cond=cond 1861 _startWUStepping() #signal 'simulate' that a process is waiting 1862 # passivate calling process 1863 proc._nextTime=None 1864 else: 1865 #schedule continuation of calling process 1866 _e._post(proc,at=_t,prior=1)
1867 1868 1869 ##end waituntil functionality 1870
1871 -def scheduler(till=0):
1872 """Schedules Processes/semi-coroutines until time 'till'. 1873 Deprecated since version 0.5. 1874 """ 1875 simulate(until=till)
1876
1877 -def holdfunc(a):
1878 a[0][1]._hold(a)
1879
1880 -def requestfunc(a):
1881 """Handles 'yield request,self,res' and 'yield (request,self,res),(<code>,self,par)'. 1882 <code> can be 'hold' or 'waitevent'. 1883 """ 1884 if type(a[0][0])==tuple: 1885 ## Compound yield request statement 1886 ## first tuple in ((request,self,res),(xx,self,yy)) 1887 b=a[0][0] 1888 ## b[2]==res (the resource requested) 1889 ##process the first part of the compound yield statement 1890 ##a[1] is the Process instance 1891 b[2]._request(arg=(b,a[1])) 1892 ##deal with add-on condition to command 1893 ##Trigger processes for reneging 1894 class _Holder(Process): 1895 """Provides timeout process""" 1896 def trigger(self,delay): 1897 yield hold,self,delay 1898 if not proc in b[2].activeQ: 1899 reactivate(proc)
1900 1901 class _EventWait(Process): 1902 """Provides event waiting process""" 1903 def trigger(self,event): 1904 yield waitevent,self,event 1905 if not proc in b[2].activeQ: 1906 a[1].eventsFired=self.eventsFired 1907 reactivate(proc) 1908 1909 #activate it 1910 proc=a[0][0][1] # the process to be woken up 1911 actCode=a[0][1][0] 1912 if actCode==hold: 1913 proc._holder=_Holder(name="RENEGE-hold for %s"%proc.name) 1914 ## the timeout delay 1915 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1916 elif actCode==waituntil: 1917 raise FatalSimerror("Illegal code for reneging: waituntil") 1918 elif actCode==waitevent: 1919 proc._holder=_EventWait(name="RENEGE-waitevent for %s"%proc.name) 1920 ## the event 1921 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1922 elif actCode==queueevent: 1923 raise FatalSimerror("Illegal code for reneging: queueevent") 1924 else: 1925 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1926 else: 1927 ## Simple yield request command 1928 a[0][2]._request(a) 1929
1930 -def releasefunc(a):
1931 a[0][2]._release(a)
1932
1933 -def passivatefunc(a):
1934 a[0][1]._passivate(a)
1935
1936 -def waitevfunc(a):
1937 #if waiting for one event only (not a tuple or list) 1938 evtpar=a[0][2] 1939 if isinstance(evtpar,SimEvent): 1940 a[0][2]._wait(a) 1941 # else, if waiting for an OR of events (list/tuple): 1942 else: #it should be a list/tuple of events 1943 # call _waitOR for first event 1944 evtpar[0]._waitOR(a)
1945
1946 -def queueevfunc(a):
1947 #if queueing for one event only (not a tuple or list) 1948 evtpar=a[0][2] 1949 if isinstance(evtpar,SimEvent): 1950 a[0][2]._queue(a) 1951 #else, if queueing for an OR of events (list/tuple): 1952 else: #it should be a list/tuple of events 1953 # call _queueOR for first event 1954 evtpar[0]._queueOR(a)
1955
1956 -def waituntilfunc(par):
1957 _waitUntilFunc(par[0][1],par[0][2])
1958
1959 -def getfunc(a):
1960 """Handles 'yield get,self,buffer,what,priority' and 1961 'yield (get,self,buffer,what,priority),(<code>,self,par)'. 1962 <code> can be 'hold' or 'waitevent'. 1963 """ 1964 if type(a[0][0])==tuple: 1965 ## Compound yield request statement 1966 ## first tuple in ((request,self,res),(xx,self,yy)) 1967 b=a[0][0] 1968 ## b[2]==res (the resource requested) 1969 ##process the first part of the compound yield statement 1970 ##a[1] is the Process instance 1971 b[2]._get(arg=(b,a[1])) 1972 ##deal with add-on condition to command 1973 ##Trigger processes for reneging 1974 class _Holder(Process): 1975 """Provides timeout process""" 1976 def trigger(self,delay): 1977 yield hold,self,delay 1978 #if not proc in b[2].activeQ: 1979 if proc in b[2].getQ: 1980 reactivate(proc)
1981 1982 class _EventWait(Process): 1983 """Provides event waiting process""" 1984 def trigger(self,event): 1985 yield waitevent,self,event 1986 if proc in b[2].getQ: 1987 a[1].eventsFired=self.eventsFired 1988 reactivate(proc) 1989 1990 #activate it 1991 proc=a[0][0][1] # the process to be woken up 1992 actCode=a[0][1][0] 1993 if actCode==hold: 1994 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1995 ## the timeout delay 1996 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1997 elif actCode==waituntil: 1998 raise FatalSimerror("Illegal code for reneging: waituntil") 1999 elif actCode==waitevent: 2000 proc._holder=_EventWait(proc.name) 2001 ## the event 2002 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 2003 elif actCode==queueevent: 2004 raise FatalSimerror("Illegal code for reneging: queueevent") 2005 else: 2006 raise FatalSimerror("Illegal code for reneging %s"%actCode) 2007 else: 2008 ## Simple yield request command 2009 a[0][2]._get(a) 2010 2011
2012 -def putfunc(a):
2013 """Handles 'yield put' (simple and compound hold/waitevent) 2014 """ 2015 if type(a[0][0])==tuple: 2016 ## Compound yield request statement 2017 ## first tuple in ((request,self,res),(xx,self,yy)) 2018 b=a[0][0] 2019 ## b[2]==res (the resource requested) 2020 ##process the first part of the compound yield statement 2021 ##a[1] is the Process instance 2022 b[2]._put(arg=(b,a[1])) 2023 ##deal with add-on condition to command 2024 ##Trigger processes for reneging 2025 class _Holder(Process): 2026 """Provides timeout process""" 2027 def trigger(self,delay): 2028 yield hold,self,delay 2029 #if not proc in b[2].activeQ: 2030 if proc in b[2].putQ: 2031 reactivate(proc)
2032 2033 class _EventWait(Process): 2034 """Provides event waiting process""" 2035 def trigger(self,event): 2036 yield waitevent,self,event 2037 if proc in b[2].putQ: 2038 a[1].eventsFired=self.eventsFired 2039 reactivate(proc) 2040 2041 #activate it 2042 proc=a[0][0][1] # the process to be woken up 2043 actCode=a[0][1][0] 2044 if actCode==hold: 2045 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 2046 ## the timeout delay 2047 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 2048 elif actCode==waituntil: 2049 raise FatalSimerror("Illegal code for reneging: waituntil") 2050 elif actCode==waitevent: 2051 proc._holder=_EventWait("RENEGE-waitevent for %s"%proc.name) 2052 ## the event 2053 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 2054 elif actCode==queueevent: 2055 raise FatalSimerror("Illegal code for reneging: queueevent") 2056 else: 2057 raise FatalSimerror("Illegal code for reneging %s"%actCode) 2058 else: 2059 ## Simple yield request command 2060 a[0][2]._put(a) 2061
2062 -def simulate(callback=lambda :None,until=0):
2063 """Schedules Processes/semi-coroutines until time 'until'""" 2064 2065 """Gets called once. Afterwards, co-routines (generators) return by 2066 'yield' with a cargo: 2067 yield hold, self, <delay>: schedules the "self" process for activation 2068 after <delay> time units.If <,delay> missing, 2069 same as "yield hold,self,0" 2070 2071 yield passivate,self : makes the "self" process wait to be re-activated 2072 2073 yield request,self,<Resource>[,<priority>]: request 1 unit from <Resource> 2074 with <priority> pos integer (default=0) 2075 2076 yield release,self,<Resource> : release 1 unit to <Resource> 2077 2078 yield waitevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 2079 wait for one or more of several events 2080 2081 2082 yield queueevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 2083 queue for one or more of several events 2084 2085 yield waituntil,self,cond : wait for arbitrary condition 2086 2087 yield get,self,<buffer>[,<WhatToGet>[,<priority>]] 2088 get <WhatToGet> items from buffer (default=1); 2089 <WhatToGet> can be a pos integer or a filter function 2090 (Store only) 2091 2092 yield put,self,<buffer>[,<WhatToPut>[,priority]] 2093 put <WhatToPut> items into buffer (default=1); 2094 <WhatToPut> can be a pos integer (Level) or a list of objects 2095 (Store) 2096 2097 EXTENSIONS: 2098 Request with timeout reneging: 2099 yield (request,self,<Resource>),(hold,self,<patience>) : 2100 requests 1 unit from <Resource>. If unit not acquired in time period 2101 <patience>, self leaves waitQ (reneges). 2102 2103 Request with event-based reneging: 2104 yield (request,self,<Resource>),(waitevent,self,<eventlist>): 2105 requests 1 unit from <Resource>. If one of the events in <eventlist> occurs before unit 2106 acquired, self leaves waitQ (reneges). 2107 2108 Get with timeout reneging (for Store and Level): 2109 yield (get,self,<buffer>,nrToGet etc.),(hold,self,<patience>) 2110 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> in time period 2111 <patience>, self leaves <buffer>.getQ (reneges). 2112 2113 Get with event-based reneging (for Store and Level): 2114 yield (get,self,<buffer>,nrToGet etc.),(waitevent,self,<eventlist>) 2115 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> before one of 2116 the events in <eventlist> occurs, self leaves <buffer>.getQ (reneges). 2117 2118 2119 2120 Event notices get posted in event-list by scheduler after "yield" or by 2121 "activate"/"reactivate" functions. 2122 2123 Nov 9, 2003: Added capability to step through simulation event by event if 2124 step==True. 'callback' gets called after every event. It can 2125 cancel stepping or end run. API and semantics backwards 2126 compatible with previous versions of simulate(). 2127 2128 """ 2129 global _endtime,_e,_stop,_t,_step,_wustep 2130 paused=False 2131 _stop=False 2132 2133 if _e is None: 2134 raise FatalSimerror("Simulation not initialized") 2135 if _e._isEmpty(): 2136 message="SimPy: No activities scheduled" 2137 return message 2138 2139 _endtime=until 2140 message="SimPy: Normal exit" 2141 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2142 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2143 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2144 commandcodes=dispatch.keys() 2145 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2146 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2147 get:"get",put:"put"} 2148 nextev=_e._nextev ## just a timesaver 2149 while not _stop and _t<=_endtime: 2150 try: 2151 a=nextev() 2152 if not a[0] is None: 2153 ## 'a' is tuple "(<yield command>, <action>)" 2154 if type(a[0][0])==tuple: 2155 ##allowing for yield (request,self,res),(waituntil,self,cond) 2156 command=a[0][0][0] 2157 else: 2158 command = a[0][0] 2159 if __debug__: 2160 if not command in commandcodes: 2161 raise FatalSimerror("Illegal command: yield %s"%command) 2162 dispatch[command](a) 2163 except FatalSimerror,error: 2164 print "SimPy: "+error.value 2165 sys.exit(1) 2166 except Simerror,error: 2167 message="SimPy: "+error.value 2168 _stop = True 2169 if _step: 2170 callback() 2171 if _wustep: 2172 _test() 2173 _stopWUStepping() 2174 stopStepping() 2175 _e=None 2176 return message
2177
2178 -def simulateStep(callback=lambda :None,until=0):
2179 """Schedules Processes/semi-coroutines until next event""" 2180 2181 """Can be called repeatedly. 2182 Behaves like "simulate", but does execute only one event per call. 2183 2184 2185 2186 """ 2187 global _endtime,_e,_stop,_t,_step 2188 2189 status="resumable" 2190 2191 if _e == None: 2192 raise Simerror("Fatal SimPy error: Simulation not initialized") 2193 if _e._isEmpty(): 2194 message="SimPy: No activities scheduled" 2195 status="notResumable" 2196 return message,status 2197 2198 _endtime=until 2199 message="SimPy: Normal exit" 2200 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2201 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2202 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2203 commandcodes=dispatch.keys() 2204 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2205 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2206 get:"get",put:"put"} 2207 if not _stop and _t<=_endtime: 2208 try: 2209 a=_e._nextev() 2210 if not a[0]==None: 2211 ## 'a' is tuple "(<yield command>, <action>)" 2212 if type(a[0][0])==tuple: 2213 ##allowing for yield (request,self,res),(waituntil,self,cond) 2214 command=a[0][0][0] 2215 else: 2216 command = a[0][0] 2217 if __debug__: 2218 if not command in commandcodes: 2219 raise FatalSimerror("Fatal error: illegal command: yield %s"%command) 2220 dispatch[command](a) 2221 except FatalSimerror,error: 2222 print "SimPy: "+error.value 2223 sys.exit(1) 2224 except Simerror, error: 2225 message="SimPy: "+ error.value 2226 _stop = True 2227 status="notResumable" 2228 if _step: 2229 callback() 2230 return message,status
2231 2232 2233 ################### end of Simulation module 2234 2235 if __name__ == "__main__": 2236 print "SimPy.SimulationStep %s" %__version__ 2237 ################### start of test/demo programs 2238
2239 - def askCancel():
2240 a=raw_input("[Time=%s] End run (e), Continue stepping (s), Run to end (r)"%now()) 2241 if a=="e": 2242 stopSimulation() 2243 elif a=="s": 2244 return 2245 else: 2246 stopStepping()
2247
2248 - def test_demo():
2249 class Aa(Process): 2250 sequIn=[] 2251 sequOut=[] 2252 def __init__(self,holdtime,name): 2253 Process.__init__(self,name) 2254 self.holdtime=holdtime
2255 2256 def life(self,priority): 2257 for i in range(1): 2258 Aa.sequIn.append(self.name) 2259 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2260 len(rrr.activeQ) 2261 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2262 print "activeQ: ",[(k.name,k._priority[rrr]) \ 2263 for k in rrr.activeQ] 2264 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2265 "Inconsistent resource unit numbers" 2266 print now(),self.name,"requests 1 ", rrr.unitName 2267 yield request,self,rrr,priority 2268 print now(),self.name,"has 1 ",rrr.unitName 2269 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2270 len(rrr.activeQ) 2271 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2272 len(rrr.activeQ) 2273 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2274 "Inconsistent resource unit numbers" 2275 yield hold,self,self.holdtime 2276 print now(),self.name,"gives up 1",rrr.unitName 2277 yield release,self,rrr 2278 Aa.sequOut.append(self.name) 2279 print now(),self.name,"has released 1 ",rrr.unitName 2280 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2281 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2282 len(rrr.activeQ) 2283 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2284 "Inconsistent resource unit numbers" 2285 2286 class Destroyer(Process): 2287 def __init__(self): 2288 Process.__init__(self) 2289 2290 def destroy(self,whichProcesses): 2291 for i in whichProcesses: 2292 Process().cancel(i) 2293 yield hold,self,0 2294 2295 class Observer(Process): 2296 def __init__(self): 2297 Process.__init__(self) 2298 2299 def observe(self,step,processes,res): 2300 while now()<11: 2301 for i in processes: 2302 print " %s %s: act:%s, pass:%s, term: %s,interr:%s, qu:%s"\ 2303 %(now(),i.name,i.active(),i.passive(),i.terminated()\ 2304 ,i.interrupted(),i.queuing(res)) 2305 print 2306 yield hold,self,step 2307 2308 class UserControl(Process): 2309 def letUserInteract(self,when): 2310 yield hold,self,when 2311 startStepping() 2312 2313 print "****First case == priority queue, resource service not preemptable" 2314 initialize() 2315 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2316 preemptable=0) 2317 procs=[] 2318 for i in range(10): 2319 z=Aa(holdtime=i,name="Car "+str(i)) 2320 procs.append(z) 2321 activate(z,z.life(priority=i)) 2322 o=Observer() 2323 activate(o,o.observe(1,procs,rrr)) 2324 startStepping() 2325 a=simulate(until=10000,callback=askCancel) 2326 print "Input sequence: ",Aa.sequIn 2327 print "Output sequence: ",Aa.sequOut 2328 2329 print "\n****Second case == priority queue, resource service preemptable" 2330 initialize() 2331 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2332 preemptable=1) 2333 procs=[] 2334 for i in range(10): 2335 z=Aa(holdtime=i,name="Car "+str(i)) 2336 procs.append(z) 2337 activate(z,z.life(priority=i)) 2338 o=Observer() 2339 activate(o,o.observe(1,procs,rrr)) 2340 u=UserControl() 2341 activate(u,u.letUserInteract(4)) 2342 Aa.sequIn=[] 2343 Aa.sequOut=[] 2344 a=simulate(askCancel,until=10000) 2345 print a 2346 print "Input sequence: ",Aa.sequIn 2347 print "Output sequence: ",Aa.sequOut 2348
2349 - def test_interrupt():
2350 class Bus(Process): 2351 def __init__(self,name): 2352 Process.__init__(self,name)
2353 2354 def operate(self,repairduration=0): 2355 print now(),">> %s starts" %(self.name) 2356 tripleft = 1000 2357 while tripleft > 0: 2358 yield hold,self,tripleft 2359 if self.interrupted(): 2360 print "interrupted by %s" %self.interruptCause.name 2361 print "%s: %s breaks down " %(now(),self.name) 2362 tripleft=self.interruptLeft 2363 self.interruptReset() 2364 print "tripleft ",tripleft 2365 reactivate(br,delay=repairduration) # breakdowns only during operation 2366 yield hold,self,repairduration 2367 print now()," repaired" 2368 else: 2369 break # no breakdown, ergo bus arrived 2370 print now(),"<< %s done" %(self.name) 2371 2372 class Breakdown(Process): 2373 def __init__(self,myBus): 2374 Process.__init__(self,name="Breakdown "+myBus.name) 2375 self.bus=myBus 2376 2377 def breakBus(self,interval): 2378 2379 while True: 2380 yield hold,self,interval 2381 if self.bus.terminated(): break 2382 self.interrupt(self.bus) 2383 2384 print"\n\n+++test_interrupt" 2385 initialize() 2386 b=Bus("Bus 1") 2387 activate(b,b.operate(repairduration=20)) 2388 br=Breakdown(b) 2389 activate(br,br.breakBus(200)) 2390 startStepping() 2391 simulate(until=4000) 2392 2393
2394 - def testSimEvents():
2395 class Waiter(Process): 2396 def waiting(self,theSignal): 2397 while True: 2398 yield waitevent,self,theSignal 2399 print "%s: process '%s' continued after waiting for %s"%(now(),self.name,theSignal.name) 2400 yield queueevent,self,theSignal 2401 print "%s: process '%s' continued after queueing for %s"%(now(),self.name,theSignal.name)
2402 2403 class ORWaiter(Process): 2404 def waiting(self,signals): 2405 while True: 2406 yield waitevent,self,signals 2407 print now(),"one of %s signals occurred"%[x.name for x in signals] 2408 print "\t%s (fired/param)"%[(x.name,x.signalparam) for x in self.eventsFired] 2409 yield hold,self,1 2410 2411 class Caller(Process): 2412 def calling(self): 2413 while True: 2414 signal1.signal("wake up!") 2415 print "%s: signal 1 has occurred"%now() 2416 yield hold,self,10 2417 signal2.signal("and again") 2418 signal2.signal("sig 2 again") 2419 print "%s: signal1, signal2 have occurred"%now() 2420 yield hold,self,10 2421 print"\n+++testSimEvents output" 2422 initialize() 2423 signal1=SimEvent("signal 1") 2424 signal2=SimEvent("signal 2") 2425 signal1.signal("startup1") 2426 signal2.signal("startup2") 2427 w1=Waiter("waiting for signal 1") 2428 activate(w1,w1.waiting(signal1)) 2429 w2=Waiter("waiting for signal 2") 2430 activate(w2,w2.waiting(signal2)) 2431 w3=Waiter("also waiting for signal 2") 2432 activate(w3,w3.waiting(signal2)) 2433 w4=ORWaiter("waiting for either signal 1 or signal 2") 2434 activate(w4,w4.waiting([signal1,signal2]),prior=True) 2435 c=Caller("Caller") 2436 activate(c,c.calling()) 2437 print simulate(until=100) 2438
2439 - def testwaituntil():
2440 """ 2441 Demo of waitUntil capability. 2442 2443 Scenario: 2444 Three workers require sets of tools to do their jobs. Tools are shared, scarce 2445 resources for which they compete. 2446 """ 2447 2448 2449 class Worker(Process): 2450 def __init__(self,name,heNeeds=[]): 2451 Process.__init__(self,name) 2452 self.heNeeds=heNeeds
2453 def work(self): 2454 2455 def workerNeeds(): 2456 for item in self.heNeeds: 2457 if item.n==0: 2458 return False 2459 return True 2460 2461 while now()<8*60: 2462 yield waituntil,self,workerNeeds 2463 for item in self.heNeeds: 2464 yield request,self,item 2465 print "%s %s has %s and starts job" %(now(),self.name, 2466 [x.name for x in self.heNeeds]) 2467 yield hold,self,random.uniform(10,30) 2468 for item in self.heNeeds: 2469 yield release,self,item 2470 yield hold,self,2 #rest 2471 2472 print "\n+++ nwaituntil demo output" 2473 initialize() 2474 brush=Resource(capacity=1,name="brush") 2475 ladder=Resource(capacity=2,name="ladder") 2476 hammer=Resource(capacity=1,name="hammer") 2477 saw=Resource(capacity=1,name="saw") 2478 painter=Worker("painter",[brush,ladder]) 2479 activate(painter,painter.work()) 2480 roofer=Worker("roofer",[hammer,ladder,ladder]) 2481 activate(roofer,roofer.work()) 2482 treeguy=Worker("treeguy",[saw,ladder]) 2483 activate(treeguy,treeguy.work()) 2484 for who in (painter,roofer,treeguy): 2485 print "%s needs %s for his job" %(who.name,[x.name for x in who.heNeeds]) 2486 print 2487 print simulate(until=9*60) 2488 2489 2490 2491 2492 test_demo() 2493 test_interrupt() 2494 testSimEvents() 2495 testwaituntil() 2496