1 | import os |
---|
2 | import re |
---|
3 | from .._compat import pjoin |
---|
4 | from .._globals import THREAD_LOCAL |
---|
5 | from ..migrator import InDBMigrator |
---|
6 | from ..helpers.classes import FakeDriver, SQLCustomType, SQLALL, Reference |
---|
7 | from ..helpers.methods import use_common_filters, xorify |
---|
8 | from ..objects import Table, Field, Expression, Query |
---|
9 | from .base import NoSQLAdapter |
---|
10 | from .mysql import MySQL |
---|
11 | from .postgres import PostgrePsyco |
---|
12 | from . import adapters, with_connection_or_raise |
---|
13 | from .._gae import gae |
---|
14 | |
---|
15 | if gae: |
---|
16 | from .._gae import ndb, rdbms, namespace_manager, classobj, NDBPolyModel |
---|
17 | from ..helpers.gae import NDBDecimalProperty |
---|
18 | |
---|
19 | |
---|
20 | class GoogleMigratorMixin(object): |
---|
21 | migrator_cls = InDBMigrator |
---|
22 | |
---|
23 | |
---|
24 | @adapters.register_for("google:sql") |
---|
25 | class GoogleSQL(GoogleMigratorMixin, MySQL): |
---|
26 | uploads_in_blob = True |
---|
27 | REGEX_URI = "^(?P<instance>.*)/(?P<db>.+)$" |
---|
28 | |
---|
29 | def _find_work_folder(self): |
---|
30 | super(GoogleSQL, self)._find_work_folder() |
---|
31 | if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()): |
---|
32 | self.folder = os.path.relpath(self.folder, os.getcwd()) |
---|
33 | |
---|
34 | def _initialize_(self): |
---|
35 | super(GoogleSQL, self)._initialize_() |
---|
36 | self.folder = self.folder or pjoin( |
---|
37 | "$HOME", |
---|
38 | THREAD_LOCAL._pydal_folder_.split(os.sep + "applications" + os.sep, 1)[1], |
---|
39 | ) |
---|
40 | ruri = self.uri.split("://", 1)[1] |
---|
41 | m = re.match(self.REGEX_URI, ruri) |
---|
42 | if not m: |
---|
43 | raise SyntaxError("Invalid URI string in DAL") |
---|
44 | self.driver_args["instance"] = self.credential_decoder(m.group("instance")) |
---|
45 | self.dbstring = self.credential_decoder(m.group("db")) |
---|
46 | self.createdb = self.adapter_args.get("createdb", True) |
---|
47 | if not self.createdb: |
---|
48 | self.driver_args["database"] = self.dbstring |
---|
49 | |
---|
50 | def find_driver(self): |
---|
51 | self.driver = "google" |
---|
52 | |
---|
53 | def connector(self): |
---|
54 | return rdbms.connect(**self.driver_args) |
---|
55 | |
---|
56 | def after_connection(self): |
---|
57 | if self.createdb: |
---|
58 | self.execute("CREATE DATABASE IF NOT EXISTS %s" % self.dbstring) |
---|
59 | self.execute("USE %s" % self.dbstring) |
---|
60 | self.execute("SET FOREIGN_KEY_CHECKS=1;") |
---|
61 | self.execute("SET sql_mode='NO_BACKSLASH_ESCAPES';") |
---|
62 | |
---|
63 | @with_connection_or_raise |
---|
64 | def execute(self, *args, **kwargs): |
---|
65 | command = self.filter_sql_command(args[0]).decode("utf8") |
---|
66 | handlers = self._build_handlers_for_execution() |
---|
67 | for handler in handlers: |
---|
68 | handler.before_execute(command) |
---|
69 | rv = self.cursor.execute(command, *args[1:], **kwargs) |
---|
70 | for handler in handlers: |
---|
71 | handler.after_execute(command) |
---|
72 | return rv |
---|
73 | |
---|
74 | def clear_cache(self): |
---|
75 | ndb.get_context().clear_cache() |
---|
76 | |
---|
77 | def ignore_cache_for(self, entities=None): |
---|
78 | entities = entities or [] |
---|
79 | ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities) |
---|
80 | |
---|
81 | |
---|
82 | # based on this: https://cloud.google.com/appengine/docs/standard/python/cloud-sql/ |
---|
83 | @adapters.register_for("google:MySQLdb") |
---|
84 | class GoogleMySQL(GoogleMigratorMixin, MySQL): |
---|
85 | uploads_in_blob = True |
---|
86 | drivers = ("MySQLdb",) |
---|
87 | |
---|
88 | def _find_work_folder(self): |
---|
89 | super(GoogleMySQL, self)._find_work_folder() |
---|
90 | if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()): |
---|
91 | self.folder = os.path.relpath(self.folder, os.getcwd()) |
---|
92 | |
---|
93 | def clear_cache(self): |
---|
94 | ndb.get_context().clear_cache() |
---|
95 | |
---|
96 | def ignore_cache_for(self, entities=None): |
---|
97 | entities = entities or [] |
---|
98 | ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities) |
---|
99 | |
---|
100 | def after_connection(self): |
---|
101 | self.execute("SET FOREIGN_KEY_CHECKS=1;") |
---|
102 | self.execute("SET sql_mode='NO_BACKSLASH_ESCAPES,TRADITIONAL';") |
---|
103 | |
---|
104 | |
---|
105 | @adapters.register_for("google:psycopg2") |
---|
106 | class GooglePostgres(GoogleMigratorMixin, PostgrePsyco): |
---|
107 | uploads_in_blob = True |
---|
108 | drivers = ("psycopg2",) |
---|
109 | |
---|
110 | def _find_work_folder(self): |
---|
111 | super(GooglePostgres, self)._find_work_folder() |
---|
112 | if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()): |
---|
113 | self.folder = os.path.relpath(self.folder, os.getcwd()) |
---|
114 | |
---|
115 | def clear_cache(self): |
---|
116 | ndb.get_context().clear_cache() |
---|
117 | |
---|
118 | def ignore_cache_for(self, entities=None): |
---|
119 | entities = entities or [] |
---|
120 | ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities) |
---|
121 | |
---|
122 | |
---|
123 | @adapters.register_for("google:datastore", "google:datastore+ndb") |
---|
124 | class GoogleDatastore(NoSQLAdapter): |
---|
125 | dbengine = "google:datastore" |
---|
126 | |
---|
127 | REGEX_NAMESPACE = ".*://(?P<namespace>.+)" |
---|
128 | |
---|
129 | def _initialize_(self): |
---|
130 | super(GoogleDatastore, self)._initialize_() |
---|
131 | match = re.match(self.REGEX_NAMESPACE, self.uri) |
---|
132 | if match: |
---|
133 | namespace_manager.set_namespace(match.group("namespace")) |
---|
134 | self.ndb_settings = self.adapter_args.get("ndb_settings") |
---|
135 | |
---|
136 | def find_driver(self): |
---|
137 | pass |
---|
138 | |
---|
139 | def connector(self): |
---|
140 | return FakeDriver() |
---|
141 | |
---|
142 | def create_table(self, table, migrate=True, fake_migrate=False, polymodel=None): |
---|
143 | myfields = {} |
---|
144 | for field in table: |
---|
145 | if isinstance(polymodel, Table) and field.name in polymodel.fields(): |
---|
146 | continue |
---|
147 | attr = {} |
---|
148 | if isinstance(field.custom_qualifier, dict): |
---|
149 | # this is custom properties to add to the GAE field declartion |
---|
150 | attr = field.custom_qualifier |
---|
151 | field_type = field.type |
---|
152 | if isinstance(field_type, SQLCustomType): |
---|
153 | ftype = self.types[field_type.native or field_type.type](**attr) |
---|
154 | elif isinstance(field_type, ndb.Property): |
---|
155 | ftype = field_type |
---|
156 | elif field_type.startswith("id"): |
---|
157 | continue |
---|
158 | elif field_type.startswith("decimal"): |
---|
159 | precision, scale = field_type[7:].strip("()").split(",") |
---|
160 | precision = int(precision) |
---|
161 | scale = int(scale) |
---|
162 | dec_cls = NDBDecimalProperty |
---|
163 | ftype = dec_cls(precision, scale, **attr) |
---|
164 | elif field_type.startswith("reference"): |
---|
165 | if field.notnull: |
---|
166 | attr = dict(required=True) |
---|
167 | ftype = self.types[field_type[:9]](**attr) |
---|
168 | elif field_type.startswith("list:reference"): |
---|
169 | if field.notnull: |
---|
170 | attr["required"] = True |
---|
171 | ftype = self.types[field_type[:14]](**attr) |
---|
172 | elif field_type.startswith("list:"): |
---|
173 | ftype = self.types[field_type](**attr) |
---|
174 | elif field_type not in self.types or not self.types[field_type]: |
---|
175 | raise SyntaxError("Field: unknown field type: %s" % field_type) |
---|
176 | else: |
---|
177 | ftype = self.types[field_type](**attr) |
---|
178 | myfields[field.name] = ftype |
---|
179 | if not polymodel: |
---|
180 | model_cls = ndb.Model |
---|
181 | table._tableobj = classobj(table._tablename, (model_cls,), myfields) |
---|
182 | # Set NDB caching variables |
---|
183 | if self.ndb_settings and (table._tablename in self.ndb_settings): |
---|
184 | for k, v in self.ndb_settings.iteritems(): |
---|
185 | setattr(table._tableobj, k, v) |
---|
186 | elif polymodel == True: |
---|
187 | pm_cls = NDBPolyModel |
---|
188 | table._tableobj = classobj(table._tablename, (pm_cls,), myfields) |
---|
189 | elif isinstance(polymodel, Table): |
---|
190 | table._tableobj = classobj( |
---|
191 | table._tablename, (polymodel._tableobj,), myfields |
---|
192 | ) |
---|
193 | else: |
---|
194 | raise SyntaxError("polymodel must be None, True, a table or a tablename") |
---|
195 | return None |
---|
196 | |
---|
197 | def _expand(self, expression, field_type=None, query_env={}): |
---|
198 | if expression is None: |
---|
199 | return None |
---|
200 | elif isinstance(expression, Field): |
---|
201 | if expression.type in ("text", "blob", "json"): |
---|
202 | raise SyntaxError("AppEngine does not index by: %s" % expression.type) |
---|
203 | return expression.name |
---|
204 | elif isinstance(expression, (Expression, Query)): |
---|
205 | if expression.second is not None: |
---|
206 | return expression.op( |
---|
207 | expression.first, expression.second, query_env=query_env |
---|
208 | ) |
---|
209 | elif expression.first is not None: |
---|
210 | return expression.op(expression.first, query_env=query_env) |
---|
211 | else: |
---|
212 | return expression.op() |
---|
213 | elif field_type: |
---|
214 | return self.represent(expression, field_type) |
---|
215 | elif isinstance(expression, (list, tuple)): |
---|
216 | return ",".join([self.represent(item, field_type) for item in expression]) |
---|
217 | elif hasattr(expression, "_FilterNode__name"): |
---|
218 | # check for _FilterNode__name to avoid explicit |
---|
219 | # import of FilterNode |
---|
220 | return expression |
---|
221 | else: |
---|
222 | raise NotImplementedError |
---|
223 | |
---|
224 | def _add_operators_to_parsed_row(self, rid, table, row): |
---|
225 | row.gae_item = rid |
---|
226 | lid = rid.key.id() |
---|
227 | row.id = lid |
---|
228 | super(GoogleDatastore, self)._add_operators_to_parsed_row(lid, table, row) |
---|
229 | |
---|
230 | def represent(self, obj, field_type, tablename=None): |
---|
231 | if isinstance(obj, ndb.Key): |
---|
232 | return obj |
---|
233 | if field_type == "id" and tablename: |
---|
234 | if isinstance(obj, list): |
---|
235 | return [self.represent(item, field_type, tablename) for item in obj] |
---|
236 | elif obj is None: |
---|
237 | return None |
---|
238 | else: |
---|
239 | return ndb.Key(tablename, long(obj)) |
---|
240 | if isinstance(obj, (Expression, Field)): |
---|
241 | raise SyntaxError("not supported on GAE") |
---|
242 | if isinstance(field_type, gae.Property): |
---|
243 | return obj |
---|
244 | return super(GoogleDatastore, self).represent(obj, field_type) |
---|
245 | |
---|
246 | def truncate(self, table, mode=""): |
---|
247 | self.db(self.id_query(table)).delete() |
---|
248 | |
---|
249 | def select_raw(self, query, fields=None, attributes=None, count_only=False): |
---|
250 | db = self.db |
---|
251 | fields = fields or [] |
---|
252 | attributes = attributes or {} |
---|
253 | args_get = attributes.get |
---|
254 | new_fields = [] |
---|
255 | |
---|
256 | for item in fields: |
---|
257 | if isinstance(item, SQLALL): |
---|
258 | new_fields += item._table |
---|
259 | else: |
---|
260 | new_fields.append(item) |
---|
261 | |
---|
262 | fields = new_fields |
---|
263 | if query: |
---|
264 | table = self.get_table(query) |
---|
265 | elif fields: |
---|
266 | table = fields[0].table |
---|
267 | query = db._adapter.id_query(fields[0].table) |
---|
268 | else: |
---|
269 | raise SyntaxError("Unable to determine the table") |
---|
270 | |
---|
271 | if query: |
---|
272 | if use_common_filters(query): |
---|
273 | query = self.common_filter(query, [table]) |
---|
274 | |
---|
275 | # tableobj is a GAE/NDB Model class (or subclass) |
---|
276 | tableobj = table._tableobj |
---|
277 | filters = self.expand(query) |
---|
278 | |
---|
279 | ## DETERMINE PROJECTION |
---|
280 | projection = None |
---|
281 | if len(table.fields) == len(fields): |
---|
282 | # getting all fields, not a projection query |
---|
283 | projection = None |
---|
284 | elif args_get("projection") == True: |
---|
285 | projection = [] |
---|
286 | for f in fields: |
---|
287 | if f.type in ["text", "blob", "json"]: |
---|
288 | raise SyntaxError( |
---|
289 | "text and blob field types not allowed in " |
---|
290 | + "projection queries" |
---|
291 | ) |
---|
292 | else: |
---|
293 | projection.append(f) |
---|
294 | |
---|
295 | elif args_get("filterfields") is True: |
---|
296 | projection = [] |
---|
297 | for f in fields: |
---|
298 | projection.append(f) |
---|
299 | |
---|
300 | # real projection's can't include 'id'. |
---|
301 | # it will be added to the result later |
---|
302 | if projection and args_get("projection") == True: |
---|
303 | query_projection = [f.name for f in projection if f.name != table._id.name] |
---|
304 | else: |
---|
305 | query_projection = None |
---|
306 | ## DONE WITH PROJECTION |
---|
307 | |
---|
308 | cursor = args_get("reusecursor") |
---|
309 | cursor = cursor if isinstance(cursor, str) else None |
---|
310 | qo = ndb.QueryOptions(projection=query_projection, cursor=cursor) |
---|
311 | |
---|
312 | if filters == None: |
---|
313 | items = tableobj.query(default_options=qo) |
---|
314 | elif getattr(filters, "filter_all", None): |
---|
315 | items = [] |
---|
316 | elif ( |
---|
317 | getattr(filters, "_FilterNode__value", None) |
---|
318 | and getattr(filters, "_FilterNode__name", None) == "__key__" |
---|
319 | and getattr(filters, "_FilterNode__opsymbol", None) == "=" |
---|
320 | ): |
---|
321 | item = ndb.Key.from_old_key(getattr(filters, "_FilterNode__value")).get() |
---|
322 | items = [item] if item else [] |
---|
323 | else: |
---|
324 | items = tableobj.query(filters, default_options=qo) |
---|
325 | |
---|
326 | if count_only: |
---|
327 | items = [len(items) if isinstance(items, list) else items.count()] |
---|
328 | elif not isinstance(items, list): |
---|
329 | if args_get("left", None): |
---|
330 | raise SyntaxError("Set: no left join in appengine") |
---|
331 | if args_get("groupby", None): |
---|
332 | raise SyntaxError("Set: no groupby in appengine") |
---|
333 | orderby = args_get("orderby", False) |
---|
334 | if orderby: |
---|
335 | if isinstance(orderby, (list, tuple)): |
---|
336 | orderby = xorify(orderby) |
---|
337 | if isinstance(orderby, Expression): |
---|
338 | orderby = self.expand(orderby) |
---|
339 | orders = orderby.split(", ") |
---|
340 | tbl = tableobj |
---|
341 | for order in orders: |
---|
342 | order = str(order) |
---|
343 | desc = order.startswith("-") |
---|
344 | name = order[1 if desc else 0 :].split(".")[-1] |
---|
345 | if name == "id": |
---|
346 | o = -tbl._key if desc else tbl._key |
---|
347 | else: |
---|
348 | o = -getattr(tbl, name) if desc else getattr(tbl, name) |
---|
349 | items = items.order(o) |
---|
350 | |
---|
351 | if args_get("limitby", None): |
---|
352 | (lmin, lmax) = attributes["limitby"] |
---|
353 | limit = lmax - lmin |
---|
354 | fetch_args = {"offset": lmin, "keys_only": True} |
---|
355 | |
---|
356 | keys, cursor, more = items.fetch_page(limit, **fetch_args) |
---|
357 | items = ndb.get_multi(keys) |
---|
358 | # cursor is only useful if there was a limit and we |
---|
359 | # didn't return all results |
---|
360 | if args_get("reusecursor"): |
---|
361 | db["_lastcursor"] = cursor |
---|
362 | return (items, table, projection or [f for f in table]) |
---|
363 | |
---|
364 | def select(self, query, fields, attributes): |
---|
365 | """ |
---|
366 | This is the GAE version of select. Some notes to consider: |
---|
367 | - 'nativeRef' is a magical fieldname used for self references |
---|
368 | on GAE |
---|
369 | - optional attribute 'projection' when set to True will trigger |
---|
370 | use of the GAE projection queries. note that there are rules for |
---|
371 | what is accepted imposed by GAE: each field must be indexed, |
---|
372 | projection queries cannot contain blob or text fields, and you |
---|
373 | cannot use == and also select that same field. |
---|
374 | see https://developers.google.com/appengine/docs/python/datastore/queries#Query_Projection |
---|
375 | - optional attribute 'filterfields' when set to True web2py will |
---|
376 | only parse the explicitly listed fields into the Rows object, |
---|
377 | even though all fields are returned in the query. This can be |
---|
378 | used to reduce memory usage in cases where true projection |
---|
379 | queries are not usable. |
---|
380 | - optional attribute 'reusecursor' allows use of cursor with |
---|
381 | queries that have the limitby attribute. Set the attribute to |
---|
382 | True for the first query, set it to the value of |
---|
383 | db['_lastcursor'] to continue a previous query. The user must |
---|
384 | save the cursor value between requests, and the filters must be |
---|
385 | identical. It is up to the user to follow google's limitations: |
---|
386 | https://developers.google.com/appengine/docs/python/datastore/queries#Query_Cursors |
---|
387 | """ |
---|
388 | |
---|
389 | items, table, fields = self.select_raw(query, fields, attributes) |
---|
390 | rows = [ |
---|
391 | [ |
---|
392 | (t.name == table._id.name and item) |
---|
393 | or (t.name == "nativeRef" and item) |
---|
394 | or getattr(item, t.name) |
---|
395 | for t in fields |
---|
396 | ] |
---|
397 | for item in items |
---|
398 | ] |
---|
399 | colnames = [t.longname for t in fields] |
---|
400 | processor = attributes.get("processor", self.parse) |
---|
401 | return processor(rows, fields, colnames, False) |
---|
402 | |
---|
403 | def count(self, query, distinct=None, limit=None): |
---|
404 | if distinct: |
---|
405 | raise RuntimeError("COUNT DISTINCT not supported") |
---|
406 | items, table, fields = self.select_raw(query, count_only=True) |
---|
407 | return items[0] |
---|
408 | |
---|
409 | def delete(self, table, query): |
---|
410 | """ |
---|
411 | This function was changed on 2010-05-04 because according to |
---|
412 | http://code.google.com/p/googleappengine/issues/detail?id=3119 |
---|
413 | GAE no longer supports deleting more than 1000 records. |
---|
414 | """ |
---|
415 | items, table, fields = self.select_raw(query) |
---|
416 | # items can be one item or a query |
---|
417 | if not isinstance(items, list): |
---|
418 | # use a keys_only query to ensure that this runs as a datastore |
---|
419 | # small operations |
---|
420 | leftitems = items.fetch(1000, keys_only=True) |
---|
421 | counter = 0 |
---|
422 | while len(leftitems): |
---|
423 | counter += len(leftitems) |
---|
424 | ndb.delete_multi(leftitems) |
---|
425 | leftitems = items.fetch(1000, keys_only=True) |
---|
426 | else: |
---|
427 | counter = len(items) |
---|
428 | ndb.delete_multi([item.key for item in items]) |
---|
429 | return counter |
---|
430 | |
---|
431 | def update(self, table, query, update_fields): |
---|
432 | items, table, fields = self.select_raw(query) |
---|
433 | counter = 0 |
---|
434 | for item in items: |
---|
435 | for field, value in update_fields: |
---|
436 | setattr(item, field.name, self.represent(value, field.type)) |
---|
437 | item.put() |
---|
438 | counter += 1 |
---|
439 | self.db.logger.info(str(counter)) |
---|
440 | return counter |
---|
441 | |
---|
442 | def insert(self, table, fields): |
---|
443 | dfields = dict((f.name, self.represent(v, f.type)) for f, v in fields) |
---|
444 | tmp = table._tableobj(**dfields) |
---|
445 | tmp.put() |
---|
446 | key = tmp.key |
---|
447 | rid = Reference(key.id()) |
---|
448 | rid._table, rid._record, rid._gaekey = table, None, key |
---|
449 | return rid |
---|
450 | |
---|
451 | def bulk_insert(self, table, items): |
---|
452 | parsed_items = [] |
---|
453 | for item in items: |
---|
454 | dfields = dict((f.name, self.represent(v, f.type)) for f, v in item) |
---|
455 | parsed_items.append(table._tableobj(**dfields)) |
---|
456 | return ndb.put_multi(parsed_items) |
---|