aboutsummaryrefslogtreecommitdiff
path: root/ihatemoney/utils.py
diff options
context:
space:
mode:
author0livd <github@destras.fr>2017-08-20 12:37:12 +0200
committerAlexis Metaireau <alexis@notmyidea.org>2017-08-20 12:37:12 +0200
commitec4a099f182629d86a7421af7d4899a655be684e (patch)
treef5b72f89a3fca31f5a74b393a508d0153344a0fc /ihatemoney/utils.py
parent68e411473540c136dfdb269af888ceddbd0d403b (diff)
downloadihatemoney-mirror-ec4a099f182629d86a7421af7d4899a655be684e.zip
ihatemoney-mirror-ec4a099f182629d86a7421af7d4899a655be684e.tar.gz
ihatemoney-mirror-ec4a099f182629d86a7421af7d4899a655be684e.tar.bz2
Protect admin endpoints against brute force attacks (#249)
* Protect admin endpoints against brute force attacks Add a throttling mechanism to prevent a client brute forcing the authentication form, based on its ip address Closes #245 * Reset attempt counters if they get memory hungry
Diffstat (limited to 'ihatemoney/utils.py')
-rw-r--r--ihatemoney/utils.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/ihatemoney/utils.py b/ihatemoney/utils.py
index 4e79a37..6af0112 100644
--- a/ihatemoney/utils.py
+++ b/ihatemoney/utils.py
@@ -7,6 +7,7 @@ from json import dumps
from flask import redirect
from werkzeug.routing import HTTPException, RoutingException
import six
+from datetime import datetime, timedelta
import csv
@@ -131,3 +132,41 @@ def list_of_dicts2csv(dict_to_convert):
# base64 encoding that works with both py2 and py3 and yield no warning
base64_encode = base64.encodestring if six.PY2 else base64.encodebytes
+
+
+class LoginThrottler():
+ """Simple login throttler used to limit authentication attempts based on client's ip address.
+ When using multiple workers, remaining number of attempts can get inconsistent
+ but will still be limited to num_workers * max_attempts.
+ """
+ def __init__(self, max_attempts=3, delay=1):
+ self._max_attempts = max_attempts
+ # Delay in minutes before resetting the attempts counter
+ self._delay = delay
+ self._attempts = {}
+
+ def get_remaining_attempts(self, ip):
+ return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1]
+
+ def increment_attempts_counter(self, ip):
+ # Reset all attempt counters when they get hungry for memory
+ if len(self._attempts) > 10000:
+ self.__init__()
+ if self._attempts.get(ip) is None:
+ # Store first attempt date and number of attempts since
+ self._attempts[ip] = [datetime.now(), 0]
+ self._attempts.get(ip)[1] += 1
+
+ def is_login_allowed(self, ip):
+ if self._attempts.get(ip) is None:
+ return True
+ # When the delay is expired, reset the counter
+ if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay):
+ self.reset(ip)
+ return True
+ if self._attempts.get(ip)[1] >= self._max_attempts:
+ return False
+ return True
+
+ def reset(self, ip):
+ self._attempts.pop(ip, None)