source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/contrib/redis_cache.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 10.6 KB
Line 
1"""
2Developed by niphlod@gmail.com
3Released under web2py license because includes gluon/cache.py source code
4"""
5
6try:
7    import cPickle as pickle
8except:
9    import pickle
10import time
11import re
12import logging
13from threading import Lock
14import random
15from gluon import current
16from gluon.cache import CacheAbstract
17from gluon.contrib.redis_utils import acquire_lock, release_lock
18from gluon.contrib.redis_utils import register_release_lock, RConnectionError
19
20logger = logging.getLogger("web2py.cache.redis")
21
22locker = Lock()
23
24
25def RedisCache(redis_conn=None, debug=False, with_lock=False, fail_gracefully=False, db=None, application=None):
26    """
27    Usage example: put in models::
28
29    First of all install Redis
30    Ubuntu :
31    sudo apt-get install redis-server
32    sudo pip install redis
33
34    Then
35
36        from gluon.contrib.redis_utils import RConn
37        rconn = RConn()
38        from gluon.contrib.redis_cache import RedisCache
39        cache.redis = RedisCache(redis_conn=rconn, debug=True, with_lock=True)
40
41    Args:
42        redis_conn: a redis-like connection object
43        debug: if True adds to stats() the total_hits and misses
44        with_lock: sets the default locking mode for creating new keys.
45            By default is False (usualy when you choose Redis you do it
46            for performances reason)
47            When True, only one thread/process can set a value concurrently
48        fail_gracefully: if redis is unavailable, returns the value computing it
49            instead of raising an exception
50        application: if provided, it is used to construct the instance_name,
51            allowing to share cache between different applications if needed
52
53    It can be used pretty much the same as cache.ram()
54    When you use cache.redis directly you can use :
55
56        redis_key_and_var_name = cache.redis('redis_key_and_var_name', lambda or function,
57                                             time_expire=time.time(), with_lock=True)
58
59    to enforce locking. The with_lock parameter overrides the one set in the
60    cache.redis instance creation
61
62    cache.redis.stats()
63        returns a dictionary with statistics of Redis server
64        with one additional key ('w2p_keys') showing all keys currently set
65        from web2py with their TTL
66
67    A little wording on how keys are stored (and why the cache_it() function
68    and the clear() one look a little bit convoluted): there are a lot of
69    libraries that just store values and then use the KEYS command to delete it.
70    Until recent releases of this module, that technique was used here too.
71    In the need of deleting specific keys in a database with zillions keys in it
72    (other web2py apps, other applications in the need of a Redis stack) the
73    KEYS command is slow (it needs to scan every key in the database).
74    So, we use Redis 'sets' to store keys in "buckets"...
75    - every key created gets "indexed" in a bucket
76    - all buckets are indexed in a fixed key that never expires
77    - all keys generated within the same minute go in the same bucket
78    - every bucket is then set to expire when every key within it is expired
79    When we need to clear() cached keys:
80    - we tell Redis to SUNION all buckets
81       - gives us just the keys that are not expired yet
82    - buckets that are expired are removed from the fixed set
83    - we scan the keys and then delete them
84    """
85
86    locker.acquire()
87    try:
88        if application is None:
89            application = current.request.application
90        instance_name = 'redis_instance_' + application
91        if not hasattr(RedisCache, instance_name):
92            setattr(RedisCache, instance_name,
93                    RedisClient(redis_conn=redis_conn, debug=debug,
94                                with_lock=with_lock, fail_gracefully=fail_gracefully,
95                                application=application))
96        return getattr(RedisCache, instance_name)
97    finally:
98        locker.release()
99
100
101class RedisClient(object):
102
103    meta_storage = {}
104    MAX_RETRIES = 5
105    RETRIES = 0
106
107    def __init__(self, redis_conn=None, debug=False,
108                 with_lock=False, fail_gracefully=False, application=None):
109        self.request = current.request
110        self.debug = debug
111        self.with_lock = with_lock
112        self.fail_gracefully = fail_gracefully
113        self.application = application or current.request.application
114        self.prefix = "w2p:cache:%s:" % self.application
115        if self.request:
116            app = self.application
117        else:
118            app = ''
119
120        if app not in self.meta_storage:
121            self.storage = self.meta_storage[app] = {
122                CacheAbstract.cache_stats_name: {
123                    'hit_total': 0,
124                    'misses': 0,
125                }}
126        else:
127            self.storage = self.meta_storage[app]
128
129        self.cache_set_key = 'w2p:%s:___cache_set' % self.application
130
131        self.r_server = redis_conn
132        self._release_script = register_release_lock(self.r_server)
133
134    def initialize(self):
135        pass
136
137    def __call__(self, key, f, time_expire=300, with_lock=None):
138        if with_lock is None:
139            with_lock = self.with_lock
140        if time_expire is None:
141            time_expire = 24 * 60 * 60
142        newKey = self.__keyFormat__(key)
143        value = None
144        ttl = 0
145        try:
146            if f is None:
147                # delete and never look back
148                self.r_server.delete(newKey)
149                return None
150            # is there a value
151            obj = self.r_server.get(newKey)
152            # what's its ttl
153            if obj:
154                ttl = self.r_server.ttl(newKey)
155            if ttl > time_expire:
156                obj = None
157            if obj:
158                # was cached
159                if self.debug:
160                    self.r_server.incr('web2py_cache_statistics:hit_total')
161                value = pickle.loads(obj)
162            else:
163                # naive distributed locking
164                if with_lock:
165                    lock_key = '%s:__lock' % newKey
166                    randomvalue = time.time()
167                    al = acquire_lock(self.r_server, lock_key, randomvalue)
168                    try:
169                        # someone may have computed it
170                        obj = self.r_server.get(newKey)
171                        if obj is None:
172                            value = self.cache_it(newKey, f, time_expire)
173                        else:
174                            value = pickle.loads(obj)
175                    finally:
176                        release_lock(self, lock_key, al)
177                else:
178                    # without distributed locking
179                    value = self.cache_it(newKey, f, time_expire)
180            return value
181        except RConnectionError:
182            return self.retry_call(key, f, time_expire, with_lock)
183
184    def cache_it(self, key, f, time_expire):
185        if self.debug:
186            self.r_server.incr('web2py_cache_statistics:misses')
187        cache_set_key = self.cache_set_key
188        expire_at = int(time.time() + time_expire) + 120
189        bucket_key = "%s:%s" % (cache_set_key, expire_at // 60)
190        value = f()
191        value_ = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
192        if time_expire == 0:
193            time_expire = 1
194        self.r_server.setex(key, time_expire, value_)
195        # print '%s will expire on %s: it goes in bucket %s' % (key, time.ctime(expire_at))
196        # print 'that will expire on %s' % (bucket_key, time.ctime(((expire_at / 60) + 1) * 60))
197        p = self.r_server.pipeline()
198        # add bucket to the fixed set
199        p.sadd(cache_set_key, bucket_key)
200        # sets the key
201        p.setex(key, time_expire, value_)
202        # add the key to the bucket
203        p.sadd(bucket_key, key)
204        # expire the bucket properly
205        p.expireat(bucket_key, ((expire_at // 60) + 1) * 60)
206        p.execute()
207        return value
208
209    def retry_call(self, key, f, time_expire, with_lock):
210        self.RETRIES += 1
211        if self.RETRIES <= self.MAX_RETRIES:           
212            if self.fail_gracefully:
213                self.RETRIES = 0
214                return f()
215            logger.error("sleeping %s seconds before reconnecting" % (2 * self.RETRIES))           
216            time.sleep(2 * self.RETRIES)
217            return self.__call__(key, f, time_expire, with_lock)
218        else:
219            self.RETRIES = 0
220            if self.fail_gracefully:
221                return f
222            raise RConnectionError('Redis instance is unavailable')
223
224    def increment(self, key, value=1):
225        try:
226            newKey = self.__keyFormat__(key)
227            return self.r_server.incr(newKey, value)
228        except RConnectionError:
229            return self.retry_increment(key, value)
230
231    def retry_increment(self, key, value):
232        self.RETRIES += 1
233        if self.RETRIES <= self.MAX_RETRIES:
234            logger.error("sleeping some seconds before reconnecting")
235            time.sleep(2 * self.RETRIES)
236            return self.increment(key, value)
237        else:
238            self.RETRIES = 0
239            raise RConnectionError('Redis instance is unavailable')
240
241    def clear(self, regex):
242        """
243        Auxiliary function called by `clear` to search and
244        clear cache entries
245        """
246        r = re.compile(regex)
247        # get all buckets
248        buckets = self.r_server.smembers(self.cache_set_key)
249        # get all keys in buckets
250        if buckets:
251            keys = self.r_server.sunion(buckets)
252        else:
253            return
254        prefix = self.prefix
255        pipe = self.r_server.pipeline()
256        for a in keys:
257            if r.match(str(a).replace(prefix, '', 1)):
258                pipe.delete(a)
259        if random.randrange(0, 100) < 10:
260            # do this just once in a while (10% chance)
261            self.clear_buckets(buckets)
262        pipe.execute()
263
264    def clear_buckets(self, buckets):
265        p = self.r_server.pipeline()
266        for b in buckets:
267            if not self.r_server.exists(b):
268                p.srem(self.cache_set_key, b)
269        p.execute()
270
271    def delete(self, key):
272        newKey = self.__keyFormat__(key)
273        return self.r_server.delete(newKey)
274
275    def stats(self):
276        stats_collector = self.r_server.info()
277        if self.debug:
278            stats_collector['w2p_stats'] = dict(
279                hit_total=self.r_server.get(
280                    'web2py_cache_statistics:hit_total'),
281                misses=self.r_server.get('web2py_cache_statistics:misses')
282            )
283        stats_collector['w2p_keys'] = dict()
284
285        for a in self.r_server.keys("w2p:%s:*" % self.application):
286            stats_collector['w2p_keys']["%s_expire_in_sec" % a] = self.r_server.ttl(a)
287        return stats_collector
288
289    def __keyFormat__(self, key):
290        return '%s%s' % (self.prefix, key.replace(' ', '_'))
Note: See TracBrowser for help on using the repository browser.