Streamingsocket will not send short messages


#1

I am running my communication task as a separate thread. I am having trouble with sending short messages with the Streaminsocket class, they are only sent after my communication task shuts down giving me around 15 secons delay … I can see that the TCP_NODELAY flag is set by calling getsockopt just before the send. I also tried setting the buffers to 0, assuring that the messages should be sent with no delay according to the windows documentation. If I patch in a shutdown right after sending, the message is being sent with no delay. But then my channel is shut down, still having lots more to do. The situation is the same on both Mac and Windows. Hope for a solution soon, lots of hours trying to solve this one…


#2

and… I am running a client - server setup. Communication is initiated from the client and the connection procedure goes well.


#3

Another symptom. When I break with the debugger in juce_Socket .cpp(functions called from my sever task), this blocks the internet connection on my computer. Explorer will not open any pages. When debugging stops, it’s back to normal.


#4

I guess the windows TCP stack has a shared lock, and if a process is holding it, you’re stuck. Not much we can do to change that!

No idea about the delay. Does the write() method block, or does it return before the bytes get sent?


#5

Seems I’ve been a bit stupid :oops: The message is really being sent. But I was trying to use variable length messages. A shorter message than the receive buffer length, will not be registered as a message being received. So it never did appear in the other end.

Simple solution, using fixed message and buffer length (being the same), always having enough room for the longest messages. Then adding a header with the real message length for picking out the variable length message. The extra overhead does not matter in this case.

Since the message and buffer length is set equal, there will be one recieve call for each message. Now life is easy again :smiley:


#6

I’m doing something similar, this seems to work with Juce r597 (pre-major Mac Cocoa port). Basically create two apps with this as the MainComponent on the same machine. The first one to launch should listen and the second one should connect to the first one. If one app is quit the other should then enter listen mode (so relaunching the other should reconnect). Moving the slider on one, should update the other.

[code]class MainComponent : public Component,
public SliderListener,
public Thread
{
private:
StreamingSocket* socket;
StreamingSocket* socketServer;
Slider* dataSlider;

CriticalSection lock;

public:

MainComponent ()
:	Thread(T("Socket")),
	socket(0),
	socketServer(0)
{		
	addAndMakeVisible(dataSlider = new Slider(T("Data")));
	dataSlider->addListener(this);
	startThread();
}

~MainComponent ()
{		
	signalThreadShouldExit();
			
	if(socket) socket->close();		
	
	{
		ScopedLock sl(lock);
		if(socketServer) socketServer->close();
	}
	
	stopThread(4000);
		
	if(socketServer) deleteAndZero(socketServer);
	if(socket)		 deleteAndZero(socket);
	
	deleteAllChildren();
}

void resized()
{
	dataSlider->setBounds(10, 40, 250, 20);
}

void sliderValueChanged(Slider* changedSlider)
{
	if(changedSlider == dataSlider)
	{
		if(socket == 0 || !socket->isConnected()) return;
		
		String msg = String(dataSlider->getValue(), 4) + T("\r\n");
		
		MemoryBlock data(BUFFERSIZE, true);
		data.copyFrom(msg.toUTF8(), 0, msg.length());
		
		int written = socket->write(data.getData(), data.getSize());
		
		Logger::outputDebugPrintf(T("bytes written = %d"), written);
	}
}

void run()
{
	while(!threadShouldExit())
	{
		Thread::sleep(2);
					
		if(socket == 0)
		{	
			ScopedLock sl(lock);
							
			socket = new StreamingSocket();
			bool success = socket->connect(T("localhost"), 12345);
			
			if(success)
				Logger::outputDebugString(T("connect succeeded"));
			else
			{
				deleteAndZero(socket);
				socketServer = new StreamingSocket();
				socketServer->createListener(12345);
				Logger::outputDebugString(T("listening..."));
				
				{
					ScopedUnlock sul(lock);
					socket = socketServer->waitForNextConnection();
				}
				
				deleteAndZero(socketServer);
				
				if(socket != 0)
				{
					Logger::outputDebugPrintf(T("connection received, %p"), socket);
					sliderValueChanged(dataSlider);
				}
				else
					Logger::outputDebugString(T("disconnecting"));
			}
			
		}
		else if(socket != 0 && socket->isConnected())
		{
			const ScopedLock sl(lock);
			
			if(socket->waitUntilReady(true, 0) == 1)
			{
				MemoryBlock data(BUFFERSIZE, true);
									
				int bytesRead = socket->read(data.getData(), data.getSize());
				
				if(bytesRead == -1)
				{
					Logger::outputDebugString(T("disconnected"));
					if(socket) deleteAndZero(socket);
				}
				else
				{
					Logger::outputDebugPrintf(T("bytes read = %d"), bytesRead);
					
					String msg = data.toString();
					
					Logger::outputDebugString(String("data read = ") + msg);
					
					const MessageManagerLock mmlock(Thread::getCurrentThread());
					if(mmlock.lockWasGained())
						dataSlider->setValue(msg.getFloatValue(), false);
				}
			}
		}
		else
		{
			Logger::outputDebugString(T("shouldn't happen!!"));
			deleteAndZero(socket);
		}
		
	}
}

};[/code]

But seems to get stuck at the MessageManagerLock on the Juce tip (r636 Mac). If I use a Timer, it’s fine:

[code]class MainComponent : public Component,
public SliderListener,
public Thread,
public Timer
{
private:
StreamingSocket* socket;
StreamingSocket* socketServer;
Slider* dataSlider;
double dataSliderValue;

CriticalSection lock;

public:

MainComponent ()
:	Thread(T("Socket")),
	socket(0),
	socketServer(0)
{		
	addAndMakeVisible(dataSlider = new Slider(T("Data")));
	dataSlider->addListener(this);
	startTimer(5);
	startThread();
}

~MainComponent ()
{		
	signalThreadShouldExit();
			
	if(socket) socket->close();		
	
	{
		ScopedLock sl(lock);
		if(socketServer) socketServer->close();
	}
	
	stopThread(4000);
		
	if(socketServer) deleteAndZero(socketServer);
	if(socket)		 deleteAndZero(socket);
	
	deleteAllChildren();
}

void resized()
{
	dataSlider->setBounds(10, 40, 250, 20);
}

void sliderValueChanged(Slider* changedSlider)
{
	if(changedSlider == dataSlider)
	{
		if(socket == 0 || !socket->isConnected()) return;
		
		String msg = String(dataSlider->getValue(), 4) + T("\r\n");
		
		MemoryBlock data(BUFFERSIZE, true);
		data.copyFrom(msg.toUTF8(), 0, msg.length());
		
		int written = socket->write(data.getData(), data.getSize());
		
		Logger::outputDebugPrintf(T("bytes written = %d"), written);
		
		dataSliderValue = dataSlider->getValue();
	}
}

void run()
{
	while(!threadShouldExit())
	{
		Thread::sleep(2);
					
		if(socket == 0)
		{	
			ScopedLock sl(lock);
							
			socket = new StreamingSocket();
			bool success = socket->connect(T("localhost"), 12345);
			
			if(success)
				Logger::outputDebugString(T("connect succeeded"));
			else
			{
				deleteAndZero(socket);
				socketServer = new StreamingSocket();
				socketServer->createListener(12345);
				Logger::outputDebugString(T("listening..."));
				
				{
					ScopedUnlock sul(lock);
					socket = socketServer->waitForNextConnection();
				}
				
				deleteAndZero(socketServer);
				
				if(socket != 0)
				{
					Logger::outputDebugPrintf(T("connection received, %p"), socket);
					sliderValueChanged(dataSlider);
				}
				else
					Logger::outputDebugString(T("disconnecting"));
			}
			
		}
		else if(socket != 0 && socket->isConnected())
		{
			const ScopedLock sl(lock);
			
			if(socket->waitUntilReady(true, 0) == 1)
			{
				MemoryBlock data(BUFFERSIZE, true);
									
				int bytesRead = socket->read(data.getData(), data.getSize());
				
				if(bytesRead == -1)
				{
					Logger::outputDebugString(T("disconnected"));
					if(socket) deleteAndZero(socket);
				}
				else
				{
					Logger::outputDebugPrintf(T("bytes read = %d"), bytesRead);
					String msg = data.toString();			
					Logger::outputDebugString(String("data read = ") + msg);
					dataSliderValue = msg.getFloatValue();
				}
			}
		}
		else
		{
			Logger::outputDebugString(T("shouldn't happen!!"));
			deleteAndZero(socket);
		}
		
	}
}

void timerCallback()
{
	ScopedLock sl(lock);
	if(dataSlider->getValue() != dataSliderValue) {
		dataSlider->setValue(dataSliderValue, false);
	}
}

};[/code]

I’ve had problems with MessageManagerLock before, am I doing something wrong? Doesn’t seem to be deadlocking, the thread still runs. (BTW I know there’s a rounding error on the sliders…)