source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/pydal/adapters/mssql.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 6.0 KB
Line 
1import re
2from .._compat import PY2, iteritems, integer_types, to_unicode, long
3from .._globals import IDENTITY
4from .base import SQLAdapter
5from ..utils import split_uri_args
6from . import adapters, with_connection_or_raise
7
8
9class Slicer(object):
10    def rowslice(self, rows, minimum=0, maximum=None):
11        if maximum is None:
12            return rows[minimum:]
13        return rows[minimum:maximum]
14
15
16class MSSQL(SQLAdapter):
17    dbengine = "mssql"
18    drivers = ("pyodbc", "pytds")
19
20    REGEX_DSN = "^.+$"
21    REGEX_URI = (
22        "^(?P<user>[^:@]+)(:(?P<password>[^@]*))?"
23        r"@(?P<host>[^:/]+|\[[^\]]+\])(:(?P<port>\d+))?"
24        "/(?P<db>[^?]+)"
25        r"(\?(?P<uriargs>.*))?$"
26    )
27
28    def __init__(
29        self,
30        db,
31        uri,
32        pool_size=0,
33        folder=None,
34        db_codec="UTF-8",
35        credential_decoder=IDENTITY,
36        driver_args={},
37        adapter_args={},
38        srid=4326,
39        after_connection=None,
40    ):
41        self.srid = srid
42        super(MSSQL, self).__init__(
43            db,
44            uri,
45            pool_size,
46            folder,
47            db_codec,
48            credential_decoder,
49            driver_args,
50            adapter_args,
51            after_connection,
52        )
53
54    def _initialize_(self):
55        super(MSSQL, self)._initialize_()
56        ruri = self.uri.split("://", 1)[1]
57        if "@" not in ruri:
58            m = re.match(self.REGEX_DSN, ruri)
59            if not m:
60                raise SyntaxError("Invalid URI string in DAL")
61            self.dsn = m.group()
62        else:
63            m = re.match(self.REGEX_URI, ruri)
64            if not m:
65                raise SyntaxError("Invalid URI string in DAL: %s" % self.uri)
66            user = self.credential_decoder(m.group("user"))
67            password = self.credential_decoder(m.group("password"))
68            if password is None:
69                password = ""
70            host = m.group("host")
71            db = m.group("db")
72            port = m.group("port") or "1433"
73            # Parse the optional uri name-value arg pairs after the '?'
74            # (in the form of arg1=value1&arg2=value2&...)
75            # (drivers like FreeTDS insist on uppercase parameter keys)
76            argsdict = {"DRIVER": "{SQL Server}"}
77            uriargs = m.group("uriargs")
78            if uriargs:
79                for argkey, argvalue in split_uri_args(
80                    uriargs, separators="&", need_equal=True
81                ).items():
82                    argsdict[argkey.upper()] = argvalue
83            uriargs = ";".join(["%s=%s" % (ak, av) for (ak, av) in iteritems(argsdict)])
84            self.dsn = "SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;%s" % (
85                host,
86                port,
87                db,
88                user,
89                password,
90                uriargs,
91            )
92
93    def connector(self):
94        return self.driver.connect(self.dsn, **self.driver_args)
95
96    def lastrowid(self, table):
97        self.execute("SELECT SCOPE_IDENTITY();")
98        return long(self.cursor.fetchone()[0])
99
100
101@adapters.register_for("mssql")
102class MSSQL1(MSSQL, Slicer):
103    pass
104
105
106@adapters.register_for("mssql3")
107class MSSQL3(MSSQL):
108    pass
109
110
111@adapters.register_for("mssql4")
112class MSSQL4(MSSQL):
113    pass
114
115
116class MSSQLN(MSSQL):
117    def represent(self, obj, field_type):
118        rv = super(MSSQLN, self).represent(obj, field_type)
119        if field_type in ("string", "text", "json") and rv.startswith("'"):
120            rv = "N" + rv
121        return rv
122
123    @with_connection_or_raise
124    def execute(self, *args, **kwargs):
125        if PY2:
126            args = list(args)
127            args[0] = to_unicode(args[0])
128        return super(MSSQLN, self).execute(*args, **kwargs)
129
130
131@adapters.register_for("mssqln", "mssql2")
132class MSSQL1N(MSSQLN, Slicer):
133    pass
134
135
136@adapters.register_for("mssql3n")
137class MSSQL3N(MSSQLN):
138    pass
139
140
141@adapters.register_for("mssql4n")
142class MSSQL4N(MSSQLN):
143    pass
144
145
146@adapters.register_for("pytds")
147class PyTDS(MSSQL):
148    def _initialize_(self):
149        super(MSSQL, self)._initialize_()
150        ruri = self.uri.split("://", 1)[1]
151        if "@" not in ruri:
152            m = re.match(self.REGEX_DSN, ruri)
153            if not m:
154                raise SyntaxError("Invalid URI string in DAL")
155            self.dsn = m.group()
156        else:
157            m = re.match(self.REGEX_URI, ruri)
158            if not m:
159                raise SyntaxError("Invalid URI string in DAL: %s" % self.uri)
160            self.dsn = m.group("host")
161            self.driver_args.update(
162                user=self.credential_decoder(m.group("user")),
163                password=self.credential_decoder(m.group("password")) or "",
164                database=m.group("db"),
165                port=m.group("port") or "1433",
166            )
167
168    def connector(self):
169        return self.driver.connect(self.dsn, **self.driver_args)
170
171
172@adapters.register_for("vertica")
173class Vertica(MSSQL1):
174    def lastrowid(self, table):
175        self.execute("SELECT SCOPE_IDENTITY();")
176        return long(self.cursor.fetchone()[0])
177
178
179@adapters.register_for("sybase")
180class Sybase(MSSQL1):
181    dbengine = "sybase"
182
183    def _initialize_(self):
184        super(MSSQL, self)._initialize_()
185        ruri = self.uri.split("://", 1)[1]
186        if "@" not in ruri:
187            m = re.match(self.REGEX_DSN, ruri)
188            if not m:
189                raise SyntaxError("Invalid URI string in DAL")
190            dsn = m.group()
191        else:
192            m = re.match(self.REGEX_URI, ruri)
193            if not m:
194                raise SyntaxError("Invalid URI string in DAL: %s" % self.uri)
195            user = self.credential_decoder(m.group("user"))
196            password = self.credential_decoder(m.group("password"))
197            if password is None:
198                password = ""
199            host = m.group("host")
200            db = m.group("db")
201            port = m.group("port") or "1433"
202            self.dsn = "sybase:host=%s:%s;dbname=%s" % (host, port, db)
203            self.driver_args.update(
204                user=self.credential_decoder(user),
205                passwd=self.credential_decoder(password),
206            )
Note: See TracBrowser for help on using the repository browser.