Package qm :: Package test :: Package classes :: Module sql_result_stream
[hide private]
[frames] | no frames]

Source Code for Module qm.test.classes.sql_result_stream

  1  ######################################################################## 
  2  # 
  3  # File:   sql_result_stream.py 
  4  # Author: Nathaniel Smith <njs@codesourcery.com> 
  5  # Date:   2003-06-13 
  6  # 
  7  # Contents: 
  8  #   SQLResultStream, SQLResultSource 
  9  # 
 10  # Copyright (c) 2003 by CodeSourcery, LLC.  All rights reserved.  
 11  # 
 12  # For license terms see the file COPYING. 
 13  # 
 14  ######################################################################## 
 15   
 16  ######################################################################## 
 17  # Imports 
 18  ######################################################################## 
 19   
 20  import qm.fields 
 21  from qm.extension          import Extension 
 22  from qm.test.result_stream import ResultStream 
 23  from qm.test.result_reader import ResultReader 
 24  from qm.db                 import quote_string, Connection 
 25  from qm.test.result        import Result 
 26   
 27  ######################################################################## 
 28  # Classes 
 29  ######################################################################## 
 30   
31 -class _SQLConnected(Extension):
32 """Mixin class for classes that need a database connection.""" 33 34 arguments = [ 35 qm.fields.TextField( 36 name = "db_name", 37 title = "Database name", 38 description = "The PostgreSQL database to connect to.", 39 verbatim = "true", 40 default_value = ""), 41 qm.fields.TextField( 42 name = "db_module", 43 title = "Database module", 44 description = "The DB 2.0 module to use.", 45 verbatim = "true", 46 default_value = "pgdb"), 47 qm.fields.PythonField( 48 name = "connection"), 49 ] 50
51 - def __init__(self, arguments = None, **args):
52 53 if arguments: args.update(arguments) 54 super(_SQLConnected, self).__init__(**args) 55 56 if not self.connection: 57 self.connection = Connection(self.db_module, 58 database=self.db_name)
59 60 61
62 -class SQLResultStream(ResultStream, _SQLConnected):
63 """A 'SQLResultStream' writes results out to an SQL database. 64 65 This class currently supports PostgreSQL only.""" 66 67
68 - def __init__(self, arguments = None, **args):
69 70 super(SQLResultStream, self).__init__(arguments, **args) 71 72 run_id_cursor = self.connection.execute(""" 73 SELECT nextval('run_id_seq'); 74 """) 75 (self._run_id,) = run_id_cursor.fetchone() 76 77 self.connection.execute(""" 78 INSERT INTO runs (run_id) VALUES (%i) 79 """ % (self._run_id,))
80 81
82 - def WriteAnnotation(self, key, value):
83 84 self.connection.execute(""" 85 INSERT INTO run_annotations (run_id, key, value) 86 VALUES (%i, %s, %s) 87 """ % (self._run_id, 88 quote_string(key), 89 quote_string(value)))
90 91
92 - def WriteResult(self, result):
93 94 self.connection.execute(""" 95 INSERT INTO results (run_id, result_id, kind, outcome) 96 VALUES (%i, %s, %s, %s) 97 """ % (self._run_id, 98 quote_string(result.GetId()), 99 quote_string(result.GetKind()), 100 quote_string(result.GetOutcome()))) 101 102 for key, value in result.items(): 103 self.connection.execute(""" 104 INSERT INTO result_annotations (run_id, 105 result_id, 106 result_kind, 107 key, 108 value) 109 VALUES (%i, %s, %s, %s, %s) 110 """ % (self._run_id, 111 quote_string(result.GetId()), 112 quote_string(result.GetKind()), 113 quote_string(key), 114 quote_string(value)))
115 116
117 - def Summarize(self):
118 119 self.connection.commit()
120 121 122
123 -class _Buffer:
124 """A little buffering iterator with one-element rewind.""" 125
126 - def __init__(self, size, get_more):
127 """Create a '_Buffer'. 128 129 'size' -- the number of items to hold in the buffer at a time. 130 131 'get_more' -- a function taking a number as its sole argument; 132 should return a list of that many new items (or as 133 many items are left, whichever is less). 134 """ 135 136 self.size = size 137 self.get_more = get_more 138 self.buffer = get_more(size) 139 self.idx = 0 140 # Needed for rewinding over buffer refills: 141 self.last = None
142 143
144 - def next(self):
145 """Returns the next item, refilling the buffer if necessary.""" 146 147 idx = self.idx 148 if idx == len(self.buffer): 149 self.buffer = self.get_more(self.size) 150 self.idx = 0 151 idx = 0 152 if not self.buffer: 153 raise StopIteration 154 self.idx += 1 155 self.last = self.buffer[idx] 156 return self.buffer[idx]
157 158
159 - def rewind(self):
160 161 if self.idx == 0: 162 self.buffer.insert(0, self.last) 163 else: 164 self.idx -= 1
165 166
167 - def __iter__(self):
168 169 return self
170 171 172
173 -class SQLResultReader(ResultReader, _SQLConnected):
174 """A 'SQLResultReader' reads result in from an SQL database. 175 176 This class currently supports PostgreSQL only.""" 177 178 arguments = [ 179 qm.fields.IntegerField( 180 name = "run_id", 181 title = "Run ID", 182 ), 183 ] 184
185 - def __init__(self, arguments = None, **args):
186 187 super(SQLResultReader, self).__init__(arguments, **args) 188 189 self._batch_size = 1000 190 191 self._LoadAnnotations() 192 self._SetupResultCursors()
193 194
195 - def _LoadAnnotations(self):
196 197 cursor = self.connection.execute(""" 198 SELECT key, value FROM run_annotations 199 WHERE run_id = %i 200 """ % (self.run_id)) 201 202 self._annotations = dict(iter(cursor.fetchone, None))
203 204
205 - def GetAnnotations(self):
206 207 return self._annotations
208 209
210 - def _SetupResultCursors(self):
211 212 # Set up our two result cursors. 213 self.connection.execute(""" 214 DECLARE results_c CURSOR FOR 215 SELECT result_id, kind, outcome FROM results 216 WHERE run_id = %i 217 ORDER BY result_id, kind 218 """ % (self.run_id,)) 219 self.connection.execute(""" 220 DECLARE annote_c CURSOR FOR 221 SELECT result_id, result_kind, key, value 222 FROM result_annotations WHERE run_id = %i 223 ORDER BY result_id, result_kind 224 """ % (self.run_id,)) 225 226 def get_more_results(num): 227 return self.connection.execute(""" 228 FETCH FORWARD %i FROM results_c 229 """ % (num,)).fetchall()
230 def get_more_annotations(num): 231 return self.connection.execute(""" 232 FETCH FORWARD %i FROM annote_c 233 """ % (num,)).fetchall()
234 235 self._r_buffer = _Buffer(self._batch_size, get_more_results) 236 self._a_buffer = _Buffer(self._batch_size, get_more_annotations) 237 238
239 - def GetResult(self):
240 241 try: 242 id, kind, outcome = self._r_buffer.next() 243 except StopIteration: 244 return None 245 annotations = {} 246 for result_id, result_kind, key, value in self._a_buffer: 247 if (result_id, result_kind) != (id, kind): 248 self._a_buffer.rewind() 249 break 250 annotations[key] = value 251 return Result(kind, id, outcome, annotations)
252 253 ######################################################################## 254 # Local Variables: 255 # mode: python 256 # indent-tabs-mode: nil 257 # fill-column: 72 258 # End: 259