使用 reactphp/socket 进行 websocket

2020-03-28  本文已影响0人  小山丘321
<?php

require 'vendor/autoload.php';

function handleHeader($line)
{
    $headers = array();
    $lines   = preg_split("/\r\n/", $line);
    foreach ($lines as $line) {
        $line = chop($line);
        if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
            $headers[$matches[1]] = $matches[2];
        }
    }

    return $headers;
}
/**
 * 握手处理
 * @param $newClient socket
 * @return int  接收到的信息
 */
function handshaking($line)
{
    $headers   = handleHeader($line);
    $secKey    = $headers['Sec-WebSocket-Key'];
    $secAccept = base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
    return "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
        "Upgrade: websocket\r\n" .
        "Connection: Upgrade\r\n" .
        "WebSocket-Origin: 127.0.0.1\r\n" .
        "WebSocket-Location: ws://127.0.0.1:9000/websocket/websocket\r\n" .
        "Sec-WebSocket-Accept:$secAccept\r\n\r\n";
}

/**
 * 解析接收数据
 * @param $buffer
 * @return null|string
 */
function decode($buffer)
{
    $len = $masks = $data = $decoded = null;
    $len = ord($buffer[1]) & 127;
    if ($len === 126) {
        $masks = substr($buffer, 4, 4);
        $data  = substr($buffer, 8);
    } else if ($len === 127) {
        $masks = substr($buffer, 10, 4);
        $data  = substr($buffer, 14);
    } else {
        $masks = substr($buffer, 2, 4);
        $data  = substr($buffer, 6);
    }
    for ($index = 0; $index < strlen($data); $index++) {
        $decoded .= $data[$index] ^ $masks[$index % 4];
    }
    return $decoded;
}

/**
 *打包消息
 **/
function encode($buffer)
{
    $first_byte = "\x81";
    $len        = strlen($buffer);
    if ($len <= 125) {
        $encode_buffer = $first_byte . chr($len) . $buffer;
    } else {
        if ($len <= 65535) {
            $encode_buffer = $first_byte . chr(126) . pack("n", $len) . $buffer;
        } else {
            $encode_buffer = $first_byte . chr(127) . pack("xxxxN", $len) . $buffer;
        }
    }
    return $encode_buffer;
}

use React\Socket\ConnectionInterface;

class ConnectionsPool
{
    /** @var SplObjectStorage  */
    private $connections;

    public $userId = 1000;

    public function __construct()
    {
        $this->connections = new SplObjectStorage();
    }

    public function add(ConnectionInterface $connection)
    {
        // $connection->write("Enter your name: ");
        $this->initEvents($connection);
        $this->setConnectionData($connection, []);
    }

    /**
     * @param ConnectionInterface $connection
     */
    private function initEvents(ConnectionInterface $connection)
    {
        // On receiving the data we loop through other connections
        // from the pool and write this data to them
        $connection->on('data', function ($data) use ($connection) {
            $connectionData = $this->getConnectionData($connection);

            // It is the first data received, so we consider it as
            // a user's name.
            if (empty($connectionData)) {
                $this->addNewMember($data, $this->userId, $connection);
                return;
            }

            // $name = $connectionData['name'];
            $data = encode(decode($data));
            $this->sendAll($data, $connection);
            // $this->sendAll("$name: $data", $connection);
        });

        // When connection closes detach it from the pool
        $connection->on('close', function () use ($connection) {
            $data = $this->getConnectionData($connection);
            $name = $data['name'] ?? '';

            $this->connections->offsetUnset($connection);
            $this->sendAll("User $name leaves the chat\n", $connection);
        });
    }

    private function addNewMember($data, $name, $connection)
    {
        $name = str_replace(["\n", "\r"], "", $name);
        $this->setConnectionData($connection, ['name' => $name]);
        $data = handshaking($data);
        $connection->write($data);
    }

    private function setConnectionData(ConnectionInterface $connection, $data)
    {
        $this->connections->offsetSet($connection, $data);
    }

    private function getConnectionData(ConnectionInterface $connection)
    {
        return $this->connections->offsetGet($connection);
    }

    /**
     * Send data to all connections from the pool except
     * the specified one.
     *
     * @param mixed $data
     * @param ConnectionInterface $except
     */
    private function sendAll($data, ConnectionInterface $except)
    {
        foreach ($this->connections as $conn) {
            if ($conn != $except) {
                $conn->write($data);
            }

        }
    }
}

$loop   = React\EventLoop\Factory::create();
$socket = new React\Socket\Server('127.0.0.1:9000', $loop);
$pool   = new ConnectionsPool();

$socket->on('connection', function (ConnectionInterface $connection) use ($pool) {
    $pool->add($connection);
});

echo "Listening on {$socket->getAddress()}\n";

$loop->run();
上一篇下一篇

猜你喜欢

热点阅读