eneter.messaging.messagingsystems.composites.monitoredmessagingcomposit
Class MonitoredMessagingFactory

java.lang.Object
  extended by eneter.messaging.messagingsystems.composites.monitoredmessagingcomposit.MonitoredMessagingFactory
All Implemented Interfaces:
IMessagingSystemFactory

public class MonitoredMessagingFactory
extends java.lang.Object
implements IMessagingSystemFactory

Extension providing the connection monitoring. The monitored messaging regularly monitors if the connection is still available. It sends ping messages and receives ping messages in a defined frequency. If sending of the ping message fails or the ping message is not received within the specified time the connection is considered broken.
The advantage of the monitored messaging is that the disconnection can be detected very early.

When the connection is monitored, the duplex output channel periodically sends 'ping' messages to the duplex input channel and waits for responses. If the response comes within the specified timeout, the connection is open.
On the receiver side, the duplex input channel waits for the 'ping' messages and monitors if the connected duplex output channel is still alive. If the 'ping' message does not come within the specified timeout, the particular duplex output channel is disconnected.

Note Channels created by monitored messaging factory cannot communicate with channels, that were not created by monitored factory. E.g. the channel created with the monitored messaging factory with underlying TCP will not communicate with channels created directly with TCP messaging factory. The reason is, the communicating channels must understand the 'ping' communication.

The following example shows how to use monitored messaging:

 // Create TCP messaging.
 IMessagingSystemFactory anUnderlyingMessaging = new TcpMessagingSystemFactory();
 
// Create monitored messaging which internally uses TCP. IMessagingSystemFactory aMessaging = new MonitoredMessagingSystemFactory(anUnderlyingMessaging);
// Create the output channel. IDuplexOutputChannel anOutputChannel = aMessaging.createDuplexOutputChannel("tcp://127.0.0.1:8045/");
// Create message sender to send simple string messages. IDuplexStringMessagesFactory aSenderFactory = new DuplexStringMessagesFactory(); IDuplexStringMessageSender aSender = aSenderFactory.CreateDuplexStringMessageSender();
// Subscribe to detect the disconnection. aSender.connectionClosed().subscribe(myOnConnectionClosed);
// Subscribe to receive responses. aSender.responseReceived().subscribe(myOnResponseReceived);
// Attach output channel an be able to send messages and receive responses. aSender.attachDuplexOutputChannel(anOutputChannel);
...
// Send a message. aSender.SendMessage("Hello.");


Constructor Summary
MonitoredMessagingFactory(IMessagingSystemFactory underlyingMessaging)
          Constructs the factory with default settings.
MonitoredMessagingFactory(IMessagingSystemFactory underlyingMessaging, long pingFrequency, long pingReceiveTimeout)
          Constructs the factory from specified parameters.
 
Method Summary
 IDuplexInputChannel createDuplexInputChannel(java.lang.String channelId)
          Creates the input channel which can receive messages from the output channel and send response messages.
 IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId)
          Creates the output channel which can send messages to the input channel and receive response messages.
 IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId, java.lang.String responseReceiverId)
          Creates the output channel which can send messages to the input channel and receive response messages.
 IThreadDispatcherProvider getInputChannelThreading()
          Gets threading mode used for input channels.
 IThreadDispatcherProvider getOutputChannelThreading()
          Gets threading mode used for output channels.
 long getPingFrequency()
          Gets the ping frequency.
 long getReceiveTimeout()
          Gets the time within it the ping message must be received.
 ISerializer getSerializer()
          Gets the serializer which is used to serialize/deserialize MonitorChannelMessage.
 MonitoredMessagingFactory setInputChannelThreading(IThreadDispatcherProvider inputChannelThreading)
          Sets threading mode for input channels.
 MonitoredMessagingFactory setOutputChannelThreading(IThreadDispatcherProvider outputChannelThreading)
          Sets threading mode for output channels.
 MonitoredMessagingFactory setPingFrequency(long milliseconds)
          Sets how often the ping message shall be sent.
 MonitoredMessagingFactory setReceiveTimeout(long milliseconds)
          Sets the time within it the ping message must be received.
 MonitoredMessagingFactory setSerializer(ISerializer pingSerializer)
          Sets the serializer which shall be used to serialize MonitorChannelMessage.
 
Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

MonitoredMessagingFactory

public MonitoredMessagingFactory(IMessagingSystemFactory underlyingMessaging)
Constructs the factory with default settings. It uses optimized custom serializer which is optimized to serialize/deserialize MonitorChannelMessage which is used for the internal communication between output and input channels. The ping message is sent once per second and it is expected the ping message is received at least once per two seconds.

Parameters:
underlyingMessaging - underlying messaging system e.g. Websocket, TCP, ...

MonitoredMessagingFactory

public MonitoredMessagingFactory(IMessagingSystemFactory underlyingMessaging,
                                 long pingFrequency,
                                 long pingReceiveTimeout)
Constructs the factory from specified parameters.

Parameters:
underlyingMessaging - underlying messaging system e.g. Websocket, TCP, ...
pingFrequency - how often the ping message is sent.
pingReceiveTimeout - the maximum time within it the ping message must be received.
Method Detail

createDuplexOutputChannel

public IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId)
                                               throws java.lang.Exception
Creates the output channel which can send messages to the input channel and receive response messages. In addition the output channel monitors the connection availability. It sends ping messages in a specified frequency to the input channel and expects receiving ping messages within a specified time.

Specified by:
createDuplexOutputChannel in interface IMessagingSystemFactory
Parameters:
channelId - address of the input channel.
Returns:
output channel
Throws:
java.lang.Exception

createDuplexOutputChannel

public IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId,
                                                      java.lang.String responseReceiverId)
                                               throws java.lang.Exception
Creates the output channel which can send messages to the input channel and receive response messages. In addition the output channel monitors the connection availability. It sends ping messages in a specified frequency to the input channel and expects receiving ping messages within a specified time.

Specified by:
createDuplexOutputChannel in interface IMessagingSystemFactory
Parameters:
channelId - address of the input channel.
responseReceiverId - unique identifier of the output channel. If the value is null then the identifier is genearated automatically
Returns:
duplex output channel
Throws:
java.lang.Exception

createDuplexInputChannel

public IDuplexInputChannel createDuplexInputChannel(java.lang.String channelId)
                                             throws java.lang.Exception
Creates the input channel which can receive messages from the output channel and send response messages. In addition it expects receiving ping messages from each connected client within a specified time and sends ping messages to each connected client in a specified frequency.

Specified by:
createDuplexInputChannel in interface IMessagingSystemFactory
Parameters:
channelId - address of the input channel.
Returns:
input channel
Throws:
java.lang.Exception

setPingFrequency

public MonitoredMessagingFactory setPingFrequency(long milliseconds)
Sets how often the ping message shall be sent.

Parameters:
milliseconds - time in milliseconds
Returns:
this MonitoredMessagingFactory

getPingFrequency

public long getPingFrequency()
Gets the ping frequency.

Returns:
time in milliseconds.

setReceiveTimeout

public MonitoredMessagingFactory setReceiveTimeout(long milliseconds)
Sets the time within it the ping message must be received.

Parameters:
milliseconds - time in milliseconds
Returns:
this MonitoredMessagingFactory

getReceiveTimeout

public long getReceiveTimeout()
Gets the time within it the ping message must be received.

Returns:
time in milliseconds.

setSerializer

public MonitoredMessagingFactory setSerializer(ISerializer pingSerializer)
Sets the serializer which shall be used to serialize MonitorChannelMessage.

Parameters:
pingSerializer - serializer.
Returns:
this MonitoredMessagingFactory

setInputChannelThreading

public MonitoredMessagingFactory setInputChannelThreading(IThreadDispatcherProvider inputChannelThreading)
Sets threading mode for input channels.

Parameters:
inputChannelThreading - threading model
Returns:
this TcpMessagingSystemFactory

getInputChannelThreading

public IThreadDispatcherProvider getInputChannelThreading()
Gets threading mode used for input channels.

Returns:
thread dispatcher

setOutputChannelThreading

public MonitoredMessagingFactory setOutputChannelThreading(IThreadDispatcherProvider outputChannelThreading)
Sets threading mode for output channels.

Parameters:
outputChannelThreading -
Returns:
this TcpMessagingSystemFactory

getOutputChannelThreading

public IThreadDispatcherProvider getOutputChannelThreading()
Gets threading mode used for output channels.

Returns:
thread dispatcher

getSerializer

public ISerializer getSerializer()
Gets the serializer which is used to serialize/deserialize MonitorChannelMessage.

Returns:
serializer