source: OpenRLabs-Git/deploy/rlabs-docker/web2py-rlabs/gluon/packages/dal/pydal/contrib/mockimaplib.py

main
Last change on this file was 42bd667, checked in by David Fuertes <dfuertes@…>, 4 years ago

Historial Limpio

  • Property mode set to 100755
File size: 10.4 KB
Line 
1# -*- encoding: utf-8 -*-
2
3from imaplib import ParseFlags
4
5# mockimaplib: A very simple mock server module for imap client APIs
6#    Copyright (C) 2014  Alan Etkin <spametki@gmail.com>
7#
8#    This program is free software: you can redistribute it and/or modify
9#    it under the terms of the GNU Lesser General Public License as
10#    published by the Free Software Foundation, either version 3 of the
11#    License, or(at your option) any later version.
12#
13#    This program is distributed in the hope that it will be useful,
14#    but WITHOUT ANY WARRANTY; without even the implied warranty of
15#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16#    GNU General Public License for more details.
17#
18#    You should have received a copy of the GNU Lesser General Public
19#    License along with this program. If not, see
20#    <http://www.gnu.org/licenses/lgpl.html>
21
22"""
23mockimaplib allows you to test applications connecting to a dummy imap
24service. For more details on the api subset implemented,
25refer to the imaplib docs.
26
27The client should configure a dictionary to map imap string queries to sets
28of entries stored in a message dummy storage dictionary. The module includes
29a small set of default message records (SPAM and MESSAGES), two mailboxes
30(Draft and INBOX) and a list of query/resultset entries (RESULTS).
31
32Usage:
33
34>>> import mockimaplib
35>>> connection = mockimaplib.IMAP4_SSL(<host>)
36>>> connection.login(<user>, <password>)
37None
38>>> connection.select("INBOX")
39("OK", ... <mailbox length>)
40# fetch commands specifying single uid or message id
41# will try to get messages recorded in SPAM
42>>> connection.uid(...)
43<search query or fetch result>
44# returns a string list of matching message ids
45>>> connection.search(<query>)
46("OK", ... "1 2 ... n")
47"""
48
49MESSAGES = (
50    "MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:52:30 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:52:30 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <10101010101010010000010101010001010101001010010000001@mail.example.com>\r\nSubject: spam1\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse!\r\n\r\n\r\n",
51    "MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:52:47 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:52:47 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <101010101010100100000101010100010101010010100100000010@mail.example.com>\r\nSubject: spam2\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse, nurse!",
52    "MIME-Version: 1.0\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:54:54 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:54:54 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <1010101010101001000001010101000101010100101001000000101@mail.example.com>\r\nSubject: spamalot1\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse!\r\n\r\n\r\n",
53    "MIME-Version: 1.0\r\n\r\nReceived: by 10.140.91.199 with HTTP; Mon, 27 Jan 2014 13:54:54 -0800 (PST)\r\nDate: Mon, 27 Jan 2014 19:54:54 -0200\r\nDelivered-To: nurse@example.com\r\nMessage-ID: <101010101010100100000101010100010101010010100100000010101@mail.example.com>\r\nSubject: spamalot2\r\nFrom: Mr. Gumby <gumby@example.com>\r\nTo: The nurse <nurse@example.com>\r\nContent-Type: text/plain; charset=ISO-8859-1\r\n\r\nNurse! ... Nurse! ... Nurse!\r\n\r\n\r\n",
54)
55
56SPAM = {
57    "INBOX": [
58        {"uid": "483209", "headers": MESSAGES[0], "complete": MESSAGES[0], "flags": ""},
59        {"uid": "483211", "headers": MESSAGES[1], "complete": MESSAGES[1], "flags": ""},
60        {"uid": "483225", "headers": MESSAGES[2], "complete": MESSAGES[2], "flags": ""},
61    ],
62    "Draft": [
63        {"uid": "483432", "headers": MESSAGES[3], "complete": MESSAGES[3], "flags": ""},
64    ],
65}
66
67RESULTS = {
68    # <query string>: [<str uid> | <long id>, ...]
69    "INBOX": {"(ALL)": (1, 2, 3), "(1:3)": (1, 2, 3)},
70    "Draft": {"(1:1)": (1,)},
71}
72
73
74class Connection(object):
75    """Dummy connection object for the imap client.
76    By default, uses the module SPAM and RESULT
77    sets (use Connection.setup for custom values)"""
78
79    def login(self, user, password):
80        pass
81
82    def __init__(self):
83        self._readonly = False
84        self._mailbox = None
85        self.setup()
86
87    def list(self):
88        return ("OK", ['(\\HasNoChildren) "/" "%s"' % key for key in self.spam])
89
90    def select(self, tablename, readonly=False):
91        self._readonly = readonly
92        """args: mailbox, boolean
93        result[1][0] -> int last message id / mailbox lenght
94        result[0] = 'OK'
95        """
96        self._mailbox = tablename
97        return ("OK", (len(SPAM[self._mailbox]), None))
98
99    def uid(self, command, uid, arg):
100        """ args:
101              command: "search" | "fetch"
102              uid: None | uid
103              parts: "(ALL)" | "(RFC822 FLAGS)" | "(RFC822.HEADER FLAGS)"
104
105        "search", None, "(ALL)" -> ("OK", ("uid_1 uid_2 ... uid_<mailbox length>", None))
106        "search", None, "<query>" -> ("OK", ("uid_1 uid_2 ... uid_n", None))
107        "fetch", uid, parts -> ("OK", (("<id> ...", "<raw message as specified in parts>"), "<flags>")
108                                [0]     [1][0][0]     [1][0][1]                              [1][1]
109        """
110        if command == "search":
111            return self._search(arg)
112        elif command == "fetch":
113            return self._fetch(uid, arg)
114
115    def _search(self, query):
116        return (
117            "OK",
118            (" ".join([str(item["uid"]) for item in self._get_messages(query)]), None),
119        )
120
121    def _fetch(self, value, arg):
122        try:
123            message = self.spam[self._mailbox][value - 1]
124            message_id = value
125        except TypeError:
126            for x, item in enumerate(self.spam[self._mailbox]):
127                if item["uid"] == value:
128                    message = item
129                    message_id = x + 1
130                    break
131
132        parts = "headers"
133        if arg in ("(ALL)", "(RFC822 FLAGS)"):
134            parts = "complete"
135
136        return ("OK", (("%s " % message_id, message[parts]), message["flags"]))
137
138    def _get_messages(self, query):
139        if query.strip().isdigit():
140            return [
141                self.spam[self._mailbox][int(query.strip()) - 1],
142            ]
143        elif query[1:-1].strip().isdigit():
144            return [
145                self.spam[self._mailbox][int(query[1:-1].strip()) - 1],
146            ]
147        elif query[1:-1].replace("UID", "").strip().isdigit():
148            for item in self.spam[self._mailbox]:
149                if item["uid"] == query[1:-1].replace("UID", "").strip():
150                    return [
151                        item,
152                    ]
153        messages = []
154        try:
155            for m in self.results[self._mailbox][query]:
156                try:
157                    self.spam[self._mailbox][m - 1]["id"] = m
158                    messages.append(self.spam[self._mailbox][m - 1])
159                except TypeError:
160                    for x, item in enumerate(self.spam[self._mailbox]):
161                        if item["uid"] == m:
162                            item["id"] = x + 1
163                            messages.append(item)
164                            break
165                except IndexError:
166                    # message removed
167                    pass
168            return messages
169        except KeyError:
170            raise ValueError("The client issued an unexpected query: %s" % query)
171
172    def setup(self, spam={}, results={}):
173        """adds custom message and query databases or sets
174        the values to the module defaults.
175        """
176
177        self.spam = spam
178        self.results = results
179        if not spam:
180            for key in SPAM:
181                self.spam[key] = []
182                for d in SPAM[key]:
183                    self.spam[key].append(d.copy())
184        if not results:
185            for key in RESULTS:
186                self.results[key] = RESULTS[key].copy()
187
188    def search(self, first, query):
189        """ args:
190             first: None
191             query: string with mailbox query (flags, date, uid, id, ...)
192                example: '2:15723 BEFORE 27-Jan-2014 FROM "gumby"'
193        result[1][0] -> "id_1 id_2 ... id_n"
194        """
195        messages = self._get_messages(query)
196        ids = " ".join([str(item["id"]) for item in messages])
197        return ("OK", (ids, None))
198
199    def append(self, mailbox, flags, struct_time, message):
200        """
201            result, data = self.connection.append(mailbox, flags, struct_time, message)
202            if result == "OK":
203                uid = int(re.findall("\d+", str(data))[-1])
204        """
205        last = self.spam[mailbox][-1]
206        try:
207            uid = int(last["uid"]) + 1
208        except ValueError:
209            alluids = []
210            for _mailbox in self.spam.keys():
211                for item in self.spam[_mailbox]:
212                    try:
213                        alluids.append(int(item["uid"]))
214                    except:
215                        pass
216            if len(alluids) > 0:
217                uid = max(alluids) + 1
218            else:
219                uid = 1
220        flags = "FLAGS " + flags
221        item = {
222            "uid": str(uid),
223            "headers": message,
224            "complete": message,
225            "flags": flags,
226        }
227        self.spam[mailbox].append(item)
228        return ("OK", "spam spam %s spam" % uid)
229
230    def store(self, *args):
231        """
232        implements some flag commands
233        args: ("<id>", "<+|->FLAGS", "(\\Flag1 \\Flag2 ... \\Flagn)")
234        """
235        message = self.spam[self._mailbox][int(args[0] - 1)]
236        old_flags = ParseFlags(message["flags"])
237        flags = ParseFlags("FLAGS" + args[2])
238        if args[1].strip().startswith("+"):
239            message["flags"] = "FLAGS (%s)" % " ".join(set(flags + old_flags))
240        elif args[1].strip().startswith("-"):
241            message["flags"] = "FLAGS (%s)" % " ".join(
242                [flag for flag in old_flags if not flag in flags]
243            )
244
245    def expunge(self):
246        """implements removal of deleted flag messages"""
247        for x, item in enumerate(self.spam[self._mailbox]):
248            if "\\Deleted" in item["flags"]:
249                self.spam[self._mailbox].pop(x)
250
251
252class IMAP4(object):
253    """>>> connection = IMAP4() # creates the dummy imap4 client object"""
254
255    def __new__(self, *args, **kwargs):
256        # args: (server, port)
257        return Connection()
258
259
260IMAP4_SSL = IMAP4
Note: See TracBrowser for help on using the repository browser.