InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Message.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
30 #include "Message.h"
31 #line 32 "Message.cc"
32 
34 {
35 }
36 
38 {
39 }
40 
41 size_t Message::size()
42 {
44 }
45 
46 string *Message::ser()
47 {
48  class SerializedMessage serobj(size());
49  package(serobj);
50  if (serobj.data->size() != serobj.pos)
51  {
52  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
53  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
54  }
55  return serobj.data;
56 }
57 
59 {
60  serobj.ser(messageStruct);
61 }
62 
64 {
65  serobj.des(messageStruct);
66 }
67 
69 {
70  messageStruct={};
71 }
72 
73 class Message *Message::des(string *serstr)
74 {
75  message_s tmpheader;
76  memcpy(&tmpheader, &serstr->at(0), sizeof(tmpheader));
77 
78  class Message *msg;
79  class SerializedMessage serobj(serstr);
80  switch(serobj.getpayloadtype())
81  {
82  case PAYLOADMESSAGE:
83  msg = (class Message *)new class Message;
84  msg->unpack(serobj);
85  break;
86 
87  case PAYLOADSOCKET:
88  msg = (class Message *)new class MessageSocket;
89  ((class MessageSocket *)msg)->unpack(serobj);
90  break;
91 
92  case PAYLOADUSERSCHEMA:
93  msg = (class Message *)new class MessageUserSchema;
94  ((class MessageUserSchema *)msg)->unpack(serobj);
95  break;
96 
97  case PAYLOADDEADLOCK:
98  msg = (class Message *)new class MessageDeadlock;
99  ((class MessageDeadlock *)msg)->unpack(serobj);
100  break;
101 
103  msg = (class Message *)new class MessageSubtransactionCmd;
104  ((class MessageSubtransactionCmd *)msg)->unpack(serobj);
105  break;
106 
108  msg = (class Message *)new class MessageCommitRollback;
109  ((class MessageCommitRollback *)msg)->unpack(serobj);
110  break;
111 
112  case PAYLOADDISPATCH:
113  msg = (class Message *)new class MessageDispatch;
114  ((class MessageDispatch *)msg)->unpack(serobj);
115  break;
116 
117  case PAYLOADACKDISPATCH:
118  msg = (class Message *)new class MessageAckDispatch;
119  ((class MessageAckDispatch *)msg)->unpack(serobj);
120  break;
121 
122  case PAYLOADAPPLY:
123  msg = (class Message *)new class MessageApply;
124  ((class MessageApply *)msg)->unpack(serobj);
125  break;
126 
127  case PAYLOADACKAPPLY:
128  msg = (class Message *)new class MessageAckApply;
129  ((class MessageAckApply *)msg)->unpack(serobj);
130  break;
131 
132  default:
133  printf("%s %i anomaly %i\n", __FILE__, __LINE__, tmpheader.payloadtype);
134  delete serstr;
135  return NULL;
136  }
137 
138  delete serstr;
139  return msg;
140 }
141 
143 {
144  string *serstr;
145 
146  switch (messageStruct.payloadtype)
147  {
148  case PAYLOADMESSAGE:
149  serstr=((class Message *)this)->ser();
150  break;
151 
152  case PAYLOADSOCKET:
153  serstr=((class MessageSocket *)this)->ser();
154  break;
155 
156  case PAYLOADUSERSCHEMA:
157  serstr=((class MessageUserSchema *)this)->ser();
158  break;
159 
160  case PAYLOADDEADLOCK:
161  serstr=((class MessageDeadlock *)this)->ser();
162  break;
163 
165  serstr=((class MessageSubtransactionCmd *)this)->ser();
166  break;
167 
169  serstr=((class MessageCommitRollback *)this)->ser();
170  break;
171 
172  case PAYLOADDISPATCH:
173  serstr=((class MessageDispatch *)this)->ser();
174  break;
175 
176  case PAYLOADACKDISPATCH:
177  serstr=((class MessageAckDispatch *)this)->ser();
178  break;
179 
180  case PAYLOADAPPLY:
181  serstr=((class MessageApply *)this)->ser();
182  break;
183 
184  case PAYLOADACKAPPLY:
185  serstr=((class MessageAckApply *)this)->ser();
186  break;
187 
188  default:
189  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
191  serstr=NULL;
192  }
193 
194  return serstr;
195 }
196 
198  const Topology::addressStruct &dest,
199  class Message &msg)
200 {
201  msg.messageStruct.sourceAddr = source;
202  msg.messageStruct.destAddr = dest;
203 }
204 
206 {
207 }
208 
209 MessageSocket::MessageSocket(int socketarg, uint32_t eventsarg,
210  listenertype_e listenertypearg, int64_t nodeidarg,
211  topic_e topicarg) :
212  socketStruct({socketarg, eventsarg, listenertypearg})
213 {
214  messageStruct.topic=topicarg;
217 }
218 
220 {
221 }
222 
224 {
226 }
227 
229 {
230  class SerializedMessage serobj(size());
231  package(serobj);
232  if (serobj.data->size() != serobj.pos)
233  {
234  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
235  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
236  }
237  return serobj.data;
238 }
239 
241 {
242  Message::package(serobj);
243  serobj.ser(socketStruct);
244 }
245 
247 {
248  Message::unpack(serobj);
249  serobj.des(messageStruct);
250 }
251 
253 {
254  Message::clear();
255  socketStruct={};
256 }
257 
259 {
260 }
261 
263  procs()
264 {
265  messageStruct.topic = topicarg;
267 }
268 
270 {
271 }
272 
274 {
275  return (size_t) (Message::size() +
284 }
285 
287 {
288  class SerializedMessage serobj(size());
289  package(serobj);
290  if (serobj.data->size() != serobj.pos)
291  {
292  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
293  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
294  }
295  return serobj.data;
296 }
297 
299 {
300  Message::package(serobj);
301  serobj.ser(userschemaStruct);
302  serobj.ser(procs);
303  serobj.ser(argstring);
304  serobj.ser(pathname);
305  serobj.ser(procname);
306  serobj.ser(username);
307  serobj.ser(domainname);
308  serobj.ser(password);
309 }
310 
312 {
313  Message::unpack(serobj);
314  serobj.des(userschemaStruct);
315  serobj.des(procs);
316  serobj.des(argstring);
317  serobj.des(pathname);
318  serobj.des(procname);
319  serobj.des(username);
320  serobj.des(domainname);
321  serobj.des(password);
322 }
323 
325 {
326  Message::clear();
327  userschemaStruct={};
328  procs={};
329  pathname.clear();
330  procname.clear();
331  username.clear();
332  domainname.clear();
333  password.clear();
334 }
335 
337 {
339 }
340 
342 {
343 }
344 
346 {
350 }
351 
353 {
354  class SerializedMessage serobj(size());
355  package(serobj);
356  if (serobj.data->size() != serobj.pos)
357  {
358  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
359  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
360  }
361  return serobj.data;
362 }
363 
365 {
366  Message::package(serobj);
367  serobj.ser(deadlockStruct);
368  serobj.ser(deadlockNode);
369  serobj.ser(nodes);
370 }
371 
373 {
374  Message::unpack(serobj);
375  serobj.des(deadlockStruct);
376  serobj.des(deadlockNode);
377  serobj.des(nodes);
378 }
379 
381 {
382  Message::clear();
383  deadlockStruct={};
384  deadlockNode.clear();
385  nodes={};
386 }
387 
389 {
390 }
391 
393 {
394 }
395 
396 
398 {
400 }
401 
403 {
404  class SerializedMessage serobj(size());
405  package(serobj);
406  if (serobj.data->size() != serobj.pos)
407  {
408  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
409  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
410  }
411  return serobj.data;
412 }
413 
415 {
416  Message::package(serobj);
417  serobj.ser(transactionStruct);
418 }
419 
421 {
422  Message::unpack(serobj);
423  serobj.des(transactionStruct);
424 }
425 
427 {
428  Message::clear();
430 }
431 
433  subtransactionStruct (), fieldVal ()
434 {
435 }
436 
438 {
439 }
440 
442 {
443  return MessageTransaction::size() +
451 }
452 
454 {
455  class SerializedMessage serobj(size());
456  package(serobj);
457  if (serobj.data->size() != serobj.pos)
458  {
459  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
460  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
461  }
462  return serobj.data;
463 }
464 
466 {
468  serobj.ser(subtransactionStruct);
469  serobj.ser(row);
470  serobj.ser(fieldVal);
471  serobj.ser(indexHits);
472  serobj.ser(searchParameters);
473  serobj.ser(rowids);
474  serobj.ser(returnRows);
475 }
476 
478 {
480  serobj.des(subtransactionStruct);
481  serobj.des(row);
482  serobj.des(fieldVal);
483  serobj.des(indexHits);
484  serobj.des(searchParameters);
485  serobj.des(rowids);
486  serobj.des(returnRows);
487 }
488 
490 {
493  row.clear();
494  fieldVal={};
495  indexHits.clear();
496  searchParameters={};
497  rowids.clear();
498  returnRows.clear();
499 }
500 
502 {
503 }
504 
506 {
507 }
508 
510 {
511  return MessageTransaction::size() +
513 }
514 
516 {
517  class SerializedMessage serobj(size());
518  package(serobj);
519  if (serobj.data->size() != serobj.pos)
520  {
521  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
522  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
523  }
524  return serobj.data;
525 }
526 
528 {
530  serobj.ser(rofs);
531 }
532 
534 {
536  serobj.des(rofs);
537 }
538 
540 {
542  rofs.clear();
543 }
544 
546 {
549 }
550 
552 {
553 }
554 
556 {
560 }
561 
563 {
564  class SerializedMessage serobj(size());
565  package(serobj);
566  if (serobj.data->size() != serobj.pos)
567  {
568  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
569  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
570  }
571  return serobj.data;
572 }
573 
575 {
576  Message::package(serobj);
577  serobj.ser(dispatchStruct);
578  serobj.ser(pidsids);
579  serobj.ser(records);
580 }
581 
583 {
584  Message::unpack(serobj);
585  serobj.des(dispatchStruct);
586  serobj.des(pidsids);
587  serobj.des(records);
588 }
589 
591 {
592  Message::clear();
593  dispatchStruct={};
594  pidsids.clear();
595  records.clear();
596 }
597 
598 /* order is as follows, after base Message class:
599  * transactionid (int64_t)
600  * pidsids is map of partitionid->subtransactionid
601  *
602  * based on size of that vector, 3 sets of vectors:
603  * 1 with record's rowid & primitive & tableid & previoussubtransactionid as
604  * string, the others with row & oldrow, which are strings
605  * serialize the key (partitionid) of records, then the 3 vectors
606  */
607 
609 {
610 }
611 
612 MessageAckDispatch::MessageAckDispatch(int64_t transactionidarg, int statusarg) :
613  ackdispatchStruct ({transactionidarg, statusarg})
614 {
615  messageStruct.topic = TOPIC_ACKDISPATCH;
616  messageStruct.payloadtype = PAYLOADACKDISPATCH;
617 }
618 
620 {
621 }
622 
624 {
626 }
627 
629 {
630  class SerializedMessage serobj(size());
631  package(serobj);
632  if (serobj.data->size() != serobj.pos)
633  {
634  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
635  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
636  }
637  return serobj.data;
638 }
639 
641 {
642  Message::package(serobj);
643  serobj.ser(ackdispatchStruct);
644 }
645 
647 {
648  Message::unpack(serobj);
649  serobj.des(ackdispatchStruct);
650 }
651 
653 {
654  Message::clear();
656 }
657 
659 {
660 }
661 
662 MessageApply::MessageApply(int64_t subtransactionidarg, int64_t applieridarg,
663  int64_t domainidarg) :
664  applyStruct ({subtransactionidarg, applieridarg, domainidarg})
665 {
666  messageStruct.topic = TOPIC_APPLY;
667  messageStruct.payloadtype = PAYLOADAPPLY;
668 }
669 
671 {
672 }
673 
675 {
679 }
680 
682 {
683  class SerializedMessage serobj(size());
684  package(serobj);
685  if (serobj.data->size() != serobj.pos)
686  {
687  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
688  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
689  }
690  return serobj.data;
691 }
692 
694 {
695  Message::package(serobj);
696  serobj.ser(applyStruct);
697  serobj.ser(rows);
698  serobj.ser(indices);
699 }
700 
702 {
703  Message::unpack(serobj);
704  serobj.des(applyStruct);
705  serobj.des(rows);
706  serobj.des(indices);
707 }
708 
710 {
711  Message::clear();
712  applyStruct={};
713  rows.clear();
714  indices={};
715 }
716 
718 {
719  *c |= 1 << ADDFLAG;
720 }
721 
723 {
724  return c & 1 << ADDFLAG;
725 }
726 
728 {
729  *c &= ~(1 << ADDFLAG);
730 }
731 
732 
734 {
735 }
736 
737 MessageAckApply::MessageAckApply(int64_t subtransactionidarg,
738  int64_t applieridarg, int64_t partitionidarg,
739  int statusarg) :
740  ackapplyStruct ({subtransactionidarg, applieridarg, partitionidarg,
741  statusarg})
742 {
743  messageStruct.topic = TOPIC_ACKAPPLY;
744  messageStruct.payloadtype = PAYLOADACKAPPLY;
745 }
746 
748 {
749 }
750 
752 {
754 }
755 
757 {
758  class SerializedMessage serobj(size());
759  package(serobj);
760  if (serobj.data->size() != serobj.pos)
761  {
762  fprintf(logfile, "%s %i ser %i size %lu pos %lu\n", __FILE__, __LINE__,
763  serobj.getpayloadtype(), serobj.data->size(), serobj.pos);
764  }
765  return serobj.data;
766 }
767 
769 {
770  Message::package(serobj);
771  serobj.ser(ackapplyStruct);
772 }
773 
775 {
776  Message::unpack(serobj);
777  serobj.des(ackapplyStruct);
778 }
779 
781 {
782  Message::clear();
783  ackapplyStruct={};
784 }
785 
786 SerializedMessage::SerializedMessage(size_t sizearg) : size(sizearg), pos(0)
787 {
788  data=new string(size, char(0));
789 }
790 
791 SerializedMessage::SerializedMessage(string *dataarg) : pos(0), data(dataarg)
792 {
793  size=data->size();
794 }
795 
797 {
798 }
799 
801 {
802  Message::message_s tmpheader;
803  memcpy(&tmpheader, data->c_str(), sizeof(tmpheader));
804  return tmpheader.payloadtype;
805 }
806 
807 // raw
808 void SerializedMessage::ser(size_t s, void *dataptr)
809 {
810  memcpy(&data->at(pos), dataptr, s);
811  pos += s;
812 }
813 
814 void SerializedMessage::des(size_t s, void *dataptr)
815 {
816  memcpy(dataptr, &data->at(pos), s);
817  pos += s;
818 }
819 
820 //pods
821 void SerializedMessage::ser(int64_t d)
822 {
823  memcpy(&data->at(pos), &d, sizeof(d));
824  pos += sizeof(d);
825 }
826 
827 size_t SerializedMessage::sersize(int64_t d)
828 {
829  return sizeof(d);
830 }
831 
832 void SerializedMessage::des(int64_t *d)
833 {
834  memcpy(d, &data->at(pos), sizeof(*d));
835  pos += sizeof(*d);
836 }
837 
838 void SerializedMessage::ser(int32_t d)
839 {
840  memcpy(&data->at(pos), &d, sizeof(d));
841  pos += sizeof(d);
842 }
843 
844 size_t SerializedMessage::sersize(int32_t d)
845 {
846  return sizeof(d);
847 }
848 
849 void SerializedMessage::des(int32_t *d)
850 {
851  memcpy(d, &data->at(pos), sizeof(*d));
852  pos += sizeof(*d);
853 }
854 
855 void SerializedMessage::ser(int16_t d)
856 {
857  memcpy(&data->at(pos), &d, sizeof(d));
858  pos += sizeof(d);
859 }
860 
861 size_t SerializedMessage::sersize(int16_t d)
862 {
863  return sizeof(d);
864 }
865 
866 void SerializedMessage::des(int16_t *d)
867 {
868  memcpy(d, &data->at(pos), sizeof(*d));
869  pos += sizeof(*d);
870 }
871 
873 {
874  memcpy(&data->at(pos), &d, sizeof(d));
875  pos += sizeof(d);
876 }
877 
879 {
880  return sizeof(d);
881 }
882 
883 void SerializedMessage::des(int8_t *d)
884 {
885  memcpy(d, &data->at(pos), sizeof(*d));
886  pos += sizeof(*d);
887 }
888 
889 // containers
890 void SerializedMessage::ser(const string &d)
891 {
892  ser((int64_t)d.size());
893  if (d.size())
894  {
895  memcpy(&data->at(pos), d.c_str(), d.size());
896  pos += d.size();
897  }
898 }
899 
900 size_t SerializedMessage::sersize(const string &d)
901 {
902  return sizeof(int64_t)+d.size();
903 }
904 
905 void SerializedMessage::des(string &d)
906 {
907  size_t s;
908  des((int64_t *)&s);
909  if (s)
910  {
911  d.assign((const char *)&data->at(pos), s);
912  pos += s;
913  }
914 }
915 
916 void SerializedMessage::ser(vector<int64_t> &d)
917 {
918  size_t s=d.size();
919  memcpy(&data->at(pos), &s, sizeof(s));
920  pos += sizeof(s);
921  for (size_t n=0; n<s; n++)
922  {
923  ser(d[n]);
924  }
925 }
926 
927 size_t SerializedMessage::sersize(vector<int64_t> &d)
928 {
929  return sizeof(size_t) + (d.size() * sizeof(int64_t));
930 }
931 
932 void SerializedMessage::des(vector<int64_t> &d)
933 {
934  size_t s;
935  memcpy(&s, &data->at(pos), sizeof(s));
936  pos += sizeof(s);
937  d.reserve(s);
938  for (size_t n=0; n<s; n++)
939  {
940  int64_t val;
941  des(&val);
942  d.push_back(val);
943  }
944 }
945 
946 void SerializedMessage::ser(boost::unordered_map<int64_t, int64_t> &d)
947 {
948  ser((int64_t)d.size());
949  boost::unordered_map<int64_t, int64_t>::const_iterator it;
950  for (it=d.begin(); it != d.end(); ++it)
951  {
952  ser(it->first);
953  ser(it->second);
954  }
955 }
956 
957 size_t SerializedMessage::sersize(boost::unordered_map<int64_t, int64_t> &d)
958 {
959  return d.size() + (d.size() * 2 * sizeof(int64_t));
960 }
961 
962 void SerializedMessage::des(boost::unordered_map<int64_t, int64_t> &d)
963 {
964  size_t s;
965  memcpy(&s, &data->at(pos), sizeof(s));
966  pos += sizeof(s);
967  for (size_t n=0; n<s; n++)
968  {
969  int64_t val1, val2;
970  des(&val1);
971  des(&val2);
972  d[val1]=val2;
973  }
974 }
975 
976 // pod structs
978 {
979  memcpy(&data->at(pos), &d, sizeof(d));
980  pos += sizeof(d);
981 }
982 
984 {
985  return sizeof(d);
986 }
987 
989 {
990  memcpy(&d, &data->at(pos), sizeof(d));
991  pos += sizeof(d);
992 }
993 
994 
996 {
997  memcpy(&data->at(pos), &d, sizeof(d));
998  pos += sizeof(d);
999 }
1000 
1002 {
1003  return sizeof(d);
1004 }
1005 
1007 {
1008  memcpy(&d, &data->at(pos), sizeof(d));
1009  pos += sizeof(d);
1010 }
1011 
1013 {
1014  memcpy(&data->at(pos), &d, sizeof(d));
1015  pos += sizeof(d);
1016 }
1017 
1019 {
1020  return sizeof(d);
1021 }
1022 
1024 {
1025  memcpy(&d, &data->at(pos), sizeof(d));
1026  pos += sizeof(d);
1027 }
1028 
1030 {
1031  memcpy(&data->at(pos), &d, sizeof(d));
1032  pos += sizeof(d);
1033 }
1034 
1036 {
1037  return sizeof(d);
1038 }
1039 
1041 {
1042  memcpy(&d, &data->at(pos), sizeof(d));
1043  pos += sizeof(d);
1044 }
1045 
1047 {
1048  memcpy(&data->at(pos), &d, sizeof(d));
1049  pos += sizeof(d);
1050 }
1051 
1053 {
1054  return sizeof(d);
1055 }
1056 
1058 {
1059  memcpy(&d, &data->at(pos), sizeof(d));
1060  pos += sizeof(d);
1061 }
1062 
1064 {
1065  memcpy(&data->at(pos), &d, sizeof(d));
1066  pos += sizeof(d);
1067 }
1068 
1070 {
1071  return sizeof(d);
1072 }
1073 
1075 {
1076  memcpy(&d, &data->at(pos), sizeof(d));
1077  pos += sizeof(d);
1078 }
1079 
1081 {
1082  memcpy(&data->at(pos), &d, sizeof(d));
1083  pos += sizeof(d);
1084 }
1085 
1087 {
1088  return sizeof(d);
1089 }
1090 
1092 {
1093  memcpy(&d, &data->at(pos), sizeof(d));
1094  pos += sizeof(d);
1095 }
1096 
1098 {
1099  memcpy(&data->at(pos), &d, sizeof(d));
1100  pos += sizeof(d);
1101 }
1102 
1104 {
1105  return sizeof(d);
1106 }
1107 
1109 {
1110  memcpy(&d, &data->at(pos), sizeof(d));
1111  pos += sizeof(d);
1112 }
1113 
1115 {
1116  memcpy(&data->at(pos), &d, sizeof(d));
1117  pos += sizeof(d);
1118 }
1119 
1121 {
1122  return sizeof(d);
1123 }
1124 
1126 {
1127  memcpy(&d, &data->at(pos), sizeof(d));
1128  pos += sizeof(d);
1129 }
1130 
1132 {
1133  memcpy(&data->at(pos), &d, sizeof(d));
1134  pos += sizeof(d);
1135 }
1136 
1138 {
1139  return sizeof(d);
1140 }
1141 
1143 {
1144  memcpy(&d, &data->at(pos), sizeof(d));
1145  pos += sizeof(d);
1146 }
1147 
1149 {
1150  memcpy(&data->at(pos), &d, sizeof(d));
1151  pos += sizeof(d);
1152 }
1153 
1155 {
1156  return sizeof(d);
1157 }
1158 
1160 {
1161  memcpy(&d, &data->at(pos), sizeof(d));
1162  pos += sizeof(d);
1163 }
1164 
1166 {
1167  memcpy(&data->at(pos), &d, sizeof(d));
1168  pos += sizeof(d);
1169 }
1170 
1172 {
1173  return sizeof(d);
1174 }
1175 
1177 {
1178  memcpy(&d, &data->at(pos), sizeof(d));
1179  pos += sizeof(d);
1180 }
1181 
1182 // level 1
1183 void SerializedMessage::ser(boost::unordered_set<string> &d)
1184 {
1185  size_t s=d.size();
1186  ser((int64_t)s);
1187  boost::unordered_set<string>::const_iterator it;
1188  for (it=d.begin(); it != d.end(); ++it)
1189  {
1190  ser(*it);
1191  }
1192 }
1193 
1194 size_t SerializedMessage::sersize(boost::unordered_set<string> &d)
1195 {
1196  size_t retval=sizeof(size_t);
1197  boost::unordered_set<string>::const_iterator it;
1198  for (it=d.begin(); it != d.end(); ++it)
1199  {
1200  retval += sersize(*it);
1201  }
1202  return retval;
1203 }
1204 
1205 void SerializedMessage::des(boost::unordered_set<string> &d)
1206 {
1207  size_t s;
1208  des((int64_t *)&s);
1209  for (size_t n=0; n<s; n++)
1210  {
1211  string val;
1212  des(val);
1213  d.insert(val);
1214  }
1215 }
1216 
1218 {
1219  memcpy(&data->at(pos), &d.value, sizeof(d.value));
1220  pos += sizeof(d.value);
1221  ser(d.str);
1222  ser((int8_t)d.isnull);
1223 }
1224 
1226 {
1227  return sizeof(d.value)+sersize(d.str)+sersize((int8_t)d.isnull);
1228 }
1229 
1231 {
1232  memcpy(&d.value, &data->at(pos), sizeof(d.value));
1233  pos += sizeof(d.value);
1234  des(d.str);
1235  des((int8_t *)&d.isnull);
1236 }
1237 
1239 {
1240  ser(d.rowid);
1242  ser((int8_t)d.locktype);
1243  ser(d.row);
1244 }
1245 
1247 {
1249  sersize((int8_t)d.locktype)+sersize(d.row);
1250 }
1251 
1253 {
1254  des(&d.rowid);
1256  des((int8_t *)&d.locktype);
1257  des(d.row);
1258 }
1259 
1261 {
1262  ser(d.rowid);
1263  ser((int8_t)d.primitive);
1264  ser(d.tableid);
1266  ser(d.row);
1267  ser(d.oldrow);
1268 }
1269 
1271 {
1272  return sersize(d.rowid)+sersize((int8_t)d.primitive)+sersize(d.tableid)+
1274 }
1275 
1277 {
1278  des(&d.rowid);
1279  des((int8_t *)&d.primitive);
1280  des(&d.tableid);
1282  des(d.row);
1283  des(d.oldrow);
1284 }
1285 
1286 void SerializedMessage::ser(vector<nonLockingIndexEntry_s> &d)
1287 {
1288  ser((int64_t)d.size());
1289  vector<nonLockingIndexEntry_s>::iterator it;
1290  for (it=d.begin(); it != d.end(); ++it)
1291  {
1292  ser(*it);
1293  }
1294 }
1295 
1296 size_t SerializedMessage::sersize(vector<nonLockingIndexEntry_s> &d)
1297 {
1298  size_t s=d.size();
1299  return sizeof(int64_t) + (s *sizeof(nonLockingIndexEntry_s));
1300 }
1301 
1302 void SerializedMessage::des(vector<nonLockingIndexEntry_s> &d)
1303 {
1304  size_t s;
1305  des((int64_t *)&s);
1306  d.reserve(s);
1307  for (size_t n=0; n<s; n++)
1308  {
1310  des(val);
1311  d.push_back(val);
1312  }
1313 }
1314 
1315 // level 2
1317 {
1318  ser(d.locked);
1319  ser(d.waiting);
1320 }
1321 
1323 {
1324  return sersize(d.locked)+sersize(d.waiting);
1325 }
1326 
1328 {
1329  des(d.locked);
1330  des(d.waiting);
1331 }
1332 
1333 void SerializedMessage::ser(vector<fieldValue_s> &d)
1334 {
1335  ser((int64_t)d.size());
1336  vector<fieldValue_s>::iterator it;
1337  for (it=d.begin(); it != d.end(); ++it)
1338  {
1339  ser(*it);
1340  }
1341 }
1342 
1343 size_t SerializedMessage::sersize(vector<fieldValue_s> &d)
1344 {
1345  size_t retval=sizeof(int64_t);
1346  vector<fieldValue_s>::iterator it;
1347  for (it = d.begin(); it != d.end(); ++it)
1348  {
1349  retval += sersize(*it);
1350  }
1351  return retval;
1352 }
1353 
1354 void SerializedMessage::des(vector<fieldValue_s> &d)
1355 {
1356  size_t s;
1357  des((int64_t *)&s);
1358  d.reserve(s);
1359  for (size_t n=0; n<s; n++)
1360  {
1361  fieldValue_s val;
1362  des(val);
1363  d.push_back(val);
1364  }
1365 }
1366 
1367 void SerializedMessage::ser(vector<returnRow_s> &d)
1368 {
1369  ser((int64_t)d.size());
1370  vector<returnRow_s>::iterator it;
1371  for (it=d.begin(); it != d.end(); ++it)
1372  {
1373  ser(*it);
1374  }
1375 }
1376 
1377 size_t SerializedMessage::sersize(vector<returnRow_s> &d)
1378 {
1379  size_t retval=sizeof(int64_t);
1380  vector<returnRow_s>::iterator it;
1381  for (it = d.begin(); it != d.end(); ++it)
1382  {
1383  retval += sersize(*it);
1384  }
1385  return retval;
1386 }
1387 
1388 void SerializedMessage::des(vector<returnRow_s> &d)
1389 {
1390  size_t s;
1391  des((int64_t *)&s);
1392  d.reserve(s);
1393  for (size_t n=0; n<s; n++)
1394  {
1395  returnRow_s val;
1396  des(val);
1397  d.push_back(val);
1398  }
1399 }
1400 
1401 void SerializedMessage::ser(vector<MessageDispatch::record_s> &d)
1402 {
1403  ser((int64_t)d.size());
1404  vector<MessageDispatch::record_s>::iterator it;
1405  for (it=d.begin(); it != d.end(); ++it)
1406  {
1407  ser(*it);
1408  }
1409 }
1410 
1411 size_t SerializedMessage::sersize(vector<MessageDispatch::record_s> &d)
1412 {
1413  size_t retval=sizeof(int64_t);
1414  vector<MessageDispatch::record_s>::iterator it;
1415  for (it = d.begin(); it != d.end(); ++it)
1416  {
1417  retval += sersize(*it);
1418  }
1419  return retval;
1420 }
1421 
1422 void SerializedMessage::des(vector<MessageDispatch::record_s> &d)
1423 {
1424  size_t s;
1425  des((int64_t *)&s);
1426  d.reserve(s);
1427  for (size_t n=0; n<s; n++)
1428  {
1430  des(val);
1431  d.push_back(val);
1432  }
1433 }
1434 
1436 {
1437  ser((int8_t)d.isrow);
1438  ser(d.tableid);
1439  ser(d.rowid);
1440  ser(d.fieldid);
1441  ser(d.engineid);
1442  ser((int8_t)d.deleteindexentry);
1443  ser((int8_t)d.isnotaddunique);
1444  ser((int8_t)d.isreplace);
1445  ser(d.newrowid);
1446  ser(d.newengineid);
1447  ser(d.fieldVal);
1448 }
1449 
1451 {
1452  return sersize((int8_t)d.isrow)+sersize(d.tableid)+sersize(d.rowid)+
1454  sersize((int8_t)d.deleteindexentry)+
1455  sersize((int8_t)d.isnotaddunique)+
1456  sersize((int8_t)d.isreplace)+sersize(d.newrowid)+
1458 }
1459 
1461 {
1462  des((int8_t *)&d.isrow);
1463  des(&d.tableid);
1464  des(&d.rowid);
1465  des(&d.fieldid);
1466  des(&d.engineid);
1467  des((int8_t *)&d.deleteindexentry);
1468  des((int8_t *)&d.isnotaddunique);
1469  des((int8_t *)&d.isreplace);
1470  des(&d.newrowid);
1471  des(&d.newengineid);
1472  des(d.fieldVal);
1473 }
1474 
1476 {
1477  ser(d.fieldVal);
1478  ser(d.entry);
1479  data[pos++]=d.flags;
1480  ser(d.tableid);
1481  ser(d.fieldid);
1482 }
1483 
1485 {
1486  return sersize(d.fieldVal)+sersize(d.entry)+1+sersize(d.tableid)+
1487  sersize(d.fieldid);
1488 }
1489 
1491 {
1492  des(d.fieldVal);
1493  des(d.entry);
1494  d.flags=data->at(pos++);
1495  des(&d.tableid);
1496  des(&d.fieldid);
1497 }
1498 
1499 // level 3
1501 {
1502  ser((int8_t)d.op);
1503  ser(d.values);
1504  ser(d.regexString);
1505 }
1506 
1508 {
1509  return sersize((int8_t)d.op)+sersize(d.values)+sersize(d.regexString);
1510 }
1511 
1513 {
1514  des((int8_t *)&d.op);
1515  des(d.values);
1516  des(d.regexString);
1517 }
1518 
1519 void SerializedMessage::ser(vector<rowOrField_s> &d)
1520 {
1521  ser((int64_t)d.size());
1522  vector<rowOrField_s>::iterator it;
1523  for (it=d.begin(); it != d.end(); ++it)
1524  {
1525  ser(*it);
1526  }
1527 }
1528 
1529 size_t SerializedMessage::sersize(vector<rowOrField_s> &d)
1530 {
1531  size_t retval=sizeof(int64_t);
1532  vector<rowOrField_s>::iterator it;
1533  for (it = d.begin(); it != d.end(); ++it)
1534  {
1535  retval += sersize(*it);
1536  }
1537  return retval;
1538 }
1539 
1540 void SerializedMessage::des(vector<rowOrField_s> &d)
1541 {
1542  size_t s;
1543  des((int64_t *)&s);
1544  d.reserve(s);
1545  for (size_t n=0; n<s; n++)
1546  {
1547  rowOrField_s val;
1548  des(val);
1549  d.push_back(val);
1550  }
1551 }
1552 
1553 void SerializedMessage::ser(vector<MessageApply::applyindex_s> &d)
1554 {
1555  ser((int64_t)d.size());
1556  vector<MessageApply::applyindex_s>::iterator it;
1557  for (it=d.begin(); it != d.end(); ++it)
1558  {
1559  ser(*it);
1560  }
1561 }
1562 
1563 size_t SerializedMessage::sersize(vector<MessageApply::applyindex_s> &d)
1564 {
1565  size_t retval=sizeof(int64_t);
1566  vector<MessageApply::applyindex_s>::iterator it;
1567  for (it = d.begin(); it != d.end(); ++it)
1568  {
1569  retval += sersize(*it);
1570  }
1571  return retval;
1572 }
1573 
1574 void SerializedMessage::des(vector<MessageApply::applyindex_s> &d)
1575 {
1576  size_t s;
1577  des((int64_t *)&s);
1578  d.reserve(s);
1579  for (size_t n=0; n<s; n++)
1580  {
1582  des(val);
1583  d.push_back(val);
1584  }
1585 }
1586 
1587 void SerializedMessage::ser(boost::unordered_map< int64_t,
1588  vector<MessageDispatch::record_s> > &d)
1589 {
1590  ser((int64_t)d.size());
1591  boost::unordered_map< int64_t,
1592  vector<MessageDispatch::record_s> >::iterator it;
1593  for (it=d.begin(); it != d.end(); ++it)
1594  {
1595  ser(it->first);
1596  ser(it->second);
1597  }
1598 }
1599 
1600 size_t SerializedMessage::sersize(boost::unordered_map< int64_t,
1601  vector<MessageDispatch::record_s> > &d)
1602 {
1603  return d.size() + (d.size() * 2 * sizeof(int64_t));
1604 }
1605 
1606 void SerializedMessage::des(boost::unordered_map< int64_t,
1607  vector<MessageDispatch::record_s> > &d)
1608 {
1609  size_t s;
1610  memcpy(&s, &data->at(pos), sizeof(s));
1611  pos += sizeof(s);
1612  for (size_t n=0; n<s; n++)
1613  {
1614  int64_t val1;
1615  des(&val1);
1616  vector<MessageDispatch::record_s> val2;
1617  des(val2);
1618  d[val1]=val2;
1619  }
1620 }
1621 
1623 {
1624  data = dataarg;
1625  memcpy(&messageStruct, data->c_str(), sizeof(messageStruct));
1628 }
1629 
1631 {
1632 }
1633 
1635  msgbatch ()
1636 {
1637  messageStruct.destAddr.nodeid=nodeidarg;
1640 }
1641 
1643 {
1644 }