Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import time 
  2  from sqlalchemy import and_ 
  3   
  4  from coprs import db 
  5  from coprs import exceptions 
  6  from coprs import helpers 
  7  from coprs import models 
  8  from coprs import signals 
  9  from coprs.logic import users_logic 
10 11 -class CoprsLogic(object):
12 13 """ 14 Used for manipulating Coprs. 15 16 All methods accept user object as a first argument, 17 as this may be needed in future. 18 """ 19 20 @classmethod
21 - def get_all(cls):
22 """ Return all coprs without those which are deleted. """ 23 query = (db.session.query(models.Copr) 24 .join(models.Copr.owner) 25 .options(db.contains_eager(models.Copr.owner)) 26 .filter(models.Copr.deleted == False)) 27 return query
28 29 @classmethod
30 - def get(cls, user, username, coprname, **kwargs):
31 with_builds = kwargs.get("with_builds", False) 32 with_mock_chroots = kwargs.get("with_mock_chroots", False) 33 34 query = (cls.get_all() 35 .filter(models.Copr.name == coprname) 36 .filter(models.User.openid_name == 37 models.User.openidize_name(username)) 38 ) 39 40 if with_builds: 41 query = (query.outerjoin(models.Copr.builds) 42 .options(db.contains_eager(models.Copr.builds)) 43 .order_by(models.Build.submitted_on.desc())) 44 45 if with_mock_chroots: 46 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 47 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 48 .order_by(models.MockChroot.os_release.asc()) 49 .order_by(models.MockChroot.os_version.asc()) 50 .order_by(models.MockChroot.arch.asc())) 51 52 return query
53 54 @classmethod
55 - def get_multiple(cls, user, **kwargs):
56 user_relation = kwargs.get("user_relation", None) 57 username = kwargs.get("username", None) 58 with_mock_chroots = kwargs.get("with_mock_chroots", None) 59 with_builds = kwargs.get("with_builds", None) 60 incl_deleted = kwargs.get("incl_deleted", None) 61 ids = kwargs.get("ids", None) 62 63 query = (db.session.query(models.Copr) 64 .join(models.Copr.owner) 65 .options(db.contains_eager(models.Copr.owner)) 66 .order_by(models.Copr.id.desc())) 67 68 if not incl_deleted: 69 query = query.filter(models.Copr.deleted == False) 70 71 if isinstance(ids, list): # can be an empty list 72 query = query.filter(models.Copr.id.in_(ids)) 73 74 if user_relation == "owned": 75 query = query.filter( 76 models.User.openid_name == models.User.openidize_name(username)) 77 elif user_relation == "allowed": 78 aliased_user = db.aliased(models.User) 79 80 query = (query.join(models.CoprPermission, 81 models.Copr.copr_permissions) 82 .filter(models.CoprPermission.copr_builder == 83 helpers.PermissionEnum('approved')) 84 .join(aliased_user, models.CoprPermission.user) 85 .filter(aliased_user.openid_name == 86 models.User.openidize_name(username))) 87 88 if with_mock_chroots: 89 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 90 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 91 .order_by(models.MockChroot.os_release.asc()) 92 .order_by(models.MockChroot.os_version.asc()) 93 .order_by(models.MockChroot.arch.asc())) 94 95 if with_builds: 96 query = (query.outerjoin(models.Copr.builds) 97 .options(db.contains_eager(models.Copr.builds)) 98 .order_by(models.Build.submitted_on.desc())) 99 100 return query
101 102 @classmethod
103 - def get_playground(cls):
104 return cls.get_all().filter(models.Copr.playground == True)
105 106 @classmethod
107 - def set_playground(cls, user, copr):
108 if user.admin: 109 db.session.add(copr) 110 pass 111 else: 112 raise exceptions.InsufficientRightsException( 113 "User is not a system admin")
114 115 @classmethod
116 - def get_multiple_fulltext(cls, user, search_string):
117 query = (models.Copr.query.join(models.User) 118 .filter(models.Copr.deleted == False)) 119 if "/" in search_string: 120 # searching for user/project 121 name = "%{}%".format(search_string.split("/")[0]) 122 project = "%{}%".format(search_string.split("/")[1]) 123 query = query.filter(and_(models.User.openid_name.ilike(name), 124 models.Copr.name.ilike(project))) 125 else: 126 # fulltext search 127 query = query.whooshee_search(search_string) 128 return query
129 130 @classmethod
131 - def add(cls, user, name, repos, selected_chroots, description, 132 instructions, check_for_duplicates=False):
133 copr = models.Copr(name=name, 134 repos=repos, 135 owner=user, 136 description=description, 137 instructions=instructions, 138 created_on=int(time.time())) 139 140 # form validation checks for duplicates 141 CoprsLogic.new(user, copr, 142 check_for_duplicates=check_for_duplicates) 143 CoprChrootsLogic.new_from_names(user, copr, 144 selected_chroots) 145 return copr
146 147 @classmethod
148 - def new(cls, user, copr, check_for_duplicates=True):
149 if check_for_duplicates and cls.exists_for_user(user, copr.name).all(): 150 raise exceptions.DuplicateException( 151 "Copr: '{0}' already exists".format(copr.name)) 152 signals.copr_created.send(cls, copr=copr) 153 db.session.add(copr)
154 155 @classmethod
156 - def update(cls, user, copr, check_for_duplicates=True):
157 cls.raise_if_unfinished_blocking_action( 158 user, copr, "Can't change this project name," 159 " another operation is in progress: {action}") 160 161 users_logic.UsersLogic.raise_if_cant_update_copr( 162 user, copr, "Only owners and admins may update their projects.") 163 164 existing = cls.exists_for_user(copr.owner, copr.name).first() 165 if existing: 166 if check_for_duplicates and existing.id != copr.id: 167 raise exceptions.DuplicateException( 168 "Project: '{0}' already exists".format(copr.name)) 169 170 else: # we're renaming 171 # if we fire a models.Copr.query, it will use the modified copr in session 172 # -> workaround this by just getting the name 173 old_copr_name = (db.session.query(models.Copr.name) 174 .filter(models.Copr.id == copr.id) 175 .filter(models.Copr.deleted == False) 176 .first()[0]) 177 178 action = models.Action(action_type=helpers.ActionTypeEnum("rename"), 179 object_type="copr", 180 object_id=copr.id, 181 old_value="{0}/{1}".format(copr.owner.name, 182 old_copr_name), 183 new_value="{0}/{1}".format(copr.owner.name, 184 copr.name), 185 created_on=int(time.time())) 186 db.session.add(action) 187 db.session.add(copr)
188 189 @classmethod
190 - def delete(cls, user, copr, check_for_duplicates=True):
191 cls.raise_if_cant_delete(user, copr) 192 # TODO: do we want to dump the information somewhere, so that we can 193 # search it in future? 194 cls.raise_if_unfinished_blocking_action( 195 user, copr, "Can't delete this project," 196 " another operation is in progress: {action}") 197 198 action = models.Action(action_type=helpers.ActionTypeEnum("delete"), 199 object_type="copr", 200 object_id=copr.id, 201 old_value="{0}/{1}".format(copr.owner.name, 202 copr.name), 203 new_value="", 204 created_on=int(time.time())) 205 copr.deleted = True 206 207 db.session.add(action) 208 209 return copr
210 211 @classmethod
212 - def exists_for_user(cls, user, coprname, incl_deleted=False):
213 existing = (models.Copr.query 214 .filter(models.Copr.name == coprname) 215 .filter(models.Copr.owner_id == user.id)) 216 217 if not incl_deleted: 218 existing = existing.filter(models.Copr.deleted == False) 219 220 return existing
221 222 @classmethod
223 - def unfinished_blocking_actions_for(cls, user, copr):
224 blocking_actions = [helpers.ActionTypeEnum("rename"), 225 helpers.ActionTypeEnum("delete")] 226 227 actions = (models.Action.query 228 .filter(models.Action.object_type == "copr") 229 .filter(models.Action.object_id == copr.id) 230 .filter(models.Action.result == 231 helpers.BackendResultEnum("waiting")) 232 .filter(models.Action.action_type.in_(blocking_actions))) 233 234 return actions
235 236 @classmethod
237 - def raise_if_unfinished_blocking_action(cls, user, copr, message):
238 """ 239 Raise ActionInProgressException if given copr has an unfinished 240 action. Return None otherwise. 241 """ 242 243 unfinished_actions = cls.unfinished_blocking_actions_for( 244 user, copr).all() 245 if unfinished_actions: 246 raise exceptions.ActionInProgressException( 247 message, unfinished_actions[0])
248 249 @classmethod
250 - def raise_if_cant_delete(cls, user, copr):
251 """ 252 Raise InsufficientRightsException if given copr cant be deleted 253 by given user. Return None otherwise. 254 """ 255 256 if not user.admin and user != copr.owner: 257 raise exceptions.InsufficientRightsException( 258 "Only owners may delete their projects.")
259
260 261 -class CoprPermissionsLogic(object):
262 263 @classmethod
264 - def get(cls, user, copr, searched_user):
265 query = (models.CoprPermission.query 266 .filter(models.CoprPermission.copr == copr) 267 .filter(models.CoprPermission.user == searched_user)) 268 269 return query
270 271 @classmethod
272 - def get_for_copr(cls, user, copr):
273 query = models.CoprPermission.query.filter( 274 models.CoprPermission.copr == copr) 275 276 return query
277 278 @classmethod
279 - def new(cls, user, copr_permission):
280 db.session.add(copr_permission)
281 282 @classmethod
283 - def update_permissions(cls, user, copr, copr_permission, 284 new_builder, new_admin):
285 286 users_logic.UsersLogic.raise_if_cant_update_copr( 287 user, copr, "Only owners and admins may update" 288 " their projects permissions.") 289 290 (models.CoprPermission.query 291 .filter(models.CoprPermission.copr_id == copr.id) 292 .filter(models.CoprPermission.user_id == copr_permission.user_id) 293 .update({"copr_builder": new_builder, 294 "copr_admin": new_admin}))
295 296 @classmethod
297 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
298 if copr_permission: 299 # preserve approved permissions if set 300 if (not new_builder or copr_permission.copr_builder != 301 helpers.PermissionEnum("approved")): 302 303 copr_permission.copr_builder = new_builder 304 305 if (not new_admin or copr_permission.copr_admin != 306 helpers.PermissionEnum("approved")): 307 308 copr_permission.copr_admin = new_admin 309 else: 310 perm = models.CoprPermission( 311 user=user, 312 copr=copr, 313 copr_builder=new_builder, 314 copr_admin=new_admin) 315 316 cls.new(user, perm)
317 318 @classmethod
319 - def delete(cls, user, copr_permission):
320 db.session.delete(copr_permission)
321
322 323 -class CoprChrootsLogic(object):
324 325 @classmethod
326 - def mock_chroots_from_names(cls, user, names):
327 db_chroots = models.MockChroot.query.all() 328 mock_chroots = [] 329 for ch in db_chroots: 330 if ch.name in names: 331 mock_chroots.append(ch) 332 333 return mock_chroots
334 335 @classmethod
336 - def new(cls, user, mock_chroot):
337 db.session.add(mock_chroot)
338 339 @classmethod
340 - def new_from_names(cls, user, copr, names):
344 345 @classmethod
346 - def update_buildroot_pkgs(cls, copr, chroot, buildroot_pkgs):
347 copr_chroot = copr.check_copr_chroot(chroot) 348 if copr_chroot: 349 copr_chroot.buildroot_pkgs = buildroot_pkgs 350 db.session.add(copr_chroot)
351 352 @classmethod
353 - def update_from_names(cls, user, copr, names):
354 current_chroots = copr.mock_chroots 355 new_chroots = cls.mock_chroots_from_names(user, names) 356 # add non-existing 357 for mock_chroot in new_chroots: 358 if mock_chroot not in current_chroots: 359 db.session.add( 360 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 361 362 # delete no more present 363 to_remove = [] 364 for mock_chroot in current_chroots: 365 if mock_chroot not in new_chroots: 366 # can't delete here, it would change current_chroots and break 367 # iteration 368 to_remove.append(mock_chroot) 369 370 for mc in to_remove: 371 copr.mock_chroots.remove(mc)
372
373 374 -class MockChrootsLogic(object):
375 376 @classmethod
377 - def get(cls, user, os_release, os_version, arch, active_only=False, noarch=False):
378 if noarch and not arch: 379 return (models.MockChroot.query 380 .filter(models.MockChroot.os_release == os_release, 381 models.MockChroot.os_version == os_version)) 382 383 return (models.MockChroot.query 384 .filter(models.MockChroot.os_release == os_release, 385 models.MockChroot.os_version == os_version, 386 models.MockChroot.arch == arch))
387 388 @classmethod
389 - def get_from_name(cls, chroot_name, active_only=False, noarch=False):
390 """ 391 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 392 the architecture could be optional with noarch=True 393 394 Return MockChroot object for textual representation of chroot 395 """ 396 397 name_tuple = cls.tuple_from_name(None, chroot_name, noarch=noarch) 398 return cls.get(None, name_tuple[0], name_tuple[1], name_tuple[2], 399 active_only=active_only, noarch=noarch)
400 401 @classmethod
402 - def get_multiple(cls, user, active_only=False):
403 query = models.MockChroot.query 404 if active_only: 405 query = query.filter(models.MockChroot.is_active == True) 406 return query
407 408 @classmethod
409 - def add(cls, user, name):
410 name_tuple = cls.tuple_from_name(user, name) 411 if cls.get(user, *name_tuple).first(): 412 raise exceptions.DuplicateException( 413 "Mock chroot with this name already exists.") 414 new_chroot = models.MockChroot(os_release=name_tuple[0], 415 os_version=name_tuple[1], 416 arch=name_tuple[2]) 417 cls.new(user, new_chroot) 418 return new_chroot
419 420 @classmethod
421 - def new(cls, user, mock_chroot):
422 db.session.add(mock_chroot)
423 424 @classmethod
425 - def edit_by_name(cls, user, name, is_active):
426 name_tuple = cls.tuple_from_name(user, name) 427 mock_chroot = cls.get(user, *name_tuple).first() 428 if not mock_chroot: 429 raise exceptions.NotFoundException( 430 "Mock chroot with this name doesn't exist.") 431 432 mock_chroot.is_active = is_active 433 cls.update(user, mock_chroot) 434 return mock_chroot
435 436 @classmethod
437 - def update(cls, user, mock_chroot):
438 db.session.add(mock_chroot)
439 440 @classmethod
441 - def delete_by_name(cls, user, name):
442 name_tuple = cls.tuple_from_name(user, name) 443 mock_chroot = cls.get(user, *name_tuple).first() 444 if not mock_chroot: 445 raise exceptions.NotFoundException( 446 "Mock chroot with this name doesn't exist.") 447 448 cls.delete(user, mock_chroot)
449 450 @classmethod
451 - def delete(cls, user, mock_chroot):
452 db.session.delete(mock_chroot)
453 454 @classmethod
455 - def tuple_from_name(cls, user, name, noarch=False):
456 """ 457 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 458 459 the architecture could be optional with noarch=True 460 461 returns ("os", "vetsion", "arch") or ("os", "version", None) 462 """ 463 splitted_name = name.split("-") 464 if (not noarch and len(splitted_name) is not 3)\ 465 or (noarch and len(splitted_name) not in (2,3)): 466 raise exceptions.MalformedArgumentException( 467 "Chroot name is not valid") 468 469 if noarch: 470 splitted_name.append(None) 471 472 return tuple(splitted_name)
473