Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import os 
  2  import time 
  3  import datetime 
  4   
  5  from sqlalchemy import and_ 
  6  from sqlalchemy.sql import func 
  7  from sqlalchemy import asc 
  8  from sqlalchemy.event import listen 
  9  from sqlalchemy.orm.attributes import NEVER_SET 
 10  from sqlalchemy.orm.exc import NoResultFound 
 11  from sqlalchemy.orm.attributes import get_history 
 12   
 13  from copr_common.enums import ActionTypeEnum, BackendResultEnum 
 14  from coprs import db 
 15  from coprs import exceptions 
 16  from coprs import helpers 
 17  from coprs import models 
 18  from coprs.exceptions import MalformedArgumentException, BadRequest 
 19  from coprs.logic import users_logic 
 20  from coprs.whoosheers import CoprWhoosheer 
 21  from coprs.helpers import fix_protocol_for_backend 
 22   
 23  from coprs.logic.actions_logic import ActionsLogic 
 24  from coprs.logic.users_logic import UsersLogic 
25 26 27 -class CoprsLogic(object):
28 """ 29 Used for manipulating Coprs. 30 31 All methods accept user object as a first argument, 32 as this may be needed in future. 33 """ 34 35 @classmethod
36 - def get_all(cls):
37 """ Return all coprs without those which are deleted. """ 38 query = (db.session.query(models.Copr) 39 .join(models.Copr.user) 40 .options(db.contains_eager(models.Copr.user)) 41 .filter(models.Copr.deleted == False)) 42 return query
43 44 @classmethod
45 - def get_by_id(cls, copr_id):
46 return cls.get_all().filter(models.Copr.id == copr_id)
47 48 @classmethod
49 - def attach_build(cls, query):
50 query = (query.outerjoin(models.Copr.builds) 51 .options(db.contains_eager(models.Copr.builds)) 52 .order_by(models.Build.submitted_on.desc())) 53 return query
54 55 @classmethod
56 - def attach_mock_chroots(cls, query):
57 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 58 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 59 .order_by(models.MockChroot.os_release.asc()) 60 .order_by(models.MockChroot.os_version.asc()) 61 .order_by(models.MockChroot.arch.asc())) 62 return query
63 64 @classmethod
65 - def get_multiple_by_username(cls, username, **kwargs):
66 with_builds = kwargs.get("with_builds", False) 67 with_mock_chroots = kwargs.get("with_mock_chroots", False) 68 69 query = ( 70 cls.get_all() 71 .filter(models.User.username == username) 72 ) 73 74 if with_builds: 75 query = cls.attach_build(query) 76 77 if with_mock_chroots: 78 query = cls.attach_mock_chroots(query) 79 80 return query
81 82 @classmethod
83 - def get_multiple_by_group_id(cls, group_id, **kwargs):
84 with_builds = kwargs.get("with_builds", False) 85 with_mock_chroots = kwargs.get("with_mock_chroots", False) 86 87 query = ( 88 cls.get_all() 89 .filter(models.Copr.group_id == group_id) 90 ) 91 92 if with_builds: 93 query = cls.attach_build(query) 94 95 if with_mock_chroots: 96 query = cls.attach_mock_chroots(query) 97 98 return query
99 100 @classmethod
101 - def get(cls, username, coprname, **kwargs):
102 query = cls.get_multiple_by_username(username, **kwargs) 103 query = query.filter(models.Copr.name == coprname) 104 return query
105 106 @classmethod
107 - def get_by_group_id(cls, group_id, coprname, **kwargs):
108 query = cls.get_multiple_by_group_id(group_id, **kwargs) 109 query = query.filter(models.Copr.name == coprname) 110 return query
111 112 @classmethod
113 - def get_multiple(cls, include_deleted=False, include_unlisted_on_hp=True):
114 query = ( 115 db.session.query(models.Copr) 116 .join(models.Copr.user) 117 .outerjoin(models.Group) 118 .options(db.contains_eager(models.Copr.user)) 119 ) 120 121 if not include_deleted: 122 query = query.filter(models.Copr.deleted.is_(False)) 123 124 if not include_unlisted_on_hp: 125 query = query.filter(models.Copr.unlisted_on_hp.is_(False)) 126 127 return query
128 129 @classmethod
130 - def set_query_order(cls, query, desc=False):
131 if desc: 132 query = query.order_by(models.Copr.id.desc()) 133 else: 134 query = query.order_by(models.Copr.id.asc()) 135 return query
136 137 # user_relation="owned", username=username, with_mock_chroots=False 138 @classmethod
139 - def get_multiple_owned_by_username(cls, username):
140 query = cls.get_multiple() 141 return query.filter(models.User.username == username)
142 143 @classmethod
144 - def filter_by_name(cls, query, name):
145 return query.filter(models.Copr.name == name)
146 147 @classmethod
148 - def filter_by_user_name(cls, query, username):
149 # should be already joined with the User table 150 return query.filter(models.User.username == username)
151 152 @classmethod
153 - def filter_by_group_name(cls, query, group_name):
154 # should be already joined with the Group table 155 return query.filter(models.Group.name == group_name)
156 157 @classmethod
158 - def filter_without_group_projects(cls, query):
159 return query.filter(models.Copr.group_id.is_(None))
160 161 @classmethod
162 - def join_builds(cls, query):
163 return (query.outerjoin(models.Copr.builds) 164 .options(db.contains_eager(models.Copr.builds)) 165 .order_by(models.Build.submitted_on.desc()))
166 167 @classmethod
168 - def join_mock_chroots(cls, query):
169 return (query.outerjoin(*models.Copr.mock_chroots.attr) 170 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 171 .order_by(models.MockChroot.os_release.asc()) 172 .order_by(models.MockChroot.os_version.asc()) 173 .order_by(models.MockChroot.arch.asc()))
174 175 @classmethod
176 - def get_playground(cls):
177 return cls.get_all().filter(models.Copr.playground == True)
178 179 @classmethod
180 - def set_playground(cls, user, copr):
181 if user.admin: 182 db.session.add(copr) 183 pass 184 else: 185 raise exceptions.InsufficientRightsException( 186 "User is not a system admin")
187 188 @classmethod
189 - def get_multiple_fulltext(cls, search_string):
190 query = (models.Copr.query.join(models.User) 191 .filter(models.Copr.deleted == False)) 192 if "/" in search_string: # copr search by its full name 193 if search_string[0] == '@': # searching for @group/project 194 group_name = "%{}%".format(search_string.split("/")[0][1:]) 195 project = "%{}%".format(search_string.split("/")[1]) 196 query = query.filter(and_(models.Group.name.ilike(group_name), 197 models.Copr.name.ilike(project), 198 models.Group.id == models.Copr.group_id)) 199 query = query.order_by(asc(func.length(models.Group.name)+func.length(models.Copr.name))) 200 else: # searching for user/project 201 user_name = "%{}%".format(search_string.split("/")[0]) 202 project = "%{}%".format(search_string.split("/")[1]) 203 query = query.filter(and_(models.User.username.ilike(user_name), 204 models.Copr.name.ilike(project), 205 models.User.id == models.Copr.user_id)) 206 query = query.order_by(asc(func.length(models.User.username)+func.length(models.Copr.name))) 207 else: # fulltext search 208 query = query.whooshee_search(search_string, whoosheer=CoprWhoosheer, order_by_relevance=100) 209 return query
210 211 @classmethod
212 - def add(cls, user, name, selected_chroots, repos=None, description=None, 213 instructions=None, check_for_duplicates=False, group=None, persistent=False, 214 auto_prune=True, use_bootstrap_container=False, follow_fedora_branching=False, **kwargs):
215 216 if not user.admin and persistent: 217 raise exceptions.NonAdminCannotCreatePersistentProject() 218 219 if not user.admin and not auto_prune: 220 raise exceptions.NonAdminCannotDisableAutoPrunning() 221 222 # form validation checks for duplicates 223 cls.new(user, name, group, check_for_duplicates=check_for_duplicates) 224 225 copr = models.Copr(name=name, 226 repos=repos or u"", 227 user=user, 228 description=description or u"", 229 instructions=instructions or u"", 230 created_on=int(time.time()), 231 persistent=persistent, 232 auto_prune=auto_prune, 233 use_bootstrap_container=use_bootstrap_container, 234 follow_fedora_branching=follow_fedora_branching, 235 **kwargs) 236 237 238 if group is not None: 239 UsersLogic.raise_if_not_in_group(user, group) 240 copr.group = group 241 242 copr_dir = models.CoprDir( 243 main=True, 244 name=name, 245 copr=copr) 246 247 db.session.add(copr_dir) 248 db.session.add(copr) 249 250 CoprChrootsLogic.new_from_names( 251 copr, selected_chroots) 252 253 db.session.flush() 254 ActionsLogic.send_create_gpg_key(copr) 255 256 return copr
257 258 @classmethod
259 - def new(cls, user, copr_name, group=None, check_for_duplicates=True):
260 if check_for_duplicates: 261 if group is None and cls.exists_for_user(user, copr_name).all(): 262 raise exceptions.DuplicateException( 263 "Copr: '{0}/{1}' already exists".format(user.name, copr_name)) 264 elif group: 265 if cls.exists_for_group(group, copr_name).all(): 266 db.session.rollback() 267 raise exceptions.DuplicateException( 268 "Copr: '@{0}/{1}' already exists".format(group.name, copr_name))
269 270 @classmethod
271 - def update(cls, user, copr):
272 # we should call get_history before other requests, otherwise 273 # the changes would be forgotten 274 if get_history(copr, "name").has_changes(): 275 raise MalformedArgumentException("Change name of the project is forbidden") 276 277 users_logic.UsersLogic.raise_if_cant_update_copr( 278 user, copr, "Only owners and admins may update their projects.") 279 280 if not user.admin and not copr.auto_prune: 281 raise exceptions.NonAdminCannotDisableAutoPrunning() 282 283 db.session.add(copr)
284 285 @classmethod
286 - def delete_unsafe(cls, user, copr):
287 """ 288 Deletes copr without termination of ongoing builds. 289 """ 290 cls.raise_if_cant_delete(user, copr) 291 # TODO: do we want to dump the information somewhere, so that we can 292 # search it in future? 293 cls.raise_if_unfinished_blocking_action( 294 copr, "Can't delete this project," 295 " another operation is in progress: {action}") 296 297 ActionsLogic.send_delete_copr(copr) 298 CoprDirsLogic.delete_all_by_copr(copr) 299 300 copr.deleted = True 301 return copr
302 303 @classmethod
304 - def exists_for_user(cls, user, coprname, incl_deleted=False):
305 existing = (models.Copr.query 306 .filter(models.Copr.name == coprname) 307 .filter(models.Copr.user_id == user.id)) 308 309 if not incl_deleted: 310 existing = existing.filter(models.Copr.deleted == False) 311 312 return cls.filter_without_group_projects(existing)
313 314 @classmethod
315 - def exists_for_group(cls, group, coprname, incl_deleted=False):
316 existing = (models.Copr.query 317 .filter(models.Copr.name == coprname) 318 .filter(models.Copr.group_id == group.id)) 319 320 if not incl_deleted: 321 existing = existing.filter(models.Copr.deleted == False) 322 323 return existing
324 325 @classmethod
326 - def unfinished_blocking_actions_for(cls, copr):
327 blocking_actions = [ActionTypeEnum("delete")] 328 329 actions = (models.Action.query 330 .filter(models.Action.object_type == "copr") 331 .filter(models.Action.object_id == copr.id) 332 .filter(models.Action.result == 333 BackendResultEnum("waiting")) 334 .filter(models.Action.action_type.in_(blocking_actions))) 335 336 return actions
337 338 @classmethod
339 - def get_yum_repos(cls, copr, empty=False):
340 repos = {} 341 release_tmpl = "{chroot.os_release}-{chroot.os_version}-{chroot.arch}" 342 build = models.Build.query.filter(models.Build.copr_id == copr.id).first() 343 if build or empty: 344 for chroot in copr.active_chroots: 345 release = release_tmpl.format(chroot=chroot) 346 repos[release] = fix_protocol_for_backend( 347 os.path.join(copr.repo_url, release + '/')) 348 return repos
349 350 @classmethod
351 - def raise_if_unfinished_blocking_action(cls, copr, message):
352 """ 353 Raise ActionInProgressException if given copr has an unfinished 354 action. Return None otherwise. 355 """ 356 357 unfinished_actions = cls.unfinished_blocking_actions_for(copr).all() 358 if unfinished_actions: 359 raise exceptions.ActionInProgressException( 360 message, unfinished_actions[0])
361 362 @classmethod
363 - def raise_if_cant_delete(cls, user, copr):
364 """ 365 Raise InsufficientRightsException if given copr cant be deleted 366 by given user. Return None otherwise. 367 """ 368 369 if not user.admin and user != copr.user: 370 raise exceptions.InsufficientRightsException( 371 "Only owners may delete their projects.")
372 373 @classmethod
374 - def delete_expired_projects(cls):
375 query = ( 376 models.Copr.query 377 .filter(models.Copr.delete_after.isnot(None)) 378 .filter(models.Copr.delete_after < datetime.datetime.now()) 379 .filter(models.Copr.deleted.isnot(True)) 380 ) 381 for copr in query.all(): 382 print("deleting project '{}'".format(copr.full_name)) 383 CoprsLogic.delete_unsafe(copr.user, copr)
384
385 386 387 -class CoprPermissionsLogic(object):
388 @classmethod
389 - def get(cls, copr, searched_user):
390 query = (models.CoprPermission.query 391 .filter(models.CoprPermission.copr == copr) 392 .filter(models.CoprPermission.user == searched_user)) 393 394 return query
395 396 @classmethod
397 - def get_for_copr(cls, copr):
398 query = models.CoprPermission.query.filter( 399 models.CoprPermission.copr == copr) 400 401 return query
402 403 @classmethod
404 - def get_admins_for_copr(cls, copr):
405 permissions = cls.get_for_copr(copr) 406 return [copr.user] + [p.user for p in permissions if p.copr_admin == helpers.PermissionEnum("approved")]
407 408 @classmethod
409 - def new(cls, copr_permission):
410 db.session.add(copr_permission)
411 412 @classmethod
413 - def update_permissions(cls, user, copr, copr_permission, 414 new_builder, new_admin):
415 416 users_logic.UsersLogic.raise_if_cant_update_copr( 417 user, copr, "Only owners and admins may update" 418 " their projects permissions.") 419 420 (models.CoprPermission.query 421 .filter(models.CoprPermission.copr_id == copr.id) 422 .filter(models.CoprPermission.user_id == copr_permission.user_id) 423 .update({"copr_builder": new_builder, 424 "copr_admin": new_admin}))
425 426 @classmethod
427 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
428 if copr_permission: 429 # preserve approved permissions if set 430 if (not new_builder or 431 copr_permission.copr_builder != helpers.PermissionEnum("approved")): 432 433 copr_permission.copr_builder = new_builder 434 435 if (not new_admin or 436 copr_permission.copr_admin != helpers.PermissionEnum("approved")): 437 438 copr_permission.copr_admin = new_admin 439 else: 440 perm = models.CoprPermission( 441 user=user, 442 copr=copr, 443 copr_builder=new_builder, 444 copr_admin=new_admin) 445 446 cls.new(perm)
447 448 @classmethod
449 - def delete(cls, copr_permission):
450 db.session.delete(copr_permission)
451 452 @classmethod
453 - def validate_permission(cls, user, copr, permission, state):
454 allowed = ['admin', 'builder'] 455 if permission not in allowed: 456 raise BadRequest( 457 "invalid permission '{0}', allowed {1}".format(permission, 458 '|'.join(allowed))) 459 460 allowed = helpers.PermissionEnum.vals.keys() 461 if state not in allowed: 462 raise BadRequest( 463 "invalid '{0}' permission state '{1}', " 464 "use {2}".format(permission, state, '|'.join(allowed))) 465 466 if user.id == copr.user_id: 467 raise BadRequest("user '{0}' is owner of the '{1}' " 468 "project".format(user.name, copr.full_name))
469 470 @classmethod
471 - def set_permissions(cls, request_user, copr, user, permission, state):
472 users_logic.UsersLogic.raise_if_cant_update_copr( 473 request_user, copr, 474 "only owners and admins may update their projects permissions.") 475 476 cls.validate_permission(user, copr, permission, state) 477 478 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 479 perm_o = db.session.merge(perm_o) 480 old_state = perm_o.get_permission(permission) 481 482 new_state = helpers.PermissionEnum(state) 483 perm_o.set_permission(permission, new_state) 484 db.session.merge(perm_o) 485 486 return (old_state, new_state) if old_state != new_state else None
487 488 @classmethod
489 - def request_permission(cls, copr, user, permission, req_bool):
490 approved = helpers.PermissionEnum('approved') 491 state = None 492 if req_bool is True: 493 state = 'request' 494 elif req_bool is False: 495 state = 'nothing' 496 else: 497 raise BadRequest("invalid '{0}' permission request '{1}', " 498 "expected True or False".format(permission, 499 req_bool)) 500 501 cls.validate_permission(user, copr, permission, state) 502 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 503 perm_o = db.session.merge(perm_o) 504 old_state = perm_o.get_permission(permission) 505 if old_state == approved and state == 'request': 506 raise BadRequest("You already are '{0}' in '{1}'".format( 507 permission, copr.full_name)) 508 509 new_state = helpers.PermissionEnum(state) 510 perm_o.set_permission(permission, new_state) 511 512 if old_state != new_state: 513 return (old_state, new_state) 514 return None
515
516 517 -class CoprDirsLogic(object):
518 @classmethod
519 - def get_or_create(cls, copr, dirname, main=False):
520 copr_dir = cls.get_by_copr(copr, dirname).first() 521 522 if copr_dir: 523 return copr_dir 524 525 copr_dir = models.CoprDir( 526 name=dirname, copr=copr, main=main) 527 528 db.session.add(copr_dir) 529 return copr_dir
530 531 @classmethod
532 - def get_by_copr(cls, copr, dirname):
533 return (db.session.query(models.CoprDir) 534 .join(models.Copr) 535 .filter(models.Copr.id==copr.id) 536 .filter(models.CoprDir.name==dirname))
537 538 @classmethod
539 - def get_by_ownername(cls, ownername, dirname):
540 return (db.session.query(models.CoprDir) 541 .filter(models.CoprDir.name==dirname) 542 .filter(models.CoprDir.ownername==ownername))
543 544 @classmethod
545 - def delete(cls, copr_dir):
546 db.session.delete(copr_dir)
547 548 @classmethod
549 - def delete_all_by_copr(cls, copr):
550 for copr_dir in copr.dirs: 551 db.session.delete(copr_dir)
552
553 554 -def on_auto_createrepo_change(target_copr, value_acr, old_value_acr, initiator):
555 """ Emit createrepo action when auto_createrepo re-enabled""" 556 if old_value_acr == NEVER_SET: 557 # created new copr, not interesting 558 return 559 if not old_value_acr and value_acr: 560 # re-enabled 561 ActionsLogic.send_createrepo(target_copr)
562 563 564 listen(models.Copr.auto_createrepo, 'set', on_auto_createrepo_change, 565 active_history=True, retval=False)
566 567 568 -class BranchesLogic(object):
569 @classmethod
570 - def get_or_create(cls, name, session=db.session):
571 item = session.query(models.DistGitBranch).filter_by(name=name).first() 572 if item: 573 return item 574 575 branch = models.DistGitBranch() 576 branch.name = name 577 session.add(branch) 578 return branch
579
580 581 -class CoprChrootsLogic(object):
582 @classmethod
583 - def get_multiple(cls, include_deleted=False):
584 query = models.CoprChroot.query.join(models.Copr) 585 if not include_deleted: 586 query = query.filter(models.Copr.deleted.is_(False)) 587 return query
588 589 @classmethod
590 - def mock_chroots_from_names(cls, names):
591 592 db_chroots = models.MockChroot.query.all() 593 mock_chroots = [] 594 for ch in db_chroots: 595 if ch.name in names: 596 mock_chroots.append(ch) 597 598 return mock_chroots
599 600 @classmethod
601 - def get_by_name(cls, copr, chroot_name, active_only=True):
602 mc = MockChrootsLogic.get_from_name(chroot_name, active_only=active_only).one() 603 query = ( 604 models.CoprChroot.query.join(models.MockChroot) 605 .filter(models.CoprChroot.copr_id == copr.id) 606 .filter(models.MockChroot.id == mc.id) 607 ) 608 return query
609 610 @classmethod
611 - def get_by_name_safe(cls, copr, chroot_name):
612 """ 613 :rtype: models.CoprChroot 614 """ 615 try: 616 return cls.get_by_name(copr, chroot_name).one() 617 except NoResultFound: 618 return None
619 620 @classmethod
621 - def new(cls, mock_chroot):
622 db.session.add(mock_chroot)
623 624 @classmethod
625 - def new_from_names(cls, copr, names):
629 630 @classmethod
631 - def create_chroot(cls, user, copr, mock_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 632 module_md=None, module_md_name=None, with_opts="", without_opts="", 633 delete_after=None, delete_notify=None):
634 """ 635 :type user: models.User 636 :type mock_chroot: models.MockChroot 637 """ 638 if buildroot_pkgs is None: 639 buildroot_pkgs = "" 640 if repos is None: 641 repos = "" 642 UsersLogic.raise_if_cant_update_copr( 643 user, copr, 644 "Only owners and admins may update their projects.") 645 646 chroot = models.CoprChroot(copr=copr, mock_chroot=mock_chroot) 647 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, chroot, 648 with_opts, without_opts, delete_after, delete_notify) 649 return chroot
650 651 @classmethod
652 - def update_chroot(cls, user, copr_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 653 module_md=None, module_md_name=None, with_opts="", without_opts="", 654 delete_after=None, delete_notify=None):
655 """ 656 :type user: models.User 657 :type copr_chroot: models.CoprChroot 658 """ 659 UsersLogic.raise_if_cant_update_copr( 660 user, copr_chroot.copr, 661 "Only owners and admins may update their projects.") 662 663 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, 664 copr_chroot, with_opts, without_opts, delete_after, delete_notify) 665 return copr_chroot
666 667 @classmethod
668 - def _update_chroot(cls, buildroot_pkgs, repos, comps, comps_name, module_md, module_md_name, 669 copr_chroot, with_opts, without_opts, delete_after, delete_notify):
670 if buildroot_pkgs is not None: 671 copr_chroot.buildroot_pkgs = buildroot_pkgs 672 673 if repos is not None: 674 copr_chroot.repos = repos.replace("\n", " ") 675 676 if with_opts is not None: 677 copr_chroot.with_opts = with_opts 678 679 if without_opts is not None: 680 copr_chroot.without_opts = without_opts 681 682 if comps_name is not None: 683 copr_chroot.update_comps(comps) 684 copr_chroot.comps_name = comps_name 685 ActionsLogic.send_update_comps(copr_chroot) 686 687 if module_md_name is not None: 688 copr_chroot.update_module_md(module_md) 689 copr_chroot.module_md_name = module_md_name 690 ActionsLogic.send_update_module_md(copr_chroot) 691 692 if delete_after is not None: 693 copr_chroot.delete_after = delete_after 694 695 if delete_notify is not None: 696 copr_chroot.delete_notify = delete_notify 697 698 db.session.add(copr_chroot)
699 700 @classmethod
701 - def update_from_names(cls, user, copr, names):
702 UsersLogic.raise_if_cant_update_copr( 703 user, copr, 704 "Only owners and admins may update their projects.") 705 current_chroots = copr.mock_chroots 706 new_chroots = cls.mock_chroots_from_names(names) 707 # add non-existing 708 for mock_chroot in new_chroots: 709 if mock_chroot not in current_chroots: 710 db.session.add( 711 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 712 713 # delete no more present 714 to_remove = [] 715 for mock_chroot in current_chroots: 716 if mock_chroot not in new_chroots: 717 # can't delete here, it would change current_chroots and break 718 # iteration 719 to_remove.append(mock_chroot) 720 721 for mc in to_remove: 722 copr.mock_chroots.remove(mc)
723 724 @classmethod
725 - def remove_comps(cls, user, copr_chroot):
726 UsersLogic.raise_if_cant_update_copr( 727 user, copr_chroot.copr, 728 "Only owners and admins may update their projects.") 729 730 copr_chroot.comps_name = None 731 copr_chroot.comps_zlib = None 732 ActionsLogic.send_update_comps(copr_chroot) 733 db.session.add(copr_chroot)
734 735 @classmethod
736 - def remove_module_md(cls, user, copr_chroot):
737 UsersLogic.raise_if_cant_update_copr( 738 user, copr_chroot.copr, 739 "Only owners and admins may update their projects.") 740 741 copr_chroot.module_md_name = None 742 copr_chroot.module_md_zlib = None 743 ActionsLogic.send_update_module_md(copr_chroot) 744 db.session.add(copr_chroot)
745 746 @classmethod
747 - def remove_copr_chroot(cls, user, copr_chroot):
748 """ 749 :param models.CoprChroot chroot: 750 """ 751 UsersLogic.raise_if_cant_update_copr( 752 user, copr_chroot.copr, 753 "Only owners and admins may update their projects.") 754 755 db.session.delete(copr_chroot)
756 757 @classmethod
758 - def filter_outdated(cls, query):
759 return query.filter(models.CoprChroot.delete_after >= datetime.datetime.now())
760 761 @classmethod
762 - def filter_outdated_to_be_deleted(cls, query):
763 return query.filter(models.CoprChroot.delete_after < datetime.datetime.now())
764
765 766 -class MockChrootsLogic(object):
767 @classmethod
768 - def get(cls, os_release, os_version, arch, active_only=False, noarch=False):
778 779 @classmethod
780 - def get_from_name(cls, chroot_name, active_only=False, noarch=False):
781 """ 782 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 783 the architecture could be optional with noarch=True 784 785 Return MockChroot object for textual representation of chroot 786 """ 787 788 name_tuple = cls.tuple_from_name(chroot_name, noarch=noarch) 789 return cls.get(name_tuple[0], name_tuple[1], name_tuple[2], 790 active_only=active_only, noarch=noarch)
791 792 @classmethod
793 - def get_multiple(cls, active_only=False):
794 query = models.MockChroot.query 795 if active_only: 796 query = query.filter(models.MockChroot.is_active == True) 797 return query
798 799 @classmethod
800 - def add(cls, name):
801 name_tuple = cls.tuple_from_name(name) 802 if cls.get(*name_tuple).first(): 803 raise exceptions.DuplicateException( 804 "Mock chroot with this name already exists.") 805 new_chroot = models.MockChroot(os_release=name_tuple[0], 806 os_version=name_tuple[1], 807 arch=name_tuple[2]) 808 cls.new(new_chroot) 809 return new_chroot
810 811 @classmethod
812 - def active_names(cls):
813 return [ch.name for ch in cls.get_multiple(active_only=True).all()]
814 815 @classmethod
816 - def new(cls, mock_chroot):
817 db.session.add(mock_chroot)
818 819 @classmethod
820 - def edit_by_name(cls, name, is_active):
821 name_tuple = cls.tuple_from_name(name) 822 mock_chroot = cls.get(*name_tuple).first() 823 if not mock_chroot: 824 raise exceptions.NotFoundException( 825 "Mock chroot with this name doesn't exist.") 826 827 mock_chroot.is_active = is_active 828 cls.update(mock_chroot) 829 return mock_chroot
830 831 @classmethod
832 - def update(cls, mock_chroot):
833 db.session.add(mock_chroot)
834 835 @classmethod
836 - def delete_by_name(cls, name):
837 name_tuple = cls.tuple_from_name(name) 838 mock_chroot = cls.get(*name_tuple).first() 839 if not mock_chroot: 840 raise exceptions.NotFoundException( 841 "Mock chroot with this name doesn't exist.") 842 843 cls.delete(mock_chroot)
844 845 @classmethod
846 - def delete(cls, mock_chroot):
847 db.session.delete(mock_chroot)
848 849 @classmethod
850 - def tuple_from_name(cls, name, noarch=False):
851 """ 852 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 853 854 the architecture could be optional with noarch=True 855 856 returns ("os", "version", "arch") or ("os", "version", None) 857 """ 858 split_name = name.rsplit("-", 1) if noarch else name.rsplit("-", 2) 859 860 valid = False 861 if noarch and len(split_name) in [2, 3]: 862 valid = True 863 if not noarch and len(split_name) == 3: 864 valid = True 865 866 if not valid: 867 raise MalformedArgumentException( 868 "Chroot name is not valid") 869 870 if noarch and len(split_name) == 2: 871 split_name.append(None) 872 873 return tuple(split_name)
874 875 @classmethod
876 - def prunerepo_finished(cls, chroots_pruned):
877 for chroot_name in chroots_pruned: 878 chroot = cls.get_from_name(chroot_name).one() 879 if not chroot.is_active: 880 chroot.final_prunerepo_done = True 881 882 db.session.commit() 883 return True
884 885 @classmethod
887 query = models.MockChroot.query 888 chroots = {} 889 for chroot in query: 890 if chroot.is_active or not chroot.final_prunerepo_done: 891 chroots[chroot.name] = chroot.is_active 892 893 return chroots
894