summaryrefslogtreecommitdiff
path: root/game/python-extra/ws4py/manager.py
blob: 20c215f981372cb7d8378d2c3112b81d69acb5f5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# -*- coding: utf-8 -*-
__doc__ = """
The manager module provides a selected classes to
handle websocket's execution.

Initially the rationale was to:

- Externalize the way the CherryPy server had been setup
  as its websocket management was too tightly coupled with
  the plugin implementation.
- Offer a management that could be used by other
  server or client implementations.
- Move away from the threaded model to the event-based
  model by relying on `select` or `epoll` (when available).


A simple usage for handling websocket clients:

.. code-block:: python

    from ws4py.client import WebSocketBaseClient
    from ws4py.manager import WebSocketManager

    m = WebSocketManager()

    class EchoClient(WebSocketBaseClient):
        def handshake_ok(self):
            m.add(self)  # register the client once the handshake is done

        def received_message(self, msg):
            print str(msg)

    m.start()

    client = EchoClient('ws://localhost:9000/ws')
    client.connect()

    m.join()  # blocks forever

Managers are not compulsory but hopefully will help your
workflow. For clients, you can still rely on threaded, gevent or
tornado based implementations of course.
"""
import logging
import select
import threading
import time

from ws4py import format_addresses
from ws4py.compat import py3k

logger = logging.getLogger('ws4py')

class SelectPoller(object):
    def __init__(self, timeout=0.1):
        """
        A socket poller that uses the `select`
        implementation to determines which
        file descriptors have data available to read.

        It is available on all platforms.
        """
        self._fds = []
        self.timeout = timeout

    def release(self):
        """
        Cleanup resources.
        """
        self._fds = []

    def register(self, fd):
        """
        Register a new file descriptor to be
        part of the select polling next time around.
        """
        if fd not in self._fds:
            self._fds.append(fd)

    def unregister(self, fd):
        """
        Unregister the given file descriptor.
        """
        if fd in self._fds:
            self._fds.remove(fd)

    def poll(self):
        """
        Polls once and returns a list of
        ready-to-be-read file descriptors.
        """
        if not self._fds:
            time.sleep(self.timeout)
            return []
        try:
            r, w, x = select.select(self._fds, [], [], self.timeout)
        except IOError as e:
            return []
        return r

class EPollPoller(object):
    def __init__(self, timeout=0.1):
        """
        An epoll poller that uses the ``epoll``
        implementation to determines which
        file descriptors have data available to read.

        Available on Unix flavors mostly.
        """
        self.poller = select.epoll()
        self.timeout = timeout

    def release(self):
        """
        Cleanup resources.
        """
        self.poller.close()

    def register(self, fd):
        """
        Register a new file descriptor to be
        part of the select polling next time around.
        """
        try:
            self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
        except IOError:
            pass

    def unregister(self, fd):
        """
        Unregister the given file descriptor.
        """
        self.poller.unregister(fd)

    def poll(self):
        """
        Polls once and yields each ready-to-be-read
        file-descriptor
        """
        try:
            events = self.poller.poll(timeout=self.timeout)
        except IOError:
            events = []

        for fd, event in events:
            if event | select.EPOLLIN | select.EPOLLPRI:
                yield fd

class KQueuePoller(object):
    def __init__(self, timeout=0.1):
        """
        An epoll poller that uses the ``epoll``
        implementation to determines which
        file descriptors have data available to read.

        Available on Unix flavors mostly.
        """
        self.poller = select.epoll()
        self.timeout = timeout

    def release(self):
        """
        Cleanup resources.
        """
        self.poller.close()

    def register(self, fd):
        """
        Register a new file descriptor to be
        part of the select polling next time around.
        """
        try:
            self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI)
        except IOError:
            pass

    def unregister(self, fd):
        """
        Unregister the given file descriptor.
        """
        self.poller.unregister(fd)

    def poll(self):
        """
        Polls once and yields each ready-to-be-read
        file-descriptor
        """
        try:
            events = self.poller.poll(timeout=self.timeout)
        except IOError:
            events = []
        for fd, event in events:
            if event | select.EPOLLIN | select.EPOLLPRI:
                yield fd

class WebSocketManager(threading.Thread):
    def __init__(self, poller=None):
        """
        An event-based websocket manager. By event-based, we mean
        that the websockets will be called when their
        sockets have data to be read from.

        The manager itself runs in its own thread as not to
        be the blocking mainloop of your application.

        The poller's implementation is automatically chosen
        with ``epoll`` if available else ``select`` unless you
        provide your own ``poller``.
        """
        threading.Thread.__init__(self)
        self.name = "WebSocketManager"
        self.lock = threading.Lock()
        self.websockets = {}
        self.running = False

        if poller:
            self.poller = poller
        else:
            if hasattr(select, "epoll"):
                self.poller = EPollPoller()
                logger.info("Using epoll")
            else:
                self.poller = SelectPoller()
                logger.info("Using select as epoll is not available")

    def __len__(self):
        return len(self.websockets)

    def __iter__(self):
        if py3k:
            return iter(self.websockets.values())
        else:
            return self.websockets.itervalues()

    def __contains__(self, ws):
        fd = ws.sock.fileno()
        # just in case the file descriptor was reused
        # we actually check the instance (well, this might
        # also have been reused...)
        return self.websockets.get(fd) is ws

    def add(self, websocket):
        """
        Manage a new websocket.

        First calls its :meth:`opened() <ws4py.websocket.WebSocket.opened>`
        method and register its socket against the poller
        for reading events.
        """
        if websocket in self:
            return

        logger.info("Managing websocket %s" % format_addresses(websocket))
        websocket.opened()
        with self.lock:
            fd = websocket.sock.fileno()
            self.websockets[fd] = websocket
            self.poller.register(fd)

    def remove(self, websocket):
        """
        Remove the given ``websocket`` from the manager.

        This does not call its :meth:`closed() <ws4py.websocket.WebSocket.closed>`
        method as it's out-of-band by your application
        or from within the manager's run loop.
        """
        if websocket not in self:
            return

        logger.info("Removing websocket %s" % format_addresses(websocket))
        with self.lock:
            fd = websocket.sock.fileno()
            self.websockets.pop(fd, None)
            self.poller.unregister(fd)

    def stop(self):
        """
        Mark the manager as terminated and
        releases its resources.
        """
        self.running = False
        with self.lock:
            self.websockets.clear()
            self.poller.release()

    def run(self):
        """
        Manager's mainloop executed from within a thread.

        Constantly poll for read events and, when available,
        call related websockets' `once` method to
        read and process the incoming data.

        If the :meth:`once() <ws4py.websocket.WebSocket.once>`
        method returns a `False` value, its :meth:`terminate() <ws4py.websocket.WebSocket.terminate>`
        method is also applied to properly close
        the websocket and its socket is unregistered from the poller.

        Note that websocket shouldn't take long to process
        their data or they will block the remaining
        websockets with data to be handled. As for what long means,
        it's up to your requirements.
        """
        self.running = True
        while self.running:
            with self.lock:
                polled = self.poller.poll()
            if not self.running:
                break

            for fd in polled:
                if not self.running:
                    break

                ws = self.websockets.get(fd)
                if ws and not ws.terminated:
                    # I don't know what kind of errors might spew out of here
                    # but they probably shouldn't crash the entire server.
                    try:
                        x = ws.once()
                    # Treat the error as if once() had returned None
                    except Exception as e:
                        x = None
                        logger.error("Terminating websocket %s due to exception: %s in once method" % (format_addresses(ws), repr(e)) )
                    if not x:
                        with self.lock:
                            self.websockets.pop(fd, None)
                            self.poller.unregister(fd)

                        if not ws.terminated:
                            logger.info("Terminating websocket %s" % format_addresses(ws))
                            ws.terminate()


    def close_all(self, code=1001, message='Server is shutting down'):
        """
        Execute the :meth:`close() <ws4py.websocket.WebSocket.close>`
        method of each registered websockets to initiate the closing handshake.
        It doesn't wait for the handshake to complete properly.
        """
        with self.lock:
            logger.info("Closing all websockets with [%d] '%s'" % (code, message))
            for ws in iter(self):
                ws.close(code=code, reason=message)

    def broadcast(self, message, binary=False):
        """
        Broadcasts the given message to all registered
        websockets, at the time of the call.

        Broadcast may fail on a given registered peer
        but this is silent as it's not the method's
        purpose to handle websocket's failures.
        """
        with self.lock:
            websockets = self.websockets.copy()
            if py3k:
                ws_iter = iter(websockets.values())
            else:
                ws_iter = websockets.itervalues()

        for ws in ws_iter:
            if not ws.terminated:
                try:
                    ws.send(message, binary)
                except:
                    pass