Package qm :: Package test :: Package classes :: Module thread_target
[hide private]
[frames] | no frames]

Source Code for Module qm.test.classes.thread_target

  1  ######################################################################## 
  2  # 
  3  # File:   thread_target.py 
  4  # Author: Mark Mitchell 
  5  # Date:   10/30/2001 
  6  # 
  7  # Contents: 
  8  #   QMTest ThreadTarget class. 
  9  # 
 10  # Copyright (c) 2001, 2002, 2003 by CodeSourcery, LLC.  All rights reserved.  
 11  # 
 12  # For license terms see the file COPYING. 
 13  # 
 14  ######################################################################## 
 15   
 16  ######################################################################## 
 17  # imports 
 18  ######################################################################## 
 19   
 20  from   qm.temporary_directory import TemporaryDirectory 
 21  from   qm.test.base import * 
 22  import qm.test.cmdline 
 23  from   qm.test.command_thread import * 
 24  from   qm.test.target import * 
 25  import Queue 
 26  from   threading import * 
 27   
 28  ######################################################################## 
 29  # classes 
 30  ######################################################################## 
 31   
32 -class LocalThread(CommandThread):
33 """A 'LocalThread' executes commands locally.""" 34
35 - def __init__(self, target):
36 37 CommandThread.__init__(self, target) 38 self.__temporary_directory = TemporaryDirectory()
39 40
41 - def _RunTest(self, descriptor, context):
42 """Run the test given by 'descriptor'. 43 44 'descriptor' -- The name of the test to be run. 45 46 'context' -- The 'Context' in which to run the test.""" 47 48 self.GetTarget()._RunTest(descriptor, context)
49 50
51 - def GetTemporaryDirectory(self):
52 """Return the path to the temporary directory for this thread. 53 54 returns -- The path to the temporary directory associated with 55 this thread.""" 56 57 return self.__temporary_directory.GetPath()
58 59 60
61 -class ThreadTarget(Target):
62 """A target implementation that runs tests in local threads. 63 64 Each thread executes one test or resource at a time.""" 65 66 arguments = [ 67 qm.fields.IntegerField( 68 name="threads", 69 title="Number of Threads", 70 description="""The number of threads to devote to running tests. 71 72 A positive integer that indicates the number of threads to 73 use when running tests. Larger numbers will allow more 74 tests to be run at once. You can experiment with this 75 value to find the number that results in the fastest 76 execution.""", 77 default_value=1), 78 ] 79
80 - def __init__(self, database, properties):
81 """Construct a 'ThreadTarget'. 82 83 'database' -- The 'Database' containing the tests that will be 84 run. 85 86 'properties' -- A dictionary mapping strings (property names) 87 to strings (property values).""" 88 89 # Initialize the base class. 90 Target.__init__(self, database, properties) 91 92 # Create a lock to guard accesses to __ready_threads. 93 self.__ready_threads_lock = Lock() 94 # Create a condition variable to guard accesses to the 95 # available resources table. 96 self.__resources_condition = Condition()
97 98
99 - def IsIdle(self):
100 """Return true if the target is idle. 101 102 returns -- True if the target is idle. If the target is idle, 103 additional tasks may be assigned to it.""" 104 105 # Acquire the lock. (Otherwise, a thread that terminates 106 # right as we are checking for idleness may alter 107 # __ready_threads.) 108 self.__ready_threads_lock.acquire() 109 110 # This target is idle if there are any ready threads. 111 if self.__ready_threads: 112 idle=1 113 else: 114 idle=0 115 116 # Release the lock. 117 self.__ready_threads_lock.release() 118 119 return idle
120 121
122 - def Start(self, response_queue, engine=None):
123 """Start the target. 124 125 'response_queue' -- The 'Queue' in which the results of test 126 executions are placed. 127 128 'engine' -- The 'ExecutionEngine' that is starting the target, 129 or 'None' if this target is being started without an 130 'ExecutionEngine'.""" 131 132 Target.Start(self, response_queue, engine) 133 134 # Build the threads. 135 self.__threads = [] 136 for i in xrange(0, self.threads): 137 # Create the new thread. 138 thread = LocalThread(self) 139 # Start the thread. 140 thread.start() 141 # Remember the thread. 142 self.__threads.append(thread) 143 144 # Initially, all threads are ready. 145 self.__ready_threads = self.__threads[:]
146 147
148 - def Stop(self):
149 """Stop the target. 150 151 postconditions -- The target may no longer be used.""" 152 153 Target.Stop(self) 154 155 # Send each thread a "quit" command. 156 for thread in self.__threads: 157 thread.Stop() 158 # Now wait for each thread process to finish. 159 for thread in self.__threads: 160 thread.join()
161 162
163 - def RunTest(self, descriptor, context):
164 """Run the test given by 'descriptor'. 165 166 'descriptor' -- The 'TestDescriptor' for the test. 167 168 'context' -- The 'Context' in which to run the test. 169 170 Derived classes may override this method.""" 171 172 self._Trace("About to dispatch test " + descriptor.GetId()) 173 174 self.__ready_threads_lock.acquire() 175 176 # The execution engine should never be trying to run a test 177 # when the target is not already idle. 178 assert self.__ready_threads 179 # Pick an idle thread to run the test. 180 thread = self.__ready_threads.pop(0) 181 182 self.__ready_threads_lock.release() 183 184 thread.RunTest(descriptor, context) 185 186 self._Trace("Finished dispatching test " + descriptor.GetId())
187 188
189 - def _RunTest(self, descriptor, context):
190 """Run the test given by 'descriptor'. 191 192 'descriptor' -- The 'TestDescriptor' for the test. 193 194 'context' -- The 'Context' in which to run the test. 195 196 This method will be called from the thread that has been 197 assigned the test.""" 198 199 Target.RunTest(self, descriptor, context)
200 201
202 - def _RecordResult(self, result):
203 """Record the 'result'. 204 205 'result' -- A 'Result' of a test or resource execution.""" 206 207 # If this is a test result, then this thread has finished all 208 # of its work. 209 if result.GetKind() == Result.TEST: 210 self._NoteIdleThread() 211 # Pass the result back to the execution engine. 212 Target._RecordResult(self, result)
213 214
215 - def _BeginResourceSetUp(self, resource_name):
216 """Begin setting up the indicated resource. 217 218 'resource_name' -- A string naming a resource. 219 220 returns -- If the resource has already been set up, returns a 221 tuple '(outcome, map)'. The 'outcome' indicates the outcome 222 that resulted when the resource was set up; the 'map' is a map 223 from strings to strings indicating properties added by this 224 resource. Otherwise, returns 'None', but marks the resource 225 as in the process of being set up; it is the caller's 226 responsibility to finish setting it up by calling 227 '_FinishResourceSetUp'.""" 228 229 # Acquire the lock. 230 self.__resources_condition.acquire() 231 try: 232 # Loop until either we are assigned to set up the resource 233 # or until some other thread has finished setting it up. 234 while 1: 235 rop = Target._BeginResourceSetUp(self, resource_name) 236 # If this is the first thread to call _BeginResourceSetUp 237 # for this thread will set up the resource. 238 if not rop: 239 return rop 240 # If this resource has already been set up, we do not 241 # need to do anything more. 242 if rop[1]: 243 return rop 244 # Otherwise, some other thread is in the process of 245 # setting up this resource so we just wait for it to 246 # finish its job. 247 self.__resources_condition.wait() 248 finally: 249 # Release the lock. 250 self.__resources_condition.release()
251 252
253 - def _FinishResourceSetUp(self, resource, result, properties):
254 255 # Acquire the lock. 256 self.__resources_condition.acquire() 257 # Record the fact that the resource is set up. 258 rop = Target._FinishResourceSetUp(self, resource, result, properties) 259 # Tell all the other threads that the resource has been set 260 # up. 261 self.__resources_condition.notifyAll() 262 # Release the lock. 263 self.__resources_condition.release() 264 265 return rop
266 267
268 - def _NoteIdleThread(self):
269 """Note that the current thread. 270 271 This method is called by the thread when it has completed a 272 task.""" 273 274 # Acquire the lock. (Otherwise, IsIdle might be called right 275 # as we are accessing __ready_threads.) 276 self.__ready_threads_lock.acquire() 277 278 # Now that we have acquired the lock make sure that we release 279 # it, even if an exception occurs. 280 try: 281 thread = currentThread() 282 assert thread not in self.__ready_threads 283 self.__ready_threads.append(thread) 284 finally: 285 # Release the lock. 286 self.__ready_threads_lock.release()
287 288
289 - def _Trace(self, message):
290 """Write a trace 'message'. 291 292 'message' -- A string to be output as a trace message.""" 293 294 if __debug__: 295 tracer = qm.test.cmdline.get_qmtest().GetTracer() 296 tracer.Write(message, "thread_target")
297 298
299 - def _GetTemporaryDirectory(self):
300 301 return currentThread().GetTemporaryDirectory()
302