00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifndef MESSAGE_H
00029 #define MESSAGE_H
00030
00031 #include <pthread.h>
00032
00033 #include <libplayercore/player.h>
00034
00035
00036
00037
00038
00039 class MessageQueue;
00040
00055 class Message
00056 {
00057 public:
00059 Message(const struct player_msghdr & Header,
00060 const void* data,
00061 unsigned int data_size,
00062 MessageQueue* _queue = NULL);
00064 Message(const Message & rhs);
00065
00067 ~Message();
00068
00074 static bool MatchMessage(player_msghdr_t* hdr,
00075 int type,
00076 int subtype,
00077 player_devaddr_t addr)
00078 {
00079 return(((type < 0) || (hdr->type == (uint8_t)type)) &&
00080 (hdr->subtype == (uint8_t)subtype) &&
00081 (hdr->addr.host == addr.host) &&
00082 (hdr->addr.robot == addr.robot) &&
00083 (hdr->addr.interf == addr.interf) &&
00084 (hdr->addr.index == addr.index));
00085 }
00086
00088 void* GetData() {return (void*)Data;};
00090 player_msghdr_t * GetHeader() {return reinterpret_cast<player_msghdr_t *> (Data);};
00092 void* GetPayload() {return (void*)(&Data[sizeof(player_msghdr_t)]);};
00094 size_t GetPayloadSize() {return Size - sizeof(player_msghdr_t);};
00096 unsigned int GetSize() {return Size;};
00098 bool Compare(Message &other);
00100 void DecRef();
00102 void SetReady () { ready = true; }
00104 bool Ready (void) const { return ready; }
00105
00107 MessageQueue* Queue;
00108
00110 unsigned int * RefCount;
00111
00112 private:
00114 uint8_t * Data;
00116 unsigned int Size;
00118 pthread_mutex_t * Lock;
00120 bool ready;
00121 };
00122
00126 class MessageQueueElement
00127 {
00128 public:
00130 MessageQueueElement();
00132 ~MessageQueueElement();
00133
00135 Message* msg;
00136 private:
00138 MessageQueueElement * prev;
00140 MessageQueueElement * next;
00141
00142 friend class MessageQueue;
00143 };
00144
00151 class MessageReplaceRule
00152 {
00153 private:
00154
00155
00156 int host, robot, interf, index;
00157
00158 int type, subtype;
00159 public:
00160 MessageReplaceRule(int _host, int _robot, int _interf, int _index,
00161 int _type, int _subtype, bool _replace) :
00162 host(_host), robot(_robot), interf(_interf), index(_index),
00163 type(_type), subtype(_subtype), replace(_replace), next(NULL) {}
00164
00165 bool Match(player_msghdr_t* hdr)
00166 {
00167 return(((this->host < 0) ||
00168 ((uint32_t)this->host == hdr->addr.host)) &&
00169 ((this->robot < 0) ||
00170 ((uint32_t)this->robot == hdr->addr.robot)) &&
00171 ((this->interf < 0) ||
00172 ((uint16_t)this->interf == hdr->addr.interf)) &&
00173 ((this->index < 0) ||
00174 ((uint16_t)this->index == hdr->addr.index)) &&
00175 ((this->type < 0) ||
00176 ((uint8_t)this->type == hdr->type)) &&
00177 ((this->subtype < 0) ||
00178 ((uint8_t)this->subtype == hdr->subtype)));
00179 }
00180
00181 bool Equivalent (int _host, int _robot, int _interf, int _index, int _type, int _subtype)
00182 {
00183 return (host == _host && robot == _robot && _interf && index == _index &&
00184 type == _type && subtype == _subtype);
00185 }
00186
00187
00188
00189 bool replace;
00190
00191 MessageReplaceRule* next;
00192 };
00193
00245 class MessageQueue
00246 {
00247 public:
00249 MessageQueue(bool _Replace, size_t _Maxlen);
00251 ~MessageQueue();
00253 bool Empty() { return(this->head == NULL); }
00256 MessageQueueElement * Push(Message& msg);
00260 Message* Pop();
00265 Message* PopReady (void);
00269 void SetReplace(bool _Replace) { this->Replace = _Replace; };
00276 void AddReplaceRule(int _host, int _robot, int _interf, int _index,
00277 int _type, int _subtype, bool _replace);
00283 void AddReplaceRule(const player_devaddr_t &device,
00284 int _type, int _subtype, bool _replace);
00287 bool CheckReplace(player_msghdr_t* hdr);
00290 void Wait(void);
00293 void DataAvailable(void);
00295 bool Filter(Message& msg);
00297 void ClearFilter(void);
00299 void SetFilter(int host, int robot, int interf, int index,
00300 int type, int subtype);
00303 void SetPull (bool _pull) { this->pull = _pull; }
00305 void MarkAllReady (void);
00306 private:
00308 void Lock() {pthread_mutex_lock(&lock);};
00310 void Unlock() {pthread_mutex_unlock(&lock);};
00313 void Remove(MessageQueueElement* el);
00315 MessageQueueElement* head;
00317 MessageQueueElement* tail;
00319 pthread_mutex_t lock;
00321 size_t Maxlen;
00323 MessageReplaceRule* replaceRules;
00326 bool Replace;
00328 size_t Length;
00332 pthread_cond_t cond;
00334 pthread_mutex_t condMutex;
00336 bool filter_on;
00337 int filter_host, filter_robot, filter_interf,
00338 filter_index, filter_type, filter_subtype;
00341 bool pull;
00342 };
00343
00344 #endif