#include <FXIPC.h>
Inheritance diagram for FX::FXIPCChannel:
While you could use FX::FXIPCMsg manually, chances are you'll use this base class which provides almost everything you need to implement a TnFOX IPC channel. While it was written with running a monitor thread to dispatch incoming messages asynchronously in mind, you can in fact just not start the thread and call doReception(false)
manually. Note that if you do this, you must send messages without waiting and retrieve their acks after the doReception() - also see ackedMsgs().
The two most interesting functions for average users will be sendMsg() and getMsgAck(). sendMsg() endianises all the requisite data and sets all header items of FXIPCMsg for you before writing to the transport device. If unreliable() is set, a slight amount of overhead is incurred to calculate the adler32 checksum of the message contents. If compression() is set, a great amount of overhead is incurred by compressing the message data using an internal FX::QGZipDevice - it makes no sense to use this except when the transport is very, very slow indeed (eg; a modem dialup). Note that one end can be compressed and the other not as indeed one end can be unreliable and the other not.
In order to be most efficient, FXIPCChannel in general extends on demand - but doesn't ever reduce the allocation. It trades off a small amount of execution speed for unlocking itself when sending data so that another thread can receive and dispatch incoming data during sends, thus improving performance. As of v0.80 of TnFOX, FXIPCChannel self-optimises endian conversion so that when like endian architectures are conversing, conversion is totally optimised away even when a message tunneller is of a different endian to the source and destination. You can still force one kind of endian or another which is especially useful for dumping message exchanges into a format you can read later.
Because of exception safety requirements, thread termination is disabled during most of doReception() - all except the first read from the device which is the one which normally does most of the waiting. After the initial header, the reading of the rest of the message cannot be interrupted. Sending also disables thread termination during the actual device write but it is enabled during getMsgAck() which again is where most of the waiting would be done. requestTermination() is not available publicly in order to encourage you to use requestClose() whenever possible - however it can be accessed by a method in your subclass.
You must subclass FXIPCChannel, implementing msgReceived() which is called to handle non-acknowledgement messages (acknowledgement messages are deendianised directly into the ack passed to getMsgAck()). msgReceived() is called in the context of the monitor thread and with the FXIPCChannel unlocked. The form of msgReceived() is obvious:
HandledCode msgReceived(FXIPCMsg *rawmsg) { switch(rawmsg->msgType()) { case MyMsg::id::code: { MyMsg *i=(MyMsg *) rawmsg; // Do processing ... if(i->wantsAck()) { MyMsgAck ia(i->msgId()); // Maybe set members in ia? sendMsg(ia); } return Handled; } ... return NotHandled; }
HandledAsync
after dispatching the heavy processing to a FX::QThreadPool, passing the message pointer which must now be deleted by you. Ensure you don't throw an exception before returning HandledAsync
in this case as FXIPCChannel will delete it on you.If you throw a FX::FXException or subclass during msgReceived(), it is automatically translated into a FX::FXIPCMsg_ErrorOccurred and sent if and only if the message you were handling has an acknowledgement. If you handle the message asynchronously, you must do this yourself if you don't use invokeMsgHandler(). The error is then thrown on the opposite end of the channel in the thread which called getMsgAck(). An exception to this operation is throwing a FX::FXConnectionLostException which is causes doReception() to return false ie; closes the channel.
Something you should note if the situation should arise is that if you are newer code and send a message which the other end of the IPC channel does not recognise, you will see a FX::FXException thrown by sendMsg() or getMsgAck() with the code FXIPCCHANNEL_UNHANDLED if that message has an ack. You should trap this situation with a FXERRH_TRY() etc.
From v0.80 of TnFOX some provision is also made for FXIPCChannel subclasses to tunnel messages from one remote end to another remote end and vice versa, even messages whose internal format is totally opaque (they must be a FX::FXIPCMsg subclass however). Each FXIPCMsg has an optional 32 bit routing number which is up to you to set so that it identifies the true destination of the message. It is assumed in this explanation that each end of a FXIPCChannel has its own independent set of routing numbers and thus four routing numbers would be required to operate a tunnel (this needn't be true - it's up to how you implement routing).
At the start of your msgReceived() you will need to see if the message's routing number is known (usually via a FX::QIntDict lookup). The lookup will yield the destination FXIPCChannel subclass instance & destination routing plus a mapping table of message id's within the destination FXIPCChannel back to message id's within the source FXIPCChannel. You then restamp the header of the message with the correct info for the destination channel and send it using restampMsgAndSend() and the FX::FXIPCMsg::originalData() method (this being more efficient than reserialising the message but also dependent on both channels using the same endianness). If the message has an ack, before restamping you should allocate a new message id using makeUniqueMsgId() in the destination channel and add a mapping of that id back to the id used by the message you received from the source. You then return HandledAsync
.
If you can be absolutely sure that you will never post to yourself and no more than the above is required, it is safe to post directly from within the msgReceived() handler so long as the destination channel isn't choked. If however you will be performing some work (eg; validating the message or substantially altering it) you will get better performance and avoid deadlocks by outsourcing the tunnelling to a thread pool using invokeMsgHandler().
Handling the routing of acks back through the tunnel can be tricky. Your subclass should implement lonelyMsgAckReceived() and from its message id see if any messages were tunnelled through using that message id. From this you can derive the original message id and routing within the original source channel and thus restamp and send it as previously.
Remember that FXIPCChannel never uses FXIPCMsg::routing() so you can use that for anything you like. And if you want to tunnel messages which have no registration within the carrying FXIPCChannel subclass, you should implement unknownMsgReceived().
Definition at line 598 of file FXIPC.h.
Public Types | |
typedef Generic::Functor< Generic::TL::create< HandledCode, FXIPCMsg * >::value > | MsgFilterSpec |
typedef void(*) | endianiseSpec (FXIPCMsg *, FXStream &) |
AlwaysLittleEndian | |
AlwaysBigEndian | |
AutoEndian | |
NotHandled | |
Handled | |
HandledAsync | |
enum | EndianConversionKinds { AlwaysLittleEndian, AlwaysBigEndian, AutoEndian } |
enum | HandledCode { NotHandled, Handled, HandledAsync } |
Public Member Functions | |
FXIPCChannel (FXIPCMsgRegistry ®istry, QIODeviceS *dev, bool peerUntrusted=false, QThreadPool *threadPool=0, const char *threadname="IPC channel monitor") | |
~FXIPCChannel () | |
FXIPCMsgRegistry & | registry () const |
void | setRegistry (FXIPCMsgRegistry ®istry) |
QIODeviceS * | device () const |
void | setDevice (QIODeviceS *dev) |
QThreadPool * | threadPool () const |
void | setThreadPool (QThreadPool *threadPool) |
bool | unreliable () const |
void | setUnreliable (bool v) |
bool | compression () const |
void | setCompression (bool v) |
bool | active () const |
void | reset () |
EndianConversionKinds | endianConversion () const |
void | setEndianConversion (EndianConversionKinds kind) |
bool | errorTranslation () const |
void | setErrorTranslation (bool v) |
bool | peerUntrusted () const |
void | setPeerUntrusted (bool v) |
FXuint | maxMsgSize () const |
void | setMaxMsgSize (FXuint newsize) |
FXuint | garbageMessageCount () const |
void | setGarbageMessageCount (FXuint newsize) |
void | setPrintStatistics (bool v) |
void | requestClose () |
bool | doReception (FXuint waitfor=FXINFINITE) |
QPtrVector< FXIPCMsgHolder > | ackedMsgs () const |
void | installPreMsgReceivedFilter (MsgFilterSpec filter) |
bool | removePreMsgReceivedFilter (MsgFilterSpec filter) |
bool | sendMsgI (FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, endianiseSpec endianise, FXuint waitfor) |
template<class msgacktype, class msgtype> | |
bool | sendMsg (msgacktype *FXRESTRICT msgack, msgtype *FXRESTRICT msg, FXuint waitfor=FXINFINITE) |
template<class msgacktype, class msgtype> | |
bool | sendMsg (msgacktype &msgack, msgtype &msg, FXuint waitfor=FXINFINITE) |
template<class msgtype> | |
bool | sendMsg (msgtype *msg) |
template<class msgtype> | |
bool | sendMsg (msgtype &msg) |
bool | getMsgAck (FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, FXuint waitfor=FXINFINITE) |
bool | getMsgAck (FXIPCMsg &msgack, FXIPCMsg &msg, FXuint waitfor=FXINFINITE) |
FXuint | makeUniqueMsgId () |
bool | restampMsgAndSend (FXuchar *rawmsg, FXIPCMsg *msgheader) |
QMUTEX_INLINEP bool | isLocked () const |
QMUTEX_INLINEP FXbool | locked () const |
QMUTEX_INLINEP FXuint | spinCount () const |
QMUTEX_INLINEP void | setSpinCount (FXuint c) |
QMUTEX_INLINEP void | lock () |
QMUTEX_INLINEP void | unlock () |
QMUTEX_INLINEP bool | tryLock () |
QMUTEX_INLINEP FXbool | trylock () |
Static Public Member Functions | |
static QMUTEX_INLINEP bool | setMutexDebugYield (bool v) |
Protected Types | |
typedef Generic::Functor< Generic::TL::create< void, QThread * >::value > | CreationUpcallSpec |
Auto | |
InProcess | |
InKernel | |
enum | ThreadScheduler { Auto, InProcess, InKernel } |
Protected Member Functions | |
void | doAsyncHandled (FXIPCMsg *msg, HandledCode handled) |
void | doAsyncHandled (FXIPCMsg *msg, FXException &e) |
virtual HandledCode | msgReceived (FXIPCMsg *msg)=0 |
virtual HandledCode | unknownMsgReceived (FXIPCMsg *msgheader, FXuchar *buffer) |
virtual HandledCode | lonelyMsgAckReceived (FXIPCMsg *msgheader, FXuchar *buffer) |
void | forceClose () |
template<typename fntype, typename msgtype> | |
HandledCode | invokeMsgHandler (fntype fnptr, msgtype *msg) |
virtual void | run () |
virtual void * | cleanup () |
const char * | name () const throw () |
FXuval | stackSize () const |
void | setStackSize (FXuval newsize) |
ThreadScheduler | threadLocation () const |
void | setThreadLocation (ThreadScheduler threadloc) |
bool | wait (FXuint time=FXINFINITE) |
void | start (bool waitTillStarted=false) |
bool | finished () const throw () |
bool | running () const throw () |
bool | inCleanup () const throw () |
bool | isValid () const throw () |
bool | setAutoDelete (bool doso) throw () |
void | requestTermination () |
FXulong | myId () const |
QThread * | creator () const |
signed char | priority () const |
void | setPriority (signed char pri) |
FXulong | processorAffinity () const |
void | setProcessorAffinity (FXulong mask=(FXulong)-1, bool recursive=false) |
virtual void | selfDestruct () |
void * | result () const throw () |
void | disableTermination () |
bool | checkForTerminate () |
void | enableTermination () |
Generic::BoundFunctorV * | addCleanupCall (FXAutoPtr< Generic::BoundFunctorV > handler, bool inThread=false) |
bool | removeCleanupCall (Generic::BoundFunctorV *handler) |
Static Protected Member Functions | |
static FXulong | id () throw () |
static QThread * | current () |
static QThread * | primaryThread () throw () |
static void | sleep (FXuint secs) |
static void | msleep (FXuint millisecs) |
static void | yield () |
static void | exit (void *retcode) |
static void | addCreationUpcall (CreationUpcallSpec upcallv, bool inThread=false) |
static bool | removeCreationUpcall (CreationUpcallSpec upcallv) |
static FXDLLLOCAL void * | int_cancelWaiterHandle () |