1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | """client module for memcached (memory cache daemon) |
---|
4 | |
---|
5 | Overview |
---|
6 | ======== |
---|
7 | |
---|
8 | See U{the MemCached homepage<http://www.danga.com/memcached>} for more |
---|
9 | about memcached. |
---|
10 | |
---|
11 | Usage summary |
---|
12 | ============= |
---|
13 | |
---|
14 | This should give you a feel for how this module operates:: |
---|
15 | |
---|
16 | import memcache |
---|
17 | mc = memcache.Client(['127.0.0.1:11211'], debug=0) |
---|
18 | |
---|
19 | mc.set("some_key", "Some value") |
---|
20 | value = mc.get("some_key") |
---|
21 | |
---|
22 | mc.set("another_key", 3) |
---|
23 | mc.delete("another_key") |
---|
24 | |
---|
25 | mc.set("key", "1") # note that the key used for incr/decr must be |
---|
26 | # a string. |
---|
27 | mc.incr("key") |
---|
28 | mc.decr("key") |
---|
29 | |
---|
30 | The standard way to use memcache with a database is like this: |
---|
31 | |
---|
32 | key = derive_key(obj) |
---|
33 | obj = mc.get(key) |
---|
34 | if not obj: |
---|
35 | obj = backend_api.get(...) |
---|
36 | mc.set(key, obj) |
---|
37 | |
---|
38 | # we now have obj, and future passes through this code |
---|
39 | # will use the object from the cache. |
---|
40 | |
---|
41 | Detailed Documentation |
---|
42 | ====================== |
---|
43 | |
---|
44 | More detailed documentation is available in the L{Client} class. |
---|
45 | |
---|
46 | """ |
---|
47 | |
---|
48 | from __future__ import print_function |
---|
49 | |
---|
50 | import binascii |
---|
51 | import os |
---|
52 | import pickle |
---|
53 | import re |
---|
54 | import socket |
---|
55 | import sys |
---|
56 | import threading |
---|
57 | import time |
---|
58 | import zlib |
---|
59 | |
---|
60 | import six |
---|
61 | |
---|
62 | |
---|
63 | def cmemcache_hash(key): |
---|
64 | return ( |
---|
65 | (((binascii.crc32(key.encode('ascii')) & 0xffffffff) |
---|
66 | >> 16) & 0x7fff) or 1) |
---|
67 | serverHashFunction = cmemcache_hash |
---|
68 | |
---|
69 | |
---|
70 | def useOldServerHashFunction(): |
---|
71 | """Use the old python-memcache server hash function.""" |
---|
72 | global serverHashFunction |
---|
73 | serverHashFunction = binascii.crc32 |
---|
74 | |
---|
75 | try: |
---|
76 | from zlib import compress, decompress |
---|
77 | _supports_compress = True |
---|
78 | except ImportError: |
---|
79 | _supports_compress = False |
---|
80 | # quickly define a decompress just in case we recv compressed data. |
---|
81 | |
---|
82 | def decompress(val): |
---|
83 | raise _Error( |
---|
84 | "Received compressed data but I don't support " |
---|
85 | "compression (import error)") |
---|
86 | |
---|
87 | from io import BytesIO |
---|
88 | try: |
---|
89 | unicode |
---|
90 | except NameError: |
---|
91 | _has_unicode = False |
---|
92 | else: |
---|
93 | _has_unicode = True |
---|
94 | |
---|
95 | try: |
---|
96 | _str_cls = basestring |
---|
97 | except NameError: |
---|
98 | _str_cls = str |
---|
99 | |
---|
100 | valid_key_chars_re = re.compile('[\x21-\x7e\x80-\xff]+$') |
---|
101 | |
---|
102 | |
---|
103 | # Original author: Evan Martin of Danga Interactive |
---|
104 | __author__ = "Sean Reifschneider <jafo-memcached@tummy.com>" |
---|
105 | __version__ = "1.53" |
---|
106 | __copyright__ = "Copyright (C) 2003 Danga Interactive" |
---|
107 | # http://en.wikipedia.org/wiki/Python_Software_Foundation_License |
---|
108 | __license__ = "Python Software Foundation License" |
---|
109 | |
---|
110 | SERVER_MAX_KEY_LENGTH = 250 |
---|
111 | # Storing values larger than 1MB requires recompiling memcached. If |
---|
112 | # you do, this value can be changed by doing |
---|
113 | # "memcache.SERVER_MAX_VALUE_LENGTH = N" after importing this module. |
---|
114 | SERVER_MAX_VALUE_LENGTH = 1024 * 1024 |
---|
115 | |
---|
116 | |
---|
117 | class _Error(Exception): |
---|
118 | pass |
---|
119 | |
---|
120 | |
---|
121 | class _ConnectionDeadError(Exception): |
---|
122 | pass |
---|
123 | |
---|
124 | |
---|
125 | _DEAD_RETRY = 30 # number of seconds before retrying a dead server. |
---|
126 | _SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. |
---|
127 | |
---|
128 | |
---|
129 | class Client(threading.local): |
---|
130 | """Object representing a pool of memcache servers. |
---|
131 | |
---|
132 | See L{memcache} for an overview. |
---|
133 | |
---|
134 | In all cases where a key is used, the key can be either: |
---|
135 | 1. A simple hashable type (string, integer, etc.). |
---|
136 | 2. A tuple of C{(hashvalue, key)}. This is useful if you want |
---|
137 | to avoid making this module calculate a hash value. You may |
---|
138 | prefer, for example, to keep all of a given user's objects on |
---|
139 | the same memcache server, so you could use the user's unique |
---|
140 | id as the hash value. |
---|
141 | |
---|
142 | |
---|
143 | @group Setup: __init__, set_servers, forget_dead_hosts, |
---|
144 | disconnect_all, debuglog |
---|
145 | @group Insertion: set, add, replace, set_multi |
---|
146 | @group Retrieval: get, get_multi |
---|
147 | @group Integers: incr, decr |
---|
148 | @group Removal: delete, delete_multi |
---|
149 | @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, |
---|
150 | debuglog,\ set, set_multi, add, replace, get, get_multi, |
---|
151 | incr, decr, delete, delete_multi |
---|
152 | """ |
---|
153 | _FLAG_PICKLE = 1 << 0 |
---|
154 | _FLAG_INTEGER = 1 << 1 |
---|
155 | _FLAG_LONG = 1 << 2 |
---|
156 | _FLAG_COMPRESSED = 1 << 3 |
---|
157 | |
---|
158 | _SERVER_RETRIES = 10 # how many times to try finding a free server. |
---|
159 | |
---|
160 | # exceptions for Client |
---|
161 | class MemcachedKeyError(Exception): |
---|
162 | pass |
---|
163 | |
---|
164 | class MemcachedKeyLengthError(MemcachedKeyError): |
---|
165 | pass |
---|
166 | |
---|
167 | class MemcachedKeyCharacterError(MemcachedKeyError): |
---|
168 | pass |
---|
169 | |
---|
170 | class MemcachedKeyNoneError(MemcachedKeyError): |
---|
171 | pass |
---|
172 | |
---|
173 | class MemcachedKeyTypeError(MemcachedKeyError): |
---|
174 | pass |
---|
175 | |
---|
176 | class MemcachedStringEncodingError(Exception): |
---|
177 | pass |
---|
178 | |
---|
179 | def __init__(self, servers, debug=0, pickleProtocol=0, |
---|
180 | pickler=pickle.Pickler, unpickler=pickle.Unpickler, |
---|
181 | pload=None, pid=None, |
---|
182 | server_max_key_length=None, server_max_value_length=None, |
---|
183 | dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT, |
---|
184 | cache_cas=False, flush_on_reconnect=0, check_keys=True): |
---|
185 | """Create a new Client object with the given list of servers. |
---|
186 | |
---|
187 | @param servers: C{servers} is passed to L{set_servers}. |
---|
188 | @param debug: whether to display error messages when a server |
---|
189 | can't be contacted. |
---|
190 | @param pickleProtocol: number to mandate protocol used by |
---|
191 | (c)Pickle. |
---|
192 | @param pickler: optional override of default Pickler to allow |
---|
193 | subclassing. |
---|
194 | @param unpickler: optional override of default Unpickler to |
---|
195 | allow subclassing. |
---|
196 | @param pload: optional persistent_load function to call on |
---|
197 | pickle loading. Useful for cPickle since subclassing isn't |
---|
198 | allowed. |
---|
199 | @param pid: optional persistent_id function to call on pickle |
---|
200 | storing. Useful for cPickle since subclassing isn't allowed. |
---|
201 | @param dead_retry: number of seconds before retrying a |
---|
202 | blacklisted server. Default to 30 s. |
---|
203 | @param socket_timeout: timeout in seconds for all calls to a |
---|
204 | server. Defaults to 3 seconds. |
---|
205 | @param cache_cas: (default False) If true, cas operations will |
---|
206 | be cached. WARNING: This cache is not expired internally, if |
---|
207 | you have a long-running process you will need to expire it |
---|
208 | manually via client.reset_cas(), or the cache can grow |
---|
209 | unlimited. |
---|
210 | @param server_max_key_length: (default SERVER_MAX_KEY_LENGTH) |
---|
211 | Data that is larger than this will not be sent to the server. |
---|
212 | @param server_max_value_length: (default |
---|
213 | SERVER_MAX_VALUE_LENGTH) Data that is larger than this will |
---|
214 | not be sent to the server. |
---|
215 | @param flush_on_reconnect: optional flag which prevents a |
---|
216 | scenario that can cause stale data to be read: If there's more |
---|
217 | than one memcached server and the connection to one is |
---|
218 | interrupted, keys that mapped to that server will get |
---|
219 | reassigned to another. If the first server comes back, those |
---|
220 | keys will map to it again. If it still has its data, get()s |
---|
221 | can read stale data that was overwritten on another |
---|
222 | server. This flag is off by default for backwards |
---|
223 | compatibility. |
---|
224 | @param check_keys: (default True) If True, the key is checked |
---|
225 | to ensure it is the correct length and composed of the right |
---|
226 | characters. |
---|
227 | """ |
---|
228 | super(Client, self).__init__() |
---|
229 | self.debug = debug |
---|
230 | self.dead_retry = dead_retry |
---|
231 | self.socket_timeout = socket_timeout |
---|
232 | self.flush_on_reconnect = flush_on_reconnect |
---|
233 | self.set_servers(servers) |
---|
234 | self.stats = {} |
---|
235 | self.cache_cas = cache_cas |
---|
236 | self.reset_cas() |
---|
237 | self.do_check_key = check_keys |
---|
238 | |
---|
239 | # Allow users to modify pickling/unpickling behavior |
---|
240 | self.pickleProtocol = pickleProtocol |
---|
241 | self.pickler = pickler |
---|
242 | self.unpickler = unpickler |
---|
243 | self.persistent_load = pload |
---|
244 | self.persistent_id = pid |
---|
245 | self.server_max_key_length = server_max_key_length |
---|
246 | if self.server_max_key_length is None: |
---|
247 | self.server_max_key_length = SERVER_MAX_KEY_LENGTH |
---|
248 | self.server_max_value_length = server_max_value_length |
---|
249 | if self.server_max_value_length is None: |
---|
250 | self.server_max_value_length = SERVER_MAX_VALUE_LENGTH |
---|
251 | |
---|
252 | # figure out the pickler style |
---|
253 | file = BytesIO() |
---|
254 | try: |
---|
255 | pickler = self.pickler(file, protocol=self.pickleProtocol) |
---|
256 | self.picklerIsKeyword = True |
---|
257 | except TypeError: |
---|
258 | self.picklerIsKeyword = False |
---|
259 | |
---|
260 | def reset_cas(self): |
---|
261 | """Reset the cas cache. |
---|
262 | |
---|
263 | This is only used if the Client() object was created with |
---|
264 | "cache_cas=True". If used, this cache does not expire |
---|
265 | internally, so it can grow unbounded if you do not clear it |
---|
266 | yourself. |
---|
267 | """ |
---|
268 | self.cas_ids = {} |
---|
269 | |
---|
270 | def set_servers(self, servers): |
---|
271 | """Set the pool of servers used by this client. |
---|
272 | |
---|
273 | @param servers: an array of servers. |
---|
274 | Servers can be passed in two forms: |
---|
275 | 1. Strings of the form C{"host:port"}, which implies a |
---|
276 | default weight of 1. |
---|
277 | 2. Tuples of the form C{("host:port", weight)}, where |
---|
278 | C{weight} is an integer weight value. |
---|
279 | |
---|
280 | """ |
---|
281 | self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry, |
---|
282 | socket_timeout=self.socket_timeout, |
---|
283 | flush_on_reconnect=self.flush_on_reconnect) |
---|
284 | for s in servers] |
---|
285 | self._init_buckets() |
---|
286 | |
---|
287 | def get_stats(self, stat_args=None): |
---|
288 | """Get statistics from each of the servers. |
---|
289 | |
---|
290 | @param stat_args: Additional arguments to pass to the memcache |
---|
291 | "stats" command. |
---|
292 | |
---|
293 | @return: A list of tuples ( server_identifier, |
---|
294 | stats_dictionary ). The dictionary contains a number of |
---|
295 | name/value pairs specifying the name of the status field |
---|
296 | and the string value associated with it. The values are |
---|
297 | not converted from strings. |
---|
298 | """ |
---|
299 | data = [] |
---|
300 | for s in self.servers: |
---|
301 | if not s.connect(): |
---|
302 | continue |
---|
303 | if s.family == socket.AF_INET: |
---|
304 | name = '%s:%s (%s)' % (s.ip, s.port, s.weight) |
---|
305 | elif s.family == socket.AF_INET6: |
---|
306 | name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight) |
---|
307 | else: |
---|
308 | name = 'unix:%s (%s)' % (s.address, s.weight) |
---|
309 | if not stat_args: |
---|
310 | s.send_cmd('stats') |
---|
311 | else: |
---|
312 | s.send_cmd('stats ' + stat_args) |
---|
313 | serverData = {} |
---|
314 | data.append((name, serverData)) |
---|
315 | readline = s.readline |
---|
316 | while 1: |
---|
317 | line = readline() |
---|
318 | if not line or line.strip() == 'END': |
---|
319 | break |
---|
320 | stats = line.split(' ', 2) |
---|
321 | serverData[stats[1]] = stats[2] |
---|
322 | |
---|
323 | return(data) |
---|
324 | |
---|
325 | def get_slabs(self): |
---|
326 | data = [] |
---|
327 | for s in self.servers: |
---|
328 | if not s.connect(): |
---|
329 | continue |
---|
330 | if s.family == socket.AF_INET: |
---|
331 | name = '%s:%s (%s)' % (s.ip, s.port, s.weight) |
---|
332 | elif s.family == socket.AF_INET6: |
---|
333 | name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight) |
---|
334 | else: |
---|
335 | name = 'unix:%s (%s)' % (s.address, s.weight) |
---|
336 | serverData = {} |
---|
337 | data.append((name, serverData)) |
---|
338 | s.send_cmd('stats items') |
---|
339 | readline = s.readline |
---|
340 | while 1: |
---|
341 | line = readline() |
---|
342 | if not line or line.strip() == 'END': |
---|
343 | break |
---|
344 | item = line.split(' ', 2) |
---|
345 | # 0 = STAT, 1 = ITEM, 2 = Value |
---|
346 | slab = item[1].split(':', 2) |
---|
347 | # 0 = items, 1 = Slab #, 2 = Name |
---|
348 | if slab[1] not in serverData: |
---|
349 | serverData[slab[1]] = {} |
---|
350 | serverData[slab[1]][slab[2]] = item[2] |
---|
351 | return data |
---|
352 | |
---|
353 | def flush_all(self): |
---|
354 | """Expire all data in memcache servers that are reachable.""" |
---|
355 | for s in self.servers: |
---|
356 | if not s.connect(): |
---|
357 | continue |
---|
358 | s.flush() |
---|
359 | |
---|
360 | def debuglog(self, str): |
---|
361 | if self.debug: |
---|
362 | sys.stderr.write("MemCached: %s\n" % str) |
---|
363 | |
---|
364 | def _statlog(self, func): |
---|
365 | if func not in self.stats: |
---|
366 | self.stats[func] = 1 |
---|
367 | else: |
---|
368 | self.stats[func] += 1 |
---|
369 | |
---|
370 | def forget_dead_hosts(self): |
---|
371 | """Reset every host in the pool to an "alive" state.""" |
---|
372 | for s in self.servers: |
---|
373 | s.deaduntil = 0 |
---|
374 | |
---|
375 | def _init_buckets(self): |
---|
376 | self.buckets = [] |
---|
377 | for server in self.servers: |
---|
378 | for i in range(server.weight): |
---|
379 | self.buckets.append(server) |
---|
380 | |
---|
381 | def _get_server(self, key): |
---|
382 | if isinstance(key, tuple): |
---|
383 | serverhash, key = key |
---|
384 | else: |
---|
385 | serverhash = serverHashFunction(key) |
---|
386 | |
---|
387 | if not self.buckets: |
---|
388 | return None, None |
---|
389 | |
---|
390 | for i in range(Client._SERVER_RETRIES): |
---|
391 | server = self.buckets[serverhash % len(self.buckets)] |
---|
392 | if server.connect(): |
---|
393 | # print("(using server %s)" % server,) |
---|
394 | return server, key |
---|
395 | serverhash = serverHashFunction(str(serverhash) + str(i)) |
---|
396 | return None, None |
---|
397 | |
---|
398 | def disconnect_all(self): |
---|
399 | for s in self.servers: |
---|
400 | s.close_socket() |
---|
401 | |
---|
402 | def delete_multi(self, keys, time=0, key_prefix=''): |
---|
403 | """Delete multiple keys in the memcache doing just one query. |
---|
404 | |
---|
405 | >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'}) |
---|
406 | >>> mc.get_multi(['a1', 'a2']) == {'a1' : 'val1','a2' : 'val2'} |
---|
407 | 1 |
---|
408 | >>> mc.delete_multi(['key1', 'key2']) |
---|
409 | 1 |
---|
410 | >>> mc.get_multi(['key1', 'key2']) == {} |
---|
411 | 1 |
---|
412 | |
---|
413 | This method is recommended over iterated regular L{delete}s as |
---|
414 | it reduces total latency, since your app doesn't have to wait |
---|
415 | for each round-trip of L{delete} before sending the next one. |
---|
416 | |
---|
417 | @param keys: An iterable of keys to clear |
---|
418 | @param time: number of seconds any subsequent set / update |
---|
419 | commands should fail. Defaults to 0 for no delay. |
---|
420 | @param key_prefix: Optional string to prepend to each key when |
---|
421 | sending to memcache. See docs for L{get_multi} and |
---|
422 | L{set_multi}. |
---|
423 | @return: 1 if no failure in communication with any memcacheds. |
---|
424 | @rtype: int |
---|
425 | """ |
---|
426 | |
---|
427 | self._statlog('delete_multi') |
---|
428 | |
---|
429 | server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( |
---|
430 | keys, key_prefix) |
---|
431 | |
---|
432 | # send out all requests on each server before reading anything |
---|
433 | dead_servers = [] |
---|
434 | |
---|
435 | rc = 1 |
---|
436 | for server in six.iterkeys(server_keys): |
---|
437 | bigcmd = [] |
---|
438 | write = bigcmd.append |
---|
439 | if time is not None: |
---|
440 | for key in server_keys[server]: # These are mangled keys |
---|
441 | write("delete %s %d\r\n" % (key, time)) |
---|
442 | else: |
---|
443 | for key in server_keys[server]: # These are mangled keys |
---|
444 | write("delete %s\r\n" % key) |
---|
445 | try: |
---|
446 | server.send_cmds(''.join(bigcmd)) |
---|
447 | except socket.error as msg: |
---|
448 | rc = 0 |
---|
449 | if isinstance(msg, tuple): |
---|
450 | msg = msg[1] |
---|
451 | server.mark_dead(msg) |
---|
452 | dead_servers.append(server) |
---|
453 | |
---|
454 | # if any servers died on the way, don't expect them to respond. |
---|
455 | for server in dead_servers: |
---|
456 | del server_keys[server] |
---|
457 | |
---|
458 | for server, keys in six.iteritems(server_keys): |
---|
459 | try: |
---|
460 | for key in keys: |
---|
461 | server.expect("DELETED") |
---|
462 | except socket.error as msg: |
---|
463 | if isinstance(msg, tuple): |
---|
464 | msg = msg[1] |
---|
465 | server.mark_dead(msg) |
---|
466 | rc = 0 |
---|
467 | return rc |
---|
468 | |
---|
469 | def delete(self, key, time=0): |
---|
470 | '''Deletes a key from the memcache. |
---|
471 | |
---|
472 | @return: Nonzero on success. |
---|
473 | @param time: number of seconds any subsequent set / update commands |
---|
474 | should fail. Defaults to None for no delay. |
---|
475 | @rtype: int |
---|
476 | ''' |
---|
477 | return self._deletetouch(['DELETED', 'NOT_FOUND'], "delete", key, time) |
---|
478 | |
---|
479 | def touch(self, key, time=0): |
---|
480 | '''Updates the expiration time of a key in memcache. |
---|
481 | |
---|
482 | @return: Nonzero on success. |
---|
483 | @param time: Tells memcached the time which this value should |
---|
484 | expire, either as a delta number of seconds, or an absolute |
---|
485 | unix time-since-the-epoch value. See the memcached protocol |
---|
486 | docs section "Storage Commands" for more info on <exptime>. We |
---|
487 | default to 0 == cache forever. |
---|
488 | @rtype: int |
---|
489 | ''' |
---|
490 | return self._deletetouch(['TOUCHED'], "touch", key, time) |
---|
491 | |
---|
492 | def _deletetouch(self, expected, cmd, key, time=0): |
---|
493 | if self.do_check_key: |
---|
494 | self.check_key(key) |
---|
495 | server, key = self._get_server(key) |
---|
496 | if not server: |
---|
497 | return 0 |
---|
498 | self._statlog(cmd) |
---|
499 | if time is not None and time != 0: |
---|
500 | cmd = "%s %s %d" % (cmd, key, time) |
---|
501 | else: |
---|
502 | cmd = "%s %s" % (cmd, key) |
---|
503 | |
---|
504 | try: |
---|
505 | server.send_cmd(cmd) |
---|
506 | line = server.readline() |
---|
507 | if line and line.strip() in expected: |
---|
508 | return 1 |
---|
509 | self.debuglog('%s expected %s, got: %r' |
---|
510 | % (cmd, ' or '.join(expected), line)) |
---|
511 | except socket.error as msg: |
---|
512 | if isinstance(msg, tuple): |
---|
513 | msg = msg[1] |
---|
514 | server.mark_dead(msg) |
---|
515 | return 0 |
---|
516 | |
---|
517 | def incr(self, key, delta=1): |
---|
518 | """Increment value for C{key} by C{delta} |
---|
519 | |
---|
520 | Sends a command to the server to atomically increment the |
---|
521 | value for C{key} by C{delta}, or by 1 if C{delta} is |
---|
522 | unspecified. Returns None if C{key} doesn't exist on server, |
---|
523 | otherwise it returns the new value after incrementing. |
---|
524 | |
---|
525 | Note that the value for C{key} must already exist in the |
---|
526 | memcache, and it must be the string representation of an |
---|
527 | integer. |
---|
528 | |
---|
529 | >>> mc.set("counter", "20") # returns 1, indicating success |
---|
530 | 1 |
---|
531 | >>> mc.incr("counter") |
---|
532 | 21 |
---|
533 | >>> mc.incr("counter") |
---|
534 | 22 |
---|
535 | |
---|
536 | Overflow on server is not checked. Be aware of values |
---|
537 | approaching 2**32. See L{decr}. |
---|
538 | |
---|
539 | @param delta: Integer amount to increment by (should be zero |
---|
540 | or greater). |
---|
541 | |
---|
542 | @return: New value after incrementing. |
---|
543 | @rtype: int |
---|
544 | """ |
---|
545 | return self._incrdecr("incr", key, delta) |
---|
546 | |
---|
547 | def decr(self, key, delta=1): |
---|
548 | """Decrement value for C{key} by C{delta} |
---|
549 | |
---|
550 | Like L{incr}, but decrements. Unlike L{incr}, underflow is |
---|
551 | checked and new values are capped at 0. If server value is 1, |
---|
552 | a decrement of 2 returns 0, not -1. |
---|
553 | |
---|
554 | @param delta: Integer amount to decrement by (should be zero |
---|
555 | or greater). |
---|
556 | |
---|
557 | @return: New value after decrementing or None on error. |
---|
558 | @rtype: int |
---|
559 | """ |
---|
560 | return self._incrdecr("decr", key, delta) |
---|
561 | |
---|
562 | def _incrdecr(self, cmd, key, delta): |
---|
563 | if self.do_check_key: |
---|
564 | self.check_key(key) |
---|
565 | server, key = self._get_server(key) |
---|
566 | if not server: |
---|
567 | return None |
---|
568 | self._statlog(cmd) |
---|
569 | cmd = "%s %s %d" % (cmd, key, delta) |
---|
570 | try: |
---|
571 | server.send_cmd(cmd) |
---|
572 | line = server.readline() |
---|
573 | if line is None or line.strip() == 'NOT_FOUND': |
---|
574 | return None |
---|
575 | return int(line) |
---|
576 | except socket.error as msg: |
---|
577 | if isinstance(msg, tuple): |
---|
578 | msg = msg[1] |
---|
579 | server.mark_dead(msg) |
---|
580 | return None |
---|
581 | |
---|
582 | def add(self, key, val, time=0, min_compress_len=0): |
---|
583 | '''Add new key with value. |
---|
584 | |
---|
585 | Like L{set}, but only stores in memcache if the key doesn't |
---|
586 | already exist. |
---|
587 | |
---|
588 | @return: Nonzero on success. |
---|
589 | @rtype: int |
---|
590 | ''' |
---|
591 | return self._set("add", key, val, time, min_compress_len) |
---|
592 | |
---|
593 | def append(self, key, val, time=0, min_compress_len=0): |
---|
594 | '''Append the value to the end of the existing key's value. |
---|
595 | |
---|
596 | Only stores in memcache if key already exists. |
---|
597 | Also see L{prepend}. |
---|
598 | |
---|
599 | @return: Nonzero on success. |
---|
600 | @rtype: int |
---|
601 | ''' |
---|
602 | return self._set("append", key, val, time, min_compress_len) |
---|
603 | |
---|
604 | def prepend(self, key, val, time=0, min_compress_len=0): |
---|
605 | '''Prepend the value to the beginning of the existing key's value. |
---|
606 | |
---|
607 | Only stores in memcache if key already exists. |
---|
608 | Also see L{append}. |
---|
609 | |
---|
610 | @return: Nonzero on success. |
---|
611 | @rtype: int |
---|
612 | ''' |
---|
613 | return self._set("prepend", key, val, time, min_compress_len) |
---|
614 | |
---|
615 | def replace(self, key, val, time=0, min_compress_len=0): |
---|
616 | '''Replace existing key with value. |
---|
617 | |
---|
618 | Like L{set}, but only stores in memcache if the key already exists. |
---|
619 | The opposite of L{add}. |
---|
620 | |
---|
621 | @return: Nonzero on success. |
---|
622 | @rtype: int |
---|
623 | ''' |
---|
624 | return self._set("replace", key, val, time, min_compress_len) |
---|
625 | |
---|
626 | def set(self, key, val, time=0, min_compress_len=0): |
---|
627 | '''Unconditionally sets a key to a given value in the memcache. |
---|
628 | |
---|
629 | The C{key} can optionally be an tuple, with the first element |
---|
630 | being the server hash value and the second being the key. If |
---|
631 | you want to avoid making this module calculate a hash value. |
---|
632 | You may prefer, for example, to keep all of a given user's |
---|
633 | objects on the same memcache server, so you could use the |
---|
634 | user's unique id as the hash value. |
---|
635 | |
---|
636 | @return: Nonzero on success. |
---|
637 | @rtype: int |
---|
638 | |
---|
639 | @param time: Tells memcached the time which this value should |
---|
640 | expire, either as a delta number of seconds, or an absolute |
---|
641 | unix time-since-the-epoch value. See the memcached protocol |
---|
642 | docs section "Storage Commands" for more info on <exptime>. We |
---|
643 | default to 0 == cache forever. |
---|
644 | |
---|
645 | @param min_compress_len: The threshold length to kick in |
---|
646 | auto-compression of the value using the zlib.compress() |
---|
647 | routine. If the value being cached is a string, then the |
---|
648 | length of the string is measured, else if the value is an |
---|
649 | object, then the length of the pickle result is measured. If |
---|
650 | the resulting attempt at compression yeilds a larger string |
---|
651 | than the input, then it is discarded. For backwards |
---|
652 | compatability, this parameter defaults to 0, indicating don't |
---|
653 | ever try to compress. |
---|
654 | |
---|
655 | ''' |
---|
656 | return self._set("set", key, val, time, min_compress_len) |
---|
657 | |
---|
658 | def cas(self, key, val, time=0, min_compress_len=0): |
---|
659 | '''Check and set (CAS) |
---|
660 | |
---|
661 | Sets a key to a given value in the memcache if it hasn't been |
---|
662 | altered since last fetched. (See L{gets}). |
---|
663 | |
---|
664 | The C{key} can optionally be an tuple, with the first element |
---|
665 | being the server hash value and the second being the key. If |
---|
666 | you want to avoid making this module calculate a hash value. |
---|
667 | You may prefer, for example, to keep all of a given user's |
---|
668 | objects on the same memcache server, so you could use the |
---|
669 | user's unique id as the hash value. |
---|
670 | |
---|
671 | @return: Nonzero on success. |
---|
672 | @rtype: int |
---|
673 | |
---|
674 | @param time: Tells memcached the time which this value should |
---|
675 | expire, either as a delta number of seconds, or an absolute |
---|
676 | unix time-since-the-epoch value. See the memcached protocol |
---|
677 | docs section "Storage Commands" for more info on <exptime>. We |
---|
678 | default to 0 == cache forever. |
---|
679 | |
---|
680 | @param min_compress_len: The threshold length to kick in |
---|
681 | auto-compression of the value using the zlib.compress() |
---|
682 | routine. If the value being cached is a string, then the |
---|
683 | length of the string is measured, else if the value is an |
---|
684 | object, then the length of the pickle result is measured. If |
---|
685 | the resulting attempt at compression yeilds a larger string |
---|
686 | than the input, then it is discarded. For backwards |
---|
687 | compatability, this parameter defaults to 0, indicating don't |
---|
688 | ever try to compress. |
---|
689 | ''' |
---|
690 | return self._set("cas", key, val, time, min_compress_len) |
---|
691 | |
---|
692 | def _map_and_prefix_keys(self, key_iterable, key_prefix): |
---|
693 | """Compute the mapping of server (_Host instance) -> list of keys to |
---|
694 | stuff onto that server, as well as the mapping of prefixed key |
---|
695 | -> original key. |
---|
696 | """ |
---|
697 | # Check it just once ... |
---|
698 | key_extra_len = len(key_prefix) |
---|
699 | if key_prefix and self.do_check_key: |
---|
700 | self.check_key(key_prefix) |
---|
701 | |
---|
702 | # server (_Host) -> list of unprefixed server keys in mapping |
---|
703 | server_keys = {} |
---|
704 | |
---|
705 | prefixed_to_orig_key = {} |
---|
706 | # build up a list for each server of all the keys we want. |
---|
707 | for orig_key in key_iterable: |
---|
708 | if isinstance(orig_key, tuple): |
---|
709 | # Tuple of hashvalue, key ala _get_server(). Caller is |
---|
710 | # essentially telling us what server to stuff this on. |
---|
711 | # Ensure call to _get_server gets a Tuple as well. |
---|
712 | str_orig_key = str(orig_key[1]) |
---|
713 | |
---|
714 | # Gotta pre-mangle key before hashing to a |
---|
715 | # server. Returns the mangled key. |
---|
716 | server, key = self._get_server( |
---|
717 | (orig_key[0], key_prefix + str_orig_key)) |
---|
718 | else: |
---|
719 | # set_multi supports int / long keys. |
---|
720 | str_orig_key = str(orig_key) |
---|
721 | server, key = self._get_server(key_prefix + str_orig_key) |
---|
722 | |
---|
723 | # Now check to make sure key length is proper ... |
---|
724 | if self.do_check_key: |
---|
725 | self.check_key(str_orig_key, key_extra_len=key_extra_len) |
---|
726 | |
---|
727 | if not server: |
---|
728 | continue |
---|
729 | |
---|
730 | if server not in server_keys: |
---|
731 | server_keys[server] = [] |
---|
732 | server_keys[server].append(key) |
---|
733 | prefixed_to_orig_key[key] = orig_key |
---|
734 | |
---|
735 | return (server_keys, prefixed_to_orig_key) |
---|
736 | |
---|
737 | def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): |
---|
738 | '''Sets multiple keys in the memcache doing just one query. |
---|
739 | |
---|
740 | >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) |
---|
741 | >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', |
---|
742 | ... 'key2' : 'val2'} |
---|
743 | 1 |
---|
744 | |
---|
745 | |
---|
746 | This method is recommended over regular L{set} as it lowers |
---|
747 | the number of total packets flying around your network, |
---|
748 | reducing total latency, since your app doesn't have to wait |
---|
749 | for each round-trip of L{set} before sending the next one. |
---|
750 | |
---|
751 | @param mapping: A dict of key/value pairs to set. |
---|
752 | |
---|
753 | @param time: Tells memcached the time which this value should |
---|
754 | expire, either as a delta number of seconds, or an |
---|
755 | absolute unix time-since-the-epoch value. See the |
---|
756 | memcached protocol docs section "Storage Commands" for |
---|
757 | more info on <exptime>. We default to 0 == cache forever. |
---|
758 | |
---|
759 | @param key_prefix: Optional string to prepend to each key when |
---|
760 | sending to memcache. Allows you to efficiently stuff these |
---|
761 | keys into a pseudo-namespace in memcache: |
---|
762 | |
---|
763 | >>> notset_keys = mc.set_multi( |
---|
764 | ... {'key1' : 'val1', 'key2' : 'val2'}, |
---|
765 | ... key_prefix='subspace_') |
---|
766 | >>> len(notset_keys) == 0 |
---|
767 | True |
---|
768 | >>> mc.get_multi(['subspace_key1', |
---|
769 | ... 'subspace_key2']) == {'subspace_key1': 'val1', |
---|
770 | ... 'subspace_key2' : 'val2'} |
---|
771 | True |
---|
772 | |
---|
773 | Causes key 'subspace_key1' and 'subspace_key2' to be |
---|
774 | set. Useful in conjunction with a higher-level layer which |
---|
775 | applies namespaces to data in memcache. In this case, the |
---|
776 | return result would be the list of notset original keys, |
---|
777 | prefix not applied. |
---|
778 | |
---|
779 | @param min_compress_len: The threshold length to kick in |
---|
780 | auto-compression of the value using the zlib.compress() |
---|
781 | routine. If the value being cached is a string, then the |
---|
782 | length of the string is measured, else if the value is an |
---|
783 | object, then the length of the pickle result is |
---|
784 | measured. If the resulting attempt at compression yeilds a |
---|
785 | larger string than the input, then it is discarded. For |
---|
786 | backwards compatability, this parameter defaults to 0, |
---|
787 | indicating don't ever try to compress. |
---|
788 | |
---|
789 | @return: List of keys which failed to be stored [ memcache out |
---|
790 | of memory, etc. ]. |
---|
791 | |
---|
792 | @rtype: list |
---|
793 | ''' |
---|
794 | self._statlog('set_multi') |
---|
795 | |
---|
796 | server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( |
---|
797 | six.iterkeys(mapping), key_prefix) |
---|
798 | |
---|
799 | # send out all requests on each server before reading anything |
---|
800 | dead_servers = [] |
---|
801 | notstored = [] # original keys. |
---|
802 | |
---|
803 | for server in six.iterkeys(server_keys): |
---|
804 | bigcmd = [] |
---|
805 | write = bigcmd.append |
---|
806 | try: |
---|
807 | for key in server_keys[server]: # These are mangled keys |
---|
808 | store_info = self._val_to_store_info( |
---|
809 | mapping[prefixed_to_orig_key[key]], |
---|
810 | min_compress_len) |
---|
811 | if store_info: |
---|
812 | msg = "set %s %d %d %d\r\n%s\r\n" |
---|
813 | write(msg % (key, |
---|
814 | store_info[0], |
---|
815 | time, |
---|
816 | store_info[1], |
---|
817 | store_info[2])) |
---|
818 | else: |
---|
819 | notstored.append(prefixed_to_orig_key[key]) |
---|
820 | server.send_cmds(''.join(bigcmd)) |
---|
821 | except socket.error as msg: |
---|
822 | if isinstance(msg, tuple): |
---|
823 | msg = msg[1] |
---|
824 | server.mark_dead(msg) |
---|
825 | dead_servers.append(server) |
---|
826 | |
---|
827 | # if any servers died on the way, don't expect them to respond. |
---|
828 | for server in dead_servers: |
---|
829 | del server_keys[server] |
---|
830 | |
---|
831 | # short-circuit if there are no servers, just return all keys |
---|
832 | if not server_keys: |
---|
833 | return(mapping.keys()) |
---|
834 | |
---|
835 | for server, keys in six.iteritems(server_keys): |
---|
836 | try: |
---|
837 | for key in keys: |
---|
838 | if server.readline() == 'STORED': |
---|
839 | continue |
---|
840 | else: |
---|
841 | # un-mangle. |
---|
842 | notstored.append(prefixed_to_orig_key[key]) |
---|
843 | except (_Error, socket.error) as msg: |
---|
844 | if isinstance(msg, tuple): |
---|
845 | msg = msg[1] |
---|
846 | server.mark_dead(msg) |
---|
847 | return notstored |
---|
848 | |
---|
849 | def _val_to_store_info(self, val, min_compress_len): |
---|
850 | """Transform val to a storable representation. |
---|
851 | |
---|
852 | Returns a tuple of the flags, the length of the new value, and |
---|
853 | the new value itself. |
---|
854 | """ |
---|
855 | flags = 0 |
---|
856 | if isinstance(val, str): |
---|
857 | pass |
---|
858 | elif isinstance(val, int): |
---|
859 | flags |= Client._FLAG_INTEGER |
---|
860 | val = "%d" % val |
---|
861 | # force no attempt to compress this silly string. |
---|
862 | min_compress_len = 0 |
---|
863 | elif isinstance(val, long): |
---|
864 | flags |= Client._FLAG_LONG |
---|
865 | val = "%d" % val |
---|
866 | # force no attempt to compress this silly string. |
---|
867 | min_compress_len = 0 |
---|
868 | else: |
---|
869 | flags |= Client._FLAG_PICKLE |
---|
870 | file = BytesIO() |
---|
871 | if self.picklerIsKeyword: |
---|
872 | pickler = self.pickler(file, protocol=self.pickleProtocol) |
---|
873 | else: |
---|
874 | pickler = self.pickler(file, self.pickleProtocol) |
---|
875 | if self.persistent_id: |
---|
876 | pickler.persistent_id = self.persistent_id |
---|
877 | pickler.dump(val) |
---|
878 | val = file.getvalue() |
---|
879 | |
---|
880 | lv = len(val) |
---|
881 | # We should try to compress if min_compress_len > 0 and we |
---|
882 | # could import zlib and this string is longer than our min |
---|
883 | # threshold. |
---|
884 | if min_compress_len and lv > min_compress_len: |
---|
885 | comp_val = zlib.compress(val) |
---|
886 | # Only retain the result if the compression result is smaller |
---|
887 | # than the original. |
---|
888 | if len(comp_val) < lv: |
---|
889 | flags |= Client._FLAG_COMPRESSED |
---|
890 | val = comp_val |
---|
891 | |
---|
892 | # silently do not store if value length exceeds maximum |
---|
893 | if (self.server_max_value_length != 0 and |
---|
894 | len(val) > self.server_max_value_length): |
---|
895 | return(0) |
---|
896 | |
---|
897 | return (flags, len(val), val) |
---|
898 | |
---|
899 | def _set(self, cmd, key, val, time, min_compress_len=0): |
---|
900 | if self.do_check_key: |
---|
901 | self.check_key(key) |
---|
902 | server, key = self._get_server(key) |
---|
903 | if not server: |
---|
904 | return 0 |
---|
905 | |
---|
906 | def _unsafe_set(): |
---|
907 | self._statlog(cmd) |
---|
908 | |
---|
909 | store_info = self._val_to_store_info(val, min_compress_len) |
---|
910 | if not store_info: |
---|
911 | return(0) |
---|
912 | |
---|
913 | if cmd == 'cas': |
---|
914 | if key not in self.cas_ids: |
---|
915 | return self._set('set', key, val, time, min_compress_len) |
---|
916 | fullcmd = "%s %s %d %d %d %d\r\n%s" % ( |
---|
917 | cmd, key, store_info[0], time, store_info[1], |
---|
918 | self.cas_ids[key], store_info[2]) |
---|
919 | else: |
---|
920 | fullcmd = "%s %s %d %d %d\r\n%s" % ( |
---|
921 | cmd, key, store_info[0], |
---|
922 | time, store_info[1], store_info[2] |
---|
923 | ) |
---|
924 | |
---|
925 | try: |
---|
926 | server.send_cmd(fullcmd) |
---|
927 | return(server.expect("STORED", raise_exception=True) |
---|
928 | == "STORED") |
---|
929 | except socket.error as msg: |
---|
930 | if isinstance(msg, tuple): |
---|
931 | msg = msg[1] |
---|
932 | server.mark_dead(msg) |
---|
933 | return 0 |
---|
934 | |
---|
935 | try: |
---|
936 | return _unsafe_set() |
---|
937 | except _ConnectionDeadError: |
---|
938 | # retry once |
---|
939 | try: |
---|
940 | if server._get_socket(): |
---|
941 | return _unsafe_set() |
---|
942 | except (_ConnectionDeadError, socket.error) as msg: |
---|
943 | server.mark_dead(msg) |
---|
944 | return 0 |
---|
945 | |
---|
946 | def _get(self, cmd, key): |
---|
947 | if self.do_check_key: |
---|
948 | self.check_key(key) |
---|
949 | server, key = self._get_server(key) |
---|
950 | if not server: |
---|
951 | return None |
---|
952 | |
---|
953 | def _unsafe_get(): |
---|
954 | self._statlog(cmd) |
---|
955 | |
---|
956 | try: |
---|
957 | server.send_cmd("%s %s" % (cmd, key)) |
---|
958 | rkey = flags = rlen = cas_id = None |
---|
959 | |
---|
960 | if cmd == 'gets': |
---|
961 | rkey, flags, rlen, cas_id, = self._expect_cas_value( |
---|
962 | server, raise_exception=True |
---|
963 | ) |
---|
964 | if rkey and self.cache_cas: |
---|
965 | self.cas_ids[rkey] = cas_id |
---|
966 | else: |
---|
967 | rkey, flags, rlen, = self._expectvalue( |
---|
968 | server, raise_exception=True |
---|
969 | ) |
---|
970 | |
---|
971 | if not rkey: |
---|
972 | return None |
---|
973 | try: |
---|
974 | value = self._recv_value(server, flags, rlen) |
---|
975 | finally: |
---|
976 | server.expect("END", raise_exception=True) |
---|
977 | except (_Error, socket.error) as msg: |
---|
978 | if isinstance(msg, tuple): |
---|
979 | msg = msg[1] |
---|
980 | server.mark_dead(msg) |
---|
981 | return None |
---|
982 | |
---|
983 | return value |
---|
984 | |
---|
985 | try: |
---|
986 | return _unsafe_get() |
---|
987 | except _ConnectionDeadError: |
---|
988 | # retry once |
---|
989 | try: |
---|
990 | if server.connect(): |
---|
991 | return _unsafe_get() |
---|
992 | return None |
---|
993 | except (_ConnectionDeadError, socket.error) as msg: |
---|
994 | server.mark_dead(msg) |
---|
995 | return None |
---|
996 | |
---|
997 | def get(self, key): |
---|
998 | '''Retrieves a key from the memcache. |
---|
999 | |
---|
1000 | @return: The value or None. |
---|
1001 | ''' |
---|
1002 | return self._get('get', key) |
---|
1003 | |
---|
1004 | def gets(self, key): |
---|
1005 | '''Retrieves a key from the memcache. Used in conjunction with 'cas'. |
---|
1006 | |
---|
1007 | @return: The value or None. |
---|
1008 | ''' |
---|
1009 | return self._get('gets', key) |
---|
1010 | |
---|
1011 | def get_multi(self, keys, key_prefix=''): |
---|
1012 | '''Retrieves multiple keys from the memcache doing just one query. |
---|
1013 | |
---|
1014 | >>> success = mc.set("foo", "bar") |
---|
1015 | >>> success = mc.set("baz", 42) |
---|
1016 | >>> mc.get_multi(["foo", "baz", "foobar"]) == { |
---|
1017 | ... "foo": "bar", "baz": 42 |
---|
1018 | ... } |
---|
1019 | 1 |
---|
1020 | >>> mc.set_multi({'k1' : 1, 'k2' : 2}, key_prefix='pfx_') == [] |
---|
1021 | 1 |
---|
1022 | |
---|
1023 | This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict |
---|
1024 | will just have unprefixed keys 'k1', 'k2'. |
---|
1025 | |
---|
1026 | >>> mc.get_multi(['k1', 'k2', 'nonexist'], |
---|
1027 | ... key_prefix='pfx_') == {'k1' : 1, 'k2' : 2} |
---|
1028 | 1 |
---|
1029 | |
---|
1030 | get_mult [ and L{set_multi} ] can take str()-ables like ints / |
---|
1031 | longs as keys too. Such as your db pri key fields. They're |
---|
1032 | rotored through str() before being passed off to memcache, |
---|
1033 | with or without the use of a key_prefix. In this mode, the |
---|
1034 | key_prefix could be a table name, and the key itself a db |
---|
1035 | primary key number. |
---|
1036 | |
---|
1037 | >>> mc.set_multi({42: 'douglass adams', |
---|
1038 | ... 46: 'and 2 just ahead of me'}, |
---|
1039 | ... key_prefix='numkeys_') == [] |
---|
1040 | 1 |
---|
1041 | >>> mc.get_multi([46, 42], key_prefix='numkeys_') == { |
---|
1042 | ... 42: 'douglass adams', |
---|
1043 | ... 46: 'and 2 just ahead of me' |
---|
1044 | ... } |
---|
1045 | 1 |
---|
1046 | |
---|
1047 | This method is recommended over regular L{get} as it lowers |
---|
1048 | the number of total packets flying around your network, |
---|
1049 | reducing total latency, since your app doesn't have to wait |
---|
1050 | for each round-trip of L{get} before sending the next one. |
---|
1051 | |
---|
1052 | See also L{set_multi}. |
---|
1053 | |
---|
1054 | @param keys: An array of keys. |
---|
1055 | |
---|
1056 | @param key_prefix: A string to prefix each key when we |
---|
1057 | communicate with memcache. Facilitates pseudo-namespaces |
---|
1058 | within memcache. Returned dictionary keys will not have this |
---|
1059 | prefix. |
---|
1060 | |
---|
1061 | @return: A dictionary of key/value pairs that were |
---|
1062 | available. If key_prefix was provided, the keys in the retured |
---|
1063 | dictionary will not have it present. |
---|
1064 | ''' |
---|
1065 | |
---|
1066 | self._statlog('get_multi') |
---|
1067 | |
---|
1068 | server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( |
---|
1069 | keys, key_prefix) |
---|
1070 | |
---|
1071 | # send out all requests on each server before reading anything |
---|
1072 | dead_servers = [] |
---|
1073 | for server in six.iterkeys(server_keys): |
---|
1074 | try: |
---|
1075 | server.send_cmd("get %s" % " ".join(server_keys[server])) |
---|
1076 | except socket.error as msg: |
---|
1077 | if isinstance(msg, tuple): |
---|
1078 | msg = msg[1] |
---|
1079 | server.mark_dead(msg) |
---|
1080 | dead_servers.append(server) |
---|
1081 | |
---|
1082 | # if any servers died on the way, don't expect them to respond. |
---|
1083 | for server in dead_servers: |
---|
1084 | del server_keys[server] |
---|
1085 | |
---|
1086 | retvals = {} |
---|
1087 | for server in six.iterkeys(server_keys): |
---|
1088 | try: |
---|
1089 | line = server.readline() |
---|
1090 | while line and line != 'END': |
---|
1091 | rkey, flags, rlen = self._expectvalue(server, line) |
---|
1092 | # Bo Yang reports that this can sometimes be None |
---|
1093 | if rkey is not None: |
---|
1094 | val = self._recv_value(server, flags, rlen) |
---|
1095 | # un-prefix returned key. |
---|
1096 | retvals[prefixed_to_orig_key[rkey]] = val |
---|
1097 | line = server.readline() |
---|
1098 | except (_Error, socket.error) as msg: |
---|
1099 | if isinstance(msg, tuple): |
---|
1100 | msg = msg[1] |
---|
1101 | server.mark_dead(msg) |
---|
1102 | return retvals |
---|
1103 | |
---|
1104 | def _expect_cas_value(self, server, line=None, raise_exception=False): |
---|
1105 | if not line: |
---|
1106 | line = server.readline(raise_exception) |
---|
1107 | |
---|
1108 | if line and line[:5] == 'VALUE': |
---|
1109 | resp, rkey, flags, len, cas_id = line.split() |
---|
1110 | return (rkey, int(flags), int(len), int(cas_id)) |
---|
1111 | else: |
---|
1112 | return (None, None, None, None) |
---|
1113 | |
---|
1114 | def _expectvalue(self, server, line=None, raise_exception=False): |
---|
1115 | if not line: |
---|
1116 | line = server.readline(raise_exception) |
---|
1117 | |
---|
1118 | if line and line[:5] == 'VALUE': |
---|
1119 | resp, rkey, flags, len = line.split() |
---|
1120 | flags = int(flags) |
---|
1121 | rlen = int(len) |
---|
1122 | return (rkey, flags, rlen) |
---|
1123 | else: |
---|
1124 | return (None, None, None) |
---|
1125 | |
---|
1126 | def _recv_value(self, server, flags, rlen): |
---|
1127 | rlen += 2 # include \r\n |
---|
1128 | buf = server.recv(rlen) |
---|
1129 | if len(buf) != rlen: |
---|
1130 | raise _Error("received %d bytes when expecting %d" |
---|
1131 | % (len(buf), rlen)) |
---|
1132 | |
---|
1133 | if len(buf) == rlen: |
---|
1134 | buf = buf[:-2] # strip \r\n |
---|
1135 | |
---|
1136 | if flags & Client._FLAG_COMPRESSED: |
---|
1137 | buf = zlib.decompress(buf) |
---|
1138 | |
---|
1139 | if flags == 0 or flags == Client._FLAG_COMPRESSED: |
---|
1140 | # Either a bare string or a compressed string now decompressed... |
---|
1141 | val = buf |
---|
1142 | elif flags & Client._FLAG_INTEGER: |
---|
1143 | val = int(buf) |
---|
1144 | elif flags & Client._FLAG_LONG: |
---|
1145 | val = long(buf) |
---|
1146 | elif flags & Client._FLAG_PICKLE: |
---|
1147 | try: |
---|
1148 | file = BytesIO(buf) |
---|
1149 | unpickler = self.unpickler(file) |
---|
1150 | if self.persistent_load: |
---|
1151 | unpickler.persistent_load = self.persistent_load |
---|
1152 | val = unpickler.load() |
---|
1153 | except Exception as e: |
---|
1154 | self.debuglog('Pickle error: %s\n' % e) |
---|
1155 | return None |
---|
1156 | else: |
---|
1157 | self.debuglog("unknown flags on get: %x\n" % flags) |
---|
1158 | raise ValueError('Unknown flags on get: %x' % flags) |
---|
1159 | |
---|
1160 | return val |
---|
1161 | |
---|
1162 | def check_key(self, key, key_extra_len=0): |
---|
1163 | """Checks sanity of key. |
---|
1164 | |
---|
1165 | Fails if: |
---|
1166 | |
---|
1167 | Key length is > SERVER_MAX_KEY_LENGTH (Raises MemcachedKeyLength). |
---|
1168 | Contains control characters (Raises MemcachedKeyCharacterError). |
---|
1169 | Is not a string (Raises MemcachedStringEncodingError) |
---|
1170 | Is an unicode string (Raises MemcachedStringEncodingError) |
---|
1171 | Is not a string (Raises MemcachedKeyError) |
---|
1172 | Is None (Raises MemcachedKeyError) |
---|
1173 | """ |
---|
1174 | if isinstance(key, tuple): |
---|
1175 | key = key[1] |
---|
1176 | if not key: |
---|
1177 | raise Client.MemcachedKeyNoneError("Key is None") |
---|
1178 | |
---|
1179 | # Make sure we're not a specific unicode type, if we're old enough that |
---|
1180 | # it's a separate type. |
---|
1181 | if _has_unicode is True and isinstance(key, unicode): |
---|
1182 | raise Client.MemcachedStringEncodingError( |
---|
1183 | "Keys must be str()'s, not unicode. Convert your unicode " |
---|
1184 | "strings using mystring.encode(charset)!") |
---|
1185 | if not isinstance(key, str): |
---|
1186 | raise Client.MemcachedKeyTypeError("Key must be str()'s") |
---|
1187 | |
---|
1188 | if isinstance(key, _str_cls): |
---|
1189 | if (self.server_max_key_length != 0 and |
---|
1190 | len(key) + key_extra_len > self.server_max_key_length): |
---|
1191 | raise Client.MemcachedKeyLengthError( |
---|
1192 | "Key length is > %s" % self.server_max_key_length |
---|
1193 | ) |
---|
1194 | if not valid_key_chars_re.match(key): |
---|
1195 | raise Client.MemcachedKeyCharacterError( |
---|
1196 | "Control characters not allowed") |
---|
1197 | |
---|
1198 | |
---|
1199 | class _Host(object): |
---|
1200 | |
---|
1201 | def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY, |
---|
1202 | socket_timeout=_SOCKET_TIMEOUT, flush_on_reconnect=0): |
---|
1203 | self.dead_retry = dead_retry |
---|
1204 | self.socket_timeout = socket_timeout |
---|
1205 | self.debug = debug |
---|
1206 | self.flush_on_reconnect = flush_on_reconnect |
---|
1207 | if isinstance(host, tuple): |
---|
1208 | host, self.weight = host |
---|
1209 | else: |
---|
1210 | self.weight = 1 |
---|
1211 | |
---|
1212 | # parse the connection string |
---|
1213 | m = re.match(r'^(?P<proto>unix):(?P<path>.*)$', host) |
---|
1214 | if not m: |
---|
1215 | m = re.match(r'^(?P<proto>inet6):' |
---|
1216 | r'\[(?P<host>[^\[\]]+)\](:(?P<port>[0-9]+))?$', host) |
---|
1217 | if not m: |
---|
1218 | m = re.match(r'^(?P<proto>inet):' |
---|
1219 | r'(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host) |
---|
1220 | if not m: |
---|
1221 | m = re.match(r'^(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host) |
---|
1222 | if not m: |
---|
1223 | raise ValueError('Unable to parse connection string: "%s"' % host) |
---|
1224 | |
---|
1225 | hostData = m.groupdict() |
---|
1226 | if hostData.get('proto') == 'unix': |
---|
1227 | self.family = socket.AF_UNIX |
---|
1228 | self.address = hostData['path'] |
---|
1229 | elif hostData.get('proto') == 'inet6': |
---|
1230 | self.family = socket.AF_INET6 |
---|
1231 | self.ip = hostData['host'] |
---|
1232 | self.port = int(hostData.get('port') or 11211) |
---|
1233 | self.address = (self.ip, self.port) |
---|
1234 | else: |
---|
1235 | self.family = socket.AF_INET |
---|
1236 | self.ip = hostData['host'] |
---|
1237 | self.port = int(hostData.get('port') or 11211) |
---|
1238 | self.address = (self.ip, self.port) |
---|
1239 | |
---|
1240 | self.deaduntil = 0 |
---|
1241 | self.socket = None |
---|
1242 | self.flush_on_next_connect = 0 |
---|
1243 | |
---|
1244 | self.buffer = '' |
---|
1245 | |
---|
1246 | def debuglog(self, str): |
---|
1247 | if self.debug: |
---|
1248 | sys.stderr.write("MemCached: %s\n" % str) |
---|
1249 | |
---|
1250 | def _check_dead(self): |
---|
1251 | if self.deaduntil and self.deaduntil > time.time(): |
---|
1252 | return 1 |
---|
1253 | self.deaduntil = 0 |
---|
1254 | return 0 |
---|
1255 | |
---|
1256 | def connect(self): |
---|
1257 | if self._get_socket(): |
---|
1258 | return 1 |
---|
1259 | return 0 |
---|
1260 | |
---|
1261 | def mark_dead(self, reason): |
---|
1262 | self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason)) |
---|
1263 | self.deaduntil = time.time() + self.dead_retry |
---|
1264 | if self.flush_on_reconnect: |
---|
1265 | self.flush_on_next_connect = 1 |
---|
1266 | self.close_socket() |
---|
1267 | |
---|
1268 | def _get_socket(self): |
---|
1269 | if self._check_dead(): |
---|
1270 | return None |
---|
1271 | if self.socket: |
---|
1272 | return self.socket |
---|
1273 | s = socket.socket(self.family, socket.SOCK_STREAM) |
---|
1274 | if hasattr(s, 'settimeout'): |
---|
1275 | s.settimeout(self.socket_timeout) |
---|
1276 | try: |
---|
1277 | s.connect(self.address) |
---|
1278 | except socket.timeout as msg: |
---|
1279 | self.mark_dead("connect: %s" % msg) |
---|
1280 | return None |
---|
1281 | except socket.error as msg: |
---|
1282 | if isinstance(msg, tuple): |
---|
1283 | msg = msg[1] |
---|
1284 | self.mark_dead("connect: %s" % msg) |
---|
1285 | return None |
---|
1286 | self.socket = s |
---|
1287 | self.buffer = '' |
---|
1288 | if self.flush_on_next_connect: |
---|
1289 | self.flush() |
---|
1290 | self.flush_on_next_connect = 0 |
---|
1291 | return s |
---|
1292 | |
---|
1293 | def close_socket(self): |
---|
1294 | if self.socket: |
---|
1295 | self.socket.close() |
---|
1296 | self.socket = None |
---|
1297 | |
---|
1298 | def send_cmd(self, cmd): |
---|
1299 | self.socket.sendall(cmd + '\r\n') |
---|
1300 | |
---|
1301 | def send_cmds(self, cmds): |
---|
1302 | """cmds already has trailing \r\n's applied.""" |
---|
1303 | self.socket.sendall(cmds) |
---|
1304 | |
---|
1305 | def readline(self, raise_exception=False): |
---|
1306 | """Read a line and return it. |
---|
1307 | |
---|
1308 | If "raise_exception" is set, raise _ConnectionDeadError if the |
---|
1309 | read fails, otherwise return an empty string. |
---|
1310 | """ |
---|
1311 | buf = self.buffer |
---|
1312 | if self.socket: |
---|
1313 | recv = self.socket.recv |
---|
1314 | else: |
---|
1315 | recv = lambda bufsize: '' |
---|
1316 | |
---|
1317 | while True: |
---|
1318 | index = buf.find('\r\n') |
---|
1319 | if index >= 0: |
---|
1320 | break |
---|
1321 | data = recv(4096) |
---|
1322 | if not data: |
---|
1323 | # connection close, let's kill it and raise |
---|
1324 | self.mark_dead('connection closed in readline()') |
---|
1325 | if raise_exception: |
---|
1326 | raise _ConnectionDeadError() |
---|
1327 | else: |
---|
1328 | return '' |
---|
1329 | |
---|
1330 | buf += data |
---|
1331 | self.buffer = buf[index + 2:] |
---|
1332 | return buf[:index] |
---|
1333 | |
---|
1334 | def expect(self, text, raise_exception=False): |
---|
1335 | line = self.readline(raise_exception) |
---|
1336 | if line != text: |
---|
1337 | self.debuglog("while expecting '%s', got unexpected response '%s'" |
---|
1338 | % (text, line)) |
---|
1339 | return line |
---|
1340 | |
---|
1341 | def recv(self, rlen): |
---|
1342 | self_socket_recv = self.socket.recv |
---|
1343 | buf = self.buffer |
---|
1344 | while len(buf) < rlen: |
---|
1345 | foo = self_socket_recv(max(rlen - len(buf), 4096)) |
---|
1346 | buf += foo |
---|
1347 | if not foo: |
---|
1348 | raise _Error('Read %d bytes, expecting %d, ' |
---|
1349 | 'read returned 0 length bytes' % (len(buf), rlen)) |
---|
1350 | self.buffer = buf[rlen:] |
---|
1351 | return buf[:rlen] |
---|
1352 | |
---|
1353 | def flush(self): |
---|
1354 | self.send_cmd('flush_all') |
---|
1355 | self.expect('OK') |
---|
1356 | |
---|
1357 | def __str__(self): |
---|
1358 | d = '' |
---|
1359 | if self.deaduntil: |
---|
1360 | d = " (dead until %d)" % self.deaduntil |
---|
1361 | |
---|
1362 | if self.family == socket.AF_INET: |
---|
1363 | return "inet:%s:%d%s" % (self.address[0], self.address[1], d) |
---|
1364 | elif self.family == socket.AF_INET6: |
---|
1365 | return "inet6:[%s]:%d%s" % (self.address[0], self.address[1], d) |
---|
1366 | else: |
---|
1367 | return "unix:%s%s" % (self.address, d) |
---|
1368 | |
---|
1369 | |
---|
1370 | def _doctest(): |
---|
1371 | import doctest |
---|
1372 | import memcache |
---|
1373 | servers = ["127.0.0.1:11211"] |
---|
1374 | mc = Client(servers, debug=1) |
---|
1375 | globs = {"mc": mc} |
---|
1376 | return doctest.testmod(memcache, globs=globs) |
---|
1377 | |
---|
1378 | if __name__ == "__main__": |
---|
1379 | failures = 0 |
---|
1380 | print("Testing docstrings...") |
---|
1381 | _doctest() |
---|
1382 | print("Running tests:") |
---|
1383 | print() |
---|
1384 | serverList = [["127.0.0.1:11211"]] |
---|
1385 | if '--do-unix' in sys.argv: |
---|
1386 | serverList.append([os.path.join(os.getcwd(), 'memcached.socket')]) |
---|
1387 | |
---|
1388 | for servers in serverList: |
---|
1389 | mc = Client(servers, debug=1) |
---|
1390 | |
---|
1391 | def to_s(val): |
---|
1392 | if not isinstance(val, _str_cls): |
---|
1393 | return "%s (%s)" % (val, type(val)) |
---|
1394 | return "%s" % val |
---|
1395 | |
---|
1396 | def test_setget(key, val): |
---|
1397 | global failures |
---|
1398 | print("Testing set/get {'%s': %s} ..." |
---|
1399 | % (to_s(key), to_s(val)), end=" ") |
---|
1400 | mc.set(key, val) |
---|
1401 | newval = mc.get(key) |
---|
1402 | if newval == val: |
---|
1403 | print("OK") |
---|
1404 | return 1 |
---|
1405 | else: |
---|
1406 | print("FAIL") |
---|
1407 | failures += 1 |
---|
1408 | return 0 |
---|
1409 | |
---|
1410 | class FooStruct(object): |
---|
1411 | |
---|
1412 | def __init__(self): |
---|
1413 | self.bar = "baz" |
---|
1414 | |
---|
1415 | def __str__(self): |
---|
1416 | return "A FooStruct" |
---|
1417 | |
---|
1418 | def __eq__(self, other): |
---|
1419 | if isinstance(other, FooStruct): |
---|
1420 | return self.bar == other.bar |
---|
1421 | return 0 |
---|
1422 | |
---|
1423 | test_setget("a_string", "some random string") |
---|
1424 | test_setget("an_integer", 42) |
---|
1425 | if test_setget("long", long(1 << 30)): |
---|
1426 | print("Testing delete ...", end=" ") |
---|
1427 | if mc.delete("long"): |
---|
1428 | print("OK") |
---|
1429 | else: |
---|
1430 | print("FAIL") |
---|
1431 | failures += 1 |
---|
1432 | print("Checking results of delete ...", end=" ") |
---|
1433 | if mc.get("long") is None: |
---|
1434 | print("OK") |
---|
1435 | else: |
---|
1436 | print("FAIL") |
---|
1437 | failures += 1 |
---|
1438 | print("Testing get_multi ...",) |
---|
1439 | print(mc.get_multi(["a_string", "an_integer"])) |
---|
1440 | |
---|
1441 | # removed from the protocol |
---|
1442 | # if test_setget("timed_delete", 'foo'): |
---|
1443 | # print "Testing timed delete ...", |
---|
1444 | # if mc.delete("timed_delete", 1): |
---|
1445 | # print("OK") |
---|
1446 | # else: |
---|
1447 | # print("FAIL") |
---|
1448 | # failures += 1 |
---|
1449 | # print "Checking results of timed delete ..." |
---|
1450 | # if mc.get("timed_delete") is None: |
---|
1451 | # print("OK") |
---|
1452 | # else: |
---|
1453 | # print("FAIL") |
---|
1454 | # failures += 1 |
---|
1455 | |
---|
1456 | print("Testing get(unknown value) ...", end=" ") |
---|
1457 | print(to_s(mc.get("unknown_value"))) |
---|
1458 | |
---|
1459 | f = FooStruct() |
---|
1460 | test_setget("foostruct", f) |
---|
1461 | |
---|
1462 | print("Testing incr ...", end=" ") |
---|
1463 | x = mc.incr("an_integer", 1) |
---|
1464 | if x == 43: |
---|
1465 | print("OK") |
---|
1466 | else: |
---|
1467 | print("FAIL") |
---|
1468 | failures += 1 |
---|
1469 | |
---|
1470 | print("Testing decr ...", end=" ") |
---|
1471 | x = mc.decr("an_integer", 1) |
---|
1472 | if x == 42: |
---|
1473 | print("OK") |
---|
1474 | else: |
---|
1475 | print("FAIL") |
---|
1476 | failures += 1 |
---|
1477 | sys.stdout.flush() |
---|
1478 | |
---|
1479 | # sanity tests |
---|
1480 | print("Testing sending spaces...", end=" ") |
---|
1481 | sys.stdout.flush() |
---|
1482 | try: |
---|
1483 | x = mc.set("this has spaces", 1) |
---|
1484 | except Client.MemcachedKeyCharacterError as msg: |
---|
1485 | print("OK") |
---|
1486 | else: |
---|
1487 | print("FAIL") |
---|
1488 | failures += 1 |
---|
1489 | |
---|
1490 | print("Testing sending control characters...", end=" ") |
---|
1491 | try: |
---|
1492 | x = mc.set("this\x10has\x11control characters\x02", 1) |
---|
1493 | except Client.MemcachedKeyCharacterError as msg: |
---|
1494 | print("OK") |
---|
1495 | else: |
---|
1496 | print("FAIL") |
---|
1497 | failures += 1 |
---|
1498 | |
---|
1499 | print("Testing using insanely long key...", end=" ") |
---|
1500 | try: |
---|
1501 | x = mc.set('a'*SERVER_MAX_KEY_LENGTH, 1) |
---|
1502 | except Client.MemcachedKeyLengthError as msg: |
---|
1503 | print("FAIL") |
---|
1504 | failures += 1 |
---|
1505 | else: |
---|
1506 | print("OK") |
---|
1507 | try: |
---|
1508 | x = mc.set('a'*SERVER_MAX_KEY_LENGTH + 'a', 1) |
---|
1509 | except Client.MemcachedKeyLengthError as msg: |
---|
1510 | print("OK") |
---|
1511 | else: |
---|
1512 | print("FAIL") |
---|
1513 | failures += 1 |
---|
1514 | |
---|
1515 | print("Testing sending a unicode-string key...", end=" ") |
---|
1516 | try: |
---|
1517 | x = mc.set(unicode('keyhere'), 1) |
---|
1518 | except Client.MemcachedStringEncodingError as msg: |
---|
1519 | print("OK", end=" ") |
---|
1520 | else: |
---|
1521 | print("FAIL", end=" ") |
---|
1522 | failures += 1 |
---|
1523 | try: |
---|
1524 | x = mc.set((unicode('a')*SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) |
---|
1525 | except Client.MemcachedKeyError: |
---|
1526 | print("FAIL", end=" ") |
---|
1527 | failures += 1 |
---|
1528 | else: |
---|
1529 | print("OK", end=" ") |
---|
1530 | s = pickle.loads('V\\u4f1a\np0\n.') |
---|
1531 | try: |
---|
1532 | x = mc.set((s * SERVER_MAX_KEY_LENGTH).encode('utf-8'), 1) |
---|
1533 | except Client.MemcachedKeyLengthError: |
---|
1534 | print("OK") |
---|
1535 | else: |
---|
1536 | print("FAIL") |
---|
1537 | failures += 1 |
---|
1538 | |
---|
1539 | print("Testing using a value larger than the memcached value limit...") |
---|
1540 | print('NOTE: "MemCached: while expecting[...]" is normal...') |
---|
1541 | x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH) |
---|
1542 | if mc.get('keyhere') is None: |
---|
1543 | print("OK", end=" ") |
---|
1544 | else: |
---|
1545 | print("FAIL", end=" ") |
---|
1546 | failures += 1 |
---|
1547 | x = mc.set('keyhere', 'a'*SERVER_MAX_VALUE_LENGTH + 'aaa') |
---|
1548 | if mc.get('keyhere') is None: |
---|
1549 | print("OK") |
---|
1550 | else: |
---|
1551 | print("FAIL") |
---|
1552 | failures += 1 |
---|
1553 | |
---|
1554 | print("Testing set_multi() with no memcacheds running", end=" ") |
---|
1555 | mc.disconnect_all() |
---|
1556 | errors = mc.set_multi({'keyhere': 'a', 'keythere': 'b'}) |
---|
1557 | if errors != []: |
---|
1558 | print("FAIL") |
---|
1559 | failures += 1 |
---|
1560 | else: |
---|
1561 | print("OK") |
---|
1562 | |
---|
1563 | print("Testing delete_multi() with no memcacheds running", end=" ") |
---|
1564 | mc.disconnect_all() |
---|
1565 | ret = mc.delete_multi({'keyhere': 'a', 'keythere': 'b'}) |
---|
1566 | if ret != 1: |
---|
1567 | print("FAIL") |
---|
1568 | failures += 1 |
---|
1569 | else: |
---|
1570 | print("OK") |
---|
1571 | |
---|
1572 | if failures > 0: |
---|
1573 | print('*** THERE WERE FAILED TESTS') |
---|
1574 | sys.exit(1) |
---|
1575 | sys.exit(0) |
---|
1576 | |
---|
1577 | |
---|
1578 | # vim: ts=4 sw=4 et : |
---|