From ec4a099f182629d86a7421af7d4899a655be684e Mon Sep 17 00:00:00 2001 From: 0livd Date: Sun, 20 Aug 2017 12:37:12 +0200 Subject: Protect admin endpoints against brute force attacks (#249) * Protect admin endpoints against brute force attacks Add a throttling mechanism to prevent a client brute forcing the authentication form, based on its ip address Closes #245 * Reset attempt counters if they get memory hungry --- ihatemoney/utils.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'ihatemoney/utils.py') diff --git a/ihatemoney/utils.py b/ihatemoney/utils.py index 4e79a37..6af0112 100644 --- a/ihatemoney/utils.py +++ b/ihatemoney/utils.py @@ -7,6 +7,7 @@ from json import dumps from flask import redirect from werkzeug.routing import HTTPException, RoutingException import six +from datetime import datetime, timedelta import csv @@ -131,3 +132,41 @@ def list_of_dicts2csv(dict_to_convert): # base64 encoding that works with both py2 and py3 and yield no warning base64_encode = base64.encodestring if six.PY2 else base64.encodebytes + + +class LoginThrottler(): + """Simple login throttler used to limit authentication attempts based on client's ip address. + When using multiple workers, remaining number of attempts can get inconsistent + but will still be limited to num_workers * max_attempts. + """ + def __init__(self, max_attempts=3, delay=1): + self._max_attempts = max_attempts + # Delay in minutes before resetting the attempts counter + self._delay = delay + self._attempts = {} + + def get_remaining_attempts(self, ip): + return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1] + + def increment_attempts_counter(self, ip): + # Reset all attempt counters when they get hungry for memory + if len(self._attempts) > 10000: + self.__init__() + if self._attempts.get(ip) is None: + # Store first attempt date and number of attempts since + self._attempts[ip] = [datetime.now(), 0] + self._attempts.get(ip)[1] += 1 + + def is_login_allowed(self, ip): + if self._attempts.get(ip) is None: + return True + # When the delay is expired, reset the counter + if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay): + self.reset(ip) + return True + if self._attempts.get(ip)[1] >= self._max_attempts: + return False + return True + + def reset(self, ip): + self._attempts.pop(ip, None) -- cgit v1.1