source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/pydal/dialects/mongo.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 21.6 KB
Line 
1import re
2from .._compat import PY2, basestring
3from ..adapters.mongo import Mongo
4from ..exceptions import NotOnNOSQLError
5from ..objects import Field
6from .base import NoSQLDialect
7from . import dialects
8
9_aggregate_map = {
10    "SUM": "$sum",
11    "MAX": "$max",
12    "MIN": "$min",
13    "AVG": "$avg",
14}
15
16_extract_map = {
17    "dayofyear": "$dayOfYear",
18    "day": "$dayOfMonth",
19    "dayofweek": "$dayOfWeek",
20    "year": "$year",
21    "month": "$month",
22    "week": "$week",
23    "hour": "$hour",
24    "minute": "$minute",
25    "second": "$second",
26    "millisecond": "$millisecond",
27    "string": "$dateToString",
28}
29
30
31def needs_aggregation_pipeline(f):
32    def wrap(self, first, *args, **kwargs):
33        self.adapter._parse_data(first, "pipeline", True)
34        if len(args) > 0:
35            self.adapter._parse_data(args[0], "pipeline", True)
36        return f(self, first, *args, **kwargs)
37
38    return wrap
39
40
41def validate_second(f):
42    def wrap(*args, **kwargs):
43        if len(args) < 3 or args[2] is None:
44            raise RuntimeError("Cannot compare %s with None" % args[1])
45        return f(*args, **kwargs)
46
47    return wrap
48
49
50def check_fields_for_cmp(f):
51    def wrap(self, first, second=None, *args, **kwargs):
52        if self.adapter._parse_data((first, second), "pipeline"):
53            pipeline = True
54        elif not isinstance(first, Field) or self._has_field(second):
55            pipeline = True
56            self.adapter._parse_data((first, second), "pipeline", True)
57        else:
58            pipeline = False
59        return f(self, first, second, *args, pipeline=pipeline, **kwargs)
60
61    return wrap
62
63
64@dialects.register_for(Mongo)
65class MongoDialect(NoSQLDialect):
66    GROUP_MARK = "__#GROUP#__"
67    AS_MARK = "__#AS#__"
68    REGEXP_MARK1 = "__#REGEXP_1#__"
69    REGEXP_MARK2 = "__#REGEXP_2#__"
70    REGEX_SELECT_AS_PARSER = r"'%s': '(\S+)'" % AS_MARK
71
72    @staticmethod
73    def _has_field(expression):
74        try:
75            return expression.has_field
76        except AttributeError:
77            return False
78
79    def invert(self, first, query_env={}):
80        return "-%s" % self.expand(first, query_env=query_env)
81
82    def _not(self, val, query_env={}):
83        op = self.expand(val, query_env=query_env)
84        op_k = list(op)[0]
85        op_body = op[op_k]
86        rv = None
87        if type(op_body) is list:
88            # apply De Morgan law for and/or
89            # not(A and B) -> not(A) or not(B)
90            # not(A or B)  -> not(A) and not(B)
91            not_op = "$and" if op_k == "$or" else "$or"
92            rv = {
93                not_op: [
94                    self._not(val.first, query_env),
95                    self._not(val.second, query_env),
96                ]
97            }
98        else:
99            try:
100                sub_ops = list(op_body.keys())
101                if len(sub_ops) == 1 and sub_ops[0] == "$ne":
102                    rv = {op_k: op_body["$ne"]}
103            except AttributeError:
104                rv = {op_k: {"$ne": op_body}}
105            if rv is None:
106                rv = {op_k: {"$not": op_body}}
107        return rv
108
109    def _and(self, first, second, query_env={}):
110        # pymongo expects: .find({'$and': [{'x':'1'}, {'y':'2'}]})
111        if isinstance(second, bool):
112            if second:
113                return self.expand(first, query_env=query_env)
114            return self.ne(first, first)
115        return {
116            "$and": [
117                self.expand(first, query_env=query_env),
118                self.expand(second, query_env=query_env),
119            ]
120        }
121
122    def _or(self, first, second, query_env={}):
123        # pymongo expects: .find({'$or': [{'name':'1'}, {'name':'2'}]})
124        if isinstance(second, bool):
125            if not second:
126                return self.expand(first, query_env=query_env)
127            return True
128        return {
129            "$or": [
130                self.expand(first, query_env=query_env),
131                self.expand(second, query_env=query_env),
132            ]
133        }
134
135    def belongs(self, first, second, query_env={}):
136        if isinstance(second, str):
137            # this is broken, the only way second is a string is if it has
138            # been converted to SQL. This no worky. This might be made to
139            # work if adapter._select did not return SQL.
140            raise RuntimeError("nested queries not supported")
141        items = [self.expand(item, first.type, query_env=query_env) for item in second]
142        return {self.expand(first, query_env=query_env): {"$in": items}}
143
144    def _cmp_ops_aggregation_pipeline(self, op, first, second, query_env={}):
145        try:
146            type = first.type
147        except:
148            type = None
149        return {
150            op: [
151                self.expand(first, query_env=query_env),
152                self.expand(second, type, query_env=query_env),
153            ]
154        }
155
156    @check_fields_for_cmp
157    def eq(self, first, second=None, pipeline=False, query_env={}):
158        if pipeline:
159            return self._cmp_ops_aggregation_pipeline("$eq", first, second, query_env)
160        return {
161            self.expand(first, query_env=query_env): self.expand(
162                second, first.type, query_env=query_env
163            )
164        }
165
166    @check_fields_for_cmp
167    def ne(self, first, second=None, pipeline=False, query_env={}):
168        if pipeline:
169            return self._cmp_ops_aggregation_pipeline("$ne", first, second, query_env)
170        return {
171            self.expand(first, query_env=query_env): {
172                "$ne": self.expand(second, first.type, query_env=query_env)
173            }
174        }
175
176    @validate_second
177    @check_fields_for_cmp
178    def lt(self, first, second=None, pipeline=False, query_env={}):
179        if pipeline:
180            return self._cmp_ops_aggregation_pipeline("$lt", first, second, query_env)
181        return {
182            self.expand(first, query_env=query_env): {
183                "$lt": self.expand(second, first.type, query_env=query_env)
184            }
185        }
186
187    @validate_second
188    @check_fields_for_cmp
189    def lte(self, first, second=None, pipeline=False, query_env={}):
190        if pipeline:
191            return self._cmp_ops_aggregation_pipeline("$lte", first, second, query_env)
192        return {
193            self.expand(first, query_env=query_env): {
194                "$lte": self.expand(second, first.type, query_env=query_env)
195            }
196        }
197
198    @validate_second
199    @check_fields_for_cmp
200    def gt(self, first, second=None, pipeline=False, query_env={}):
201        if pipeline:
202            return self._cmp_ops_aggregation_pipeline("$gt", first, second, query_env)
203        return {
204            self.expand(first, query_env=query_env): {
205                "$gt": self.expand(second, first.type, query_env=query_env)
206            }
207        }
208
209    @validate_second
210    @check_fields_for_cmp
211    def gte(self, first, second=None, pipeline=False, query_env={}):
212        if pipeline:
213            return self._cmp_ops_aggregation_pipeline("$gte", first, second, query_env)
214        return {
215            self.expand(first, query_env=query_env): {
216                "$gte": self.expand(second, first.type, query_env=query_env)
217            }
218        }
219
220    @needs_aggregation_pipeline
221    def add(self, first, second, query_env={}):
222        op_code = "$add"
223        for field in [first, second]:
224            try:
225                if field.type in ["string", "text", "password"]:
226                    op_code = "$concat"
227                    break
228            except:
229                pass
230        return {
231            op_code: [
232                self.expand(first, query_env=query_env),
233                self.expand(second, first.type, query_env=query_env),
234            ]
235        }
236
237    @needs_aggregation_pipeline
238    def sub(self, first, second, query_env={}):
239        return {
240            "$subtract": [
241                self.expand(first, query_env=query_env),
242                self.expand(second, first.type, query_env=query_env),
243            ]
244        }
245
246    @needs_aggregation_pipeline
247    def mul(self, first, second, query_env={}):
248        return {
249            "$multiply": [
250                self.expand(first, query_env=query_env),
251                self.expand(second, first.type, query_env=query_env),
252            ]
253        }
254
255    @needs_aggregation_pipeline
256    def div(self, first, second, query_env={}):
257        return {
258            "$divide": [
259                self.expand(first, query_env=query_env),
260                self.expand(second, first.type, query_env=query_env),
261            ]
262        }
263
264    @needs_aggregation_pipeline
265    def mod(self, first, second, query_env={}):
266        return {
267            "$mod": [
268                self.expand(first, query_env=query_env),
269                self.expand(second, first.type, query_env=query_env),
270            ]
271        }
272
273    @needs_aggregation_pipeline
274    def aggregate(self, first, what, query_env={}):
275        if what == "ABS":
276            return {
277                "$cond": [
278                    {"$lt": [self.expand(first, query_env=query_env), 0]},
279                    {"$subtract": [0, self.expand(first, query_env=query_env)]},
280                    self.expand(first, query_env=query_env),
281                ]
282            }
283        try:
284            expanded = {_aggregate_map[what]: self.expand(first, query_env=query_env)}
285        except KeyError:
286            raise NotImplementedError("'%s' not implemented" % what)
287
288        self.adapter._parse_data(first, "need_group", True)
289        return {self.GROUP_MARK: expanded}
290
291    @needs_aggregation_pipeline
292    def count(self, first, distinct=None, query_env={}):
293        self.adapter._parse_data(first, "need_group", True)
294        if distinct:
295            ret = {
296                self.GROUP_MARK: {"$addToSet": self.expand(first, query_env=query_env)}
297            }
298            if self.adapter.server_version_major >= 2.6:
299                # '$size' not present in server versions < 2.6
300                ret = {"$size": ret}
301            return ret
302        return {self.GROUP_MARK: {"$sum": 1}}
303
304    @needs_aggregation_pipeline
305    def extract(self, first, what, query_env={}):
306        try:
307            return {_extract_map[what]: self.expand(first, query_env=query_env)}
308        except KeyError:
309            raise NotImplementedError("EXTRACT(%s) not implemented" % what)
310
311    @needs_aggregation_pipeline
312    def epoch(self, first, query_env={}):
313        return {
314            "$divide": [
315                {
316                    "$subtract": [
317                        self.expand(first, query_env=query_env),
318                        self.adapter.epoch,
319                    ]
320                },
321                1000,
322            ]
323        }
324
325    @needs_aggregation_pipeline
326    def case(self, query, true_false, query_env={}):
327        return {
328            "$cond": [
329                self.expand(query, query_env=query_env),
330                self.expand(true_false[0], query_env=query_env),
331                self.expand(true_false[1], query_env=query_env),
332            ]
333        }
334
335    @needs_aggregation_pipeline
336    def _as(self, first, second, query_env={}):
337        # put the AS_MARK into the structure.  The 'AS' name will be parsed
338        # later from the string of the field name.
339        if isinstance(first, Field):
340            return [{self.AS_MARK: second}, self.expand(first, query_env=query_env)]
341        else:
342            result = self.expand(first, query_env=query_env)
343            result[self.AS_MARK] = second
344        return result
345
346    # We could implement an option that simulates a full featured SQL
347    # database. But I think the option should be set explicit or
348    # implemented as another library.
349    def on(self, first, second, query_env={}):
350        raise NotOnNOSQLError()
351
352    def comma(self, first, second, query_env={}):
353        # returns field name lists, to be separated via split(',')
354        return "%s,%s" % (
355            self.expand(first, query_env=query_env),
356            self.expand(second, query_env=query_env),
357        )
358
359    # TODO verify full compatibilty with official SQL Like operator
360    def _build_like_regex(
361        self,
362        first,
363        second,
364        case_sensitive=True,
365        escape=None,
366        ends_with=False,
367        starts_with=False,
368        whole_string=True,
369        like_wildcards=False,
370        query_env={},
371    ):
372        base = self.expand(second, "string", query_env=query_env)
373        need_regex = (
374            whole_string
375            or not case_sensitive
376            or starts_with
377            or ends_with
378            or like_wildcards
379            and ("_" in base or "%" in base)
380        )
381        if not need_regex:
382            return base
383        expr = re.escape(base)
384        if like_wildcards:
385            if escape:
386                # protect % and _ which are escaped
387                expr = expr.replace(escape + "\\%", "%")
388                if PY2:
389                    expr = expr.replace(escape + "\\_", "_")
390                elif escape + "_" in expr:
391                    set_aside = str(self.adapter.object_id("<random>"))
392                    while set_aside in expr:
393                        set_aside = str(self.adapter.object_id("<random>"))
394                    expr = expr.replace(escape + "_", set_aside)
395                else:
396                    set_aside = None
397            expr = expr.replace("\\%", ".*")
398            if PY2:
399                expr = expr.replace("\\_", ".")
400            else:
401                expr = expr.replace("_", ".")
402            if escape:
403                # convert to protected % and _
404                expr = expr.replace("%", "\\%")
405                if PY2:
406                    expr = expr.replace("_", "\\_")
407                elif set_aside:
408                    expr = expr.replace(set_aside, "_")
409        if starts_with:
410            pattern = "^%s"
411        elif ends_with:
412            pattern = "%s$"
413        elif whole_string:
414            pattern = "^%s$"
415        else:
416            pattern = "%s"
417        return self.regexp(first, pattern % expr, case_sensitive, query_env)
418
419    def like(self, first, second, case_sensitive=True, escape=None, query_env={}):
420        return self._build_like_regex(
421            first,
422            second,
423            case_sensitive=case_sensitive,
424            escape=escape,
425            like_wildcards=True,
426            query_env=query_env,
427        )
428
429    def ilike(self, first, second, escape=None, query_env={}):
430        return self.like(
431            first, second, case_sensitive=False, escape=escape, query_env=query_env
432        )
433
434    def startswith(self, first, second, query_env={}):
435        return self._build_like_regex(
436            first, second, starts_with=True, query_env=query_env
437        )
438
439    def endswith(self, first, second, query_env={}):
440        return self._build_like_regex(
441            first, second, ends_with=True, query_env=query_env
442        )
443
444    # TODO verify full compatibilty with official oracle contains operator
445    def contains(self, first, second, case_sensitive=True, query_env={}):
446        if isinstance(second, self.adapter.ObjectId):
447            ret = {self.expand(first, query_env=query_env): second}
448        elif isinstance(second, Field):
449            if second.type in ["string", "text"]:
450                if isinstance(first, Field):
451                    if first.type in ["list:string", "string", "text"]:
452                        ret = {
453                            "$where": "this.%s.indexOf(this.%s) > -1"
454                            % (first.name, second.name)
455                        }
456                    else:
457                        raise NotImplementedError(
458                            "field.CONTAINS() not implemented for field "
459                            + "type of '%s'" % first.type
460                        )
461                else:
462                    raise NotImplementedError(
463                        "x.CONTAINS() not implemented for x type of '%s'" % type(first)
464                    )
465            elif second.type in ["integer", "bigint"]:
466                ret = {
467                    "$where": "this.%s.indexOf(this.%s + '') > -1"
468                    % (first.name, second.name)
469                }
470            else:
471                raise NotImplementedError(
472                    "CONTAINS(field) not implemented for field type '%s'" % second.type
473                )
474        elif isinstance(second, (basestring, int)):
475            whole_string = isinstance(first, Field) and first.type == "list:string"
476            ret = self._build_like_regex(
477                first,
478                second,
479                case_sensitive=case_sensitive,
480                whole_string=whole_string,
481                query_env=query_env,
482            )
483            # first.type in ('string', 'text', 'json', 'upload')
484            # or first.type.startswith('list:'):
485        else:
486            raise NotImplementedError(
487                "CONTAINS() not implemented for type '%s'" % type(second)
488            )
489        return ret
490
491    @needs_aggregation_pipeline
492    def substring(self, field, parameters, query_env={}):
493        def parse_parameters(pos0, length):
494            """
495            The expression object can return these as string based expressions.
496            We can't use that so we have to tease it apart.
497
498            These are the possibilities:
499
500              pos0 = '(%s - %d)' % (self.len(), abs(start) - 1)
501              pos0 = start + 1
502
503              length = self.len()
504              length = '(%s - %d - %s)' % (self.len(), abs(stop) - 1, pos0)
505              length = '(%s - %s)' % (stop + 1, pos0)
506
507            Two of these five require the length of the string which is not
508            supported by Mongo, so for now these cause an Exception and
509            won't reach here.
510
511            If this were to ever be supported it may require a change to
512            Expression.__getitem__ so that it either returned the base
513            expression to be expanded here, or converted length to a string
514            to be parsed back to a call to STRLEN()
515            """
516            if isinstance(length, basestring):
517                return (pos0 - 1, eval(length))
518            # take the rest of the string
519            return (pos0 - 1, -1)
520
521        parameters = parse_parameters(*parameters)
522        return {
523            "$substr": [
524                self.expand(field, query_env=query_env),
525                parameters[0],
526                parameters[1],
527            ]
528        }
529
530    @needs_aggregation_pipeline
531    def lower(self, first, query_env={}):
532        return {"$toLower": self.expand(first, query_env=query_env)}
533
534    @needs_aggregation_pipeline
535    def upper(self, first, query_env={}):
536        return {"$toUpper": self.expand(first, query_env=query_env)}
537
538    def regexp(self, first, second, case_sensitive=True, query_env={}):
539        """ MongoDB provides regular expression capabilities for pattern
540            matching strings in queries. MongoDB uses Perl compatible
541            regular expressions (i.e. 'PCRE') version 8.36 with UTF-8 support.
542        """
543        if isinstance(first, Field) and first.type in [
544            "integer",
545            "bigint",
546            "float",
547            "double",
548        ]:
549            return {
550                "$where": "RegExp('%s').test(this.%s + '')"
551                % (self.expand(second, "string", query_env=query_env), first.name)
552            }
553        expanded_first = self.expand(first, query_env=query_env)
554        regex_second = {"$regex": self.expand(second, "string", query_env=query_env)}
555        if not case_sensitive:
556            regex_second["$options"] = "i"
557        if self.adapter._parse_data((first, second), "pipeline"):
558            name = str(expanded_first)
559            return {
560                self.REGEXP_MARK1: {name: expanded_first},
561                self.REGEXP_MARK2: {name: regex_second},
562            }
563        try:
564            return {expanded_first: regex_second}
565        except TypeError:
566            # if first is not hashable, then will need the pipeline
567            self.adapter._parse_data((first, second), "pipeline", True)
568            return {}
569
570    def length(self, first, query_env={}):
571        """
572        Mongo has committed $strLenBytes, $strLenCP, and $substrCP to $project
573        aggregation stage in dev branch V3.3.4
574
575        https://jira.mongodb.org/browse/SERVER-14670
576        https://jira.mongodb.org/browse/SERVER-22580
577        db.coll.aggregate([{
578            $project: {
579                byteLength: {$strLenBytes: "$string"},
580                cpLength: {$strLenCP: "$string"}
581                byteSubstr: {$substrBytes: ["$string", 0, 4]},
582                cpSubstr: {$substrCP: ["$string", 0, 4]}
583            }
584        }])
585
586        https://jira.mongodb.org/browse/SERVER-5319
587        https://github.com/afchin/mongo/commit/f52105977e4d0ccb53bdddfb9c4528a3f3c40bdf
588        """
589        if self.adapter.server_version_major <= 3.2:
590            # $strLenBytes not supported by mongo before version 3.4
591            raise NotImplementedError()
592
593        # implement here  :-)
594        raise NotImplementedError()
595
596    @needs_aggregation_pipeline
597    def coalesce(self, first, second, query_env={}):
598        if len(second) > 1:
599            second = [self.coalesce(second[0], second[1:])]
600        return {
601            "$ifNull": [
602                self.expand(first, query_env=query_env),
603                self.expand(second[0], query_env=query_env),
604            ]
605        }
606
607    @property
608    def random(self):
609        """ ORDER BY RANDOM()
610
611        Mongo has released the '$sample' pipeline stage in V3.2
612        https://docs.mongodb.org/manual/reference/operator/aggregation/sample/
613
614        https://github.com/mongodb/cookbook/blob/master/content/patterns/random-attribute.txt
615        http://stackoverflow.com/questions/19412/how-to-request-a-random-row-in-sql
616        https://jira.mongodb.org/browse/SERVER-533
617        """
618
619        if self.adapter.server_version_major <= 3.0:
620            # '$sample' not present until server version 3.2
621            raise NotImplementedError()
622
623        # implement here  :-)
624        raise NotImplementedError()
Note: See TracBrowser for help on using the repository browser.