00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef FXIPC_H
00023 #define FXIPC_H
00024
00025 #include "FXException.h"
00026 #include "QThread.h"
00027 #include "FXStream.h"
00028
00029 namespace FX {
00030
00035 class QIODeviceS;
00036
00184 struct FXIPCMsg
00185 {
00186 friend class FXIPCChannel;
00187 friend class FXIPCChannelIndirector;
00188 enum Flags
00189 {
00190 FlagsWantAck=1,
00191 FlagsGZipped=2,
00192 FlagsHasRouting=4,
00193 FlagsIsBigEndian=8
00194 };
00195 private:
00196
00197 FXuint len;
00198 FXuint crc;
00199 FXuint type;
00200 FXuint myid;
00201 FXuchar mymsgrev;
00202 FXuchar myflags;
00203 FXuint myrouting;
00204 private:
00205 FXuchar *myoriginaldata;
00206 protected:
00207 FXIPCMsg(FXuint _type, FXuchar _msgrev=0) : len(0), crc(0), type(_type), myid(0), mymsgrev(_msgrev), myflags(FOX_BIGENDIAN*FlagsIsBigEndian), myrouting(0), myoriginaldata(0) { }
00208 FXIPCMsg(FXuint _type, FXuint _id, FXuchar _msgrev=0) : len(0), crc(0), type(_type), myid(_id), mymsgrev(_msgrev), myflags(FOX_BIGENDIAN*FlagsIsBigEndian), myrouting(0), myoriginaldata(0) { }
00209 FXIPCMsg(const FXIPCMsg &o, FXuint _type=0) : len(o.len), crc(o.crc), type(_type!=0 ? _type : o.type), myid(o.myid), mymsgrev(o.mymsgrev), myflags(o.myflags), myrouting(o.myrouting), myoriginaldata(o.myoriginaldata) { }
00210 #ifdef FOXPYTHONDLL
00211 public:
00212 virtual ~FXIPCMsg() { }
00213 #else
00214 ~FXIPCMsg() { }
00215 #endif
00216 public:
00218 bool operator==(const FXIPCMsg &o) const throw() { return len==o.len && crc==o.crc && type==o.type && myid==o.myid && mymsgrev==o.mymsgrev && myflags==o.myflags && myrouting==o.myrouting; }
00220 bool operator!=(const FXIPCMsg &o) const throw() { return !(*this==o); }
00222 static const FXuint minHeaderLength=18;
00224 static const FXuint maxHeaderLength=22;
00226 int headerLength() const throw() { return minHeaderLength+((myflags & FlagsHasRouting) ? sizeof(FXuint) : 0); }
00228 FXuint length() const throw() { return len; }
00230 FXuint msgType() const throw() { return type; }
00232 bool hasAck() const throw() { return (type & 1)==0; }
00234 FXuint msgId() const throw() { return myid; }
00236 void setMsgId(FXuint id) throw() { myid=id; }
00238 bool wantsAck() const throw() { return (myflags & FlagsWantAck)!=0; }
00240 bool gzipped() const throw() { return (myflags & FlagsGZipped)!=0; }
00242 void setGZipped(bool v) throw() { if(v) myflags|=FlagsGZipped; else myflags&=~FlagsGZipped; }
00244 bool hasRouting() const throw() { return (myflags & FlagsHasRouting)!=0; }
00246 FXuint routing() const throw() { return myrouting; }
00248 void setRouting(FXuint no) throw() { myrouting=no; myflags|=FlagsHasRouting; }
00250 bool inBigEndian() const throw() { return (myflags & FlagsIsBigEndian)!=0; }
00256 FXuchar *originalData() const throw() { return myoriginaldata; }
00258 void write(FXStream &s) const { s << len << crc << type << myid << mymsgrev << myflags; if(myflags & FlagsHasRouting) s << myrouting; }
00260 void read(FXStream &s) { s >> len >> crc >> type >> myid >> mymsgrev >> myflags; if(myflags & FlagsHasRouting) s >> myrouting; }
00261 private:
00262
00263 FXDLLLOCAL FXIPCMsg();
00264 };
00266 struct FXIPCMsgHolder : public FXIPCMsg
00267 {
00268 };
00269 #ifdef __GNUC__
00270
00271 namespace Generic {
00272 template<> struct convertible<int, FXIPCMsg> { enum { value=false }; };
00273 }
00274 #endif
00275
00280 struct FXIPCMsgRegistryPrivate;
00281 class FXAPI FXIPCMsgRegistry
00282 {
00283 public:
00284 template<class codealloc, class msgtype> friend struct FXIPCMsgRegister;
00285 typedef void (*deendianiseSpec)(FXIPCMsg *, FXStream &);
00286 typedef FXIPCMsg *(*makeMsgSpec)();
00287 typedef void (*delMsgSpec)(FXIPCMsg *);
00288 private:
00289 FXuint magic;
00290 FXIPCMsgRegistryPrivate *p;
00291 void int_register(FXuint code, deendianiseSpec deendianise, makeMsgSpec makeMsg, delMsgSpec delMsg, Generic::typeInfoBase &ti);
00292 void int_deregister(FXuint code);
00293 FXIPCMsgRegistry(const FXIPCMsgRegistry &);
00294 FXIPCMsgRegistry &operator=(const FXIPCMsgRegistry &);
00295 public:
00296 FXIPCMsgRegistry();
00297 ~FXIPCMsgRegistry();
00299 bool isValid() const { return magic==*(FXuint *)"PCMR"; }
00301 bool lookup(FXuint code) const;
00303 bool lookup(deendianiseSpec &deendianise, makeMsgSpec &makeMsg, delMsgSpec &delMsg, FXuint code) const;
00305 const FXString &decodeType(FXuint code) const;
00306 };
00307
00308 template<unsigned int basecode, bool _hasAck> struct FXIPCMsgChunkCodeAlloc
00309 {
00310 enum { hasAck=_hasAck, code=basecode+(_hasAck ? 0 : 1), nextcode=basecode+2 };
00311 };
00316 #ifdef _MSC_VER
00317 #pragma warning(push)
00318 #pragma warning(disable : 4355) // this used in base member init
00319 #endif
00320 template<typename msgtypelist> class FXIPCMsgChunk
00321 {
00322 FXIPCMsgChunk(const FXIPCMsgChunk &);
00323 FXIPCMsgChunk &operator=(const FXIPCMsgChunk &);
00324 protected:
00325 FXIPCMsgRegistry *myregistry;
00326 typedef msgtypelist MsgTypeList;
00327 Generic::TL::instantiateH<msgtypelist> registrants;
00328 public:
00329 FXIPCMsgChunk(FXIPCMsgRegistry *registry) : myregistry(registry), registrants(myregistry) { }
00331 FXIPCMsgRegistry ®istry() const throw() { return *myregistry; }
00332 enum { BaseCode=Generic::TL::at<msgtypelist, 0>::value::MsgType::id::code };
00334 FXuint baseCode() const throw() { return BaseCode; }
00335 enum { EndCode=Generic::TL::at<msgtypelist, Generic::TL::length<msgtypelist>::value-1>::value::MsgType::id::code };
00337 FXuint endCode() const throw() { return EndCode; }
00338 };
00339 #ifdef _MSC_VER
00340 #pragma warning(pop)
00341 #endif
00342
00347 template<class codealloc, class msgtype> struct FXIPCMsgRegister
00348 {
00349 FXIPCMsgRegistry ®istry;
00350 Generic::typeInfo<msgtype> typeinfo;
00351 typedef msgtype MsgType;
00352 static void endianise(FXIPCMsg *msg, FXStream &s) { static_cast<msgtype *>(msg)->endianise(s); }
00353 static void deendianise(FXIPCMsg *msg, FXStream &s) { static_cast<msgtype *>(msg)->deendianise(s); }
00354 static FXIPCMsg *makeMsg() { return new msgtype; }
00355 static void delMsg(FXIPCMsg *ptr) { delete static_cast<msgtype *>(ptr); }
00356 FXIPCMsgRegister(FXIPCMsgRegistry *_registry) : registry(*_registry)
00357 {
00358 registry.int_register(codealloc::code, &deendianise, &makeMsg, &delMsg, typeinfo);
00359 }
00360 ~FXIPCMsgRegister()
00361 {
00362 registry.int_deregister(codealloc::code);
00363 }
00364 private:
00365 FXIPCMsgRegister(const FXIPCMsgRegister &);
00366 FXIPCMsgRegister &operator=(const FXIPCMsgRegister &);
00367 };
00368
00369 typedef FXIPCMsgChunkCodeAlloc<0, true> FXIPCMsgChunkStandardBegin;
00377 struct FXIPCMsg_Disconnect : public FXIPCMsg
00378 {
00379 typedef FXIPCMsgChunkCodeAlloc<FXIPCMsgChunkStandardBegin::code, false> id;
00380 typedef FXIPCMsgRegister<id, FXIPCMsg_Disconnect> regtype;
00381 FXIPCMsg_Disconnect() : FXIPCMsg(id::code) { }
00382 void endianise(FXStream &ds) const { }
00383 void deendianise(FXStream &ds) { }
00384 };
00393 struct FXIPCMsg_Unhandled : public FXIPCMsg
00394 {
00395 typedef FXIPCMsgChunkCodeAlloc<FXIPCMsg_Disconnect::id::nextcode, false> id;
00396 typedef FXIPCMsgRegister<id, FXIPCMsg_Unhandled> regtype;
00397 FXuchar cause;
00398 FXIPCMsg_Unhandled(FXuint _id=0, FXuchar _cause=0) : FXIPCMsg(id::code, _id), cause(_cause) { }
00399 void endianise(FXStream &ds) const { ds << cause; }
00400 void deendianise(FXStream &ds) { ds >> cause; }
00401 };
00409 struct FXIPCMsg_ErrorOccurred : public FXIPCMsg
00410 {
00411 typedef FXIPCMsgChunkCodeAlloc<FXIPCMsg_Unhandled::id::nextcode, false> id;
00412 typedef FXIPCMsgRegister<id, FXIPCMsg_ErrorOccurred> regtype;
00413 FXString message;
00414 FXuint code;
00415 FXuint flags;
00416 FXIPCMsg_ErrorOccurred() : FXIPCMsg(id::code), code(0), flags(0) { }
00417 FXIPCMsg_ErrorOccurred(FXuint _id, FXException &e) : FXIPCMsg(id::code, _id)
00418 {
00419 #ifdef DEBUG
00420 message=e.report();
00421 #else
00422 message=e.message();
00423 #endif
00424 code=e.code();
00425 flags=e.flags();
00426 }
00427 void endianise(FXStream &ds) const { ds << message << code << flags; }
00428 void deendianise(FXStream &ds) { ds >> message >> code >> flags; }
00429 };
00430 typedef FXIPCMsgChunk<Generic::TL::create<
00431 FXIPCMsg_Disconnect::regtype,
00432 FXIPCMsg_Unhandled::regtype,
00433 FXIPCMsg_ErrorOccurred::regtype
00434 >::value> FXIPCMsgChunkStandard;
00435
00436
00596 struct FXIPCChannelPrivate;
00597 template<class type> class QPtrVector;
00598 class FXAPI FXIPCChannel : public QMutex,
00599 #if !defined(BOOST_PYTHON_SOURCE) && !defined(FX_RUNNING_PYSTE)
00600
00601
00602 protected QThread
00603 {
00604 #else
00605 public QThread
00606 {
00607 #endif
00608 FXIPCChannelPrivate *p;
00609 FXIPCChannel(const FXIPCChannel &);
00610 FXIPCChannel &operator=(const FXIPCChannel &);
00611 public:
00613 FXIPCChannel(FXIPCMsgRegistry ®istry, QIODeviceS *dev, bool peerUntrusted=false, QThreadPool *threadPool=0, const char *threadname="IPC channel monitor");
00614 ~FXIPCChannel();
00616 FXIPCMsgRegistry ®istry() const;
00618 void setRegistry(FXIPCMsgRegistry ®istry);
00620 QIODeviceS *device() const;
00622 void setDevice(QIODeviceS *dev);
00624 QThreadPool *threadPool() const;
00626 void setThreadPool(QThreadPool *threadPool);
00628 bool unreliable() const;
00630 void setUnreliable(bool v);
00632 bool compression() const;
00634 void setCompression(bool v);
00636 bool active() const;
00638 void reset();
00640 enum EndianConversionKinds
00641 {
00642 AlwaysLittleEndian,
00643 AlwaysBigEndian,
00644 AutoEndian
00645 };
00647 EndianConversionKinds endianConversion() const;
00649 void setEndianConversion(EndianConversionKinds kind);
00651 bool errorTranslation() const;
00653 void setErrorTranslation(bool v);
00655 bool peerUntrusted() const;
00657 void setPeerUntrusted(bool v);
00659 FXuint maxMsgSize() const;
00661 void setMaxMsgSize(FXuint newsize);
00663 FXuint garbageMessageCount() const;
00665 void setGarbageMessageCount(FXuint newsize);
00667 void setPrintStatistics(bool v);
00669 void requestClose();
00670
00671
00672 using QThread::name;
00673 using QThread::wait;
00674 using QThread::start;
00675 using QThread::finished;
00676 using QThread::running;
00677 using QThread::inCleanup;
00678 using QThread::isValid;
00679 using QThread::setAutoDelete;
00680 using QThread::creator;
00681 using QThread::priority;
00682 using QThread::setPriority;
00683 using QThread::processorAffinity;
00684 using QThread::setProcessorAffinity;
00685 using QThread::addCleanupCall;
00686 using QThread::removeCleanupCall;
00687
00689 enum HandledCode
00690 {
00691 NotHandled=0,
00692 Handled=1,
00693 HandledAsync=2
00694 };
00697 bool doReception(FXuint waitfor=FXINFINITE);
00701 QPtrVector<FXIPCMsgHolder> ackedMsgs() const;
00703 typedef Generic::Functor<Generic::TL::create<HandledCode, FXIPCMsg *>::value> MsgFilterSpec;
00707 void installPreMsgReceivedFilter(MsgFilterSpec filter);
00709 bool removePreMsgReceivedFilter(MsgFilterSpec filter);
00710
00711 typedef void (*endianiseSpec)(FXIPCMsg *, FXStream &);
00713 bool sendMsgI(FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, endianiseSpec endianise, FXuint waitfor);
00729 template<class msgacktype, class msgtype> bool sendMsg(msgacktype *FXRESTRICT msgack, msgtype *FXRESTRICT msg, FXuint waitfor=FXINFINITE)
00730 {
00731 FXSTATIC_ASSERT(!msgtype::id::hasAck || msgtype::id::code+1==msgacktype::id::code, AckMsg_Not_Ack_Of_Msg);
00732 return sendMsgI(msgack, msg, &msgtype::regtype::endianise, waitfor);
00733 }
00735 template<class msgacktype, class msgtype> bool sendMsg(msgacktype &msgack, msgtype &msg, FXuint waitfor=FXINFINITE)
00736 {
00737 FXSTATIC_ASSERT(!msgtype::id::hasAck || msgtype::id::code+1==msgacktype::id::code, AckMsg_Not_Ack_Of_Msg);
00738 return sendMsgI(&msgack, &msg, &msgtype::regtype::endianise, waitfor);
00739 }
00741 template<class msgtype> bool sendMsg(msgtype *msg)
00742 {
00743 FXSTATIC_ASSERT(!msgtype::id::hasAck, Cannot_Send_Msgs_With_Ack);
00744 return sendMsgI(0, msg, &msgtype::regtype::endianise, 0);
00745 }
00747 template<class msgtype> bool sendMsg(msgtype &msg)
00748 {
00749 FXSTATIC_ASSERT(!msgtype::id::hasAck, Cannot_Send_Msgs_With_Ack);
00750 return sendMsgI(0, &msg, &msgtype::regtype::endianise, 0);
00751 }
00757 bool getMsgAck(FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, FXuint waitfor=FXINFINITE);
00759 bool getMsgAck(FXIPCMsg &msgack, FXIPCMsg &msg, FXuint waitfor=FXINFINITE)
00760 {
00761 return getMsgAck(&msgack, &msg, waitfor);
00762 }
00765 FXuint makeUniqueMsgId();
00770 bool restampMsgAndSend(FXuchar *rawmsg, FXIPCMsg *msgheader);
00771 protected:
00774 void doAsyncHandled(FXIPCMsg *msg, HandledCode handled);
00777 void doAsyncHandled(FXIPCMsg *msg, FXException &e);
00782 virtual HandledCode msgReceived(FXIPCMsg *msg)=0;
00789 virtual HandledCode unknownMsgReceived(FXIPCMsg *msgheader, FXuchar *buffer);
00797 virtual HandledCode lonelyMsgAckReceived(FXIPCMsg *msgheader, FXuchar *buffer);
00800 void forceClose();
00805 template<typename fntype, typename msgtype> HandledCode invokeMsgHandler(fntype fnptr, msgtype *msg)
00806 {
00807 typedef typename Generic::FnInfo<fntype>::objectType objType;
00808 FXSTATIC_ASSERT((Generic::convertible<FXIPCChannel, objType>::value), Must_Be_Member_Function_Of_FXIPCChannel_subclass);
00809 typedef typename Generic::TL::create<void, fntype, FXAutoPtr<msgtype>, Generic::BoundFunctorV *>::value functtype;
00810 FXAutoPtr< Generic::BoundFunctor<functtype> > callv;
00811
00812 void (FXIPCChannel::*mythunk)(fntype, FXAutoPtr<msgtype>, Generic::BoundFunctorV *)=&FXIPCChannel::int_handlerIndirect<objType, fntype, msgtype>;
00813 FXERRHM(callv=new Generic::BoundFunctor<functtype>(Generic::Functor<functtype>(this, mythunk)));
00814 Generic::TL::instance<0>(callv->parameters()).value=fnptr;
00815 Generic::TL::instance<1>(callv->parameters()).value=msg;
00816 Generic::TL::instance<2>(callv->parameters()).value=PtrPtr(callv);
00817 int_addMsgHandler(PtrPtr(callv));
00818 PtrRelease(callv);
00819 return HandledAsync;
00820 }
00822 virtual void run() { while(active()) doReception(); }
00824 virtual void *cleanup();
00825 private:
00826 FXDLLLOCAL HandledCode invokeMsgReceived(MsgFilterSpec *what, FXIPCMsg *msg, FXIPCMsg &tmsg);
00827 inline FXDLLLOCAL void removeAck(void *ae);
00828 inline FXDLLLOCAL FXuint int_makeUniqueMsgId();
00829 Generic::BoundFunctorV *int_addMsgHandler(FXAutoPtr<Generic::BoundFunctorV> v);
00830 bool int_removeMsgHandler(Generic::BoundFunctorV *v);
00831 template<typename myrealtype, typename fntype, typename msgtype> void int_handlerIndirect(fntype fnptr, FXAutoPtr<msgtype> msg, Generic::BoundFunctorV *v)
00832 {
00833 int_removeMsgHandler(v);
00834 FXERRH_TRY
00835 {
00836 doAsyncHandled(PtrPtr(msg), (static_cast<myrealtype &>(*this).*fnptr)(PtrPtr(msg)));
00837 }
00838 FXERRH_CATCH(FXException &e)
00839 {
00840 if(msg->hasAck())
00841 doAsyncHandled(PtrPtr(msg), e);
00842
00843 }
00844 FXERRH_ENDTRY
00845 }
00846 };
00847
00848
00859 class FXIPCChannelIndirector
00860 {
00861 FXIPCChannel *mychannel;
00862 FXuint myMsgChunk, myrouting;
00863 bool (FXIPCChannel::*sendMsgI)(FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, FXIPCChannel::endianiseSpec endianise, FXuint waitfor);
00864 bool (FXIPCChannel::*getMsgAckI)(FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, FXuint waitfor);
00865
00866
00867
00868
00869 inline void configMsg(FXIPCMsg *msg) const throw()
00870 {
00871 msg->type+=myMsgChunk;
00872 if(myrouting) msg->setRouting(myrouting);
00873 }
00874 protected:
00875 FXIPCChannelIndirector() : mychannel(0), myMsgChunk(0), myrouting(0), sendMsgI(0), getMsgAckI(0) { }
00877 FXIPCChannel *channel() const throw() { return mychannel; }
00879 FXuint msgChunk() const throw() { return myMsgChunk; }
00881 FXuint msgRouting() const throw() { return myrouting; }
00884 template<class msgchunk, class channel> void setIPCChannel(channel *_channel, FXuint addToMsgType=msgchunk::BaseCode, FXuint _routing=0)
00885 {
00886 mychannel=_channel;
00887 myMsgChunk=addToMsgType;
00888 myrouting=_routing;
00889 sendMsgI=&channel::sendMsgI;
00890 getMsgAckI=&channel::getMsgAck;
00891 }
00894 template<class msgacktype, class msgtype> bool sendMsg(msgacktype *FXRESTRICT msgack, msgtype *FXRESTRICT msg, FXuint waitfor=FXINFINITE)
00895 {
00896 FXSTATIC_ASSERT(!msgtype::id::hasAck || msgtype::id::code+1==msgacktype::id::code, AckMsg_Not_Ack_Of_Msg);
00897 configMsg(msg);
00898
00899 return ((*mychannel).*sendMsgI)(msgack, msg, &msgtype::regtype::endianise, waitfor);
00900 }
00902 template<class msgacktype, class msgtype> bool sendMsg(msgacktype &msgack, msgtype &msg, FXuint waitfor=FXINFINITE)
00903 {
00904 FXSTATIC_ASSERT(!msgtype::id::hasAck || msgtype::id::code+1==msgacktype::id::code, AckMsg_Not_Ack_Of_Msg);
00905 configMsg(&msg);
00906
00907 return ((*mychannel).*sendMsgI)(&msgack, &msg, &msgtype::regtype::endianise, waitfor);
00908 }
00910 template<class msgtype> bool sendMsg(msgtype *msg)
00911 {
00912 FXSTATIC_ASSERT(!msgtype::id::hasAck, Cannot_Send_Msgs_With_Ack);
00913 configMsg(msg);
00914
00915 return ((*mychannel).*sendMsgI)(0, msg, &msgtype::regtype::endianise, 0);
00916 }
00918 template<class msgtype> bool sendMsg(msgtype &msg)
00919 {
00920 FXSTATIC_ASSERT(!msgtype::id::hasAck, Cannot_Send_Msgs_With_Ack);
00921 configMsg(&msg);
00922
00923 return ((*mychannel).*sendMsgI)(0, &msg, &msgtype::regtype::endianise, 0);
00924 }
00926 bool getMsgAck(FXIPCMsg *FXRESTRICT msgack, FXIPCMsg *FXRESTRICT msg, FXuint waitfor=FXINFINITE)
00927 {
00928 return ((*mychannel).*getMsgAckI)(msgack, msg, waitfor);
00929 }
00931 bool getMsgAck(FXIPCMsg &msgack, FXIPCMsg &msg, FXuint waitfor=FXINFINITE)
00932 {
00933 return ((*mychannel).*getMsgAckI)(&msgack, &msg, waitfor);
00934 }
00935 };
00936
00937
00938 }
00939
00940 #endif