source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/pydal/adapters/sqlite.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 4.0 KB
Line 
1import locale
2import platform
3import re
4import sys
5from datetime import datetime
6from time import mktime
7from .._compat import PY2, pjoin
8from .base import SQLAdapter
9from . import adapters
10
11
12@adapters.register_for("sqlite", "sqlite:memory")
13class SQLite(SQLAdapter):
14    dbengine = "sqlite"
15    drivers = ("sqlite2", "sqlite3")
16
17    def _initialize_(self):
18        self.pool_size = 0
19        super(SQLite, self)._initialize_()
20        path_encoding = (
21            sys.getfilesystemencoding() or locale.getdefaultlocale()[1] or "utf8"
22        )
23        if ":memory" in self.uri.split("://", 1)[0]:
24            self.dbpath = ":memory:"
25        else:
26            self.dbpath = self.uri.split("://", 1)[1]
27            if self.dbpath[0] != "/":
28                if PY2:
29                    self.dbpath = pjoin(
30                        self.folder.decode(path_encoding).encode("utf8"), self.dbpath
31                    )
32                else:
33                    self.dbpath = pjoin(self.folder, self.dbpath)
34        if "check_same_thread" not in self.driver_args:
35            self.driver_args["check_same_thread"] = False
36        if "detect_types" not in self.driver_args:
37            self.driver_args["detect_types"] = self.driver.PARSE_DECLTYPES
38
39    def _driver_from_uri(self):
40        return None
41
42    def connector(self):
43        return self.driver.Connection(self.dbpath, **self.driver_args)
44
45    @staticmethod
46    def web2py_extract(lookup, s):
47        table = {
48            "year": (0, 4),
49            "month": (5, 7),
50            "day": (8, 10),
51            "hour": (11, 13),
52            "minute": (14, 16),
53            "second": (17, 19),
54        }
55        try:
56            if lookup != "epoch":
57                (i, j) = table[lookup]
58                return int(s[i:j])
59            else:
60                return mktime(datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
61        except:
62            return None
63
64    @staticmethod
65    def web2py_regexp(expression, item):
66        if item is None:
67            return False
68        return re.compile(expression).search(item) is not None
69
70    def _register_extract(self):
71        self.connection.create_function("web2py_extract", 2, self.web2py_extract)
72
73    def _register_regexp(self):
74        self.connection.create_function("REGEXP", 2, self.web2py_regexp)
75
76    def after_connection(self):
77        self._register_extract()
78        self._register_regexp()
79        if self.adapter_args.get("foreign_keys", True):
80            self.execute("PRAGMA foreign_keys=ON;")
81
82    def select(self, query, fields, attributes):
83        if attributes.get("for_update", False) and "cache" not in attributes:
84            self.execute("BEGIN IMMEDIATE TRANSACTION;")
85        return super(SQLite, self).select(query, fields, attributes)
86
87    def delete(self, table, query):
88        db = self.db
89        deleted = [x[table._id.name] for x in db(query).select(table._id)]
90        counter = super(SQLite, self).delete(table, query)
91        if counter:
92            for field in table._referenced_by:
93                if (
94                    field.type == "reference " + table._dalname
95                    and field.ondelete == "CASCADE"
96                ):
97                    db(field.belongs(deleted)).delete()
98        return counter
99
100
101@adapters.register_for("spatialite", "spatialite:memory")
102class Spatialite(SQLite):
103    dbengine = "spatialite"
104
105    SPATIALLIBS = {
106        "Windows": "mod_spatialite.dll",
107        "Linux": "libspatialite.so",
108        "Darwin": "libspatialite.dylib",
109    }
110
111    def after_connection(self):
112        self.connection.enable_load_extension(True)
113        libspatialite = self.SPATIALLIBS[platform.system()]
114        self.execute(r'SELECT load_extension("%s");' % libspatialite)
115        super(Spatialite, self).after_connection()
116
117
118@adapters.register_for("jdbc:sqlite", "jdbc:sqlite:memory")
119class JDBCSQLite(SQLite):
120    drivers = ("zxJDBC_sqlite",)
121
122    def connector(self):
123        return self.driver.connect(
124            self.driver.getConnection("jdbc:sqlite:" + self.dbpath), **self.driver_args
125        )
126
127    def after_connection(self):
128        self._register_extract()
Note: See TracBrowser for help on using the repository browser.