Some minor upgrade to juce_Socket


#1

Hi Jules,

Would you accept some addition to Juce’s socket, if implemented like this:

class JUCE_API  Socket
{
public:
    //==============================================================================
    /** Creates an uninitialised stream socket.

        To connect it, use the connect() method, after which you can read() or write()
        to it.

        To wait for other sockets to connect to this one, the createListener() method
        enters "listener" mode, and can be used to spawn new sockets for each connection
        that comes along.

        This is typical TCP socket.
    */
    Socket();

    /** Creates an uninitialised datagram socket.

        The localPort is the port on which to bind the socket before commiting any transmission. 
        If set to 0, the localPort is assigned by the operating system.

        To use it, you can call the connect() method. This method will not perform any 
        connection at all, but instead save the destination you've provided.
        After this, you can read() or write() to it, the same destination will be used.
        
        To wait for other sockets to connect to this one, you can directly 
        waitForNewConnection().

        This is typical UDP socket.
    */
    Socket(int localPort);

    /** Destructor. */
    ~Socket();

    //==============================================================================
    // Tests if the socket is ready
    // Returns: 1 == yes, 0 == no, -1 == error
    int isReady (int timeoutMsecs = 0);

    /** Test whether a socket is ready for reading or writing.

        If the socket is ready for any communication, you can write & read to/from it, 
        without being blocked.

        Returns true if the socket is ready for the mode you've specified, false otherwise.
        @warning Don't poll with this method, prefer using waitForStateChange instead.
        @see read, write, waitForStateChange
    */
    bool isReadyForCommunication(bool writing);

    /** Wait on this socket state change.
        
        Yield the current thread until the socket state you're monitoring changed, 
        or a timeout expired

        Returns true if the wait succeeded, false on timeout or error
        @see read, write, isReadyForCommunication
    */
    bool waitForStateChange(bool monitorReadingState, bool monitorWritingState, int timeOutMillisecs = 300);

    /** Optimise the socket transfers when streaming large amount of data.

        Use this method if you're receiving large amount of data in small time on datagram based sockets,
        or you want to trash the remnant packets while closing a communication.

        For typical usage you shouldn't need this method. 
        However, for streaming & low-latency communication, it's required.

        For datagram sockets, this method will create a larger buffer in kernel space, so you won't miss
        any packet anymore.
        For stream sockets, this method will stop lingering, so you can effectively close a socket faster.
        The other side must have a kind of "watchdog" mode, or the other side will never know the socket 
        communication has been broken. This is only useful on very slow links, where round trip time is 
        very high.

        Returns true on success
    */
    bool optimizeSocketForStreaming();
  
    //==============================================================================
    /** Reads bytes from the socket (blocking).

        Returns the number of bytes read, or -1 if there was an error.
        @warning This method will block unless you have checked the socket is ready for reading before.
    */
    int read (void* destBuffer, const int maxBytesToRead);

    /** Writes bytes to the socket from a buffer.

        This may block on error conditions.

        Returns the number of bytes written, or -1 if there was an error.
        @warning This method will block unless you have checked the socket is ready for writing before.
    */
    int write (const void* sourceBuffer, int numBytesToWrite);

    //==============================================================================
    /** Tries to connect the socket to hostname:port.

        If timeOutMillisecs is 0, then the connect will block until operating system
        reject the connection (which can be very long).

        Returns true if it succeeds.

        @see isConnected
    */
    bool connect (const String& hostname,
                  int portNumber,
                  int timeOutMillisecs = 3000);

    /** Closes the connection. */
    void close();

    //==============================================================================
    /** True if the socket is currently connected. */
    bool isConnected() const throw()                            { return connected; }

    /** Returns the name of the currently connected host. */
    const String& getHostName() const throw()                   { return hostName; }

    /** Returns the port number that's currently open. */
    int getPort() const throw()                                 { return portNumber; }

    /** Bind the socket on the specified port. 

        This must be done before any connection 
        Returns true on success, false may indicate that another socket is already bound on that port
    */
    bool bindOnPort(const int port) throw();

    /** True if the socket is connected to this machine rather than over the network. */
    bool isLocal() const throw();

    //==============================================================================
    /** Puts this socket into "listener" mode.

        When in this mode, your thread can call waitForNextConnection() repeatedly,
        which will spawn new sockets for each new connection, so that these can
        be handled in parallel by other threads.

        This returns true if it manages to open the socket successfully.

        @see waitForNextConnection
    */
    bool createListener (int portNumber);

    /** When in "listener" mode, this waits for a connection and spawns it as a new
        socket.

        The object that gets returned will be owned by the caller.

        This method can only be called after using createListener().

        @see createListener
    */
    Socket* waitForNextConnection();

[...]

#2

And the matching C++ code :

/*
  ==============================================================================

   This file is part of the JUCE library - "Jules' Utility Class Extensions"
   Copyright 2004-7 by Raw Material Software ltd.

  ------------------------------------------------------------------------------

   JUCE can be redistributed and/or modified under the terms of the
   GNU General Public License, as published by the Free Software Foundation;
   either version 2 of the License, or (at your option) any later version.

   JUCE is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with JUCE; if not, visit www.gnu.org/licenses or write to the
   Free Software Foundation, Inc., 59 Temple Place, Suite 330,
   Boston, MA 02111-1307 USA

  ------------------------------------------------------------------------------

   If you'd like to release a closed-source product which uses JUCE, commercial
   licenses are also available: visit www.rawmaterialsoftware.com/juce for
   more information.

  ==============================================================================
*/

#ifdef _WIN32
  #include "../../../../build/win32/platform_specific_code/win32_headers.h"
  #include <winsock2.h>

  #ifdef _MSC_VER
    #pragma warning (disable : 4127 4389 4018)
  #endif

#else
  #ifndef LINUX
    #include <Carbon/Carbon.h>
  #endif

  #include <sys/types.h>
  #include <netdb.h>
  #include <sys/socket.h>
  #include <arpa/inet.h>
  #include <sys/errno.h>
  #include <netinet/tcp.h>
  #include <netinet/in.h>
  #include <fcntl.h>
  #include <unistd.h>
#endif

#include "../../basics/juce_StandardHeader.h"

BEGIN_JUCE_NAMESPACE

#include "juce_Socket.h"
#include "../../threads/juce_ScopedLock.h"
#include "../../threads/juce_Thread.h"


#if JUCE_WIN32
 static CriticalSection socketInitLock;
 static int numActiveSockets = 0;
#endif

//==============================================================================
Socket::Socket()
    : portNumber (0),
      handle (-1),
      connected (false),
      isListener (false),
      datagram(false),
      serverAddress(0)
{
#if JUCE_WIN32
    const ScopedLock sl (socketInitLock);

    if (numActiveSockets++ == 0)
    {
        WSADATA wsaData;
        WORD wVersionRequested = MAKEWORD (1, 1);
        WSAStartup (wVersionRequested, &wsaData);
    }
#endif
}

Socket::Socket(int localPort)
    : portNumber (0),
      handle (-1),
      connected (false),
      isListener (false),
      datagram(true),
      serverAddress(0)
{
#if JUCE_WIN32
    const ScopedLock sl (socketInitLock);

    if (numActiveSockets++ == 0)
    {
        WSADATA wsaData;
        WORD wVersionRequested = MAKEWORD (1, 1);
        WSAStartup (wVersionRequested, &wsaData);
    }
#endif
    bindOnPort(localPort);
}

Socket::Socket (const String& hostName_, const int portNumber_, const int handle_)
    : hostName (hostName_),
      portNumber (portNumber_),
      handle (handle_),
      connected (true),
      isListener (false),
      datagram(false),
      serverAddress(0)
{
#if JUCE_WIN32
    socketInitLock.enter();
    ++numActiveSockets;
    socketInitLock.exit();
#endif

    resetSocketOptions();
}

Socket::Socket (const String& hostName_, const int portNumber_, const int localPort_, const int handle_)
   : hostName (hostName_),
      portNumber (portNumber_),
      handle (handle_),
      connected (true),
      isListener (false),
      datagram(true),
      serverAddress(0)
{
#if JUCE_WIN32
    socketInitLock.enter();
    ++numActiveSockets;
    socketInitLock.exit();
#endif

    if (handle_ > 0) resetSocketOptions();
    if (localPort_ > 0) bindOnPort(localPort_);
}

Socket::~Socket()
{
    close();

#if JUCE_WIN32
    const ScopedLock sl (socketInitLock);

    if (--numActiveSockets == 0)
        WSACleanup();
#endif
}

//==============================================================================
bool Socket::resetSocketOptions()
{
    const int sndBufSize = 65536;
    const int rcvBufSize = 65536;
    const int one = 1;

    return setsockopt (handle, SOL_SOCKET, SO_RCVBUF, (const char*) &rcvBufSize, sizeof (int)) == 0
            && setsockopt (handle, SOL_SOCKET, SO_SNDBUF, (const char*) &sndBufSize, sizeof (int)) == 0
            && setsockopt (handle, IPPROTO_TCP, TCP_NODELAY, (const char*) &one, sizeof (int)) == 0;
}

//==============================================================================
int Socket::read (void* destBuffer, const int maxBytesToRead)
{
    if (isListener || ! connected)
        return -1;

    int bytesRead = 0;

    while (bytesRead < maxBytesToRead)
    {
        int bytesThisTime;

#if JUCE_WIN32
        bytesThisTime = recv (handle, ((char*) destBuffer) + bytesRead, maxBytesToRead - bytesRead, 0);
#else
        while ((bytesThisTime = ::read (handle, ((char*) destBuffer) + bytesRead, maxBytesToRead - bytesRead)) < 0
                 && errno == EINTR
                 && connected)
        {
        }
#endif

        if (bytesThisTime <= 0 || ! connected)
        {
            if (bytesRead == 0)
                bytesRead = -1;

            break;
        }

        bytesRead += bytesThisTime;
    }

    return bytesRead;
}

int Socket::write (const void* sourceBuffer, int numBytesToWrite)
{
    if (isListener || ! connected)
        return -1;

    if (datagram)
    {
        jassert(serverAddress); // Must call connect first to set the server address
        return sendto(handle, (const char*) sourceBuffer, numBytesToWrite, 0, (const struct sockaddr*)serverAddress, sizeof(struct sockaddr_in));
    }

#if JUCE_WIN32
    return send (handle, (const char*) sourceBuffer, numBytesToWrite, 0);
#else
    int result;

    while ((result = ::write (handle, sourceBuffer, numBytesToWrite)) < 0
            && errno == EINTR)
    {
    }

    return result;
#endif
}

//==============================================================================
int Socket::isReady (int timeoutMsecs)
{
    return waitForStateChange(true, false, timeoutMsecs);
}

bool Socket::isReadyForCommunication (bool writing)
{
    return waitForStateChange(!writing, writing, 0) == 1;
}

int Socket::waitForStateChange (bool monitorReadingState, bool monitorWritingState, int timeoutMsecs)
{
    if (! connected)
        return -1;

    struct timeval timeout;
    struct timeval* timeoutp;

    if (timeoutMsecs >= 0)
    {
        timeout.tv_sec = timeoutMsecs / 1000;
        timeout.tv_usec = (timeoutMsecs % 1000) * 1000;
        timeoutp = &timeout;
    }
    else
    {
        timeoutp = 0;
    }


    fd_set rset, wset;
    FD_ZERO (&rset);
    FD_ZERO (&wset);
    FD_SET (handle, &rset);
    FD_SET (handle, &wset);

    fd_set * prset = monitorReadingState ? &rset : 0;
    fd_set * pwset = monitorWritingState ? &wset : 0;

#if JUCE_WIN32
    if (select (handle + 1, prset, pwset, 0, timeoutp) < 0)
        return -1;
#else
    int result;
    while ((result = select (handle + 1, prset, pwset, 0, timeoutp)) < 0
            && errno == EINTR)
    {
    }

    if (result < 0)
        return -1;
#endif

    if ((monitorReadingState && monitorWritingState && FD_ISSET (handle, &rset) && FD_ISSET (handle, &wset))
        || (monitorReadingState && FD_ISSET (handle, &rset))
        || (monitorWritingState && FD_ISSET (handle, &wset)))
        return 1;

    return 0;
}


//==============================================================================
bool Socket::setSocketBlockingState(bool block)
{
#if JUCE_WIN32
    int ioNonBlock = block ? 0 : 1;
	// Set non blocking socket now
	if (ioctlsocket(handle, FIONBIO, (u_long FAR*) &ioNonBlock) != 0) return false;
#else
	int socketFlags = fcntl(mxSocket, F_GETFL, 0);
	if (socketFlags == -1) return false;
    if (block) socketFlags &= ~O_NONBLOCK; else socketFlags |= O_NONBLOCK;
	if (fcntl(handle, F_SETFL, socketFlags) != 0) return false;
#endif

    return true;
}


bool Socket::connect (const String& newHostName,
                      int newPortNumber,
                      int timeOutMillisecs)
{
    if (isListener)
    {
        jassertfalse    // a listener socket can't connect to another one!
        return false;
    }
    if (isListener && datagram)
    {
        jassertfalse    // a listener socket can't be a datagram socket!
        return false;
    }

    if (connected)
        close();

    hostName = newHostName;
    portNumber = newPortNumber;
    isListener = false;

    struct hostent* hostEnt = gethostbyname (hostName);

    if (! hostEnt)
        return false;

    struct in_addr targetAddress;
    memcpy (&targetAddress.s_addr,
            *(hostEnt->h_addr_list),
            sizeof (targetAddress.s_addr));

    struct sockaddr_in servTmpAddr;
    zerostruct (servTmpAddr);
    servTmpAddr.sin_family = PF_INET;
    servTmpAddr.sin_addr = targetAddress;
    servTmpAddr.sin_port = htons ((uint16) portNumber);

    handle = (int) socket (AF_INET, datagram ? SOCK_STREAM : SOCK_DGRAM, 0);

    if (handle < 0)
        return false;
    if (datagram)
    {
        serverAddress = (void*)new struct sockaddr_in;
        *((struct sockaddr_in*)serverAddress) = servTmpAddr;
        return true;
    }
    else 
        setSocketBlockingState(false);



    int result = 0;
    if (result = ::connect (handle, (struct sockaddr*) &servTmpAddr, sizeof (struct sockaddr_in)) < 0)
    {   // Errors here
#if JUCE_WIN32
        if (result == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
#else
        if (result == EINPROGRESS)    
#endif
        {   // Will we connect in the time provided ?
            if (waitForStateChange(false, true, timeOutMillisecs) != 1)
            {   // Well, no...
                close();
                return false;
            } 
        }
    } 
    
    // Restore socket blocking state 
    setSocketBlockingState(true);
    resetSocketOptions();
    return true;
}

void Socket::close()
{
#if JUCE_WIN32
    closesocket (handle);
#else
    if (connected)
    {
        connected = false;

        if (isListener)
        {
            // need to do this to interrupt the accept() function..
            Socket temp;
            temp.connect ("localhost", portNumber, 1000);
        }
    }

    ::close (handle);
#endif

    hostName = String::empty;
    portNumber = 0;
    handle = -1;
    connected = false;
    isListener = false;
    delete ((struct sockaddr_in*)serverAddress);
    serverAddress = 0;
}

//==============================================================================
bool Socket::createListener (int newPortNumber)
{
    if (connected)
        close();
    if (datagram)
    {
        jassertfalse    // You can't create a listener on datagram based sockets
        return false;
    }

    hostName = "listener";
    portNumber = newPortNumber;
    isListener = true;

    struct sockaddr_in servTmpAddr;
    zerostruct (servTmpAddr);
    servTmpAddr.sin_family = PF_INET;
    servTmpAddr.sin_addr.s_addr = htonl (INADDR_ANY);
    servTmpAddr.sin_port = htons ((uint16) portNumber);

    handle = (int) socket (AF_INET, SOCK_STREAM, 0);

    if (handle < 0)
        return false;

    const int reuse = 1;
    setsockopt (handle, SOL_SOCKET, SO_REUSEADDR, (const char*) &reuse, sizeof (reuse));

    if (bind (handle, (struct sockaddr*) &servTmpAddr, sizeof (struct sockaddr_in)) < 0
         || listen (handle, SOMAXCONN) < 0)
    {
        close();
        return false;
    }

    connected = true;
    return true;
}

Socket* Socket::waitForNextConnection()
{
    if (datagram)
    {
        jassertfalse // Datagram based socket should never call this method
        return 0;
    }

    jassert (isListener || ! connected); // to call this method as stream based socket, you first have to use 
                                         // createListener() to prepare this socket as a listener.

    if (connected && isListener)
    {
        struct sockaddr address;

#if defined (JUCE_LINUX) || (defined (JUCE_MAC) && ! MACOS_10_2_OR_EARLIER)
        socklen_t len = sizeof (sockaddr);
#else
        int len = sizeof (sockaddr);
#endif
        const int newSocket = (int) accept (handle, &address, &len);

        if (newSocket >= 0 && connected)
            return new Socket (inet_ntoa (((struct sockaddr_in*) &address)->sin_addr),
                               portNumber, newSocket);
    }

    return 0;
}

Socket* Socket::monitorIncomingData()
{
    if (datagram)
    {
        struct sockaddr address;

#if defined (JUCE_LINUX) || (defined (JUCE_MAC) && ! MACOS_10_2_OR_EARLIER)
        socklen_t len = sizeof (sockaddr);
#else
        int len = sizeof (sockaddr);
#endif
        while (waitForStateChange(true, false, -1) == 1)
        {
            char buf[1];
            if (recvfrom(handle, buf, 0, 0, &address, &len) > 0)
            {
                return new Socket(inet_ntoa (((struct sockaddr_in*) &address)->sin_addr), ntohs(((struct sockaddr_in*) &address)->sin_port), -1, -1); 
            } 
        }
    } else
    {
        jassertfalse    // You can only call this function with datagram based sockets
    }
    return 0;
}

//==============================================================================
bool Socket::isLocal() const throw()
{
    return hostName == T("127.0.0.1");
}

bool Socket::bindOnPort(const int port)
{
    if (!handle)
        return false;

    struct sockaddr_in servTmpAddr;
    zerostruct (servTmpAddr);
    servTmpAddr.sin_family = PF_INET;
    servTmpAddr.sin_addr.s_addr = htonl (INADDR_ANY);
    servTmpAddr.sin_port = htons ((uint16) port);

    if (bind (handle, (struct sockaddr*) &servTmpAddr, sizeof (struct sockaddr_in)) < 0)
        return false;
    return true;
}

END_JUCE_NAMESPACE

#3