1 | # -*- coding: utf-8 -*- |
---|
2 | # pylint: disable=no-member |
---|
3 | |
---|
4 | import os |
---|
5 | import threading |
---|
6 | from ._compat import itervalues |
---|
7 | from ._globals import GLOBAL_LOCKER, THREAD_LOCAL |
---|
8 | from ._load import OrderedDict |
---|
9 | |
---|
10 | |
---|
11 | class ConnectionPool(object): |
---|
12 | POOLS = {} |
---|
13 | check_active_connection = True |
---|
14 | |
---|
15 | def __init__(self): |
---|
16 | self._first_connection = False |
---|
17 | |
---|
18 | @property |
---|
19 | def _connection_uname_(self): |
---|
20 | return "_pydal_connection_%s_%s" % (id(self), os.getpid()) |
---|
21 | |
---|
22 | @property |
---|
23 | def _cursors_uname_(self): |
---|
24 | return "_pydal_cursor_%s_%s" % (id(self), os.getpid()) |
---|
25 | |
---|
26 | @staticmethod |
---|
27 | def set_folder(folder): |
---|
28 | THREAD_LOCAL._pydal_folder_ = folder |
---|
29 | |
---|
30 | @property |
---|
31 | def connection(self): |
---|
32 | return self.get_connection() |
---|
33 | |
---|
34 | def get_connection(self, use_pool=True): |
---|
35 | """ |
---|
36 | if `self.pool_size>0` it will try pull the connection from the pool |
---|
37 | if the connection is not active (closed by db server) it will loop |
---|
38 | if not `self.pool_size` or no active connections in pool makes a new one |
---|
39 | """ |
---|
40 | # check we we have a connection for this process/thread/object id |
---|
41 | connection = getattr(THREAD_LOCAL, self._connection_uname_, None) |
---|
42 | # if so, return it |
---|
43 | if connection is not None: |
---|
44 | return connection |
---|
45 | |
---|
46 | # if not and we have a pool |
---|
47 | if use_pool and self.pool_size: |
---|
48 | try: |
---|
49 | GLOBAL_LOCKER.acquire() |
---|
50 | pool = ConnectionPool.POOLS.get(self.uri, []) |
---|
51 | ConnectionPool.POOLS[self.uri] = pool |
---|
52 | # pop from the pool until we find a valid connection |
---|
53 | while connection is None and pool: |
---|
54 | connection = pool.pop() |
---|
55 | try: |
---|
56 | self.set_connection(connection, run_hooks=False) |
---|
57 | except: |
---|
58 | connection = None |
---|
59 | finally: |
---|
60 | GLOBAL_LOCKER.release() |
---|
61 | |
---|
62 | # if still no connection, make a new one and run the hooks |
---|
63 | # note we serialize actual connectons to protect hooks |
---|
64 | if connection is None: |
---|
65 | connection = self.connector() |
---|
66 | self.set_connection(connection, run_hooks=True) |
---|
67 | |
---|
68 | return connection |
---|
69 | |
---|
70 | def set_connection(self, connection, run_hooks=False): |
---|
71 | # store the connection in the thread local object and run hooks (optional) |
---|
72 | setattr(THREAD_LOCAL, self._connection_uname_, connection) |
---|
73 | if connection: |
---|
74 | setattr(THREAD_LOCAL, self._cursors_uname_, connection.cursor()) |
---|
75 | # run hooks |
---|
76 | if run_hooks: |
---|
77 | self.after_connection_hook() |
---|
78 | # some times we want to check the connection is still good |
---|
79 | if self.check_active_connection: |
---|
80 | self.test_connection() |
---|
81 | else: |
---|
82 | setattr(THREAD_LOCAL, self._cursors_uname_, None) |
---|
83 | |
---|
84 | def reset_cursor(self): |
---|
85 | """get a new cursor for the existing connection""" |
---|
86 | setattr(THREAD_LOCAL, self._cursors_uname_, self.connection.cursor()) |
---|
87 | |
---|
88 | @property |
---|
89 | def cursor(self): |
---|
90 | """retrieve the cursor of the connection""" |
---|
91 | return getattr(THREAD_LOCAL, self._cursors_uname_) |
---|
92 | |
---|
93 | def _clean_tlocals(self): |
---|
94 | """delete cusor and connection from the thead local""" |
---|
95 | delattr(THREAD_LOCAL, self._cursors_uname_) |
---|
96 | delattr(THREAD_LOCAL, self._connection_uname_) |
---|
97 | |
---|
98 | def close(self, action="commit", really=True): |
---|
99 | """if we have an action (commit, rollback), try to execute it""" |
---|
100 | # if the connection was never established, nothing to do |
---|
101 | if getattr(THREAD_LOCAL, self._connection_uname_, None) is None: |
---|
102 | return |
---|
103 | # try commit or rollback |
---|
104 | succeeded = True |
---|
105 | if action: |
---|
106 | try: |
---|
107 | if callable(action): |
---|
108 | action(self) |
---|
109 | else: |
---|
110 | getattr(self, action)() |
---|
111 | except: |
---|
112 | #: connection had some problems, we want to drop it |
---|
113 | succeeded = False |
---|
114 | # if we have pools, we should recycle the connection (but only when |
---|
115 | # we succeded in `action`, if any and `len(pool)` is good) |
---|
116 | if self.pool_size and succeeded: |
---|
117 | try: |
---|
118 | GLOBAL_LOCKER.acquire() |
---|
119 | pool = ConnectionPool.POOLS[self.uri] |
---|
120 | if len(pool) < self.pool_size: |
---|
121 | pool.append(self.connection) |
---|
122 | really = False |
---|
123 | finally: |
---|
124 | GLOBAL_LOCKER.release() |
---|
125 | #: closing the connection when we `really` want to, in particular: |
---|
126 | # - when we had an exception running `action` |
---|
127 | # - when we don't have pools |
---|
128 | # - when we have pools but they're full |
---|
129 | if really: |
---|
130 | try: |
---|
131 | self.close_connection() |
---|
132 | except: |
---|
133 | pass |
---|
134 | #: always unset `connection` attribute |
---|
135 | self.set_connection(None) |
---|
136 | |
---|
137 | @staticmethod |
---|
138 | def close_all_instances(action): |
---|
139 | """ to close cleanly databases in a multithreaded environment """ |
---|
140 | dbs = getattr(THREAD_LOCAL, "_pydal_db_instances_", {}).items() |
---|
141 | for db_uid, db_group in dbs: |
---|
142 | for db in db_group: |
---|
143 | if hasattr(db, "_adapter"): |
---|
144 | db._adapter.close(action) |
---|
145 | getattr(THREAD_LOCAL, "_pydal_db_instances_", {}).clear() |
---|
146 | getattr(THREAD_LOCAL, "_pydal_db_instances_zombie_", {}).clear() |
---|
147 | if callable(action): |
---|
148 | action(None) |
---|
149 | |
---|
150 | def _find_work_folder(self): |
---|
151 | self.folder = getattr(THREAD_LOCAL, "_pydal_folder_", "") |
---|
152 | |
---|
153 | def after_connection_hook(self): |
---|
154 | """Hook for the after_connection parameter""" |
---|
155 | # some work must be done on first connection only |
---|
156 | if not self._first_connection: |
---|
157 | self._after_first_connection() |
---|
158 | self._first_connection = True |
---|
159 | # handle user specified hooks if present |
---|
160 | if callable(self._after_connection): |
---|
161 | self._after_connection(self) |
---|
162 | # handle global adapter hooks |
---|
163 | self.after_connection() |
---|
164 | |
---|
165 | def after_connection(self): |
---|
166 | # this it is supposed to be overloaded by adapters |
---|
167 | pass |
---|
168 | |
---|
169 | def _after_first_connection(self): |
---|
170 | """called only after first connection""" |
---|
171 | pass |
---|
172 | |
---|
173 | def reconnect(self): |
---|
174 | """legacy method - no longer needed""" |
---|
175 | self.close() |
---|
176 | self.get_connection() |
---|