Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import os 
  2  import base64 
  3  import datetime 
  4  import functools 
  5  from functools import wraps, partial 
  6   
  7  from netaddr import IPAddress, IPNetwork 
  8  import re 
  9  import flask 
 10   
 11  from openid_teams.teams import TeamsRequest 
 12   
 13  from coprs import app 
 14  from coprs import db 
 15  from coprs import helpers 
 16  from coprs import models 
 17  from coprs import oid 
 18  from coprs.logic.complex_logic import ComplexLogic 
 19  from coprs.logic.users_logic import UsersLogic 
 20  from coprs.logic.coprs_logic import CoprsLogic 
21 22 23 -def fed_openidize_name(name):
24 """ 25 Create proper Fedora OpenID name from short name. 26 27 >>> fedoraoid == fed_openidize_name(user.name) 28 True 29 """ 30 31 return "http://{0}.id.fedoraproject.org/".format(name)
32
33 34 -def create_user_wrapper(username, email, timezone=None):
35 expiration_date_token = datetime.date.today() + \ 36 datetime.timedelta( 37 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 38 39 copr64 = base64.b64encode(b"copr") + b"##" 40 user = models.User(username=username, mail=email, 41 timezone=timezone, 42 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 43 app.config["API_TOKEN_LENGTH"] - len(copr64)), 44 api_token=helpers.generate_api_token( 45 app.config["API_TOKEN_LENGTH"]), 46 api_token_expiration=expiration_date_token) 47 return user
48
49 50 -def fed_raw_name(oidname):
51 return oidname.replace(".id.fedoraproject.org/", "") \ 52 .replace("http://", "")
53
54 55 -def krb_straighten_username(krb_remote_user):
56 # Input should look like 'USERNAME@REALM.TLD', strip realm. 57 username = re.sub(r'@.*', '', krb_remote_user) 58 59 # But USERNAME part can consist of USER/DOMAIN.TLD. 60 # TODO: Do we need more clever thing here? 61 username = re.sub('/', '_', username) 62 63 # Based on restrictions for project name: "letters, digits, underscores, 64 # dashes and dots", it is worth limitting the username here, too. 65 # TODO: Store this pattern on one place. 66 return username if re.match(r"^[\w.-]+$", username) else None
67
68 69 @app.before_request 70 -def set_empty_user():
71 flask.g.user = None
72
73 74 @app.before_request 75 -def lookup_current_user():
76 flask.g.user = username = None 77 if "openid" in flask.session: 78 username = fed_raw_name(flask.session["openid"]) 79 elif "krb5_login" in flask.session: 80 username = flask.session["krb5_login"] 81 82 if username: 83 flask.g.user = models.User.query.filter( 84 models.User.username == username).first()
85
86 87 @app.errorhandler(404) 88 -def page_not_found(message):
89 return flask.render_template("404.html", message=message), 404
90
91 92 @app.errorhandler(403) 93 -def access_restricted(message):
94 return flask.render_template("403.html", message=message), 403
95
96 97 -def generic_error(message, code=500, title=None):
98 """ 99 :type message: str 100 :type err: CoprHttpException 101 """ 102 return flask.render_template("_error.html", 103 message=message, 104 error_code=code, 105 error_title=title), code
106 107 108 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 109 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 110 111 app.errorhandler(500)(server_error_handler) 112 app.errorhandler(400)(bad_request_handler) 113 114 misc = flask.Blueprint("misc", __name__) 115 116 117 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
118 -def krb5_login(name):
119 """ 120 Handle the Kerberos authentication. 121 122 Note that if we are able to get here, either the user is authenticated 123 correctly, or apache is mis-configured and it does not perform KRB 124 authentication at all. Note also, even if that can be considered ugly, we 125 are reusing oid's get_next_url feature with kerberos login. 126 """ 127 128 # Already logged in? 129 if flask.g.user is not None: 130 return flask.redirect(oid.get_next_url()) 131 132 krb_config = app.config['KRB5_LOGIN'] 133 134 found = None 135 for key in krb_config.keys(): 136 if krb_config[key]['URI'] == name: 137 found = key 138 break 139 140 if not found: 141 # no KRB5_LOGIN.<name> configured in copr.conf 142 return flask.render_template("404.html"), 404 143 144 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ: 145 # For local testing (without krb5 keytab and other configuration) 146 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER'] 147 148 if 'REMOTE_USER' not in flask.request.environ: 149 nocred = "Kerberos authentication failed (no credentials provided)" 150 return flask.render_template("403.html", message=nocred), 403 151 152 krb_username = flask.request.environ['REMOTE_USER'] 153 app.logger.debug("krb5 login attempt: " + krb_username) 154 username = krb_straighten_username(krb_username) 155 if not username: 156 message = "invalid krb5 username: " + krb_username 157 return flask.render_template("403.html", message=message), 403 158 159 krb_login = ( 160 models.Krb5Login.query 161 .filter(models.Krb5Login.config_name == key) 162 .filter(models.Krb5Login.primary == username) 163 .first() 164 ) 165 if krb_login: 166 flask.g.user = krb_login.user 167 flask.session['krb5_login'] = krb_login.user.name 168 flask.flash(u"Welcome, {0}".format(flask.g.user.name)) 169 return flask.redirect(oid.get_next_url()) 170 171 # We need to create row in 'krb5_login' table 172 user = models.User.query.filter(models.User.username == username).first() 173 if not user: 174 # Even the item in 'user' table does not exist, create _now_ 175 email = username + "@" + krb_config[key]['email_domain'] 176 user = create_user_wrapper(username, email) 177 db.session.add(user) 178 179 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 180 db.session.add(krb_login) 181 db.session.commit() 182 183 flask.flash(u"Welcome, {0}".format(user.name)) 184 flask.g.user = user 185 flask.session['krb5_login'] = user.name 186 return flask.redirect(oid.get_next_url())
187 188 189 @misc.route("/login/", methods=["GET"])
190 @oid.loginhandler 191 -def login():
192 if not app.config['FAS_LOGIN']: 193 if app.config['KRB5_LOGIN']: 194 return krb5_login_redirect(next=oid.get_next_url()) 195 flask.flash("No auth method available", "error") 196 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 197 198 if flask.g.user is not None: 199 return flask.redirect(oid.get_next_url()) 200 else: 201 # a bit of magic 202 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 203 return oid.try_login("https://id.fedoraproject.org/", 204 ask_for=["email", "timezone"], 205 extensions=[team_req])
206
207 208 @oid.after_login 209 -def create_or_login(resp):
210 flask.session["openid"] = resp.identity_url 211 fasusername = resp.identity_url.replace( 212 ".id.fedoraproject.org/", "").replace("http://", "") 213 214 # kidding me.. or not 215 if fasusername and ( 216 ( 217 app.config["USE_ALLOWED_USERS"] and 218 fasusername in app.config["ALLOWED_USERS"] 219 ) or not app.config["USE_ALLOWED_USERS"]): 220 221 username = fed_raw_name(resp.identity_url) 222 user = models.User.query.filter( 223 models.User.username == username).first() 224 if not user: # create if not created already 225 user = create_user_wrapper(username, resp.email, resp.timezone) 226 else: 227 user.mail = resp.email 228 user.timezone = resp.timezone 229 if "lp" in resp.extensions: 230 team_resp = resp.extensions['lp'] # name space for the teams extension 231 user.openid_groups = {"fas_groups": team_resp.teams} 232 233 db.session.add(user) 234 db.session.commit() 235 flask.flash(u"Welcome, {0}".format(user.name)) 236 flask.g.user = user 237 238 if flask.request.url_root == oid.get_next_url(): 239 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 240 username=user.name)) 241 return flask.redirect(oid.get_next_url()) 242 else: 243 flask.flash("User '{0}' is not allowed".format(fasusername)) 244 return flask.redirect(oid.get_next_url())
245
246 247 @misc.route("/logout/") 248 -def logout():
249 flask.session.pop("openid", None) 250 flask.session.pop("krb5_login", None) 251 flask.flash(u"You were signed out") 252 return flask.redirect(oid.get_next_url())
253
254 255 -def api_login_required(f):
256 @functools.wraps(f) 257 def decorated_function(*args, **kwargs): 258 token = None 259 apt_login = None 260 if "Authorization" in flask.request.headers: 261 base64string = flask.request.headers["Authorization"] 262 base64string = base64string.split()[1].strip() 263 userstring = base64.b64decode(base64string) 264 (apt_login, token) = userstring.decode("utf-8").split(":") 265 token_auth = False 266 if token and apt_login: 267 user = UsersLogic.get_by_api_login(apt_login).first() 268 if (user and user.api_token == token and 269 user.api_token_expiration >= datetime.date.today()): 270 271 if user.proxy and "username" in flask.request.form: 272 user = UsersLogic.get(flask.request.form["username"]).first() 273 274 token_auth = True 275 flask.g.user = user 276 if not token_auth: 277 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"] 278 url = helpers.fix_protocol_for_frontend(url) 279 280 output = { 281 "output": "notok", 282 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url), 283 } 284 jsonout = flask.jsonify(output) 285 jsonout.status_code = 500 286 return jsonout 287 return f(*args, **kwargs)
288 return decorated_function 289
290 291 -def krb5_login_redirect(next=None):
292 krbc = app.config['KRB5_LOGIN'] 293 for key in krbc: 294 # Pick the first one for now. 295 return flask.redirect(flask.url_for("misc.krb5_login", 296 name=krbc[key]['URI'], 297 next=next)) 298 flask.flash("Unable to pick krb5 login page", "error") 299 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
300
301 302 -def login_required(role=helpers.RoleEnum("user")):
303 def view_wrapper(f): 304 @functools.wraps(f) 305 def decorated_function(*args, **kwargs): 306 if flask.g.user is None: 307 return flask.redirect(flask.url_for("misc.login", 308 next=flask.request.url)) 309 310 if role == helpers.RoleEnum("admin") and not flask.g.user.admin: 311 flask.flash("You are not allowed to access admin section.") 312 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 313 314 return f(*args, **kwargs)
315 return decorated_function 316 # hack: if login_required is used without params, the "role" parameter 317 # is in fact the decorated function, so we need to return 318 # the wrapped function, not the wrapper 319 # proper solution would be to use login_required() with parentheses 320 # everywhere, even if they"re empty - TODO 321 if callable(role): 322 return view_wrapper(role) 323 else: 324 return view_wrapper 325
326 327 # backend authentication 328 -def backend_authenticated(f):
329 @functools.wraps(f) 330 def decorated_function(*args, **kwargs): 331 auth = flask.request.authorization 332 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 333 return "You have to provide the correct password\n", 401 334 335 return f(*args, **kwargs)
336 return decorated_function 337
338 339 -def intranet_required(f):
340 @functools.wraps(f) 341 def decorated_function(*args, **kwargs): 342 ip_addr = IPAddress(flask.request.remote_addr) 343 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 344 accept_ranges.add("127.0.0.1") # always accept from localhost 345 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 346 return ("Stats can be update only from intranet hosts, " 347 "not {}, check config\n".format(flask.request.remote_addr)), 403 348 349 return f(*args, **kwargs)
350 return decorated_function 351
352 353 -def req_with_copr(f):
354 @wraps(f) 355 def wrapper(**kwargs): 356 coprname = kwargs.pop("coprname") 357 if "group_name" in kwargs: 358 group_name = kwargs.pop("group_name") 359 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 360 else: 361 username = kwargs.pop("username") 362 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 363 return f(copr, **kwargs)
364 return wrapper 365
366 367 @misc.route("/migration-report/") 368 @misc.route("/migration-report/<username>") 369 -def coprs_migration_report(username=None):
370 if not username and not flask.g.user: 371 return generic_error("You are not logged in") 372 elif not username: 373 username = flask.g.user.name 374 user = UsersLogic.get(username).first() 375 376 coprs = CoprsLogic.filter_without_group_projects(CoprsLogic.get_multiple_owned_by_username(username)).all() 377 for group in UsersLogic.get_groups_by_fas_names_list(user.user_teams).all(): 378 coprs.extend(CoprsLogic.get_multiple_by_group_id(group.id).all()) 379 380 return render_migration_report(coprs, user=user)
381
382 383 @misc.route("/migration-report/g/<group_name>") 384 -def group_coprs_migration_report(group_name=None):
385 group = ComplexLogic.get_group_by_name_safe(group_name) 386 coprs = CoprsLogic.get_multiple_by_group_id(group.id) 387 return render_migration_report(coprs, group=group)
388
389 390 -def render_migration_report(coprs, user=None, group=None):
391 return flask.render_template("migration-report.html", 392 user=user, 393 group=group, 394 coprs=coprs)
395