diff options
author | Jesusaves <cpntb1@ymail.com> | 2020-12-17 02:50:12 -0300 |
---|---|---|
committer | Jesusaves <cpntb1@ymail.com> | 2020-12-17 02:50:12 -0300 |
commit | e1793335a756f491593a3f36e3d6b6eee2c7a005 (patch) | |
tree | 5a39ccb2a59dc3db6d582a2763ee72649f4b2c8d /game/python-extra/ws4py | |
parent | c707470f1e3b5ddfe82ef09d9b79905d09684ebe (diff) | |
download | client-e1793335a756f491593a3f36e3d6b6eee2c7a005.tar.gz client-e1793335a756f491593a3f36e3d6b6eee2c7a005.tar.bz2 client-e1793335a756f491593a3f36e3d6b6eee2c7a005.tar.xz client-e1793335a756f491593a3f36e3d6b6eee2c7a005.zip |
Replace websocket-client with ws4py
Both are dead but well, Python 2.7 is dead, soooo
Diffstat (limited to 'game/python-extra/ws4py')
20 files changed, 3870 insertions, 0 deletions
diff --git a/game/python-extra/ws4py/__init__.py b/game/python-extra/ws4py/__init__.py new file mode 100644 index 0000000..1f618b0 --- /dev/null +++ b/game/python-extra/ws4py/__init__.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of ws4py nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +import logging +#import logging.handlers as handlers + +__author__ = "Sylvain Hellegouarch" +__version__ = "0.5.1" +__all__ = ['WS_KEY', 'WS_VERSION', 'configure_logger', 'format_addresses'] + +WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" +WS_VERSION = (8, 13) + +def configure_logger(stdout=True, filepath=None, level=logging.INFO): + logger = logging.getLogger('ws4py') + logger.setLevel(level) + logfmt = logging.Formatter("[%(asctime)s] %(levelname)s %(message)s") + + ######### + # @TMW2 + #if filepath: + # h = handlers.RotatingFileHandler(filepath, maxBytes=10485760, backupCount=3) + # h.setLevel(level) + # h.setFormatter(logfmt) + # logger.addHandler(h) + # @TMW2 + ######### + + if stdout: + import sys + h = logging.StreamHandler(sys.stdout) + h.setLevel(level) + h.setFormatter(logfmt) + logger.addHandler(h) + + return logger + +def format_addresses(ws): + me = ws.local_address + peer = ws.peer_address + if isinstance(me, tuple) and isinstance(peer, tuple): + me_ip, me_port = ws.local_address + peer_ip, peer_port = ws.peer_address + return "[Local => %s:%d | Remote => %s:%d]" % (me_ip, me_port, peer_ip, peer_port) + + return "[Bound to '%s']" % me diff --git a/game/python-extra/ws4py/async_websocket.py b/game/python-extra/ws4py/async_websocket.py new file mode 100644 index 0000000..9e2a4c7 --- /dev/null +++ b/game/python-extra/ws4py/async_websocket.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +WebSocket implementation that relies on two new Python +features: + +* asyncio to provide the high-level interface above transports +* yield from to delegate to the reading stream whenever more + bytes are required + +You can use these implementations in that context +and benefit from those features whilst using ws4py. + +Strictly speaking this module probably doesn't have to +be called async_websocket but it feels this will be its typical +usage and is probably more readable than +delegated_generator_websocket_on_top_of_asyncio.py +""" +import asyncio +import types + +from ws4py.websocket import WebSocket as _WebSocket +from ws4py.messaging import Message + +__all__ = ['WebSocket', 'EchoWebSocket'] + +class WebSocket(_WebSocket): + def __init__(self, proto): + """ + A :pep:`3156` ready websocket handler that works + well in a coroutine-aware loop such as the one provided + by the asyncio module. + + The provided `proto` instance is a + :class:`asyncio.Protocol` subclass instance that will + be used internally to read and write from the + underlying transport. + + Because the base :class:`ws4py.websocket.WebSocket` + class is still coupled a bit to the socket interface, + we have to override a little more than necessary + to play nice with the :pep:`3156` interface. Hopefully, + some day this will be cleaned out. + """ + _WebSocket.__init__(self, None) + self.started = False + self.proto = proto + + @property + def local_address(self): + """ + Local endpoint address as a tuple + """ + if not self._local_address: + self._local_address = self.proto.reader.transport.get_extra_info('sockname') + if len(self._local_address) == 4: + self._local_address = self._local_address[:2] + return self._local_address + + @property + def peer_address(self): + """ + Peer endpoint address as a tuple + """ + if not self._peer_address: + self._peer_address = self.proto.reader.transport.get_extra_info('peername') + if len(self._peer_address) == 4: + self._peer_address = self._peer_address[:2] + return self._peer_address + + def once(self): + """ + The base class directly is used in conjunction with + the :class:`ws4py.manager.WebSocketManager` which is + not actually used with the asyncio implementation + of ws4py. So let's make it clear it shan't be used. + """ + raise NotImplemented() + + def close_connection(self): + """ + Close the underlying transport + """ + @asyncio.coroutine + def closeit(): + yield from self.proto.writer.drain() + self.proto.writer.close() + asyncio.async(closeit()) + + def _write(self, data): + """ + Write to the underlying transport + """ + @asyncio.coroutine + def sendit(data): + self.proto.writer.write(data) + yield from self.proto.writer.drain() + asyncio.async(sendit(data)) + + @asyncio.coroutine + def run(self): + """ + Coroutine that runs until the websocket + exchange is terminated. It also calls the + `opened()` method to indicate the exchange + has started. + """ + self.started = True + try: + self.opened() + reader = self.proto.reader + while True: + data = yield from reader.read(self.reading_buffer_size) + if not self.process(data): + return False + finally: + self.terminate() + + return True + +class EchoWebSocket(WebSocket): + def received_message(self, message): + """ + Automatically sends back the provided ``message`` to + its originating endpoint. + """ + self.send(message.data, message.is_binary) diff --git a/game/python-extra/ws4py/client/__init__.py b/game/python-extra/ws4py/client/__init__.py new file mode 100644 index 0000000..411638f --- /dev/null +++ b/game/python-extra/ws4py/client/__init__.py @@ -0,0 +1,344 @@ +# -*- coding: utf-8 -*- +from base64 import b64encode +from hashlib import sha1 +import os +import socket +import ssl + +from ws4py import WS_KEY, WS_VERSION +from ws4py.exc import HandshakeError +from ws4py.websocket import WebSocket +from ws4py.compat import urlsplit + +__all__ = ['WebSocketBaseClient'] + +class WebSocketBaseClient(WebSocket): + def __init__(self, url, protocols=None, extensions=None, + heartbeat_freq=None, ssl_options=None, headers=None, exclude_headers=None): + """ + A websocket client that implements :rfc:`6455` and provides a simple + interface to communicate with a websocket server. + + This class works on its own but will block if not run in + its own thread. + + When an instance of this class is created, a :py:mod:`socket` + is created. If the connection is a TCP socket, + the nagle's algorithm is disabled. + + The address of the server will be extracted from the given + websocket url. + + The websocket key is randomly generated, reset the + `key` attribute if you want to provide yours. + + For instance to create a TCP client: + + .. code-block:: python + + >>> from ws4py.client import WebSocketBaseClient + >>> ws = WebSocketBaseClient('ws://localhost/ws') + + + Here is an example for a TCP client over SSL: + + .. code-block:: python + + >>> from ws4py.client import WebSocketBaseClient + >>> ws = WebSocketBaseClient('wss://localhost/ws') + + + Finally an example of a Unix-domain connection: + + .. code-block:: python + + >>> from ws4py.client import WebSocketBaseClient + >>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock') + + Note that in this case, the initial Upgrade request + will be sent to ``/``. You may need to change this + by setting the resource explicitely before connecting: + + .. code-block:: python + + >>> from ws4py.client import WebSocketBaseClient + >>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock') + >>> ws.resource = '/ws' + >>> ws.connect() + + You may provide extra headers by passing a list of tuples + which must be unicode objects. + + """ + self.url = url + self.host = None + self.scheme = None + self.port = None + self.unix_socket_path = None + self.resource = None + self.ssl_options = ssl_options or {} + self.extra_headers = headers or [] + self.exclude_headers = exclude_headers or [] + self.exclude_headers = [x.lower() for x in self.exclude_headers] + + if self.scheme == "wss": + # Prevent check_hostname requires server_hostname (ref #187) + if "cert_reqs" not in self.ssl_options: + self.ssl_options["cert_reqs"] = ssl.CERT_NONE + + self._parse_url() + + if self.unix_socket_path: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) + else: + # Let's handle IPv4 and IPv6 addresses + # Simplified from CherryPy's code + try: + family, socktype, proto, canonname, sa = socket.getaddrinfo(self.host, self.port, + socket.AF_UNSPEC, + socket.SOCK_STREAM, + 0, socket.AI_PASSIVE)[0] + except socket.gaierror: + family = socket.AF_INET + if self.host.startswith('::'): + family = socket.AF_INET6 + + socktype = socket.SOCK_STREAM + proto = 0 + canonname = "" + sa = (self.host, self.port, 0, 0) + + sock = socket.socket(family, socktype, proto) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, 'AF_INET6') and family == socket.AF_INET6 and \ + self.host.startswith('::'): + try: + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + except (AttributeError, socket.error): + pass + + WebSocket.__init__(self, sock, protocols=protocols, + extensions=extensions, + heartbeat_freq=heartbeat_freq) + + self.stream.always_mask = True + self.stream.expect_masking = False + self.key = b64encode(os.urandom(16)) + + # Adpated from: https://github.com/liris/websocket-client/blob/master/websocket.py#L105 + def _parse_url(self): + """ + Parses a URL which must have one of the following forms: + + - ws://host[:port][path] + - wss://host[:port][path] + - ws+unix:///path/to/my.socket + + In the first two cases, the ``host`` and ``port`` + attributes will be set to the parsed values. If no port + is explicitely provided, it will be either 80 or 443 + based on the scheme. Also, the ``resource`` attribute is + set to the path segment of the URL (alongside any querystring). + + In addition, if the scheme is ``ws+unix``, the + ``unix_socket_path`` attribute is set to the path to + the Unix socket while the ``resource`` attribute is + set to ``/``. + """ + # Python 2.6.1 and below don't parse ws or wss urls properly. netloc is empty. + # See: https://github.com/Lawouach/WebSocket-for-Python/issues/59 + scheme, url = self.url.split(":", 1) + + parsed = urlsplit(url, scheme="http") + if parsed.hostname: + self.host = parsed.hostname + elif '+unix' in scheme: + self.host = 'localhost' + else: + raise ValueError("Invalid hostname from: %s", self.url) + + if parsed.port: + self.port = parsed.port + + if scheme == "ws": + if not self.port: + self.port = 80 + elif scheme == "wss": + if not self.port: + self.port = 443 + elif scheme in ('ws+unix', 'wss+unix'): + pass + else: + raise ValueError("Invalid scheme: %s" % scheme) + + if parsed.path: + resource = parsed.path + else: + resource = "/" + + if '+unix' in scheme: + self.unix_socket_path = resource + resource = '/' + + if parsed.query: + resource += "?" + parsed.query + + self.scheme = scheme + self.resource = resource + + @property + def bind_addr(self): + """ + Returns the Unix socket path if or a tuple + ``(host, port)`` depending on the initial + URL's scheme. + """ + return self.unix_socket_path or (self.host, self.port) + + def close(self, code=1000, reason=''): + """ + Initiate the closing handshake with the server. + """ + if not self.client_terminated: + self.client_terminated = True + self._write(self.stream.close(code=code, reason=reason).single(mask=True)) + + def connect(self): + """ + Connects this websocket and starts the upgrade handshake + with the remote endpoint. + """ + if self.scheme == "wss": + # default port is now 443; upgrade self.sender to send ssl + self.sock = ssl.wrap_socket(self.sock, **self.ssl_options) + self._is_secure = True + + self.sock.connect(self.bind_addr) + + self._write(self.handshake_request) + + response = b'' + doubleCLRF = b'\r\n\r\n' + while True: + bytes = self.sock.recv(128) + if not bytes: + break + response += bytes + if doubleCLRF in response: + break + + if not response: + self.close_connection() + raise HandshakeError("Invalid response") + + headers, _, body = response.partition(doubleCLRF) + response_line, _, headers = headers.partition(b'\r\n') + + try: + self.process_response_line(response_line) + self.protocols, self.extensions = self.process_handshake_header(headers) + except HandshakeError: + self.close_connection() + raise + + self.handshake_ok() + if body: + self.process(body) + + @property + def handshake_headers(self): + """ + List of headers appropriate for the upgrade + handshake. + """ + headers = [ + ('Host', '%s:%s' % (self.host, self.port)), + ('Connection', 'Upgrade'), + ('Upgrade', 'websocket'), + ('Sec-WebSocket-Key', self.key.decode('utf-8')), + ('Sec-WebSocket-Version', str(max(WS_VERSION))) + ] + + if self.protocols: + headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols))) + + if self.extra_headers: + headers.extend(self.extra_headers) + + if not any(x for x in headers if x[0].lower() == 'origin') and \ + 'origin' not in self.exclude_headers: + + scheme, url = self.url.split(":", 1) + parsed = urlsplit(url, scheme="http") + if parsed.hostname: + self.host = parsed.hostname + else: + self.host = 'localhost' + origin = scheme + '://' + self.host + if parsed.port: + origin = origin + ':' + str(parsed.port) + headers.append(('Origin', origin)) + + headers = [x for x in headers if x[0].lower() not in self.exclude_headers] + + return headers + + @property + def handshake_request(self): + """ + Prepare the request to be sent for the upgrade handshake. + """ + headers = self.handshake_headers + request = [("GET %s HTTP/1.1" % self.resource).encode('utf-8')] + for header, value in headers: + request.append(("%s: %s" % (header, value)).encode('utf-8')) + request.append(b'\r\n') + + return b'\r\n'.join(request) + + def process_response_line(self, response_line): + """ + Ensure that we received a HTTP `101` status code in + response to our request and if not raises :exc:`HandshakeError`. + """ + protocol, code, status = response_line.split(b' ', 2) + if code != b'101': + raise HandshakeError("Invalid response status: %s %s" % (code, status)) + + def process_handshake_header(self, headers): + """ + Read the upgrade handshake's response headers and + validate them against :rfc:`6455`. + """ + protocols = [] + extensions = [] + + headers = headers.strip() + + for header_line in headers.split(b'\r\n'): + header, value = header_line.split(b':', 1) + header = header.strip().lower() + value = value.strip().lower() + + if header == b'upgrade' and value != b'websocket': + raise HandshakeError("Invalid Upgrade header: %s" % value) + + elif header == b'connection' and value != b'upgrade': + raise HandshakeError("Invalid Connection header: %s" % value) + + elif header == b'sec-websocket-accept': + match = b64encode(sha1(self.key + WS_KEY).digest()) + if value != match.lower(): + raise HandshakeError("Invalid challenge response: %s" % value) + + elif header == b'sec-websocket-protocol': + protocols.extend([x.strip() for x in value.split(b',')]) + + elif header == b'sec-websocket-extensions': + extensions.extend([x.strip() for x in value.split(b',')]) + + return protocols, extensions + + def handshake_ok(self): + self.opened() diff --git a/game/python-extra/ws4py/client/geventclient.py b/game/python-extra/ws4py/client/geventclient.py new file mode 100644 index 0000000..b64a17e --- /dev/null +++ b/game/python-extra/ws4py/client/geventclient.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +import copy + +import gevent +from gevent import Greenlet +from gevent.queue import Queue + +from ws4py.client import WebSocketBaseClient + +__all__ = ['WebSocketClient'] + +class WebSocketClient(WebSocketBaseClient): + def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None, ssl_options=None, headers=None, exclude_headers=None): + """ + WebSocket client that executes the + :meth:`run() <ws4py.websocket.WebSocket.run>` into a gevent greenlet. + + .. code-block:: python + + ws = WebSocketClient('ws://localhost:9000/echo', protocols=['http-only', 'chat']) + ws.connect() + + ws.send("Hello world") + + def incoming(): + while True: + m = ws.receive() + if m is not None: + print str(m) + else: + break + + def outgoing(): + for i in range(0, 40, 5): + ws.send("*" * i) + + greenlets = [ + gevent.spawn(incoming), + gevent.spawn(outgoing), + ] + gevent.joinall(greenlets) + """ + WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq, + ssl_options=ssl_options, headers=headers, exclude_headers=exclude_headers) + self._th = Greenlet(self.run) + + self.messages = Queue() + """ + Queue that will hold received messages. + """ + + def handshake_ok(self): + """ + Called when the upgrade handshake has completed + successfully. + + Starts the client's thread. + """ + self._th.start() + + def received_message(self, message): + """ + Override the base class to store the incoming message + in the `messages` queue. + """ + self.messages.put(copy.deepcopy(message)) + + def closed(self, code, reason=None): + """ + Puts a :exc:`StopIteration` as a message into the + `messages` queue. + """ + # When the connection is closed, put a StopIteration + # on the message queue to signal there's nothing left + # to wait for + self.messages.put(StopIteration) + + def receive(self, block=True): + """ + Returns messages that were stored into the + `messages` queue and returns `None` when the + websocket is terminated or closed. + `block` is passed though the gevent queue `.get()` method, which if + True will block until an item in the queue is available. Set this to + False if you just want to check the queue, which will raise an + Empty exception you need to handle if there is no message to return. + """ + # If the websocket was terminated and there are no messages + # left in the queue, return None immediately otherwise the client + # will block forever + if self.terminated and self.messages.empty(): + return None + message = self.messages.get(block=block) + if message is StopIteration: + return None + return message diff --git a/game/python-extra/ws4py/client/threadedclient.py b/game/python-extra/ws4py/client/threadedclient.py new file mode 100644 index 0000000..b033f32 --- /dev/null +++ b/game/python-extra/ws4py/client/threadedclient.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +import threading + +from ws4py.client import WebSocketBaseClient + +__all__ = ['WebSocketClient'] + +class WebSocketClient(WebSocketBaseClient): + def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None, + ssl_options=None, headers=None, exclude_headers=None): + """ + .. code-block:: python + + from ws4py.client.threadedclient import WebSocketClient + + class EchoClient(WebSocketClient): + def opened(self): + for i in range(0, 200, 25): + self.send("*" * i) + + def closed(self, code, reason): + print(("Closed down", code, reason)) + + def received_message(self, m): + print("=> %d %s" % (len(m), str(m))) + + try: + ws = EchoClient('ws://localhost:9000/echo', protocols=['http-only', 'chat']) + ws.connect() + except KeyboardInterrupt: + ws.close() + + """ + WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq, + ssl_options, headers=headers, exclude_headers=exclude_headers) + self._th = threading.Thread(target=self.run, name='WebSocketClient') + self._th.daemon = True + + @property + def daemon(self): + """ + `True` if the client's thread is set to be a daemon thread. + """ + return self._th.daemon + + @daemon.setter + def daemon(self, flag): + """ + Set to `True` if the client's thread should be a daemon. + """ + self._th.daemon = flag + + def run_forever(self): + """ + Simply blocks the thread until the + websocket has terminated. + """ + while not self.terminated: + self._th.join(timeout=0.1) + + def handshake_ok(self): + """ + Called when the upgrade handshake has completed + successfully. + + Starts the client's thread. + """ + self._th.start() + +if __name__ == '__main__': + from ws4py.client.threadedclient import WebSocketClient + + class EchoClient(WebSocketClient): + def opened(self): + def data_provider(): + for i in range(0, 200, 25): + yield "#" * i + + self.send(data_provider()) + + for i in range(0, 200, 25): + self.send("*" * i) + + def closed(self, code, reason): + print(("Closed down", code, reason)) + + def received_message(self, m): + print("#%d" % len(m)) + if len(m) == 175: + self.close(reason='bye bye') + + try: + ws = EchoClient('ws://localhost:9000/ws', protocols=['http-only', 'chat'], + headers=[('X-Test', 'hello there')]) + ws.connect() + ws.run_forever() + except KeyboardInterrupt: + ws.close() diff --git a/game/python-extra/ws4py/client/tornadoclient.py b/game/python-extra/ws4py/client/tornadoclient.py new file mode 100644 index 0000000..22478e6 --- /dev/null +++ b/game/python-extra/ws4py/client/tornadoclient.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +import ssl + +from tornado import iostream, escape +from ws4py.client import WebSocketBaseClient +from ws4py.exc import HandshakeError + +__all__ = ['TornadoWebSocketClient'] + +class TornadoWebSocketClient(WebSocketBaseClient): + def __init__(self, url, protocols=None, extensions=None, + io_loop=None, ssl_options=None, headers=None, exclude_headers=None): + """ + .. code-block:: python + + from tornado import ioloop + + class MyClient(TornadoWebSocketClient): + def opened(self): + for i in range(0, 200, 25): + self.send("*" * i) + + def received_message(self, m): + print((m, len(str(m)))) + + def closed(self, code, reason=None): + ioloop.IOLoop.instance().stop() + + ws = MyClient('ws://localhost:9000/echo', protocols=['http-only', 'chat']) + ws.connect() + + ioloop.IOLoop.instance().start() + """ + WebSocketBaseClient.__init__(self, url, protocols, extensions, + ssl_options=ssl_options, headers=headers, exclude_headers=exclude_headers) + if self.scheme == "wss": + self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False, **self.ssl_options) + self._is_secure = True + self.io = iostream.SSLIOStream(self.sock, io_loop, ssl_options=self.ssl_options) + else: + self.io = iostream.IOStream(self.sock, io_loop) + self.io_loop = io_loop + + def connect(self): + """ + Connects the websocket and initiate the upgrade handshake. + """ + self.io.set_close_callback(self.__connection_refused) + self.io.connect((self.host, int(self.port)), self.__send_handshake) + + def _write(self, b): + """ + Trying to prevent a write operation + on an already closed websocket stream. + + This cannot be bullet proof but hopefully + will catch almost all use cases. + """ + if self.terminated: + raise RuntimeError("Cannot send on a terminated websocket") + + self.io.write(b) + + def __connection_refused(self, *args, **kwargs): + self.server_terminated = True + self.closed(1005, 'Connection refused') + + def __send_handshake(self): + self.io.set_close_callback(self.__connection_closed) + self.io.write(escape.utf8(self.handshake_request), + self.__handshake_sent) + + def __connection_closed(self, *args, **kwargs): + self.server_terminated = True + self.closed(1006, 'Connection closed during handshake') + + def __handshake_sent(self): + self.io.read_until(b"\r\n\r\n", self.__handshake_completed) + + def __handshake_completed(self, data): + self.io.set_close_callback(None) + try: + response_line, _, headers = data.partition(b'\r\n') + self.process_response_line(response_line) + protocols, extensions = self.process_handshake_header(headers) + except HandshakeError: + self.close_connection() + raise + + self.opened() + self.io.set_close_callback(self.__stream_closed) + self.io.read_bytes(self.reading_buffer_size, self.__fetch_more) + + def __fetch_more(self, bytes): + try: + should_continue = self.process(bytes) + except: + should_continue = False + + if should_continue: + self.io.read_bytes(self.reading_buffer_size, self.__fetch_more) + else: + self.__gracefully_terminate() + + def __gracefully_terminate(self): + self.client_terminated = self.server_terminated = True + + try: + if not self.stream.closing: + self.closed(1006) + finally: + self.close_connection() + + def __stream_closed(self, *args, **kwargs): + self.io.set_close_callback(None) + code = 1006 + reason = None + if self.stream.closing: + code, reason = self.stream.closing.code, self.stream.closing.reason + self.closed(code, reason) + self.stream._cleanup() + + def close_connection(self): + """ + Close the underlying connection + """ + self.io.close() + +if __name__ == '__main__': + from tornado import ioloop + + class MyClient(TornadoWebSocketClient): + def opened(self): + def data_provider(): + for i in range(0, 200, 25): + yield "#" * i + + self.send(data_provider()) + + for i in range(0, 200, 25): + self.send("*" * i) + + def received_message(self, m): + print("#%d" % len(m)) + if len(m) == 175: + self.close() + + def closed(self, code, reason=None): + ioloop.IOLoop.instance().stop() + print(("Closed down", code, reason)) + + ws = MyClient('ws://localhost:9000/ws', protocols=['http-only', 'chat']) + ws.connect() + + ioloop.IOLoop.instance().start() diff --git a/game/python-extra/ws4py/compat.py b/game/python-extra/ws4py/compat.py new file mode 100644 index 0000000..e986e33 --- /dev/null +++ b/game/python-extra/ws4py/compat.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +This compatibility module is inspired by the one found +in CherryPy. It provides a common entry point for the various +functions and types that are used with ws4py but which +differ from Python 2.x to Python 3.x + +There are likely better ways for some of them so feel +free to provide patches. + +Note this has been tested against 2.7 and 3.3 only but +should hopefully work fine with other versions too. +""" +import sys + +if sys.version_info >= (3, 0): + py3k = True + from urllib.parse import urlsplit + range = range + unicode = str + basestring = (bytes, str) + _ord = ord + + def get_connection(fileobj): + return fileobj.raw._sock + + def detach_connection(fileobj): + fileobj.detach() + + def ord(c): + if isinstance(c, int): + return c + return _ord(c) +else: + py3k = False + from urlparse import urlsplit + range = xrange + unicode = unicode + basestring = basestring + ord = ord + + def get_connection(fileobj): + return fileobj._sock + + def detach_connection(fileobj): + fileobj._sock = None diff --git a/game/python-extra/ws4py/exc.py b/game/python-extra/ws4py/exc.py new file mode 100644 index 0000000..bfefea4 --- /dev/null +++ b/game/python-extra/ws4py/exc.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- + +__all__ = ['WebSocketException', 'FrameTooLargeException', 'ProtocolException', + 'UnsupportedFrameTypeException', 'TextFrameEncodingException', + 'UnsupportedFrameTypeException', 'TextFrameEncodingException', + 'StreamClosed', 'HandshakeError', 'InvalidBytesError'] + +class WebSocketException(Exception): pass + +class ProtocolException(WebSocketException): pass + +class FrameTooLargeException(WebSocketException): pass + +class UnsupportedFrameTypeException(WebSocketException): pass + +class TextFrameEncodingException(WebSocketException): pass + +class InvalidBytesError(WebSocketException): pass + +class StreamClosed(Exception): pass + +class HandshakeError(WebSocketException): + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return self.msg diff --git a/game/python-extra/ws4py/framing.py b/game/python-extra/ws4py/framing.py new file mode 100644 index 0000000..5046167 --- /dev/null +++ b/game/python-extra/ws4py/framing.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8 -*- +from struct import pack, unpack + +from ws4py.exc import FrameTooLargeException, ProtocolException +from ws4py.compat import py3k, ord, range + +# Frame opcodes defined in the spec. +OPCODE_CONTINUATION = 0x0 +OPCODE_TEXT = 0x1 +OPCODE_BINARY = 0x2 +OPCODE_CLOSE = 0x8 +OPCODE_PING = 0x9 +OPCODE_PONG = 0xa + +__all__ = ['Frame'] + +class Frame(object): + def __init__(self, opcode=None, body=b'', masking_key=None, fin=0, rsv1=0, rsv2=0, rsv3=0): + """ + Implements the framing protocol as defined by RFC 6455. + + .. code-block:: python + :linenos: + + >>> test_mask = 'XXXXXX' # perhaps from os.urandom(4) + >>> f = Frame(OPCODE_TEXT, 'hello world', masking_key=test_mask, fin=1) + >>> bytes = f.build() + >>> bytes.encode('hex') + '818bbe04e66ad6618a06d1249105cc6882' + >>> f = Frame() + >>> f.parser.send(bytes[0]) + 1 + >>> f.parser.send(bytes[1]) + 4 + + .. seealso:: Data Framing http://tools.ietf.org/html/rfc6455#section-5.2 + """ + if not isinstance(body, bytes): + raise TypeError("The body must be properly encoded") + + self.opcode = opcode + self.body = body + self.masking_key = masking_key + self.fin = fin + self.rsv1 = rsv1 + self.rsv2 = rsv2 + self.rsv3 = rsv3 + self.payload_length = len(body) + + self._parser = None + + @property + def parser(self): + if self._parser is None: + self._parser = self._parsing() + # Python generators must be initialized once. + next(self.parser) + return self._parser + + def _cleanup(self): + if self._parser: + self._parser.close() + self._parser = None + + def build(self): + """ + Builds a frame from the instance's attributes and returns + its bytes representation. + """ + header = b'' + + if self.fin > 0x1: + raise ValueError('FIN bit parameter must be 0 or 1') + + if 0x3 <= self.opcode <= 0x7 or 0xB <= self.opcode: + raise ValueError('Opcode cannot be a reserved opcode') + + ## +-+-+-+-+-------+ + ## |F|R|R|R| opcode| + ## |I|S|S|S| (4) | + ## |N|V|V|V| | + ## | |1|2|3| | + ## +-+-+-+-+-------+ + header = pack('!B', ((self.fin << 7) + | (self.rsv1 << 6) + | (self.rsv2 << 5) + | (self.rsv3 << 4) + | self.opcode)) + + ## +-+-------------+-------------------------------+ + ## |M| Payload len | Extended payload length | + ## |A| (7) | (16/63) | + ## |S| | (if payload len==126/127) | + ## |K| | | + ## +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + ## | Extended payload length continued, if payload len == 127 | + ## + - - - - - - - - - - - - - - - +-------------------------------+ + if self.masking_key: mask_bit = 1 << 7 + else: mask_bit = 0 + + length = self.payload_length + if length < 126: + header += pack('!B', (mask_bit | length)) + elif length < (1 << 16): + header += pack('!B', (mask_bit | 126)) + pack('!H', length) + elif length < (1 << 63): + header += pack('!B', (mask_bit | 127)) + pack('!Q', length) + else: + raise FrameTooLargeException() + + ## + - - - - - - - - - - - - - - - +-------------------------------+ + ## | |Masking-key, if MASK set to 1 | + ## +-------------------------------+-------------------------------+ + ## | Masking-key (continued) | Payload Data | + ## +-------------------------------- - - - - - - - - - - - - - - - + + ## : Payload Data continued ... : + ## + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + ## | Payload Data continued ... | + ## +---------------------------------------------------------------+ + body = self.body + if not self.masking_key: + return bytes(header + body) + + return bytes(header + self.masking_key + self.mask(body)) + + def _parsing(self): + """ + Generator to parse bytes into a frame. Yields until + enough bytes have been read or an error is met. + """ + buf = b'' + some_bytes = b'' + + # yield until we get the first header's byte + while not some_bytes: + some_bytes = (yield 1) + + first_byte = some_bytes[0] if isinstance(some_bytes, bytearray) else ord(some_bytes[0]) + # frame-fin = %x0 ; more frames of this message follow + # / %x1 ; final frame of this message + self.fin = (first_byte >> 7) & 1 + self.rsv1 = (first_byte >> 6) & 1 + self.rsv2 = (first_byte >> 5) & 1 + self.rsv3 = (first_byte >> 4) & 1 + self.opcode = first_byte & 0xf + + # frame-rsv1 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise + # frame-rsv2 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise + # frame-rsv3 = %x0 ; 1 bit, MUST be 0 unless negotiated otherwise + if self.rsv1 or self.rsv2 or self.rsv3: + raise ProtocolException() + + # control frames between 3 and 7 as well as above 0xA are currently reserved + if 2 < self.opcode < 8 or self.opcode > 0xA: + raise ProtocolException() + + # control frames cannot be fragmented + if self.opcode > 0x7 and self.fin == 0: + raise ProtocolException() + + # do we already have enough some_bytes to continue? + some_bytes = some_bytes[1:] if some_bytes and len(some_bytes) > 1 else b'' + + # Yield until we get the second header's byte + while not some_bytes: + some_bytes = (yield 1) + + second_byte = some_bytes[0] if isinstance(some_bytes, bytearray) else ord(some_bytes[0]) + mask = (second_byte >> 7) & 1 + self.payload_length = second_byte & 0x7f + + # All control frames MUST have a payload length of 125 some_bytes or less + if self.opcode > 0x7 and self.payload_length > 125: + raise FrameTooLargeException() + + if some_bytes and len(some_bytes) > 1: + buf = some_bytes[1:] + some_bytes = buf + else: + buf = b'' + some_bytes = b'' + + if self.payload_length == 127: + # This will compute the actual application data size + if len(buf) < 8: + nxt_buf_size = 8 - len(buf) + some_bytes = (yield nxt_buf_size) + some_bytes = buf + (some_bytes or b'') + while len(some_bytes) < 8: + b = (yield 8 - len(some_bytes)) + if b is not None: + some_bytes = some_bytes + b + if len(some_bytes) > 8: + buf = some_bytes[8:] + some_bytes = some_bytes[:8] + else: + some_bytes = buf[:8] + buf = buf[8:] + extended_payload_length = some_bytes + self.payload_length = unpack( + '!Q', extended_payload_length)[0] + if self.payload_length > 0x7FFFFFFFFFFFFFFF: + raise FrameTooLargeException() + elif self.payload_length == 126: + if len(buf) < 2: + nxt_buf_size = 2 - len(buf) + some_bytes = (yield nxt_buf_size) + some_bytes = buf + (some_bytes or b'') + while len(some_bytes) < 2: + b = (yield 2 - len(some_bytes)) + if b is not None: + some_bytes = some_bytes + b + if len(some_bytes) > 2: + buf = some_bytes[2:] + some_bytes = some_bytes[:2] + else: + some_bytes = buf[:2] + buf = buf[2:] + extended_payload_length = some_bytes + self.payload_length = unpack( + '!H', extended_payload_length)[0] + + if mask: + if len(buf) < 4: + nxt_buf_size = 4 - len(buf) + some_bytes = (yield nxt_buf_size) + some_bytes = buf + (some_bytes or b'') + while not some_bytes or len(some_bytes) < 4: + b = (yield 4 - len(some_bytes)) + if b is not None: + some_bytes = some_bytes + b + if len(some_bytes) > 4: + buf = some_bytes[4:] + else: + some_bytes = buf[:4] + buf = buf[4:] + self.masking_key = some_bytes + + if len(buf) < self.payload_length: + nxt_buf_size = self.payload_length - len(buf) + some_bytes = (yield nxt_buf_size) + some_bytes = buf + (some_bytes or b'') + while len(some_bytes) < self.payload_length: + l = self.payload_length - len(some_bytes) + b = (yield l) + if b is not None: + some_bytes = some_bytes + b + else: + if self.payload_length == len(buf): + some_bytes = buf + else: + some_bytes = buf[:self.payload_length] + + self.body = some_bytes + yield + + def mask(self, data): + """ + Performs the masking or unmasking operation on data + using the simple masking algorithm: + + .. + j = i MOD 4 + transformed-octet-i = original-octet-i XOR masking-key-octet-j + + """ + masked = bytearray(data) + if py3k: key = self.masking_key + else: key = map(ord, self.masking_key) + for i in range(len(data)): + masked[i] = masked[i] ^ key[i%4] + return masked + unmask = mask diff --git a/game/python-extra/ws4py/manager.py b/game/python-extra/ws4py/manager.py new file mode 100644 index 0000000..20c215f --- /dev/null +++ b/game/python-extra/ws4py/manager.py @@ -0,0 +1,368 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +The manager module provides a selected classes to +handle websocket's execution. + +Initially the rationale was to: + +- Externalize the way the CherryPy server had been setup + as its websocket management was too tightly coupled with + the plugin implementation. +- Offer a management that could be used by other + server or client implementations. +- Move away from the threaded model to the event-based + model by relying on `select` or `epoll` (when available). + + +A simple usage for handling websocket clients: + +.. code-block:: python + + from ws4py.client import WebSocketBaseClient + from ws4py.manager import WebSocketManager + + m = WebSocketManager() + + class EchoClient(WebSocketBaseClient): + def handshake_ok(self): + m.add(self) # register the client once the handshake is done + + def received_message(self, msg): + print str(msg) + + m.start() + + client = EchoClient('ws://localhost:9000/ws') + client.connect() + + m.join() # blocks forever + +Managers are not compulsory but hopefully will help your +workflow. For clients, you can still rely on threaded, gevent or +tornado based implementations of course. +""" +import logging +import select +import threading +import time + +from ws4py import format_addresses +from ws4py.compat import py3k + +logger = logging.getLogger('ws4py') + +class SelectPoller(object): + def __init__(self, timeout=0.1): + """ + A socket poller that uses the `select` + implementation to determines which + file descriptors have data available to read. + + It is available on all platforms. + """ + self._fds = [] + self.timeout = timeout + + def release(self): + """ + Cleanup resources. + """ + self._fds = [] + + def register(self, fd): + """ + Register a new file descriptor to be + part of the select polling next time around. + """ + if fd not in self._fds: + self._fds.append(fd) + + def unregister(self, fd): + """ + Unregister the given file descriptor. + """ + if fd in self._fds: + self._fds.remove(fd) + + def poll(self): + """ + Polls once and returns a list of + ready-to-be-read file descriptors. + """ + if not self._fds: + time.sleep(self.timeout) + return [] + try: + r, w, x = select.select(self._fds, [], [], self.timeout) + except IOError as e: + return [] + return r + +class EPollPoller(object): + def __init__(self, timeout=0.1): + """ + An epoll poller that uses the ``epoll`` + implementation to determines which + file descriptors have data available to read. + + Available on Unix flavors mostly. + """ + self.poller = select.epoll() + self.timeout = timeout + + def release(self): + """ + Cleanup resources. + """ + self.poller.close() + + def register(self, fd): + """ + Register a new file descriptor to be + part of the select polling next time around. + """ + try: + self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI) + except IOError: + pass + + def unregister(self, fd): + """ + Unregister the given file descriptor. + """ + self.poller.unregister(fd) + + def poll(self): + """ + Polls once and yields each ready-to-be-read + file-descriptor + """ + try: + events = self.poller.poll(timeout=self.timeout) + except IOError: + events = [] + + for fd, event in events: + if event | select.EPOLLIN | select.EPOLLPRI: + yield fd + +class KQueuePoller(object): + def __init__(self, timeout=0.1): + """ + An epoll poller that uses the ``epoll`` + implementation to determines which + file descriptors have data available to read. + + Available on Unix flavors mostly. + """ + self.poller = select.epoll() + self.timeout = timeout + + def release(self): + """ + Cleanup resources. + """ + self.poller.close() + + def register(self, fd): + """ + Register a new file descriptor to be + part of the select polling next time around. + """ + try: + self.poller.register(fd, select.EPOLLIN | select.EPOLLPRI) + except IOError: + pass + + def unregister(self, fd): + """ + Unregister the given file descriptor. + """ + self.poller.unregister(fd) + + def poll(self): + """ + Polls once and yields each ready-to-be-read + file-descriptor + """ + try: + events = self.poller.poll(timeout=self.timeout) + except IOError: + events = [] + for fd, event in events: + if event | select.EPOLLIN | select.EPOLLPRI: + yield fd + +class WebSocketManager(threading.Thread): + def __init__(self, poller=None): + """ + An event-based websocket manager. By event-based, we mean + that the websockets will be called when their + sockets have data to be read from. + + The manager itself runs in its own thread as not to + be the blocking mainloop of your application. + + The poller's implementation is automatically chosen + with ``epoll`` if available else ``select`` unless you + provide your own ``poller``. + """ + threading.Thread.__init__(self) + self.name = "WebSocketManager" + self.lock = threading.Lock() + self.websockets = {} + self.running = False + + if poller: + self.poller = poller + else: + if hasattr(select, "epoll"): + self.poller = EPollPoller() + logger.info("Using epoll") + else: + self.poller = SelectPoller() + logger.info("Using select as epoll is not available") + + def __len__(self): + return len(self.websockets) + + def __iter__(self): + if py3k: + return iter(self.websockets.values()) + else: + return self.websockets.itervalues() + + def __contains__(self, ws): + fd = ws.sock.fileno() + # just in case the file descriptor was reused + # we actually check the instance (well, this might + # also have been reused...) + return self.websockets.get(fd) is ws + + def add(self, websocket): + """ + Manage a new websocket. + + First calls its :meth:`opened() <ws4py.websocket.WebSocket.opened>` + method and register its socket against the poller + for reading events. + """ + if websocket in self: + return + + logger.info("Managing websocket %s" % format_addresses(websocket)) + websocket.opened() + with self.lock: + fd = websocket.sock.fileno() + self.websockets[fd] = websocket + self.poller.register(fd) + + def remove(self, websocket): + """ + Remove the given ``websocket`` from the manager. + + This does not call its :meth:`closed() <ws4py.websocket.WebSocket.closed>` + method as it's out-of-band by your application + or from within the manager's run loop. + """ + if websocket not in self: + return + + logger.info("Removing websocket %s" % format_addresses(websocket)) + with self.lock: + fd = websocket.sock.fileno() + self.websockets.pop(fd, None) + self.poller.unregister(fd) + + def stop(self): + """ + Mark the manager as terminated and + releases its resources. + """ + self.running = False + with self.lock: + self.websockets.clear() + self.poller.release() + + def run(self): + """ + Manager's mainloop executed from within a thread. + + Constantly poll for read events and, when available, + call related websockets' `once` method to + read and process the incoming data. + + If the :meth:`once() <ws4py.websocket.WebSocket.once>` + method returns a `False` value, its :meth:`terminate() <ws4py.websocket.WebSocket.terminate>` + method is also applied to properly close + the websocket and its socket is unregistered from the poller. + + Note that websocket shouldn't take long to process + their data or they will block the remaining + websockets with data to be handled. As for what long means, + it's up to your requirements. + """ + self.running = True + while self.running: + with self.lock: + polled = self.poller.poll() + if not self.running: + break + + for fd in polled: + if not self.running: + break + + ws = self.websockets.get(fd) + if ws and not ws.terminated: + # I don't know what kind of errors might spew out of here + # but they probably shouldn't crash the entire server. + try: + x = ws.once() + # Treat the error as if once() had returned None + except Exception as e: + x = None + logger.error("Terminating websocket %s due to exception: %s in once method" % (format_addresses(ws), repr(e)) ) + if not x: + with self.lock: + self.websockets.pop(fd, None) + self.poller.unregister(fd) + + if not ws.terminated: + logger.info("Terminating websocket %s" % format_addresses(ws)) + ws.terminate() + + + def close_all(self, code=1001, message='Server is shutting down'): + """ + Execute the :meth:`close() <ws4py.websocket.WebSocket.close>` + method of each registered websockets to initiate the closing handshake. + It doesn't wait for the handshake to complete properly. + """ + with self.lock: + logger.info("Closing all websockets with [%d] '%s'" % (code, message)) + for ws in iter(self): + ws.close(code=code, reason=message) + + def broadcast(self, message, binary=False): + """ + Broadcasts the given message to all registered + websockets, at the time of the call. + + Broadcast may fail on a given registered peer + but this is silent as it's not the method's + purpose to handle websocket's failures. + """ + with self.lock: + websockets = self.websockets.copy() + if py3k: + ws_iter = iter(websockets.values()) + else: + ws_iter = websockets.itervalues() + + for ws in ws_iter: + if not ws.terminated: + try: + ws.send(message, binary) + except: + pass diff --git a/game/python-extra/ws4py/messaging.py b/game/python-extra/ws4py/messaging.py new file mode 100644 index 0000000..f9b0e77 --- /dev/null +++ b/game/python-extra/ws4py/messaging.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +import os +import struct + +from ws4py.framing import Frame, OPCODE_CONTINUATION, OPCODE_TEXT, \ + OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG +from ws4py.compat import unicode, py3k + +__all__ = ['Message', 'TextMessage', 'BinaryMessage', 'CloseControlMessage', + 'PingControlMessage', 'PongControlMessage'] + +class Message(object): + def __init__(self, opcode, data=b'', encoding='utf-8'): + """ + A message is a application level entity. It's usually built + from one or many frames. The protocol defines several kind + of messages which are grouped into two sets: + + * data messages which can be text or binary typed + * control messages which provide a mechanism to perform + in-band control communication between peers + + The ``opcode`` indicates the message type and ``data`` is + the possible message payload. + + The payload is held internally as a a :func:`bytearray` as they are + faster than pure strings for append operations. + + Unicode data will be encoded using the provided ``encoding``. + """ + self.opcode = opcode + self._completed = False + self.encoding = encoding + + if isinstance(data, unicode): + if not encoding: + raise TypeError("unicode data without an encoding") + data = data.encode(encoding) + elif isinstance(data, bytearray): + data = bytes(data) + elif not isinstance(data, bytes): + raise TypeError("%s is not a supported data type" % type(data)) + + self.data = data + + def single(self, mask=False): + """ + Returns a frame bytes with the fin bit set and a random mask. + + If ``mask`` is set, automatically mask the frame + using a generated 4-byte token. + """ + mask = os.urandom(4) if mask else None + return Frame(body=self.data, opcode=self.opcode, + masking_key=mask, fin=1).build() + + def fragment(self, first=False, last=False, mask=False): + """ + Returns a :class:`ws4py.framing.Frame` bytes. + + The behavior depends on the given flags: + + * ``first``: the frame uses ``self.opcode`` else a continuation opcode + * ``last``: the frame has its ``fin`` bit set + * ``mask``: the frame is masked using a automatically generated 4-byte token + """ + fin = 1 if last is True else 0 + opcode = self.opcode if first is True else OPCODE_CONTINUATION + mask = os.urandom(4) if mask else None + return Frame(body=self.data, + opcode=opcode, masking_key=mask, + fin=fin).build() + + @property + def completed(self): + """ + Indicates the the message is complete, meaning + the frame's ``fin`` bit was set. + """ + return self._completed + + @completed.setter + def completed(self, state): + """ + Sets the state for this message. Usually + set by the stream's parser. + """ + self._completed = state + + def extend(self, data): + """ + Add more ``data`` to the message. + """ + if isinstance(data, bytes): + self.data += data + elif isinstance(data, bytearray): + self.data += bytes(data) + elif isinstance(data, unicode): + self.data += data.encode(self.encoding) + else: + raise TypeError("%s is not a supported data type" % type(data)) + + def __len__(self): + return len(self.__unicode__()) + + def __str__(self): + if py3k: + return self.data.decode(self.encoding) + return self.data + + def __unicode__(self): + return self.data.decode(self.encoding) + +class TextMessage(Message): + def __init__(self, text=None): + Message.__init__(self, OPCODE_TEXT, text) + + @property + def is_binary(self): + return False + + @property + def is_text(self): + return True + +class BinaryMessage(Message): + def __init__(self, bytes=None): + Message.__init__(self, OPCODE_BINARY, bytes, encoding=None) + + @property + def is_binary(self): + return True + + @property + def is_text(self): + return False + + def __len__(self): + return len(self.data) + +class CloseControlMessage(Message): + def __init__(self, code=1000, reason=''): + data = b"" + if code: + data += struct.pack("!H", code) + if reason is not None: + if isinstance(reason, unicode): + reason = reason.encode('utf-8') + data += reason + + Message.__init__(self, OPCODE_CLOSE, data, 'utf-8') + self.code = code + self.reason = reason + + def __str__(self): + if py3k: + return self.reason.decode('utf-8') + return self.reason + + def __unicode__(self): + return self.reason.decode(self.encoding) + +class PingControlMessage(Message): + def __init__(self, data=None): + Message.__init__(self, OPCODE_PING, data) + +class PongControlMessage(Message): + def __init__(self, data): + Message.__init__(self, OPCODE_PONG, data) diff --git a/game/python-extra/ws4py/server/__init__.py b/game/python-extra/ws4py/server/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/game/python-extra/ws4py/server/__init__.py diff --git a/game/python-extra/ws4py/server/cherrypyserver.py b/game/python-extra/ws4py/server/cherrypyserver.py new file mode 100644 index 0000000..5b93465 --- /dev/null +++ b/game/python-extra/ws4py/server/cherrypyserver.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +WebSocket within CherryPy is a tricky bit since CherryPy is +a threaded server which would choke quickly if each thread +of the server were kept attached to a long living connection +that WebSocket expects. + +In order to work around this constraint, we take some advantage +of some internals of CherryPy as well as the introspection +Python provides. + +Basically, when the WebSocket handshake is complete, we take over +the socket and let CherryPy take back the thread that was +associated with the upgrade request. + +These operations require a bit of work at various levels of +the CherryPy framework but this module takes care of them +and from your application's perspective, this is abstracted. + +Here are the various utilities provided by this module: + + * WebSocketTool: The tool is in charge to perform the + HTTP upgrade and detach the socket from + CherryPy. It runs at various hook points of the + request's processing. Enable that tool at + any path you wish to handle as a WebSocket + handler. + + * WebSocketPlugin: The plugin tracks the instanciated web socket handlers. + It also cleans out websocket handler which connection + have been closed down. The websocket connection then + runs in its own thread that this plugin manages. + +Simple usage example: + +.. code-block:: python + :linenos: + + import cherrypy + from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool + from ws4py.websocket import EchoWebSocket + + cherrypy.config.update({'server.socket_port': 9000}) + WebSocketPlugin(cherrypy.engine).subscribe() + cherrypy.tools.websocket = WebSocketTool() + + class Root(object): + @cherrypy.expose + def index(self): + return 'some HTML with a websocket javascript connection' + + @cherrypy.expose + def ws(self): + pass + + cherrypy.quickstart(Root(), '/', config={'/ws': {'tools.websocket.on': True, + 'tools.websocket.handler_cls': EchoWebSocket}}) + + +Note that you can set the handler class on per-path basis, +meaning you could also dynamically change the class based +on other envrionmental settings (is the user authenticated for ex). +""" +import base64 +from hashlib import sha1 +import inspect +import threading + +import cherrypy +from cherrypy import Tool +from cherrypy.process import plugins +try: + from cheroot.server import HTTPConnection, HTTPRequest, KnownLengthRFile +except ImportError: + from cherrypy.wsgiserver import HTTPConnection, HTTPRequest, KnownLengthRFile + +from ws4py import WS_KEY, WS_VERSION +from ws4py.exc import HandshakeError +from ws4py.websocket import WebSocket +from ws4py.compat import py3k, get_connection, detach_connection +from ws4py.manager import WebSocketManager + +__all__ = ['WebSocketTool', 'WebSocketPlugin'] + +class WebSocketTool(Tool): + def __init__(self): + Tool.__init__(self, 'before_request_body', self.upgrade) + + def _setup(self): + conf = self._merged_args() + hooks = cherrypy.serving.request.hooks + p = conf.pop("priority", getattr(self.callable, "priority", + self._priority)) + hooks.attach(self._point, self.callable, priority=p, **conf) + hooks.attach('before_finalize', self.complete, + priority=p) + hooks.attach('on_end_resource', self.cleanup_headers, + priority=70) + hooks.attach('on_end_request', self.start_handler, + priority=70) + + def upgrade(self, protocols=None, extensions=None, version=WS_VERSION, + handler_cls=WebSocket, heartbeat_freq=None): + """ + Performs the upgrade of the connection to the WebSocket + protocol. + + The provided protocols may be a list of WebSocket + protocols supported by the instance of the tool. + + When no list is provided and no protocol is either + during the upgrade, then the protocol parameter is + not taken into account. On the other hand, + if the protocol from the handshake isn't part + of the provided list, the upgrade fails immediatly. + """ + request = cherrypy.serving.request + request.process_request_body = False + + ws_protocols = None + ws_location = None + ws_version = version + ws_key = None + ws_extensions = [] + + if request.method != 'GET': + raise HandshakeError('HTTP method must be a GET') + + for key, expected_value in [('Upgrade', 'websocket'), + ('Connection', 'upgrade')]: + actual_value = request.headers.get(key, '').lower() + if not actual_value: + raise HandshakeError('Header %s is not defined' % key) + if expected_value not in actual_value: + raise HandshakeError('Illegal value for header %s: %s' % + (key, actual_value)) + + version = request.headers.get('Sec-WebSocket-Version') + supported_versions = ', '.join([str(v) for v in ws_version]) + version_is_valid = False + if version: + try: version = int(version) + except: pass + else: version_is_valid = version in ws_version + + if not version_is_valid: + cherrypy.response.headers['Sec-WebSocket-Version'] = supported_versions + raise HandshakeError('Unhandled or missing WebSocket version') + + key = request.headers.get('Sec-WebSocket-Key') + if key: + ws_key = base64.b64decode(key.encode('utf-8')) + if len(ws_key) != 16: + raise HandshakeError("WebSocket key's length is invalid") + + protocols = protocols or [] + subprotocols = request.headers.get('Sec-WebSocket-Protocol') + if subprotocols: + ws_protocols = [] + for s in subprotocols.split(','): + s = s.strip() + if s in protocols: + ws_protocols.append(s) + + exts = extensions or [] + extensions = request.headers.get('Sec-WebSocket-Extensions') + if extensions: + for ext in extensions.split(','): + ext = ext.strip() + if ext in exts: + ws_extensions.append(ext) + + location = [] + include_port = False + if request.scheme == "https": + location.append("wss://") + include_port = request.local.port != 443 + else: + location.append("ws://") + include_port = request.local.port != 80 + location.append('localhost') + if include_port: + location.append(":%d" % request.local.port) + location.append(request.path_info) + if request.query_string != "": + location.append("?%s" % request.query_string) + ws_location = ''.join(location) + + response = cherrypy.serving.response + response.stream = True + response.status = '101 Switching Protocols' + response.headers['Content-Type'] = 'text/plain' + response.headers['Upgrade'] = 'websocket' + response.headers['Connection'] = 'Upgrade' + response.headers['Sec-WebSocket-Version'] = str(version) + response.headers['Sec-WebSocket-Accept'] = base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest()) + if ws_protocols: + response.headers['Sec-WebSocket-Protocol'] = ', '.join(ws_protocols) + if ws_extensions: + response.headers['Sec-WebSocket-Extensions'] = ','.join(ws_extensions) + + addr = (request.remote.ip, request.remote.port) + rfile = request.rfile.rfile + if isinstance(rfile, KnownLengthRFile): + rfile = rfile.rfile + + ws_conn = get_connection(rfile) + request.ws_handler = handler_cls(ws_conn, ws_protocols, ws_extensions, + request.wsgi_environ.copy(), + heartbeat_freq=heartbeat_freq) + + def complete(self): + """ + Sets some internal flags of CherryPy so that it + doesn't close the socket down. + """ + self._set_internal_flags() + + def cleanup_headers(self): + """ + Some clients aren't that smart when it comes to + headers lookup. + """ + response = cherrypy.response + if not response.header_list: + return + + headers = response.header_list[:] + for (k, v) in headers: + if k[:7] == 'Sec-Web': + response.header_list.remove((k, v)) + response.header_list.append((k.replace('Sec-Websocket', 'Sec-WebSocket'), v)) + + def start_handler(self): + """ + Runs at the end of the request processing by calling + the opened method of the handler. + """ + request = cherrypy.request + if not hasattr(request, 'ws_handler'): + return + + addr = (request.remote.ip, request.remote.port) + ws_handler = request.ws_handler + request.ws_handler = None + delattr(request, 'ws_handler') + + # By doing this we detach the socket from + # the CherryPy stack avoiding memory leaks + detach_connection(request.rfile.rfile) + + cherrypy.engine.publish('handle-websocket', ws_handler, addr) + + def _set_internal_flags(self): + """ + CherryPy has two internal flags that we are interested in + to enable WebSocket within the server. They can't be set via + a public API and considering I'd want to make this extension + as compatible as possible whilst refraining in exposing more + than should be within CherryPy, I prefer performing a bit + of introspection to set those flags. Even by Python standards + such introspection isn't the cleanest but it works well + enough in this case. + + This also means that we do that only on WebSocket + connections rather than globally and therefore we do not + harm the rest of the HTTP server. + """ + current = inspect.currentframe() + while True: + if not current: + break + _locals = current.f_locals + if 'self' in _locals: + if isinstance(_locals['self'], HTTPRequest): + _locals['self'].close_connection = True + if isinstance(_locals['self'], HTTPConnection): + _locals['self'].linger = True + # HTTPConnection is more inner than + # HTTPRequest so we can leave once + # we're done here + return + _locals = None + current = current.f_back + +class WebSocketPlugin(plugins.SimplePlugin): + def __init__(self, bus): + plugins.SimplePlugin.__init__(self, bus) + self.manager = WebSocketManager() + + def start(self): + self.bus.log("Starting WebSocket processing") + self.bus.subscribe('stop', self.cleanup) + self.bus.subscribe('handle-websocket', self.handle) + self.bus.subscribe('websocket-broadcast', self.broadcast) + self.manager.start() + + def stop(self): + self.bus.log("Terminating WebSocket processing") + self.bus.unsubscribe('stop', self.cleanup) + self.bus.unsubscribe('handle-websocket', self.handle) + self.bus.unsubscribe('websocket-broadcast', self.broadcast) + + def handle(self, ws_handler, peer_addr): + """ + Tracks the provided handler. + + :param ws_handler: websocket handler instance + :param peer_addr: remote peer address for tracing purpose + """ + self.manager.add(ws_handler) + + def cleanup(self): + """ + Terminate all connections and clear the pool. Executed when the engine stops. + """ + self.manager.close_all() + self.manager.stop() + self.manager.join() + + def broadcast(self, message, binary=False): + """ + Broadcasts a message to all connected clients known to + the server. + + :param message: a message suitable to pass to the send() method + of the connected handler. + :param binary: whether or not the message is a binary one + """ + self.manager.broadcast(message, binary) + +if __name__ == '__main__': + import random + cherrypy.config.update({'server.socket_host': '127.0.0.1', + 'server.socket_port': 9000}) + WebSocketPlugin(cherrypy.engine).subscribe() + cherrypy.tools.websocket = WebSocketTool() + + class Root(object): + @cherrypy.expose + @cherrypy.tools.websocket(on=False) + def ws(self): + return """<html> + <head> + <script type='application/javascript' src='https://ajax.googleapis.com/ajax/libs/jquery/1.8.3/jquery.min.js'> </script> + <script type='application/javascript'> + $(document).ready(function() { + var ws = new WebSocket('ws://192.168.0.10:9000/'); + ws.onmessage = function (evt) { + $('#chat').val($('#chat').val() + evt.data + '\\n'); + }; + ws.onopen = function() { + ws.send("Hello there"); + }; + ws.onclose = function(evt) { + $('#chat').val($('#chat').val() + 'Connection closed by server: ' + evt.code + ' \"' + evt.reason + '\"\\n'); + }; + $('#chatform').submit(function() { + ws.send('%(username)s: ' + $('#message').val()); + $('#message').val(""); + return false; + }); + }); + </script> + </head> + <body> + <form action='/echo' id='chatform' method='get'> + <textarea id='chat' cols='35' rows='10'></textarea> + <br /> + <label for='message'>%(username)s: </label><input type='text' id='message' /> + <input type='submit' value='Send' /> + </form> + </body> + </html> + """ % {'username': "User%d" % random.randint(0, 100)} + + @cherrypy.expose + def index(self): + cherrypy.log("Handler created: %s" % repr(cherrypy.request.ws_handler)) + + cherrypy.quickstart(Root(), '/', config={'/': {'tools.websocket.on': True, + 'tools.websocket.handler_cls': EchoWebSocketHandler}}) diff --git a/game/python-extra/ws4py/server/geventserver.py b/game/python-extra/ws4py/server/geventserver.py new file mode 100644 index 0000000..13b5554 --- /dev/null +++ b/game/python-extra/ws4py/server/geventserver.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +WSGI entities to support WebSocket from within gevent. + +Its usage is rather simple: + +.. code-block: python + + from gevent import monkey; monkey.patch_all() + from ws4py.websocket import EchoWebSocket + from ws4py.server.geventserver import WSGIServer + from ws4py.server.wsgiutils import WebSocketWSGIApplication + + server = WSGIServer(('localhost', 9000), WebSocketWSGIApplication(handler_cls=EchoWebSocket)) + server.serve_forever() + +""" +import logging + +import gevent +from gevent.pywsgi import WSGIHandler, WSGIServer as _WSGIServer +from gevent.pool import Pool + +from ws4py import format_addresses +from ws4py.server.wsgiutils import WebSocketWSGIApplication + + +logger = logging.getLogger('ws4py') + +__all__ = ['WebSocketWSGIHandler', 'WSGIServer', + 'GEventWebSocketPool'] + + +class WebSocketWSGIHandler(WSGIHandler): + """ + A WSGI handler that will perform the :rfc:`6455` + upgrade and handshake before calling the WSGI application. + + If the incoming request doesn't have a `'Upgrade'` header, + the handler will simply fallback to the gevent builtin's handler + and process it as per usual. + """ + + def run_application(self): + upgrade_header = self.environ.get('HTTP_UPGRADE', '').lower() + if upgrade_header: + # Build and start the HTTP response + self.environ['ws4py.socket'] = self.socket or self.environ['wsgi.input'].rfile._sock + self.result = self.application(self.environ, self.start_response) or [] + self.process_result() + del self.environ['ws4py.socket'] + self.socket = None + self.rfile.close() + + ws = self.environ.pop('ws4py.websocket', None) + if ws: + ws_greenlet = self.server.pool.track(ws) + # issue #170 + # in gevent 1.1 socket will be closed once application returns + # so let's wait for websocket handler to finish + ws_greenlet.join() + else: + gevent.pywsgi.WSGIHandler.run_application(self) + + +class GEventWebSocketPool(Pool): + """ + Simple pool of bound websockets. + Internally it uses a gevent group to track + the websockets. The server should call the ``clear`` + method to initiate the closing handshake when the + server is shutdown. + """ + + def track(self, websocket): + logger.info("Managing websocket %s" % format_addresses(websocket)) + return self.spawn(websocket.run) + + def clear(self): + logger.info("Terminating server and all connected websockets") + for greenlet in list(self): + try: + websocket = greenlet._run.im_self + if websocket: + websocket.close(1001, 'Server is shutting down') + except: + pass + finally: + self.discard(greenlet) + + +class WSGIServer(_WSGIServer): + handler_class = WebSocketWSGIHandler + + def __init__(self, *args, **kwargs): + """ + WSGI server that simply tracks websockets + and send them a proper closing handshake + when the server terminates. + + Other than that, the server is the same + as its :class:`gevent.pywsgi.WSGIServer` + base. + """ + _WSGIServer.__init__(self, *args, **kwargs) + self.pool = GEventWebSocketPool() + + def stop(self, *args, **kwargs): + self.pool.clear() + _WSGIServer.stop(self, *args, **kwargs) + + +if __name__ == '__main__': + + from ws4py import configure_logger + configure_logger() + + from ws4py.websocket import EchoWebSocket + server = WSGIServer(('127.0.0.1', 9000), + WebSocketWSGIApplication(handler_cls=EchoWebSocket)) + server.serve_forever() diff --git a/game/python-extra/ws4py/server/tulipserver.py b/game/python-extra/ws4py/server/tulipserver.py new file mode 100644 index 0000000..2786c16 --- /dev/null +++ b/game/python-extra/ws4py/server/tulipserver.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +import base64 +from hashlib import sha1 +from email.parser import BytesHeaderParser +import io + +import asyncio + +from ws4py import WS_KEY, WS_VERSION +from ws4py.exc import HandshakeError +from ws4py.websocket import WebSocket + +LF = b'\n' +CRLF = b'\r\n' +SPACE = b' ' +EMPTY = b'' + +__all__ = ['WebSocketProtocol'] + +class WebSocketProtocol(asyncio.StreamReaderProtocol): + def __init__(self, handler_cls): + asyncio.StreamReaderProtocol.__init__(self, asyncio.StreamReader(), + self._pseudo_connected) + self.ws = handler_cls(self) + + def _pseudo_connected(self, reader, writer): + pass + + def connection_made(self, transport): + """ + A peer is now connected and we receive an instance + of the underlying :class:`asyncio.Transport`. + + We :class:`asyncio.StreamReader` is created + and the transport is associated before the + initial HTTP handshake is undertaken. + """ + #self.transport = transport + #self.stream = asyncio.StreamReader() + #self.stream.set_transport(transport) + asyncio.StreamReaderProtocol.connection_made(self, transport) + # Let make it concurrent for others to tag along + f = asyncio.async(self.handle_initial_handshake()) + f.add_done_callback(self.terminated) + + @property + def writer(self): + return self._stream_writer + + @property + def reader(self): + return self._stream_reader + + def terminated(self, f): + if f.done() and not f.cancelled(): + ex = f.exception() + if ex: + response = [b'HTTP/1.0 400 Bad Request'] + response.append(b'Content-Length: 0') + response.append(b'Connection: close') + response.append(b'') + response.append(b'') + self.writer.write(CRLF.join(response)) + self.ws.close_connection() + + def close(self): + """ + Initiate the websocket closing handshake + which will eventuall lead to the underlying + transport. + """ + self.ws.close() + + def timeout(self): + self.ws.close_connection() + if self.ws.started: + self.ws.closed(1002, "Peer connection timed-out") + + def connection_lost(self, exc): + """ + The peer connection is now, the closing + handshake won't work so let's not even try. + However let's make the websocket handler + be aware of it by calling its `closed` + method. + """ + if exc is not None: + self.ws.close_connection() + if self.ws.started: + self.ws.closed(1002, "Peer connection was lost") + + @asyncio.coroutine + def handle_initial_handshake(self): + """ + Performs the HTTP handshake described in :rfc:`6455`. Note that + this implementation is really basic and it is strongly advised + against using it in production. It would probably break for + most clients. If you want a better support for HTTP, please + use a more reliable HTTP server implemented using asyncio. + """ + request_line = yield from self.next_line() + method, uri, req_protocol = request_line.strip().split(SPACE, 2) + + # GET required + if method.upper() != b'GET': + raise HandshakeError('HTTP method must be a GET') + + headers = yield from self.read_headers() + if req_protocol == b'HTTP/1.1' and 'Host' not in headers: + raise ValueError("Missing host header") + + for key, expected_value in [('Upgrade', 'websocket'), + ('Connection', 'upgrade')]: + actual_value = headers.get(key, '').lower() + if not actual_value: + raise HandshakeError('Header %s is not defined' % str(key)) + if expected_value not in actual_value: + raise HandshakeError('Illegal value for header %s: %s' % + (key, actual_value)) + + response_headers = {} + + ws_version = WS_VERSION + version = headers.get('Sec-WebSocket-Version') + supported_versions = ', '.join([str(v) for v in ws_version]) + version_is_valid = False + if version: + try: version = int(version) + except: pass + else: version_is_valid = version in ws_version + + if not version_is_valid: + response_headers['Sec-WebSocket-Version'] = supported_versions + raise HandshakeError('Unhandled or missing WebSocket version') + + key = headers.get('Sec-WebSocket-Key') + if key: + ws_key = base64.b64decode(key.encode('utf-8')) + if len(ws_key) != 16: + raise HandshakeError("WebSocket key's length is invalid") + + protocols = [] + ws_protocols = [] + subprotocols = headers.get('Sec-WebSocket-Protocol') + if subprotocols: + for s in subprotocols.split(','): + s = s.strip() + if s in protocols: + ws_protocols.append(s) + + exts = [] + ws_extensions = [] + extensions = headers.get('Sec-WebSocket-Extensions') + if extensions: + for ext in extensions.split(','): + ext = ext.strip() + if ext in exts: + ws_extensions.append(ext) + + self.ws.protocols = ws_protocols + self.ws.extensions = ws_extensions + self.ws.headers = headers + + response = [req_protocol + b' 101 Switching Protocols'] + response.append(b'Upgrade: websocket') + response.append(b'Content-Type: text/plain') + response.append(b'Content-Length: 0') + response.append(b'Connection: Upgrade') + response.append(b'Sec-WebSocket-Version:' + bytes(str(version), 'utf-8')) + response.append(b'Sec-WebSocket-Accept:' + base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest())) + if ws_protocols: + response.append(b'Sec-WebSocket-Protocol:' + b', '.join(ws_protocols)) + if ws_extensions: + response.append(b'Sec-WebSocket-Extensions:' + b','.join(ws_extensions)) + response.append(b'') + response.append(b'') + self.writer.write(CRLF.join(response)) + yield from self.handle_websocket() + + @asyncio.coroutine + def handle_websocket(self): + """ + Starts the websocket process until the + exchange is completed and terminated. + """ + yield from self.ws.run() + + @asyncio.coroutine + def read_headers(self): + """ + Read all HTTP headers from the HTTP request + and returns a dictionary of them. + """ + headers = b'' + while True: + line = yield from self.next_line() + headers += line + if line == CRLF: + break + return BytesHeaderParser().parsebytes(headers) + + @asyncio.coroutine + def next_line(self): + """ + Reads data until \r\n is met and then return all read + bytes. + """ + line = yield from self.reader.readline() + if not line.endswith(CRLF): + raise ValueError("Missing mandatory trailing CRLF") + return line + +if __name__ == '__main__': + from ws4py.async_websocket import EchoWebSocket + + loop = asyncio.get_event_loop() + + def start_server(): + proto_factory = lambda: WebSocketProtocol(EchoWebSocket) + return loop.create_server(proto_factory, '', 9007) + + s = loop.run_until_complete(start_server()) + print('serving on', s.sockets[0].getsockname()) + loop.run_forever() diff --git a/game/python-extra/ws4py/server/wsgirefserver.py b/game/python-extra/ws4py/server/wsgirefserver.py new file mode 100644 index 0000000..d4a9d9a --- /dev/null +++ b/game/python-extra/ws4py/server/wsgirefserver.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +Add WebSocket support to the built-in WSGI server +provided by the :py:mod:`wsgiref`. This is clearly not +meant to be a production server so please consider this +only for testing purpose. + +Mostly, this module overrides bits and pieces of +the built-in classes so that it supports the WebSocket +workflow. + +.. code-block:: python + + from wsgiref.simple_server import make_server + from ws4py.websocket import EchoWebSocket + from ws4py.server.wsgirefserver import WSGIServer, WebSocketWSGIRequestHandler + from ws4py.server.wsgiutils import WebSocketWSGIApplication + + server = make_server('', 9000, server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=EchoWebSocket)) + server.initialize_websockets_manager() + server.serve_forever() + +.. note:: + For some reason this server may fail against autobahntestsuite. +""" +import logging +import sys +import itertools +import operator +from wsgiref.handlers import SimpleHandler +from wsgiref.simple_server import WSGIRequestHandler, WSGIServer as _WSGIServer +from wsgiref import util + +util._hoppish = {}.__contains__ + +from ws4py.manager import WebSocketManager +from ws4py import format_addresses +from ws4py.server.wsgiutils import WebSocketWSGIApplication +from ws4py.compat import get_connection + +__all__ = ['WebSocketWSGIHandler', 'WebSocketWSGIRequestHandler', + 'WSGIServer'] + +logger = logging.getLogger('ws4py') + +class WebSocketWSGIHandler(SimpleHandler): + def setup_environ(self): + """ + Setup the environ dictionary and add the + `'ws4py.socket'` key. Its associated value + is the real socket underlying socket. + """ + SimpleHandler.setup_environ(self) + self.environ['ws4py.socket'] = get_connection(self.environ['wsgi.input']) + self.http_version = self.environ['SERVER_PROTOCOL'].rsplit('/')[-1] + + def finish_response(self): + """ + Completes the response and performs the following tasks: + + - Remove the `'ws4py.socket'` and `'ws4py.websocket'` + environ keys. + - Attach the returned websocket, if any, to the WSGI server + using its ``link_websocket_to_server`` method. + """ + # force execution of the result iterator until first actual content + rest = iter(self.result) + first = list(itertools.islice(rest, 1)) + self.result = itertools.chain(first, rest) + + # now it's safe to look if environ was modified + ws = None + if self.environ: + self.environ.pop('ws4py.socket', None) + ws = self.environ.pop('ws4py.websocket', None) + + try: + SimpleHandler.finish_response(self) + except: + if ws: + ws.close(1011, reason='Something broke') + raise + else: + if ws: + self.request_handler.server.link_websocket_to_server(ws) + +class WebSocketWSGIRequestHandler(WSGIRequestHandler): + WebSocketWSGIHandler = WebSocketWSGIHandler + def handle(self): + """ + Unfortunately the base class forces us + to override the whole method to actually provide our wsgi handler. + """ + self.raw_requestline = self.rfile.readline() + if not self.parse_request(): # An error code has been sent, just exit + return + + # next line is where we'd have expect a configuration key somehow + handler = self.WebSocketWSGIHandler( + self.rfile, self.wfile, self.get_stderr(), self.get_environ() + ) + handler.request_handler = self # backpointer for logging + handler.run(self.server.get_app()) + +class WSGIServer(_WSGIServer): + def initialize_websockets_manager(self): + """ + Call thos to start the underlying websockets + manager. Make sure to call it once your server + is created. + """ + self.manager = WebSocketManager() + self.manager.start() + + def shutdown_request(self, request): + """ + The base class would close our socket + if we didn't override it. + """ + pass + + def link_websocket_to_server(self, ws): + """ + Call this from your WSGI handler when a websocket + has been created. + """ + self.manager.add(ws) + + def server_close(self): + """ + Properly initiate closing handshakes on + all websockets when the WSGI server terminates. + """ + if hasattr(self, 'manager'): + self.manager.close_all() + self.manager.stop() + self.manager.join() + delattr(self, 'manager') + _WSGIServer.server_close(self) + +if __name__ == '__main__': + from ws4py import configure_logger + configure_logger() + + from wsgiref.simple_server import make_server + from ws4py.websocket import EchoWebSocket + + server = make_server('', 9000, server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=EchoWebSocket)) + server.initialize_websockets_manager() + try: + server.serve_forever() + except KeyboardInterrupt: + server.server_close() diff --git a/game/python-extra/ws4py/server/wsgiutils.py b/game/python-extra/ws4py/server/wsgiutils.py new file mode 100644 index 0000000..efd3242 --- /dev/null +++ b/game/python-extra/ws4py/server/wsgiutils.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +__doc__ = """ +This module provides a WSGI application suitable +for a WSGI server such as gevent or wsgiref for instance. + +:pep:`333` couldn't foresee a protocol such as +WebSockets but luckily the way the initial +protocol upgrade was designed means that we can +fit the handshake in a WSGI flow. + +The handshake validates the request against +some internal or user-provided values and +fails the request if the validation doesn't +complete. + +On success, the provided WebSocket subclass +is instanciated and stored into the +`'ws4py.websocket'` environ key so that +the WSGI server can handle it. + +The WSGI application returns an empty iterable +since there is little value to return some +content within the response to the handshake. + +A server wishing to support WebSocket via ws4py +should: + +- Provide the real socket object to ws4py through the + `'ws4py.socket'` environ key. We can't use `'wsgi.input'` + as it may be wrapper to the socket we wouldn't know + how to extract the socket from. +- Look for the `'ws4py.websocket'` key in the environ + when the application has returned and probably attach + it to a :class:`ws4py.manager.WebSocketManager` instance + so that the websocket runs its life. +- Remove the `'ws4py.websocket'` and `'ws4py.socket'` + environ keys once the application has returned. + No need for these keys to persist. +- Not close the underlying socket otherwise, well, + your websocket will also shutdown. + +.. warning:: + + The WSGI application sets the `'Upgrade'` header response + as specified by :rfc:`6455`. This is not tolerated by + :pep:`333` since it's a hop-by-hop header. + We expect most servers won't mind. +""" +import base64 +from hashlib import sha1 +import logging +import sys + +from ws4py.websocket import WebSocket +from ws4py.exc import HandshakeError +from ws4py.compat import unicode, py3k +from ws4py import WS_VERSION, WS_KEY, format_addresses + +logger = logging.getLogger('ws4py') + +__all__ = ['WebSocketWSGIApplication'] + +class WebSocketWSGIApplication(object): + def __init__(self, protocols=None, extensions=None, handler_cls=WebSocket): + """ + WSGI application usable to complete the upgrade handshake + by validating the requested protocols and extensions as + well as the websocket version. + + If the upgrade validates, the `handler_cls` class + is instanciated and stored inside the WSGI `environ` + under the `'ws4py.websocket'` key to make it + available to the WSGI handler. + """ + self.protocols = protocols + self.extensions = extensions + self.handler_cls = handler_cls + + def make_websocket(self, sock, protocols, extensions, environ): + """ + Initialize the `handler_cls` instance with the given + negociated sets of protocols and extensions as well as + the `environ` and `sock`. + + Stores then the instance in the `environ` dict + under the `'ws4py.websocket'` key. + """ + websocket = self.handler_cls(sock, protocols, extensions, + environ.copy()) + environ['ws4py.websocket'] = websocket + return websocket + + def __call__(self, environ, start_response): + if environ.get('REQUEST_METHOD') != 'GET': + raise HandshakeError('HTTP method must be a GET') + + for key, expected_value in [('HTTP_UPGRADE', 'websocket'), + ('HTTP_CONNECTION', 'upgrade')]: + actual_value = environ.get(key, '').lower() + if not actual_value: + raise HandshakeError('Header %s is not defined' % key) + if expected_value not in actual_value: + raise HandshakeError('Illegal value for header %s: %s' % + (key, actual_value)) + + key = environ.get('HTTP_SEC_WEBSOCKET_KEY') + if key: + ws_key = base64.b64decode(key.encode('utf-8')) + if len(ws_key) != 16: + raise HandshakeError("WebSocket key's length is invalid") + + version = environ.get('HTTP_SEC_WEBSOCKET_VERSION') + supported_versions = b', '.join([unicode(v).encode('utf-8') for v in WS_VERSION]) + version_is_valid = False + if version: + try: version = int(version) + except: pass + else: version_is_valid = version in WS_VERSION + + if not version_is_valid: + environ['websocket.version'] = unicode(version).encode('utf-8') + raise HandshakeError('Unhandled or missing WebSocket version') + + ws_protocols = [] + protocols = self.protocols or [] + subprotocols = environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL') + if subprotocols: + for s in subprotocols.split(','): + s = s.strip() + if s in protocols: + ws_protocols.append(s) + + ws_extensions = [] + exts = self.extensions or [] + extensions = environ.get('HTTP_SEC_WEBSOCKET_EXTENSIONS') + if extensions: + for ext in extensions.split(','): + ext = ext.strip() + if ext in exts: + ws_extensions.append(ext) + + accept_value = base64.b64encode(sha1(key.encode('utf-8') + WS_KEY).digest()) + if py3k: accept_value = accept_value.decode('utf-8') + upgrade_headers = [ + ('Upgrade', 'websocket'), + ('Connection', 'Upgrade'), + ('Sec-WebSocket-Version', '%s' % version), + ('Sec-WebSocket-Accept', accept_value), + ] + if ws_protocols: + upgrade_headers.append(('Sec-WebSocket-Protocol', ', '.join(ws_protocols))) + if ws_extensions: + upgrade_headers.append(('Sec-WebSocket-Extensions', ','.join(ws_extensions))) + + start_response("101 Switching Protocols", upgrade_headers) + + self.make_websocket(environ['ws4py.socket'], + ws_protocols, + ws_extensions, + environ) + + return [] diff --git a/game/python-extra/ws4py/streaming.py b/game/python-extra/ws4py/streaming.py new file mode 100644 index 0000000..4420ba1 --- /dev/null +++ b/game/python-extra/ws4py/streaming.py @@ -0,0 +1,319 @@ +# -*- coding: utf-8 -*- +import struct +from struct import unpack + +from ws4py.utf8validator import Utf8Validator +from ws4py.messaging import TextMessage, BinaryMessage, CloseControlMessage,\ + PingControlMessage, PongControlMessage +from ws4py.framing import Frame, OPCODE_CONTINUATION, OPCODE_TEXT, \ + OPCODE_BINARY, OPCODE_CLOSE, OPCODE_PING, OPCODE_PONG +from ws4py.exc import FrameTooLargeException, ProtocolException, InvalidBytesError,\ + TextFrameEncodingException, UnsupportedFrameTypeException, StreamClosed +from ws4py.compat import py3k + +VALID_CLOSING_CODES = [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011] + +class Stream(object): + def __init__(self, always_mask=False, expect_masking=True): + """ Represents a websocket stream of bytes flowing in and out. + + The stream doesn't know about the data provider itself and + doesn't even know about sockets. Instead the stream simply + yields for more bytes whenever it requires them. The stream owner + is responsible to provide the stream with those bytes until + a frame can be interpreted. + + .. code-block:: python + :linenos: + + >>> s = Stream() + >>> s.parser.send(BYTES) + >>> s.has_messages + False + >>> s.parser.send(MORE_BYTES) + >>> s.has_messages + True + >>> s.message + <TextMessage ... > + + Set ``always_mask`` to mask all frames built. + + Set ``expect_masking`` to indicate masking will be + checked on all parsed frames. + """ + + self.message = None + """ + Parsed test or binary messages. Whenever the parser + reads more bytes from a fragment message, those bytes + are appended to the most recent message. + """ + + self.pings = [] + """ + Parsed ping control messages. They are instances of + :class:`ws4py.messaging.PingControlMessage` + """ + + self.pongs = [] + """ + Parsed pong control messages. They are instances of + :class:`ws4py.messaging.PongControlMessage` + """ + + self.closing = None + """ + Parsed close control messsage. Instance of + :class:`ws4py.messaging.CloseControlMessage` + """ + + self.errors = [] + """ + Detected errors while parsing. Instances of + :class:`ws4py.messaging.CloseControlMessage` + """ + + self._parser = None + """ + Parser in charge to process bytes it is fed with. + """ + + self.always_mask = always_mask + self.expect_masking = expect_masking + + @property + def parser(self): + if self._parser is None: + self._parser = self.receiver() + # Python generators must be initialized once. + next(self.parser) + return self._parser + + def _cleanup(self): + """ + Frees the stream's resources rendering it unusable. + """ + self.message = None + if self._parser is not None: + if not self._parser.gi_running: + self._parser.close() + self._parser = None + self.errors = None + self.pings = None + self.pongs = None + self.closing = None + + def text_message(self, text): + """ + Returns a :class:`ws4py.messaging.TextMessage` instance + ready to be built. Convenience method so + that the caller doesn't need to import the + :class:`ws4py.messaging.TextMessage` class itself. + """ + return TextMessage(text=text) + + def binary_message(self, bytes): + """ + Returns a :class:`ws4py.messaging.BinaryMessage` instance + ready to be built. Convenience method so + that the caller doesn't need to import the + :class:`ws4py.messaging.BinaryMessage` class itself. + """ + return BinaryMessage(bytes) + + @property + def has_message(self): + """ + Checks if the stream has received any message + which, if fragmented, is now completed. + """ + if self.message is not None: + return self.message.completed + + return False + + def close(self, code=1000, reason=''): + """ + Returns a close control message built from + a :class:`ws4py.messaging.CloseControlMessage` instance, + using the given status ``code`` and ``reason`` message. + """ + return CloseControlMessage(code=code, reason=reason) + + def ping(self, data=''): + """ + Returns a ping control message built from + a :class:`ws4py.messaging.PingControlMessage` instance. + """ + return PingControlMessage(data).single(mask=self.always_mask) + + def pong(self, data=''): + """ + Returns a ping control message built from + a :class:`ws4py.messaging.PongControlMessage` instance. + """ + return PongControlMessage(data).single(mask=self.always_mask) + + def receiver(self): + """ + Parser that keeps trying to interpret bytes it is fed with as + incoming frames part of a message. + + Control message are single frames only while data messages, like text + and binary, may be fragmented accross frames. + + The way it works is by instanciating a :class:`wspy.framing.Frame` object, + then running its parser generator which yields how much bytes + it requires to performs its task. The stream parser yields this value + to its caller and feeds the frame parser. + + When the frame parser raises :exc:`StopIteration`, the stream parser + tries to make sense of the parsed frame. It dispatches the frame's bytes + to the most appropriate message type based on the frame's opcode. + + Overall this makes the stream parser totally agonstic to + the data provider. + """ + utf8validator = Utf8Validator() + running = True + frame = None + while running: + frame = Frame() + while 1: + try: + some_bytes = (yield next(frame.parser)) + frame.parser.send(some_bytes) + except GeneratorExit: + running = False + break + except StopIteration: + frame._cleanup() + some_bytes = frame.body + + # Let's avoid unmasking when there is no payload + if some_bytes: + if frame.masking_key and self.expect_masking: + some_bytes = frame.unmask(some_bytes) + elif not frame.masking_key and self.expect_masking: + msg = CloseControlMessage(code=1002, reason='Missing masking when expected') + self.errors.append(msg) + break + elif frame.masking_key and not self.expect_masking: + msg = CloseControlMessage(code=1002, reason='Masked when not expected') + self.errors.append(msg) + break + else: + # If we reach this stage, it's because + # the frame wasn't masked and we didn't expect + # it anyway. Therefore, on py2k, the bytes + # are actually a str object and can't be used + # in the utf8 validator as we need integers + # when we get each byte one by one. + # Our only solution here is to convert our + # string to a bytearray. + some_bytes = bytearray(some_bytes) + + if frame.opcode == OPCODE_TEXT: + if self.message and not self.message.completed: + # We got a text frame before we completed the previous one + msg = CloseControlMessage(code=1002, reason='Received a new message before completing previous') + self.errors.append(msg) + break + + m = TextMessage(some_bytes) + m.completed = (frame.fin == 1) + self.message = m + + if some_bytes: + is_valid, end_on_code_point, _, _ = utf8validator.validate(some_bytes) + + if not is_valid or (m.completed and not end_on_code_point): + self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes')) + break + + elif frame.opcode == OPCODE_BINARY: + if self.message and not self.message.completed: + # We got a text frame before we completed the previous one + msg = CloseControlMessage(code=1002, reason='Received a new message before completing previous') + self.errors.append(msg) + break + + m = BinaryMessage(some_bytes) + m.completed = (frame.fin == 1) + self.message = m + + elif frame.opcode == OPCODE_CONTINUATION: + m = self.message + if m is None: + self.errors.append(CloseControlMessage(code=1002, reason='Message not started yet')) + break + + m.extend(some_bytes) + m.completed = (frame.fin == 1) + if m.opcode == OPCODE_TEXT: + if some_bytes: + is_valid, end_on_code_point, _, _ = utf8validator.validate(some_bytes) + + if not is_valid or (m.completed and not end_on_code_point): + self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes')) + break + + elif frame.opcode == OPCODE_CLOSE: + code = 1005 + reason = "" + if frame.payload_length == 0: + self.closing = CloseControlMessage(code=1005) + elif frame.payload_length == 1: + self.closing = CloseControlMessage(code=1005, reason='Payload has invalid length') + else: + try: + # at this stage, some_bytes have been unmasked + # so actually are held in a bytearray + code = int(unpack("!H", bytes(some_bytes[0:2]))[0]) + except struct.error: + reason = 'Failed at decoding closing code' + else: + # Those codes are reserved or plainly forbidden + if code not in VALID_CLOSING_CODES and not (2999 < code < 5000): + reason = 'Invalid Closing Frame Code: %d' % code + code = 1005 + elif frame.payload_length > 1: + reason = some_bytes[2:] if frame.masking_key else frame.body[2:] + + if not py3k: reason = bytearray(reason) + is_valid, end_on_code_point, _, _ = utf8validator.validate(reason) + if not is_valid or not end_on_code_point: + self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes')) + break + reason = bytes(reason) + self.closing = CloseControlMessage(code=code, reason=reason) + + elif frame.opcode == OPCODE_PING: + self.pings.append(PingControlMessage(some_bytes)) + + elif frame.opcode == OPCODE_PONG: + self.pongs.append(PongControlMessage(some_bytes)) + + else: + self.errors.append(CloseControlMessage(code=1003)) + + break + + except ProtocolException: + self.errors.append(CloseControlMessage(code=1002)) + break + except FrameTooLargeException: + self.errors.append(CloseControlMessage(code=1002, reason="Frame was too large")) + break + + frame._cleanup() + frame.body = None + frame = None + + if self.message is not None and self.message.completed: + utf8validator.reset() + + utf8validator.reset() + utf8validator = None + + self._cleanup() diff --git a/game/python-extra/ws4py/utf8validator.py b/game/python-extra/ws4py/utf8validator.py new file mode 100644 index 0000000..50b19e5 --- /dev/null +++ b/game/python-extra/ws4py/utf8validator.py @@ -0,0 +1,117 @@ +# coding=utf-8 + +############################################################################### +## +## Copyright 2011 Tavendo GmbH +## +## Note: +## +## This code is a Python implementation of the algorithm +## +## "Flexible and Economical UTF-8 Decoder" +## +## by Bjoern Hoehrmann +## +## bjoern@hoehrmann.de +## http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ +## +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## +############################################################################### + + +class Utf8Validator(object): + """ + Incremental UTF-8 validator with constant memory consumption (minimal state). + + Implements the algorithm "Flexible and Economical UTF-8 Decoder" by + Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/). + """ + + ## DFA transitions + UTF8VALIDATOR_DFA = [ + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 00..1f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 20..3f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 40..5f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 60..7f + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, # 80..9f + 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, # a0..bf + 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, # c0..df + 0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, # e0..ef + 0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, # f0..ff + 0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, # s0..s0 + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, # s1..s2 + 1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, # s3..s4 + 1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, # s5..s6 + 1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1, # s7..s8 + ] + + UTF8_ACCEPT = 0 + UTF8_REJECT = 1 + + def __init__(self): + self.reset() + + def decode(self, b): + """ + Eat one UTF-8 octet, and validate on the fly. + + Returns UTF8_ACCEPT when enough octets have been consumed, in which case + self.codepoint contains the decoded Unicode code point. + + Returns UTF8_REJECT when invalid UTF-8 was encountered. + + Returns some other positive integer when more octets need to be eaten. + """ + type = Utf8Validator.UTF8VALIDATOR_DFA[b] + if self.state != Utf8Validator.UTF8_ACCEPT: + self.codepoint = (b & 0x3f) | (self.codepoint << 6) + else: + self.codepoint = (0xff >> type) & b + self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + self.state * 16 + type] + return self.state + + def reset(self): + """ + Reset validator to start new incremental UTF-8 decode/validation. + """ + self.state = Utf8Validator.UTF8_ACCEPT + self.codepoint = 0 + self.i = 0 + + def validate(self, ba): + """ + Incrementally validate a chunk of bytes provided as bytearray. + + Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex). + + As soon as an octet is encountered which renders the octet sequence + invalid, a quad with valid? == False is returned. currentIndex returns + the index within the currently consumed chunk, and totalIndex the + index within the total consumed sequence that was the point of bail out. + When valid? == True, currentIndex will be len(ba) and totalIndex the + total amount of consumed bytes. + """ + state = self.state + DFA = Utf8Validator.UTF8VALIDATOR_DFA + i = 0 # make sure 'i' is set if when 'ba' is empty + for i, b in enumerate(ba): + ## optimized version of decode(), since we are not interested in actual code points + state = DFA[256 + (state << 4) + DFA[b]] + if state == Utf8Validator.UTF8_REJECT: + self.i += i + self.state = state + return False, False, i, self.i + self.i += i + self.state = state + return True, state == Utf8Validator.UTF8_ACCEPT, i, self.i diff --git a/game/python-extra/ws4py/websocket.py b/game/python-extra/ws4py/websocket.py new file mode 100644 index 0000000..f7e9e3a --- /dev/null +++ b/game/python-extra/ws4py/websocket.py @@ -0,0 +1,615 @@ +# -*- coding: utf-8 -*- +import logging +import socket +import ssl +import time +import threading +import types +import errno + +try: + from OpenSSL.SSL import Error as pyOpenSSLError +except ImportError: + class pyOpenSSLError(Exception): + pass + +from ws4py import WS_KEY, WS_VERSION +from ws4py.exc import HandshakeError, StreamClosed +from ws4py.streaming import Stream +from ws4py.messaging import Message, PingControlMessage,\ + PongControlMessage +from ws4py.compat import basestring, unicode + +DEFAULT_READING_SIZE = 2 + +logger = logging.getLogger('ws4py') + +__all__ = ['WebSocket', 'EchoWebSocket', 'Heartbeat'] + +class Heartbeat(threading.Thread): + def __init__(self, websocket, frequency=2.0): + """ + Runs at a periodic interval specified by + `frequency` by sending an unsolicitated pong + message to the connected peer. + + If the message fails to be sent and a socket + error is raised, we close the websocket + socket automatically, triggering the `closed` + handler. + """ + threading.Thread.__init__(self) + self.websocket = websocket + self.frequency = frequency + + def __enter__(self): + if self.frequency: + self.start() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.stop() + + def stop(self): + self.running = False + + def run(self): + self.running = True + while self.running: + time.sleep(self.frequency) + if self.websocket.terminated: + break + + try: + self.websocket.send(PongControlMessage(data='beep')) + except socket.error: + logger.info("Heartbeat failed") + self.websocket.server_terminated = True + self.websocket.close_connection() + break + +class WebSocket(object): + """ Represents a websocket endpoint and provides a high level interface to drive the endpoint. """ + + def __init__(self, sock, protocols=None, extensions=None, environ=None, heartbeat_freq=None): + """ The ``sock`` is an opened connection + resulting from the websocket handshake. + + If ``protocols`` is provided, it is a list of protocols + negotiated during the handshake as is ``extensions``. + + If ``environ`` is provided, it is a copy of the WSGI environ + dictionnary from the underlying WSGI server. + """ + + self.stream = Stream(always_mask=False) + """ + Underlying websocket stream that performs the websocket + parsing to high level objects. By default this stream + never masks its messages. Clients using this class should + set the ``stream.always_mask`` fields to ``True`` + and ``stream.expect_masking`` fields to ``False``. + """ + + self.protocols = protocols + """ + List of protocols supported by this endpoint. + Unused for now. + """ + + self.extensions = extensions + """ + List of extensions supported by this endpoint. + Unused for now. + """ + + self.sock = sock + """ + Underlying connection. + """ + + self._is_secure = hasattr(sock, '_ssl') or hasattr(sock, '_sslobj') + """ + Tell us if the socket is secure or not. + """ + + self.client_terminated = False + """ + Indicates if the client has been marked as terminated. + """ + + self.server_terminated = False + """ + Indicates if the server has been marked as terminated. + """ + + self.reading_buffer_size = DEFAULT_READING_SIZE + """ + Current connection reading buffer size. + """ + + self.environ = environ + """ + WSGI environ dictionary. + """ + + self.heartbeat_freq = heartbeat_freq + """ + At which interval the heartbeat will be running. + Set this to `0` or `None` to disable it entirely. + """ + "Internal buffer to get around SSL problems" + self.buf = b'' + + self._local_address = None + self._peer_address = None + + ######### + # @TMW2 + # Delays the reading if we are still busy writing + self.lock = False + # @TMW2 + ######### + + + @property + def local_address(self): + """ + Local endpoint address as a tuple + """ + if not self._local_address: + self._local_address = self.sock.getsockname() + if len(self._local_address) == 4: + self._local_address = self._local_address[:2] + return self._local_address + + @property + def peer_address(self): + """ + Peer endpoint address as a tuple + """ + if not self._peer_address: + self._peer_address = self.sock.getpeername() + if len(self._peer_address) == 4: + self._peer_address = self._peer_address[:2] + return self._peer_address + + def opened(self): + """ + Called by the server when the upgrade handshake + has succeeded. + """ + pass + + def close(self, code=1000, reason=''): + """ + Call this method to initiate the websocket connection + closing by sending a close frame to the connected peer. + The ``code`` is the status code representing the + termination's reason. + + Once this method is called, the ``server_terminated`` + attribute is set. Calling this method several times is + safe as the closing frame will be sent only the first + time. + + .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1 + """ + if not self.server_terminated: + self.server_terminated = True + try: + self._write(self.stream.close(code=code, reason=reason).single(mask=self.stream.always_mask)) + except Exception as ex: + logger.error("Error when terminating the connection: %s", str(ex)) + + def closed(self, code, reason=None): + """ + Called when the websocket stream and connection are finally closed. + The provided ``code`` is status set by the other point and + ``reason`` is a human readable message. + + .. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1 + """ + pass + + @property + def terminated(self): + """ + Returns ``True`` if both the client and server have been + marked as terminated. + """ + return self.client_terminated is True and self.server_terminated is True + + @property + def connection(self): + return self.sock + + def close_connection(self): + """ + Shutdowns then closes the underlying connection. + """ + if self.sock: + try: + self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + except: + pass + finally: + self.sock = None + + def ping(self, message): + """ + Send a ping message to the remote peer. + The given `message` must be a unicode string. + """ + self.send(PingControlMessage(message)) + + def ponged(self, pong): + """ + Pong message, as a :class:`messaging.PongControlMessage` instance, + received on the stream. + """ + pass + + def received_message(self, message): + """ + Called whenever a complete ``message``, binary or text, + is received and ready for application's processing. + + The passed message is an instance of :class:`messaging.TextMessage` + or :class:`messaging.BinaryMessage`. + + .. note:: You should override this method in your subclass. + """ + pass + + def unhandled_error(self, error): + """ + Called whenever a socket, or an OS, error is trapped + by ws4py but not managed by it. The given error is + an instance of `socket.error` or `OSError`. + + Note however that application exceptions will not go + through this handler. Instead, do make sure you + protect your code appropriately in `received_message` + or `send`. + + The default behaviour of this handler is to log + the error with a message. + """ + logger.exception("Failed to receive data") + + def _write(self, b): + """ + Trying to prevent a write operation + on an already closed websocket stream. + + This cannot be bullet proof but hopefully + will catch almost all use cases. + """ + if self.terminated or self.sock is None: + raise RuntimeError("Cannot send on a terminated websocket") + + self.sock.sendall(b) + + def send(self, payload, binary=False): + """ + Sends the given ``payload`` out. + + If ``payload`` is some bytes or a bytearray, + then it is sent as a single message not fragmented. + + If ``payload`` is a generator, each chunk is sent as part of + fragmented message. + + If ``binary`` is set, handles the payload as a binary message. + """ + ######### + # @TMW2 + # Delays the reading if we are still busy writing + while self.lock: + time.sleep(0.02) + #print("Sending data") + self.lock = True + # @TMW2 + ######### + + message_sender = self.stream.binary_message if binary else self.stream.text_message + + if isinstance(payload, basestring) or isinstance(payload, bytearray): + m = message_sender(payload).single(mask=self.stream.always_mask) + self._write(m) + + elif isinstance(payload, Message): + data = payload.single(mask=self.stream.always_mask) + self._write(data) + + elif type(payload) == types.GeneratorType: + bytes = next(payload) + first = True + for chunk in payload: + self._write(message_sender(bytes).fragment(first=first, mask=self.stream.always_mask)) + bytes = chunk + first = False + + self._write(message_sender(bytes).fragment(first=first, last=True, mask=self.stream.always_mask)) + + else: + raise ValueError("Unsupported type '%s' passed to send()" % type(payload)) + + ######### + # @TMW2 + # Delays the reading if we are still busy writing + self.lock = False + #print("Done sending, unlocked") + # @TMW2 + ######### + + def _get_from_pending(self): + """ + The SSL socket object provides the same interface + as the socket interface but behaves differently. + + When data is sent over a SSL connection + more data may be read than was requested from by + the ws4py websocket object. + + In that case, the data may have been indeed read + from the underlying real socket, but not read by the + application which will expect another trigger from the + manager's polling mechanism as if more data was still on the + wire. This will happen only when new data is + sent by the other peer which means there will be + some delay before the initial read data is handled + by the application. + + Due to this, we have to rely on a non-public method + to query the internal SSL socket buffer if it has indeed + more data pending in its buffer. + + Now, some people in the Python community + `discourage <https://bugs.python.org/issue21430>`_ + this usage of the ``pending()`` method because it's not + the right way of dealing with such use case. They advise + `this approach <https://docs.python.org/dev/library/ssl.html#notes-on-non-blocking-sockets>`_ + instead. Unfortunately, this applies only if the + application can directly control the poller which is not + the case with the WebSocket abstraction here. + + We therefore rely on this `technic <http://stackoverflow.com/questions/3187565/select-and-ssl-in-python>`_ + which seems to be valid anyway. + + This is a bit of a shame because we have to process + more data than what wanted initially. + """ + data = b"" + pending = self.sock.pending() + while pending: + data += self.sock.recv(pending) + pending = self.sock.pending() + return data + + def once(self): + """ + Performs the operation of reading from the underlying + connection in order to feed the stream of bytes. + + Because this needs to support SSL sockets, we must always + read as much as might be in the socket at any given time, + however process expects to have itself called with only a certain + number of bytes at a time. That number is found in + self.reading_buffer_size, so we read everything into our own buffer, + and then from there feed self.process. + + Then the stream indicates + whatever size must be read from the connection since + it knows the frame payload length. + + It returns `False` if an error occurred at the + socket level or during the bytes processing. Otherwise, + it returns `True`. + """ + if self.terminated: + logger.debug("WebSocket is already terminated") + return False + try: + b = b'' + if self._is_secure: + b = self._get_from_pending() + if not b and not self.buf: + b = self.sock.recv(self.reading_buffer_size) + if not b and not self.buf: + return False + self.buf += b + except (socket.error, OSError, pyOpenSSLError) as e: + if hasattr(e, "errno") and e.errno == errno.EINTR: + pass + else: + self.unhandled_error(e) + return False + else: + # process as much as we can + # the process will stop either if there is no buffer left + # or if the stream is closed + # only pass the requested number of bytes, leave the rest in the buffer + requested = self.reading_buffer_size + if not self.process(self.buf[:requested]): + return False + self.buf = self.buf[requested:] + + return True + + def terminate(self): + """ + Completes the websocket by calling the `closed` + method either using the received closing code + and reason, or when none was received, using + the special `1006` code. + + Finally close the underlying connection for + good and cleanup resources by unsetting + the `environ` and `stream` attributes. + """ + s = self.stream + + try: + if s.closing is None: + self.closed(1006, "Going away") + else: + self.closed(s.closing.code, s.closing.reason) + finally: + self.client_terminated = self.server_terminated = True + self.close_connection() + + # Cleaning up resources + s._cleanup() + self.stream = None + self.environ = None + + def process(self, bytes): + """ Takes some bytes and process them through the + internal stream's parser. If a message of any kind is + found, performs one of these actions: + + * A closing message will initiate the closing handshake + * Errors will initiate a closing handshake + * A message will be passed to the ``received_message`` method + * Pings will see pongs be sent automatically + * Pongs will be passed to the ``ponged`` method + + The process should be terminated when this method + returns ``False``. + """ + ######### + # @TMW2 + # Delays the reading if we are still busy writing + while self.lock: + time.sleep(0.02) + #print("processing, lock %d" % self.lock) + self.lock = True + #print("processing, lock %d" % self.lock) + err=1 + # @TMW2 + ######### + + s = self.stream + + if not bytes and self.reading_buffer_size > 0: + ######### + # @TMW2 + self.lock = False + # @TMW2 + ######### + return False + + ######### + # @TMW2 + while err: + try: + self.reading_buffer_size = s.parser.send(bytes) or DEFAULT_READING_SIZE + err=0 + except ValueError: + time.sleep(0.1) + #print("ValueError happened, total %d" % err) + if not err % 20: + print("ValueError happened, total %d" % err) + err+=1 + # @TMW2 + ######### + + if s.closing is not None: + logger.debug("Closing message received (%d) '%s'" % (s.closing.code, s.closing.reason)) + if not self.server_terminated: + self.close(s.closing.code, s.closing.reason) + else: + self.client_terminated = True + ######### + # @TMW2 + self.lock = False + # @TMW2 + ######### + return False + + if s.errors: + for error in s.errors: + logger.debug("Error message received (%d) '%s'" % (error.code, error.reason)) + self.close(error.code, error.reason) + s.errors = [] + ######### + # @TMW2 + self.lock = False + # @TMW2 + ######### + return False + + if s.has_message: + self.received_message(s.message) + if s.message is not None: + s.message.data = None + s.message = None + ######### + # @TMW2 + self.lock = False + # @TMW2 + ######### + return True + + if s.pings: + for ping in s.pings: + self._write(s.pong(ping.data)) + s.pings = [] + + if s.pongs: + for pong in s.pongs: + self.ponged(pong) + s.pongs = [] + + ######### + # @TMW2 + self.lock = False + # @TMW2 + ######### + return True + + def run(self): + """ + Performs the operation of reading from the underlying + connection in order to feed the stream of bytes. + + We start with a small size of two bytes to be read + from the connection so that we can quickly parse an + incoming frame header. Then the stream indicates + whatever size must be read from the connection since + it knows the frame payload length. + + Note that we perform some automatic opererations: + + * On a closing message, we respond with a closing + message and finally close the connection + * We respond to pings with pong messages. + * Whenever an error is raised by the stream parsing, + we initiate the closing of the connection with the + appropiate error code. + + This method is blocking and should likely be run + in a thread. + """ + self.sock.setblocking(True) + with Heartbeat(self, frequency=self.heartbeat_freq): + s = self.stream + + try: + self.opened() + while not self.terminated: + if not self.once(): + break + finally: + self.terminate() + +class EchoWebSocket(WebSocket): + def received_message(self, message): + """ + Automatically sends back the provided ``message`` to + its originating endpoint. + """ + self.send(message.data, message.is_binary) |