diff options
Diffstat (limited to 'ihatemoney/utils.py')
| -rw-r--r-- | ihatemoney/utils.py | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/ihatemoney/utils.py b/ihatemoney/utils.py index 4e79a37..6af0112 100644 --- a/ihatemoney/utils.py +++ b/ihatemoney/utils.py @@ -7,6 +7,7 @@ from json import dumps from flask import redirect from werkzeug.routing import HTTPException, RoutingException import six +from datetime import datetime, timedelta import csv @@ -131,3 +132,41 @@ def list_of_dicts2csv(dict_to_convert): # base64 encoding that works with both py2 and py3 and yield no warning base64_encode = base64.encodestring if six.PY2 else base64.encodebytes + + +class LoginThrottler(): + """Simple login throttler used to limit authentication attempts based on client's ip address. + When using multiple workers, remaining number of attempts can get inconsistent + but will still be limited to num_workers * max_attempts. + """ + def __init__(self, max_attempts=3, delay=1): + self._max_attempts = max_attempts + # Delay in minutes before resetting the attempts counter + self._delay = delay + self._attempts = {} + + def get_remaining_attempts(self, ip): + return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1] + + def increment_attempts_counter(self, ip): + # Reset all attempt counters when they get hungry for memory + if len(self._attempts) > 10000: + self.__init__() + if self._attempts.get(ip) is None: + # Store first attempt date and number of attempts since + self._attempts[ip] = [datetime.now(), 0] + self._attempts.get(ip)[1] += 1 + + def is_login_allowed(self, ip): + if self._attempts.get(ip) is None: + return True + # When the delay is expired, reset the counter + if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay): + self.reset(ip) + return True + if self._attempts.get(ip)[1] >= self._max_attempts: + return False + return True + + def reset(self, ip): + self._attempts.pop(ip, None) |
