summaryrefslogblamecommitdiff
path: root/discordrpc/presence.py
blob: d2be8c8bccdf449ac7634c0e542e3f57c64a7bfa (plain) (tree)








































































































































































































































                                                                                         
                             

















                                                               
import json
import os
import socket
import sys
import struct
import time
from .exceptions import *


OP_HANDSHAKE = 0
OP_FRAME = 1
OP_CLOSE = 2
OP_PING = 3
OP_PONG = 4


class RPC:
    def __init__(self, client_id):
        self.client_id = client_id
        self._connect()
        self._do_handshake()
        self._output = None
        self.show_output = False


    @classmethod
    def set_id(self, app_id):
        app_id = str(app_id)
        platform = sys.platform
        if platform == 'win32':
            return DiscordWindows(app_id)
        else:
            return DiscordUnix(app_id)
        
    
    def _connect(self):
        pass


    def _do_handshake(self):
        op, data = self.send_recv({'v': 1, 'client_id': self.client_id}, op=OP_HANDSHAKE)
        if op == OP_FRAME and data['cmd'] == 'DISPATCH' and data['evt'] == 'READY':
            return
        else:
            if op == OP_CLOSE:
                self.close()
                if data['code'] == 4000:
                    raise InvalidID()
            raise RPCException(f"Unknown error : {data}")
        

    def _write(self, data:bytes):
        pass

    def _recv(self, size:int) -> bytes:
        pass

    def _recv_header(self):
        header = self._recv_exact(8)
        return struct.unpack("<II", header)
    
    def _recv_exact(self, size_remaining) -> bytes:
        buf = b""
        while size_remaining:
            chunk = self._recv(size_remaining)
            buf += chunk
            size_remaining -= len(chunk)
        return buf
    
    def close(self):
        try:
            self.send({}, op=OP_CLOSE)
        finally:
            self._close()

    
    def _close(self):
        pass

    def __enter__(self):
        return self
    
    def __exit__(self, *_):
        self.close()

    def send_recv(self, data, op=OP_FRAME):
        self.send(data, op)
        return self.recv()
    
    def send(self, data, op=OP_FRAME):
        data = json.dumps(data, separators=(',', ':')).encode('UTF-8')
        header = struct.pack("<II", op, len(data))
        self._write(header)
        self._write(data)

    def recv(self):
        op, length = self._recv_header()
        payload = self._recv_exact(length)
        data = json.loads(payload.decode('UTF-8'))
        return op, data
    
    timestamp = time.mktime(time.localtime())
    
    def set_activity(
            self,
            state:str=None,
            details:str=None,
            timestamp=None,
            small_text:str='null',
            large_text:str='null',
            small_image:str='null',
            large_image:str='null',
            buttons=None
    ):
        
        if len(large_text) <= 1:
            raise Error('"large text" must be at least above 1 characters')
        
        if len(small_text) <= 1:
            raise Error('"small text" must be at least above 1 characters')
        
        act = {
            "state": state,
            "details": details,
            "timestamps": {
                "start": timestamp
            },
            "assets": {
                "small_text": small_text,
                "large_text": large_text,
                "small_image": str(small_image),
                "large_image": str(large_image)
            },
            "buttons": buttons,
        }

        if state == None or state == '' or state == 'null':
            act.pop('state', None)
        if details == None or details == '' or details == 'null':
            act.pop('details', None)
        if small_text == None or small_text == '' or small_text == 'null':
            act['assets'].pop('small_text', None)
        if large_text == None or large_text == '' or large_text == 'null':
            act['assets'].pop('large_text', None)
        if timestamp == None:
            act.pop('timestamps', None)
        if buttons == None:
            act.pop('buttons', None)


        data = {
            'cmd': 'SET_ACTIVITY',
            'args': {
                'pid': os.getpid(),
                'activity': act
            },
            'nonce': '{:.20f}'.format(time.time())
        }

        self.send(data)

        op, length = self._recv_header()
        payload = self._recv_exact(length)
        output = json.loads(payload.decode('UTF-8'))
        if output['evt'] == 'ERROR':
            raise ActivityError
        
        else:
            if self.show_output==True:print(f"Successfully set RPC for {self.client_id}")

        return op, output

    

    def run(self):
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.close()



class DiscordWindows(RPC):
    _pipe_path = R'\\?\pipe\discord-ipc-{}'

    def _connect(self):
        for i in range(10):
            path = self._pipe_path.format(i)

            try:
                self._open = open(path, "w+b")
            except OSError as e:
                raise Error("Failed to open {!r}: {}".format(path, e))
            else:
                break

        else:
            raise DiscordNotOpened

        self.path = path

    def _write(self, data:bytes):
        self._open.write(data)
        self._open.flush()

    def _recv(self, size:int) -> bytes:
        return self._open.read(size)
    
    def _close(self):
        self._open.close()



class DiscordUnix(RPC):
    def _connect(self):
        self._socket = socket.socket(socket.AF_UNIX)
        pipe_path = self._get_pipe_path()

        for i in range(10):
            path = pipe_path.format(i)
            if not os.path.exists(path):
                continue
            try:
                self._socket.connect(path)
            except OSError as e:
                raise Error("Failed to open {!r}: {}".format(path, e))
            else:
                break
        else:
            raise DiscordNotOpened


    def _get_pipe_path(self):
        env_keys = ('XDG_RUNTIME_DIR', 'TMPDIR', 'TMP', 'TEMP')
        for env_key in env_keys:
            dir_path = os.environ.get(env_key)
            if dir_path:
                break
        else:
            dir_path = '/tmp'
        return os.path.join(dir_path, 'discord-ipc-{}')
    

    def _write(self, data:bytes):
        self._socket.sendall(data)

    def _recv(self, size:int) -> bytes:
        return self._socket.recv(size)
    
    def _close(self):
        self._socket.close()