InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Mbox.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #include "gch.h"
30 #include "Mbox.h"
31 #include <time.h>
32 #line 33 "Mbox.cc"
33 
37 Mbox::Mbox() : counter(8888)
38 {
39  firstMsg = new class Message();
41  firstMsg->nextmsg = getInt128FromPointer(NULL, 5000);
43  lastMsg = firstMsg;
45 
47  tail = head;
48  current = head;
49  mytail = head;
50 
51  pthread_mutexattr_t attr;
52  attr.__align = PTHREAD_MUTEX_ADAPTIVE_NP;
53  pthread_mutex_init(&mutexLast, &attr);
54 }
55 
57 {
58  delete firstMsg;
59 }
60 
61 // return *message, NULL if timeout or nothing
62 // timeout: -1, wait forever (well, uncertain effect)
63 // timeout: 0, do not wait
64 // timeout: >0, microseconds to wait
65 //size_t Mbox::receive(class Message *msg, int timeout)
66 class Message *Mbox::receive(int timeout)
67 {
68  __int128 mynext;
69 
70  while (1)
71  {
72  mynext = __atomic_load_n(&(getPtr(current)->nextmsg), __ATOMIC_SEQ_CST);
73 
74  if (getPtr(mynext)==NULL)
75  {
76  switch (timeout)
77  {
78  case -1:
79  break;
80 
81  case 0:
82  return NULL;
83  break;
84 
85  default:
86  struct timespec ts = {0, timeout * 1000};
87  nanosleep(&ts, NULL);
88  return NULL;
89  }
90  }
91  else
92  {
93  if (getPtr(current)==getPtr(mynext))
94  {
95  printf("%s %i WTF found it i guess count current %lu next %lu\n",
96  __FILE__, __LINE__, getCount(current), getCount(mynext));
97  }
98 
99  delete getPtr(current);
100  current = mynext;
101  return getPtr(current);
102  }
103  }
104 }
105 
106 __int128 Mbox::getInt128FromPointer(class Message *ptr, uint64_t count)
107 {
108  __int128 i128;
109  memcpy(&i128, &ptr, sizeof(ptr));
110  memcpy((uint64_t *)&i128+1, &count, sizeof(count));
111 
112  return i128;
113 }
114 
115 class Message *Mbox::getPtr(__int128 i128)
116 {
117  return (class Message *)i128;
118 }
119 
120 uint64_t Mbox::getCount(__int128 i128)
121 {
122  return *((uint64_t *)&i128+1);
123 }
124 
125 MboxProducer::MboxProducer() : mbox(NULL), obBatchMsg(NULL)
126 {
127 }
128 
129 MboxProducer::MboxProducer(class Mbox *mboxarg, int16_t nodeidarg) :
130  mbox(mboxarg), nodeid(nodeidarg), obBatchMsg(NULL)
131 {
132 }
133 
135 {
136 }
137 
138 void MboxProducer::sendMsg(class Message &msgsnd)
139 {
140  class Message *msgptr;
141  if (nodeid != msgsnd.messageStruct.destAddr.nodeid)
142  { // must be sending to obgw then, so serialize here
143  if (obBatchMsg==NULL)
144  {
145  obBatchMsg=new class MessageBatchSerialized(nodeid);
146  }
148  {msgsnd.messageStruct.destAddr.nodeid, msgsnd.sermsg()};
149  delete &msgsnd;
151  {
152  mboxes->sendObBatch();
153  }
154  return;
155  }
156  else
157  {
158  msgptr=&msgsnd;
159  }
160  class Message &msg=*msgptr;
161 
162  msg.nextmsg = Mbox::getInt128FromPointer(NULL, 5555);
163 
164  __int128 mytail;
165  __int128 mynext;
166 
167  while (1)
168  {
169  mytail = __atomic_load_n(&mbox->tail, __ATOMIC_SEQ_CST);
170  mynext = __atomic_load_n(&(Mbox::getPtr(mytail)->nextmsg),
171  __ATOMIC_SEQ_CST);
172 
173  if (mytail == __atomic_load_n(&mbox->tail, __ATOMIC_SEQ_CST))
174  {
175  if (Mbox::getPtr(mynext) == NULL)
176  {
177  if (__atomic_compare_exchange_n(&(Mbox::getPtr(mytail)->nextmsg), &mynext, Mbox::getInt128FromPointer(&msg, __atomic_add_fetch(&mbox->counter, 1, __ATOMIC_SEQ_CST)), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
178  {
179  break;
180  }
181  }
182  else
183  {
184  // CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
185  __atomic_compare_exchange_n(&mbox->tail, &mytail, Mbox::getInt128FromPointer(Mbox::getPtr(mynext), __atomic_add_fetch(&mbox->counter, 1, __ATOMIC_SEQ_CST)), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
186  }
187  }
188  }
189 
190  __atomic_compare_exchange_n(&mbox->tail, &mytail, Mbox::getInt128FromPointer(&msg, __atomic_add_fetch(&mbox->counter, 1, __ATOMIC_SEQ_CST)), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
191 }
192 
194 {
195 }
196 
197 Mboxes::Mboxes(int64_t nodeidarg) : nodeid(nodeidarg),
198  topologyMgrPtr(NULL),
199  userSchemaMgrLocation (),
200  deadlockMgrLocation (),
201  listenerPtr(NULL),
202  obGatewayPtr (NULL)
203 {
204  topologyMgr.mbox = NULL;
205  userSchemaMgr.mbox = NULL;
206  deadlockMgr.mbox = NULL;
207  listener.mbox = NULL;
208  ibGateway.mbox = NULL;
209  obGateway.mbox = NULL;
210 }
211 
213 {
214 }
215 
220 void Mboxes::update(class Topology &top)
221 {
222  update(top, 0);
223 }
224 
225 void Mboxes::update(class Topology &top, int64_t myActorid)
226 {
227  class Topology *newTop = new class Topology();
228  std::swap(*newTop, top);
229  delete newTop;
230  pthread_mutex_lock(&nodeTopologyMutex);
231  top = nodeTopology;
232  pthread_mutex_unlock(&nodeTopologyMutex);
233 
234  // new, so delete the previous logic when switched over
235  // local
236  actoridToProducers.resize(top.actorList.size(), NULL);
237  transactionAgentPtrs.resize(top.numtransactionagents, NULL);
238  enginePtrs.resize(top.numengines, NULL);
239 
240  for (size_t n=0; n < top.actorList.size(); n++)
241  {
242  if (actoridToProducers[n]==NULL)
243  {
244  if (top.actorList[n].type != ACTOR_NONE)
245  {
246  actoridToProducers[n] =
247  new class MboxProducer(top.actorList[n].mbox,top.nodeid);
248  actoridToProducers[n]->mboxes=this;
249  }
250 
251  switch (top.actorList[n].type)
252  {
253  case ACTOR_TRANSACTIONAGENT:
254  transactionAgentPtrs[top.actorList[n].instance] =
256  break;
257 
258  case ACTOR_ENGINE:
259  enginePtrs[top.actorList[n].instance] = actoridToProducers[n];
260  break;
261 
262  case ACTOR_TOPOLOGYMGR:
264  break;
265 
266  case ACTOR_LISTENER:
268  break;
269 
270  case ACTOR_USERSCHEMAMGR:
271  {
275  }
276  break;
277 
278  case ACTOR_DEADLOCKMGR:
279  {
283  }
284  break;
285 
286  case ACTOR_IBGATEWAY:
287  break;
288 
289  case ACTOR_OBGATEWAY:
290  if ((int64_t)(myActorid % top.numobgateways) ==
291  top.actorList[n].instance)
292  {
294  }
295  break;
296 
297  case ACTOR_NONE:
298  break;
299 
300  default:
301  printf("%s %i unhandled actor type: %i\n", __FILE__, __LINE__,
302  top.actorList[n].type);
303  }
304  }
305  }
306 
307  // global
308  location_s emptyLocation = {};
309  partitionToProducers.resize(top.numpartitions, emptyLocation);
310 
311  for (size_t n=0; n < top.partitionListThisReplica.size(); n++)
312  {
313  if (partitionToProducers[n].address.nodeid==0)
314  {
315  partitionToProducers[n].address =
316  top.partitionListThisReplica[n].address;
317 
318  if (top.nodeid==partitionToProducers[n].address.nodeid)
319  {
320  partitionToProducers[n].destmbox =
321  actoridToProducers[partitionToProducers[n].address.actorid];
322  }
323  else
324  {
325  partitionToProducers[n].destmbox = obGatewayPtr;
326  }
327  }
328  }
329 
330  vector< vector<int> > aa = top.allActors;
331  allActors.swap(aa);
332  boost::unordered_map< int16_t, vector<int> > aatr = top.allActorsThisReplica;
333  allActorsThisReplica.swap(aatr);
334 
335  if (top.userSchemaMgrNode && (top.userSchemaMgrNode != top.nodeid))
336  {
338  }
339 
340  if (top.deadlockMgrNode && (top.deadlockMgrNode != top.nodeid))
341  {
343  }
344 }
345 
348  const Topology::addressStruct &dest, class Message &msg)
349 {
350  msg.setEnvelope(source, dest, msg);
351 
352  if (nodeid==dest.nodeid)
353  {
354  actoridToProducers[dest.actorid]->sendMsg(msg);
355  }
356  else
357  {
358  obGatewayPtr->sendMsg(msg);
359  }
360 }
361 
363  class Message &msg)
364 {
365  toActor(source, userSchemaMgrLocation.address, msg);
366 }
367 
369  class Message &msg)
370 {
371  toActor(source, deadlockMgrLocation.address, msg);
372 }
373 
375  int64_t partitionid, class Message &msg)
376 {
377  if (partitionid < 0 || (size_t)partitionid > partitionToProducers.size()-1)
378  {
379  printf("%s %i anomaly %li %lu\n", __FILE__, __LINE__, partitionid,
380  (unsigned long)partitionToProducers.size());
381  return;
382  }
383 
384  location_s &destLocation = partitionToProducers[partitionid];
385  msg.setEnvelope(source, destLocation.address, msg);
386  destLocation.destmbox->sendMsg(msg);
387 }
388 
389 int64_t Mboxes::toAllOfType(actortypes_e type,
390  const Topology::addressStruct &source,
391  class Message &msg)
392 {
393  int64_t tally = 0;
394 
395  for (int16_t n=1; n < (int16_t)allActors.size(); n++)
396  {
397  for (int16_t m=FIRSTACTORID; m < (int16_t)allActors[n].size(); m++)
398  {
399  if (allActors[n][m] == (int)type)
400  {
401  switch (msg.messageStruct.payloadtype)
402  {
403  case PAYLOADMESSAGE:
404  {
405  class Message *nmsg = new class Message;
406  *nmsg = *((class Message *)&msg);
407  toActor(source, {n, m}, *nmsg);
408  }
409  break;
410 
411  case PAYLOADSOCKET:
412  {
413  class MessageSocket *nmsg = new class MessageSocket;
414  *nmsg = *((class MessageSocket *)&msg);
415  toActor(source, {n, m}, *nmsg);
416  }
417  break;
418 
419  case PAYLOADUSERSCHEMA:
420  {
421  class MessageUserSchema *nmsg = new class MessageUserSchema;
422  *nmsg = *((class MessageUserSchema *)&msg);
423  toActor(source, {n, m}, *nmsg);
424  }
425  break;
426 
427  case PAYLOADDEADLOCK:
428  {
429  class MessageDeadlock *nmsg = new class MessageDeadlock;
430  *nmsg = *((class MessageDeadlock *)&msg);
431  toActor(source, {n, m}, *nmsg);
432  }
433  break;
434 
436  {
437  class MessageSubtransactionCmd *nmsg =
438  new class MessageSubtransactionCmd;
439  *nmsg = *((class MessageSubtransactionCmd *)&msg);
440  toActor(source, {n, m}, *nmsg);
441  }
442  break;
443 
445  {
446  class MessageCommitRollback *nmsg =
447  new class MessageCommitRollback;
448  *nmsg = *((class MessageCommitRollback *)&msg);
449  toActor(source, {n, m}, *nmsg);
450  }
451  break;
452 
453  default:
454  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
456  }
457 
458  tally++;
459  }
460  }
461  }
462 
463  return tally;
464 }
465 
466 int64_t Mboxes::toAllOfTypeThisReplica(actortypes_e type,
467  const Topology::addressStruct &source,
468  class Message &msg)
469 {
470  int64_t tally = 0;
471 
472  boost::unordered_map< int16_t, vector<int> >::iterator it;
473 
474  for (it = allActorsThisReplica.begin(); it != allActorsThisReplica.end();
475  ++it)
476  {
477  for (int16_t m=FIRSTACTORID; m <
478  (int16_t)allActorsThisReplica[it->first].size(); m++)
479  {
480  if (allActorsThisReplica[it->first][m] == (int)type)
481  {
482  printf("%s %i n m %i %i allActors %i allActorsThisReplica %i\n",
483  __FILE__, __LINE__, it->first, m, allActors[it->first][m],
484  allActorsThisReplica[it->first][m]);
485 
486  switch (msg.messageStruct.payloadtype)
487  {
488  case PAYLOADMESSAGE:
489  {
490  class Message *nmsg = new class Message;
491  *nmsg = *((class Message *)&msg);
492  toActor(source, {it->first, m}, *nmsg);
493  }
494  break;
495 
496  case PAYLOADSOCKET:
497  {
498  class MessageSocket *nmsg = new class MessageSocket;
499  *nmsg = *((class MessageSocket *)&msg);
500  toActor(source, {it->first, m}, *nmsg);
501  }
502  break;
503 
504  case PAYLOADUSERSCHEMA:
505  {
506  class MessageUserSchema *nmsg = new class MessageUserSchema;
507  *nmsg = *((class MessageUserSchema *)&msg);
508  toActor(source, {it->first, m}, *nmsg);
509  }
510  break;
511 
512  case PAYLOADDEADLOCK:
513  {
514  class MessageDeadlock *nmsg = new class MessageDeadlock;
515  *nmsg = *((class MessageDeadlock *)&msg);
516  toActor(source, {it->first, m}, *nmsg);
517  }
518  break;
519 
521  {
522  class MessageSubtransactionCmd *nmsg =
523  new class MessageSubtransactionCmd;
524  *nmsg = *((class MessageSubtransactionCmd *)&msg);
525  toActor(source, {it->first, m}, *nmsg);
526  }
527  break;
528 
530  {
531  class MessageCommitRollback *nmsg =
532  new class MessageCommitRollback;
533  *nmsg = *((class MessageCommitRollback *)&msg);
534  toActor(source, {it->first, m}, *nmsg);
535  }
536  break;
537 
538  default:
539  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
541  }
542 
543  tally++;
544  }
545  }
546  }
547 
548  return tally;
549 }
550 
552 {
553  if (obGatewayPtr != NULL)
554  {
555  if (obGatewayPtr->obBatchMsg != NULL)
556  {
558  obGatewayPtr->obBatchMsg=NULL;
559  }
560  }
561 }