InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Mbox.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #ifndef INFINISQLMBOX_H
30 #define INFINISQLMBOX_H
31 
32 #include "Message.h"
33 #include "Topology.h"
34 
35 using std::vector;
36 using std::string;
37 
46 class Mbox
47 {
48 public:
49  Mbox();
50  virtual ~Mbox();
51 
59  class Message *receive(int timeout);
60 
69  static __int128 getInt128FromPointer(class Message *ptr, uint64_t count);
77  static class Message *getPtr(__int128 i128);
85  static uint64_t getCount(__int128 i128);
86 
87  friend class MboxProducer;
88 
89 private:
90  pthread_mutex_t mutexLast;
91 
92  class Message *firstMsg;
94  class Message *lastMsg;
95  class Message *myLastMsg; // not to be modified by producer
96 
97  __int128 head;
98  __int128 tail;
99  __int128 current;
100  __int128 mytail;
101 
102  uint64_t counter;
103 };
104 
109 {
110 public:
111  MboxProducer();
118  MboxProducer(class Mbox *mboxarg, int16_t nodeidarg);
119  virtual ~MboxProducer();
125  void sendMsg(class Message &msgsnd);
126 
127  class Mbox *mbox;
128  int16_t nodeid;
130  class Mboxes *mboxes;
131 
132  friend class Mboxes;
133 };
134 
139 class Mboxes
140 {
141 public:
146  struct location_s
147  {
150  };
151 
152  Mboxes();
158  Mboxes(int64_t nodeidarg);
159  virtual ~Mboxes();
160 
166  void update(class Topology &top);
173  void update(class Topology &top, int64_t myActorid);
181  void toActor(const Topology::addressStruct &source,
182  const Topology::addressStruct &dest, class Message &msg);
189  void toUserSchemaMgr(const Topology::addressStruct &source,
190  class Message &msg);
197  void toDeadlockMgr(const Topology::addressStruct &source,
198  class Message &msg);
206  void toPartition(const Topology::addressStruct &source, int64_t partitionid,
207  class Message &msg);
217  int64_t toAllOfType(actortypes_e type,
218  const Topology::addressStruct &source,
219  class Message &msg);
229  int64_t toAllOfTypeThisReplica(actortypes_e type,
230  const Topology::addressStruct &source,
231  class Message &msg);
235  void sendObBatch();
236 
237  int64_t nodeid;
238 
245  std::vector<class MboxProducer> transactionAgents;
246  std::vector<class MboxProducer> engines;
247 
248  // new
249  std::vector<class MboxProducer *> actoridToProducers;
250  std::vector<class MboxProducer *> transactionAgentPtrs;
251  std::vector<class MboxProducer *> enginePtrs;
257 
258  std::vector<location_s> partitionToProducers;
259  // allActors[nodeid][actorid] = actortype
260  std::vector< vector<int> > allActors;
261  boost::unordered_map< int16_t, std::vector<int> > allActorsThisReplica;
262 };
263 
264 // put this in each actor's class definition (except ObGw)
265 #define REUSEMESSAGES class Message reuseMessage; \
266  class MessageSocket reuseMessageSocket; \
267  class MessageUserSchema reuseMessageUserSchema; \
268  class MessageDeadlock reuseMessageDeadlock; \
269  class MessageSubtransactionCmd reuseMessageSubtransactionCmd; \
270  class MessageCommitRollback reuseMessageCommitRollback; \
271  class MessageDispatch reuseMessageDispatch; \
272  class MessageAckDispatch reuseMessageAckDispatch; \
273  class MessageApply reuseMessageApply; \
274  class MessageAckApply reuseMessageAckApply;
275 
276 // do this instead of mymbox.receive (except ObGw)
277 #define GETMSG(X, Y, Z) \
278  X=Y->receive(Z); \
279  if (X != NULL && X->messageStruct.topic==TOPIC_SERIALIZED) \
280  { \
281  SerializedMessage serobj(((class MessageSerialized *)X)->data); \
282  switch (serobj.getpayloadtype()) \
283  { \
284 case PAYLOADMESSAGE: \
285 { \
286  reuseMessage=Message(); \
287  reuseMessage.unpack(serobj); \
288  if (serobj.data->size() != serobj.pos) \
289  { \
290  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
291  } \
292  X=&reuseMessage; \
293  } \
294 break; \
295 case PAYLOADSOCKET: \
296 { \
297  reuseMessageSocket=MessageSocket(); \
298  reuseMessageSocket.unpack(serobj); \
299  if (serobj.data->size() != serobj.pos) \
300  { \
301  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
302  } \
303  X=&reuseMessageSocket; \
304  } \
305 break; \
306 case PAYLOADUSERSCHEMA: \
307 { \
308  reuseMessageUserSchema=MessageUserSchema(); \
309  reuseMessageUserSchema.unpack(serobj); \
310  if (serobj.data->size() != serobj.pos) \
311  { \
312  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
313  } \
314  X=&reuseMessageUserSchema; \
315  } \
316 break; \
317 case PAYLOADDEADLOCK: \
318 { \
319  reuseMessageDeadlock=MessageDeadlock(); \
320  reuseMessageDeadlock.unpack(serobj); \
321  if (serobj.data->size() != serobj.pos) \
322  { \
323  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
324  } \
325  X=&reuseMessageDeadlock; \
326  } \
327 break; \
328 case PAYLOADSUBTRANSACTION: \
329 { \
330  reuseMessageSubtransactionCmd=MessageSubtransactionCmd(); \
331  reuseMessageSubtransactionCmd.unpack(serobj); \
332  if (serobj.data->size() != serobj.pos) \
333  { \
334  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
335  } \
336  X=&reuseMessageSubtransactionCmd; \
337  } \
338 break; \
339 case PAYLOADCOMMITROLLBACK: \
340 { \
341  reuseMessageCommitRollback=MessageCommitRollback(); \
342  reuseMessageCommitRollback.unpack(serobj); \
343  if (serobj.data->size() != serobj.pos) \
344  { \
345  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
346  } \
347  X=&reuseMessageCommitRollback; \
348  } \
349 break; \
350 case PAYLOADDISPATCH: \
351 { \
352  reuseMessageDispatch=MessageDispatch(); \
353  reuseMessageDispatch.unpack(serobj); \
354  if (serobj.data->size() != serobj.pos) \
355  { \
356  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
357  } \
358  X=&reuseMessageDispatch; \
359  } \
360 break; \
361 case PAYLOADACKDISPATCH: \
362 { \
363  reuseMessageAckDispatch=MessageAckDispatch(); \
364  reuseMessageAckDispatch.unpack(serobj); \
365  if (serobj.data->size() != serobj.pos) \
366  { \
367  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
368  } \
369  X=&reuseMessageAckDispatch; \
370  } \
371 break; \
372 case PAYLOADAPPLY: \
373 { \
374  reuseMessageApply=MessageApply(); \
375  reuseMessageApply.unpack(serobj); \
376  if (serobj.data->size() != serobj.pos) \
377  { \
378  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
379  } \
380  X=&reuseMessageApply; \
381  } \
382 break; \
383 case PAYLOADACKAPPLY: \
384 { \
385  reuseMessageAckApply=MessageAckApply(); \
386  reuseMessageAckApply.unpack(serobj); \
387  if (serobj.data->size() != serobj.pos) \
388  { \
389  fprintf(logfile, "unpack %i size %lu pos %lu\n", serobj.getpayloadtype(), serobj.data->size(), serobj.pos); \
390  } \
391  X=&reuseMessageAckApply; \
392  } \
393 break; \
394 default: \
395 printf("%s %i anomaly %i\n", __FILE__, __LINE__, serobj.getpayloadtype()); \
396 } \
397  delete serobj.data; \
398  }
399 
400 #endif /* INFINISQLMBOX_H */