source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/pydal/adapters/google.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 17.9 KB
Line 
1import os
2import re
3from .._compat import pjoin
4from .._globals import THREAD_LOCAL
5from ..migrator import InDBMigrator
6from ..helpers.classes import FakeDriver, SQLCustomType, SQLALL, Reference
7from ..helpers.methods import use_common_filters, xorify
8from ..objects import Table, Field, Expression, Query
9from .base import NoSQLAdapter
10from .mysql import MySQL
11from .postgres import PostgrePsyco
12from . import adapters, with_connection_or_raise
13from .._gae import gae
14
15if gae:
16    from .._gae import ndb, rdbms, namespace_manager, classobj, NDBPolyModel
17    from ..helpers.gae import NDBDecimalProperty
18
19
20class GoogleMigratorMixin(object):
21    migrator_cls = InDBMigrator
22
23
24@adapters.register_for("google:sql")
25class GoogleSQL(GoogleMigratorMixin, MySQL):
26    uploads_in_blob = True
27    REGEX_URI = "^(?P<instance>.*)/(?P<db>.+)$"
28
29    def _find_work_folder(self):
30        super(GoogleSQL, self)._find_work_folder()
31        if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()):
32            self.folder = os.path.relpath(self.folder, os.getcwd())
33
34    def _initialize_(self):
35        super(GoogleSQL, self)._initialize_()
36        self.folder = self.folder or pjoin(
37            "$HOME",
38            THREAD_LOCAL._pydal_folder_.split(os.sep + "applications" + os.sep, 1)[1],
39        )
40        ruri = self.uri.split("://", 1)[1]
41        m = re.match(self.REGEX_URI, ruri)
42        if not m:
43            raise SyntaxError("Invalid URI string in DAL")
44        self.driver_args["instance"] = self.credential_decoder(m.group("instance"))
45        self.dbstring = self.credential_decoder(m.group("db"))
46        self.createdb = self.adapter_args.get("createdb", True)
47        if not self.createdb:
48            self.driver_args["database"] = self.dbstring
49
50    def find_driver(self):
51        self.driver = "google"
52
53    def connector(self):
54        return rdbms.connect(**self.driver_args)
55
56    def after_connection(self):
57        if self.createdb:
58            self.execute("CREATE DATABASE IF NOT EXISTS %s" % self.dbstring)
59            self.execute("USE %s" % self.dbstring)
60        self.execute("SET FOREIGN_KEY_CHECKS=1;")
61        self.execute("SET sql_mode='NO_BACKSLASH_ESCAPES';")
62
63    @with_connection_or_raise
64    def execute(self, *args, **kwargs):
65        command = self.filter_sql_command(args[0]).decode("utf8")
66        handlers = self._build_handlers_for_execution()
67        for handler in handlers:
68            handler.before_execute(command)
69        rv = self.cursor.execute(command, *args[1:], **kwargs)
70        for handler in handlers:
71            handler.after_execute(command)
72        return rv
73
74    def clear_cache(self):
75        ndb.get_context().clear_cache()
76
77    def ignore_cache_for(self, entities=None):
78        entities = entities or []
79        ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities)
80
81
82# based on this: https://cloud.google.com/appengine/docs/standard/python/cloud-sql/
83@adapters.register_for("google:MySQLdb")
84class GoogleMySQL(GoogleMigratorMixin, MySQL):
85    uploads_in_blob = True
86    drivers = ("MySQLdb",)
87
88    def _find_work_folder(self):
89        super(GoogleMySQL, self)._find_work_folder()
90        if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()):
91            self.folder = os.path.relpath(self.folder, os.getcwd())
92
93    def clear_cache(self):
94        ndb.get_context().clear_cache()
95
96    def ignore_cache_for(self, entities=None):
97        entities = entities or []
98        ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities)
99
100    def after_connection(self):
101        self.execute("SET FOREIGN_KEY_CHECKS=1;")
102        self.execute("SET sql_mode='NO_BACKSLASH_ESCAPES,TRADITIONAL';")
103
104
105@adapters.register_for("google:psycopg2")
106class GooglePostgres(GoogleMigratorMixin, PostgrePsyco):
107    uploads_in_blob = True
108    drivers = ("psycopg2",)
109
110    def _find_work_folder(self):
111        super(GooglePostgres, self)._find_work_folder()
112        if os.path.isabs(self.folder) and self.folder.startswith(os.getcwd()):
113            self.folder = os.path.relpath(self.folder, os.getcwd())
114
115    def clear_cache(self):
116        ndb.get_context().clear_cache()
117
118    def ignore_cache_for(self, entities=None):
119        entities = entities or []
120        ndb.get_context().set_cache_policy(lambda key: key.kind() not in entities)
121
122
123@adapters.register_for("google:datastore", "google:datastore+ndb")
124class GoogleDatastore(NoSQLAdapter):
125    dbengine = "google:datastore"
126
127    REGEX_NAMESPACE = ".*://(?P<namespace>.+)"
128
129    def _initialize_(self):
130        super(GoogleDatastore, self)._initialize_()
131        match = re.match(self.REGEX_NAMESPACE, self.uri)
132        if match:
133            namespace_manager.set_namespace(match.group("namespace"))
134        self.ndb_settings = self.adapter_args.get("ndb_settings")
135
136    def find_driver(self):
137        pass
138
139    def connector(self):
140        return FakeDriver()
141
142    def create_table(self, table, migrate=True, fake_migrate=False, polymodel=None):
143        myfields = {}
144        for field in table:
145            if isinstance(polymodel, Table) and field.name in polymodel.fields():
146                continue
147            attr = {}
148            if isinstance(field.custom_qualifier, dict):
149                # this is custom properties to add to the GAE field declartion
150                attr = field.custom_qualifier
151            field_type = field.type
152            if isinstance(field_type, SQLCustomType):
153                ftype = self.types[field_type.native or field_type.type](**attr)
154            elif isinstance(field_type, ndb.Property):
155                ftype = field_type
156            elif field_type.startswith("id"):
157                continue
158            elif field_type.startswith("decimal"):
159                precision, scale = field_type[7:].strip("()").split(",")
160                precision = int(precision)
161                scale = int(scale)
162                dec_cls = NDBDecimalProperty
163                ftype = dec_cls(precision, scale, **attr)
164            elif field_type.startswith("reference"):
165                if field.notnull:
166                    attr = dict(required=True)
167                ftype = self.types[field_type[:9]](**attr)
168            elif field_type.startswith("list:reference"):
169                if field.notnull:
170                    attr["required"] = True
171                ftype = self.types[field_type[:14]](**attr)
172            elif field_type.startswith("list:"):
173                ftype = self.types[field_type](**attr)
174            elif field_type not in self.types or not self.types[field_type]:
175                raise SyntaxError("Field: unknown field type: %s" % field_type)
176            else:
177                ftype = self.types[field_type](**attr)
178            myfields[field.name] = ftype
179        if not polymodel:
180            model_cls = ndb.Model
181            table._tableobj = classobj(table._tablename, (model_cls,), myfields)
182            # Set NDB caching variables
183            if self.ndb_settings and (table._tablename in self.ndb_settings):
184                for k, v in self.ndb_settings.iteritems():
185                    setattr(table._tableobj, k, v)
186        elif polymodel == True:
187            pm_cls = NDBPolyModel
188            table._tableobj = classobj(table._tablename, (pm_cls,), myfields)
189        elif isinstance(polymodel, Table):
190            table._tableobj = classobj(
191                table._tablename, (polymodel._tableobj,), myfields
192            )
193        else:
194            raise SyntaxError("polymodel must be None, True, a table or a tablename")
195        return None
196
197    def _expand(self, expression, field_type=None, query_env={}):
198        if expression is None:
199            return None
200        elif isinstance(expression, Field):
201            if expression.type in ("text", "blob", "json"):
202                raise SyntaxError("AppEngine does not index by: %s" % expression.type)
203            return expression.name
204        elif isinstance(expression, (Expression, Query)):
205            if expression.second is not None:
206                return expression.op(
207                    expression.first, expression.second, query_env=query_env
208                )
209            elif expression.first is not None:
210                return expression.op(expression.first, query_env=query_env)
211            else:
212                return expression.op()
213        elif field_type:
214            return self.represent(expression, field_type)
215        elif isinstance(expression, (list, tuple)):
216            return ",".join([self.represent(item, field_type) for item in expression])
217        elif hasattr(expression, "_FilterNode__name"):
218            # check for _FilterNode__name to avoid explicit
219            # import of FilterNode
220            return expression
221        else:
222            raise NotImplementedError
223
224    def _add_operators_to_parsed_row(self, rid, table, row):
225        row.gae_item = rid
226        lid = rid.key.id()
227        row.id = lid
228        super(GoogleDatastore, self)._add_operators_to_parsed_row(lid, table, row)
229
230    def represent(self, obj, field_type, tablename=None):
231        if isinstance(obj, ndb.Key):
232            return obj
233        if field_type == "id" and tablename:
234            if isinstance(obj, list):
235                return [self.represent(item, field_type, tablename) for item in obj]
236            elif obj is None:
237                return None
238            else:
239                return ndb.Key(tablename, long(obj))
240        if isinstance(obj, (Expression, Field)):
241            raise SyntaxError("not supported on GAE")
242        if isinstance(field_type, gae.Property):
243            return obj
244        return super(GoogleDatastore, self).represent(obj, field_type)
245
246    def truncate(self, table, mode=""):
247        self.db(self.id_query(table)).delete()
248
249    def select_raw(self, query, fields=None, attributes=None, count_only=False):
250        db = self.db
251        fields = fields or []
252        attributes = attributes or {}
253        args_get = attributes.get
254        new_fields = []
255
256        for item in fields:
257            if isinstance(item, SQLALL):
258                new_fields += item._table
259            else:
260                new_fields.append(item)
261
262        fields = new_fields
263        if query:
264            table = self.get_table(query)
265        elif fields:
266            table = fields[0].table
267            query = db._adapter.id_query(fields[0].table)
268        else:
269            raise SyntaxError("Unable to determine the table")
270
271        if query:
272            if use_common_filters(query):
273                query = self.common_filter(query, [table])
274
275        # tableobj is a GAE/NDB Model class (or subclass)
276        tableobj = table._tableobj
277        filters = self.expand(query)
278
279        ## DETERMINE PROJECTION
280        projection = None
281        if len(table.fields) == len(fields):
282            # getting all fields, not a projection query
283            projection = None
284        elif args_get("projection") == True:
285            projection = []
286            for f in fields:
287                if f.type in ["text", "blob", "json"]:
288                    raise SyntaxError(
289                        "text and blob field types not allowed in "
290                        + "projection queries"
291                    )
292                else:
293                    projection.append(f)
294
295        elif args_get("filterfields") is True:
296            projection = []
297            for f in fields:
298                projection.append(f)
299
300        # real projection's can't include 'id'.
301        # it will be added to the result later
302        if projection and args_get("projection") == True:
303            query_projection = [f.name for f in projection if f.name != table._id.name]
304        else:
305            query_projection = None
306        ## DONE WITH PROJECTION
307
308        cursor = args_get("reusecursor")
309        cursor = cursor if isinstance(cursor, str) else None
310        qo = ndb.QueryOptions(projection=query_projection, cursor=cursor)
311
312        if filters == None:
313            items = tableobj.query(default_options=qo)
314        elif getattr(filters, "filter_all", None):
315            items = []
316        elif (
317            getattr(filters, "_FilterNode__value", None)
318            and getattr(filters, "_FilterNode__name", None) == "__key__"
319            and getattr(filters, "_FilterNode__opsymbol", None) == "="
320        ):
321            item = ndb.Key.from_old_key(getattr(filters, "_FilterNode__value")).get()
322            items = [item] if item else []
323        else:
324            items = tableobj.query(filters, default_options=qo)
325
326        if count_only:
327            items = [len(items) if isinstance(items, list) else items.count()]
328        elif not isinstance(items, list):
329            if args_get("left", None):
330                raise SyntaxError("Set: no left join in appengine")
331            if args_get("groupby", None):
332                raise SyntaxError("Set: no groupby in appengine")
333            orderby = args_get("orderby", False)
334            if orderby:
335                if isinstance(orderby, (list, tuple)):
336                    orderby = xorify(orderby)
337                if isinstance(orderby, Expression):
338                    orderby = self.expand(orderby)
339                orders = orderby.split(", ")
340                tbl = tableobj
341                for order in orders:
342                    order = str(order)
343                    desc = order.startswith("-")
344                    name = order[1 if desc else 0 :].split(".")[-1]
345                    if name == "id":
346                        o = -tbl._key if desc else tbl._key
347                    else:
348                        o = -getattr(tbl, name) if desc else getattr(tbl, name)
349                    items = items.order(o)
350
351            if args_get("limitby", None):
352                (lmin, lmax) = attributes["limitby"]
353                limit = lmax - lmin
354                fetch_args = {"offset": lmin, "keys_only": True}
355
356                keys, cursor, more = items.fetch_page(limit, **fetch_args)
357                items = ndb.get_multi(keys)
358                # cursor is only useful if there was a limit and we
359                # didn't return all results
360                if args_get("reusecursor"):
361                    db["_lastcursor"] = cursor
362        return (items, table, projection or [f for f in table])
363
364    def select(self, query, fields, attributes):
365        """
366        This is the GAE version of select. Some notes to consider:
367        - 'nativeRef' is a magical fieldname used for self references
368          on GAE
369        - optional attribute 'projection' when set to True will trigger
370          use of the GAE projection queries.  note that there are rules for
371          what is accepted imposed by GAE: each field must be indexed,
372          projection queries cannot contain blob or text fields, and you
373          cannot use == and also select that same field.
374          see https://developers.google.com/appengine/docs/python/datastore/queries#Query_Projection
375        - optional attribute 'filterfields' when set to True web2py will
376          only parse the explicitly listed fields into the Rows object,
377          even though all fields are returned in the query. This can be
378          used to reduce memory usage in cases where true projection
379          queries are not usable.
380        - optional attribute 'reusecursor' allows use of cursor with
381          queries that have the limitby attribute. Set the attribute to
382          True for the first query, set it to the value of
383          db['_lastcursor'] to continue a previous query. The user must
384          save the cursor value between requests, and the filters must be
385          identical. It is up to the user to follow google's limitations:
386          https://developers.google.com/appengine/docs/python/datastore/queries#Query_Cursors
387        """
388
389        items, table, fields = self.select_raw(query, fields, attributes)
390        rows = [
391            [
392                (t.name == table._id.name and item)
393                or (t.name == "nativeRef" and item)
394                or getattr(item, t.name)
395                for t in fields
396            ]
397            for item in items
398        ]
399        colnames = [t.longname for t in fields]
400        processor = attributes.get("processor", self.parse)
401        return processor(rows, fields, colnames, False)
402
403    def count(self, query, distinct=None, limit=None):
404        if distinct:
405            raise RuntimeError("COUNT DISTINCT not supported")
406        items, table, fields = self.select_raw(query, count_only=True)
407        return items[0]
408
409    def delete(self, table, query):
410        """
411        This function was changed on 2010-05-04 because according to
412        http://code.google.com/p/googleappengine/issues/detail?id=3119
413        GAE no longer supports deleting more than 1000 records.
414        """
415        items, table, fields = self.select_raw(query)
416        # items can be one item or a query
417        if not isinstance(items, list):
418            # use a keys_only query to ensure that this runs as a datastore
419            # small operations
420            leftitems = items.fetch(1000, keys_only=True)
421            counter = 0
422            while len(leftitems):
423                counter += len(leftitems)
424                ndb.delete_multi(leftitems)
425                leftitems = items.fetch(1000, keys_only=True)
426        else:
427            counter = len(items)
428            ndb.delete_multi([item.key for item in items])
429        return counter
430
431    def update(self, table, query, update_fields):
432        items, table, fields = self.select_raw(query)
433        counter = 0
434        for item in items:
435            for field, value in update_fields:
436                setattr(item, field.name, self.represent(value, field.type))
437            item.put()
438            counter += 1
439        self.db.logger.info(str(counter))
440        return counter
441
442    def insert(self, table, fields):
443        dfields = dict((f.name, self.represent(v, f.type)) for f, v in fields)
444        tmp = table._tableobj(**dfields)
445        tmp.put()
446        key = tmp.key
447        rid = Reference(key.id())
448        rid._table, rid._record, rid._gaekey = table, None, key
449        return rid
450
451    def bulk_insert(self, table, items):
452        parsed_items = []
453        for item in items:
454            dfields = dict((f.name, self.represent(v, f.type)) for f, v in item)
455            parsed_items.append(table._tableobj(**dfields))
456        return ndb.put_multi(parsed_items)
Note: See TracBrowser for help on using the repository browser.