InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TransactionAgent.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #include "TransactionAgent.h"
30 #include "Pg.h"
31 #line 32 "TransactionAgent.cc"
32 
34  myIdentity(*myIdentityArg), nexttransactionid(0), nextapplierid(0),
35  myreplica(-1), mymember(-1)
36 {
37  delete myIdentityArg;
41 
42  builtincmds_e cmd = NOCMD;
43  spclasscreate spC;
44  spclassdestroy spD;
45  uint32_t events = 0;
46 
47  typedef boost::unordered_map<std::string,
49  builtinsMap;
50  builtinsMap builtins;
51  builtins["ping"] = &TransactionAgent::ping;
52  builtins["login"] = &TransactionAgent::login;
53  builtins["logout"] = &TransactionAgent::logout;
54  builtins["changepassword"] = &TransactionAgent::changepassword;
55  builtins["createdomain"] = &TransactionAgent::createdomain;
56  builtins["createuser"] = &TransactionAgent::createuser;
57  builtins["deleteuser"] = &TransactionAgent::deleteuser;
58  builtins["deletedomain"] = &TransactionAgent::deletedomain;
59  builtins["createschema"] = &TransactionAgent::createschema;
60  builtins["createtable"] = &TransactionAgent::createtable;
61  builtins["addcolumn"] = &TransactionAgent::addcolumn;
62  builtins["deleteindex"] = &TransactionAgent::deleteindex;
63  builtins["deletetable"] = &TransactionAgent::deletetable;
64  builtins["deleteschema"] = &TransactionAgent::deleteschema;
65  builtins["loadprocedure"] = &TransactionAgent::loadprocedure;
66  builtins["compile"] = &TransactionAgent::compile;
67 
68  operationid=0;
69  int waitfor = 100;
70 
71  while (1)
72  {
73  // clear data from msgrcv
74  domainid=-1;
75  userid=-1;
76  argsize=-1;
77  sockfd=-1;
78 
80  for (size_t inmsg=0; inmsg < MSGRECEIVEBATCHSIZE; inmsg++)
81  {
82  GETMSG(msgrcv, myIdentity.mbox, waitfor)
83 
84  if (msgrcv==NULL)
85  {
86  waitfor = 100;
87  break;
88  }
89 
90  waitfor = 0;
91 
93  {
94  class MessageUserSchema &msgref =
95  *((class MessageUserSchema *)msgrcv);
96 
98  {
101 
103  {
104  ;
105  }
106  else
107  {
111  }
112  }
113 
115  }
116 
117  switch (msgrcv->messageStruct.topic)
118  {
119  case TOPIC_SOCKET:
120  {
121  switch (((class MessageSocket *)msgrcv)->socketStruct.listenertype)
122  {
123  case LISTENER_RAW:
124  {
125  sockfd=((class MessageSocket *)msgrcv)->socketStruct.socket;
126  events=((class MessageSocket *)msgrcv)->socketStruct.events;
127 
128  if ((events & EPOLLERR) || (events & EPOLLHUP))
129  {
130  endConnection();
131  break;
132  }
133 
134  if (events & EPOLLIN)
135  {
136  operation = (string *)new string;
137  argsize = readSocket();
138 
139  if (argsize < 0)
140  {
141  delete operation;
142  endConnection();
143  break;
144  }
145 
146  // ok, if no user logged in, then can only login, else
147  // break connection exit
148  // if user logged in, then cannot login but everything
149  // else
150  // login shouldn't be in binFunctions map therefore
151  socketAuthInfo::iterator loggedInUsersIterator;
152  loggedInUsersIterator = loggedInUsers.find(sockfd);
153 
154  if (loggedInUsersIterator == loggedInUsers.end())
155  {
156  // this means not logged in
157  if (operation->compare("login")==0)
158  {
159  // so, login
160  login(STARTCMD);
161  }
162  else if (operation->compare("ping")==0 &&
163  __sync_add_and_fetch(&cfgs.anonymousping,
164  0))
165  {
166  ping(STARTCMD);
167  }
168  else // gtfo
169  {
170  endConnection();
171  break;
172  }
173  }
174  else
175  {
176  // get my domainid & userid
177  domainid = loggedInUsersIterator->second.domainid;
178  userid = loggedInUsersIterator->second.userid;
179  domainName =
180  loggedInUsersIterator->second.domainName;
181 
182  // first, check domain operations, when those are
183  // built
184  if (domainidsToProcedures.count(domainid))
185  {
187  {
188  spC=(spclasscreate)domainidsToProcedures[domainid][*operation].procedurecreator;
189  spD=(spclassdestroy)domainidsToProcedures[domainid][*operation].proceduredestroyer;
190  spC(this, NULL, (void *)spD);
191  continue;
192 // break;
193  }
194  }
195 
196  builtinsMap::iterator builtinsIterator;
197  builtinsIterator = builtins.find(*operation);
198 
199  if (builtinsIterator != builtins.end())
200  {
201  (this->*(builtinsIterator->second))(STARTCMD);
202  }
203  else
204  {
205  // terminate with extreme prejudice
206  endConnection();
207  }
208  }
209 
210  delete operation;
211  }
212 
213  if (events & EPOLLOUT)
214  {
215  struct epoll_event ev;
216  ev.events = EPOLLIN | EPOLLHUP | EPOLLET;
217  ev.data.fd = sockfd;
218 
219  if (epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev))
220  {
221  endConnection();
222  break;
223  }
224 
225  // write data that's waiting
226  sendLaterMap::iterator waitingToSendIterator;
227  waitingToSendIterator = waitingToSend.find(sockfd);
228 
229  if (waitingToSendIterator != waitingToSend.end())
230  {
231  responseData response =
232  waitingToSendIterator->second;
233  sendResponse(true, response.resultCode,
234  response.sbuf);
235  }
236  }
237  }
238  break;
239 
240  case LISTENER_PG:
241  {
242  class MessageSocket &msgrcvref =
243  *(class MessageSocket *)msgrcv;
244 
245  if (!Pgs.count(msgrcvref.socketStruct.socket))
246  {
247  if ((msgrcvref.socketStruct.events & EPOLLERR) ||
248  (msgrcvref.socketStruct.events & EPOLLHUP))
249  {
250  fprintf(logfile, "\t%s %i hanging it up\n",
251  __FILE__, __LINE__);
252  Pg::pgclosesocket(*this,
253  msgrcvref.socketStruct.socket);
254  break;
255  }
256  break;
257  }
258 
259  Pgs[msgrcvref.socketStruct.socket]->cont();
260  }
261  break;
262 
263  default:
264  printf("%s %i anomaly listenertype %i\n", __FILE__, __LINE__,
266  }
267  }
268  break;
269 
271  {
272  switch (((class MessageSocket *)msgrcv)->socketStruct.listenertype)
273  {
274  case LISTENER_RAW:
275  break;
276 
277  case LISTENER_PG:
278  {
279  class MessageSocket &msgrcvref =
280  *(class MessageSocket *)msgrcv;
281  if (!Pgs.count(msgrcvref.socketStruct.socket))
282  {
283  new class Pg(this, msgrcvref.socketStruct.socket);
284  }
285  else
286  {
287  fprintf(logfile, "%s %i sockfd %i already mapped\n",
288  __FILE__, __LINE__,
289  msgrcvref.socketStruct.socket);
290  }
291  }
292  break;
293 
294  default:
295  printf("%s %i anomaly listenertype %i\n", __FILE__, __LINE__,
296  ((class MessageSocket *)msgrcv)->socketStruct.listenertype);
297  }
298 
299  }
300  break;
301 
302  case TOPIC_LOGINOK:
303  // set data members based on msgrcv
304  login(OKCMD);
305  break;
306 
307  case TOPIC_LOGINFAIL:
308  // set data members base on msgrcv
309  login(NOTOKCMD);
310  break;
311 
313  // set data members based on msgrcv
315  break;
316 
318  // set data members base on msgrcv
320  break;
321 
323  // set data members based on msgrcv
325  break;
326 
328  // set data members base on msgrcv
330  break;
331 
332  case TOPIC_CREATEUSEROK:
333  // set data members based on msgrcv
334  createuser(OKCMD);
335  break;
336 
338  // set data members base on msgrcv
340  break;
341 
342  case TOPIC_DELETEUSEROK:
343  // set data members based on msgrcv
344  deleteuser(OKCMD);
345  break;
346 
348  // set data members base on msgrcv
350  break;
351 
353  // set data members based on msgrcv
355  break;
356 
358  // set data members base on msgrcv
360  break;
361 
362  /* schema */
363  case TOPIC_SCHEMAREPLY:
365 
367  {
368  fprintf(logfile, "bad operationid %li %s %i\n", operationid,
369  __FILE__, __LINE__);
370  break;
371  }
372 
374 
375  switch (operationPtr->schemaData.state)
376  {
377  case usm:
378  cmd = USMRESPONSECMD;
379  break;
380 
381  case tasengines:
382  cmd = TASENGINESRESPONSECMD;
383  }
384 
386  {
387  case BUILTINCREATESCHEMA:
388  createschema(cmd);
389  break;
390 
391  case BUILTINCREATETABLE:
392  createtable(cmd);
393  break;
394 
395  case BUILTINADDCOLUMN:
396  addcolumn(cmd);
397  break;
398 
399  case BUILTINDELETEINDEX:
400  deleteindex(cmd);
401  break;
402 
403  case BUILTINDELETETABLE:
404  deletetable(cmd);
405  break;
406 
407  case BUILTINDELETESCHEMA:
408  deleteschema(cmd);
409  break;
410 
411  default:
412  fprintf(logfile, "builtincmd %i %s %i\n",
414  __LINE__);
415  }
416 
417  break;
418 
419  case TOPIC_SCHEMAREQUEST:
420  {
421  class MessageUserSchema &msgref =
422  *(class MessageUserSchema *)msgrcv;
424 
425  switch (msgref.userschemaStruct.builtincmd)
426  {
427  case BUILTINCREATESCHEMA:
428  TAcreateschema();
429  break;
430 
431  case BUILTINCREATETABLE:
432  TAcreatetable();
433  break;
434 
435  case BUILTINADDCOLUMN:
436  TAaddcolumn();
437  break;
438 
439  case BUILTINDELETEINDEX:
440  TAdeleteindex();
441  break;
442 
443  case BUILTINDELETETABLE:
444  TAdeletetable();
445  break;
446 
447  case BUILTINDELETESCHEMA:
448  TAdeleteschema();
449  break;
450 
451  default:
452  fprintf(logfile, "builtincmd unrecognized %li %s %i\n",
453  msgref.userschemaStruct.builtincmd, __FILE__,
454  __LINE__);
455  }
456  }
457  break;
458 
459  case TOPIC_TRANSACTION:
460  {
461  class MessageTransaction &msgref =
462  *(class MessageTransaction *)msgrcv;
463 
464  // need pendingTransactions
465  if (Transactions.count(msgref.transactionStruct.transactionid))
466  {
467  Transactions[msgref.transactionStruct.transactionid]->processTransactionMessage(msgrcv);
468  }
469  else
470  {
471  // have to check for a LOCKED message cmd, to bounce back a
472  // message to roll it back
473  fprintf(logfile, "%s %i transactionid %li\n", __FILE__,
474  __LINE__,
476  fprintf(logfile, "%s %i thismsg %p next ptr, count %p %lu, messageStruct.payloadtype %i pendingcmdid %i entrypoint %i locktype %i\n", __FILE__, __LINE__, msgrcv, Mbox::getPtr(msgref.nextmsg), Mbox::getCount(msgref.nextmsg), msgref.messageStruct.payloadtype, msgref.transactionStruct.transaction_pendingcmdid, msgref.transactionStruct.transaction_tacmdentrypoint, ((class MessageSubtransactionCmd *)msgrcv)->subtransactionStruct.locktype);
478  }
479 
480  break;
481  }
482 
483  case TOPIC_DEADLOCKABORT:
484  {
485  class MessageDeadlock &msgref = *(class MessageDeadlock *)msgrcv;
486 
487  if (Transactions.count(msgref.deadlockStruct.transactionid))
488  {
489  Transactions[msgref.deadlockStruct.transactionid]->deadlockAbort(msgref);
490  }
491  }
492  break;
493 
494  case TOPIC_TOPOLOGY:
496  updateReplicas();
497  break;
498 
499  case TOPIC_ACKDISPATCH:
500  {
501  class MessageAckDispatch &msgref =
502  *(class MessageAckDispatch *)msgrcv;
503 
504  // need pendingTransactions
505  if (Transactions.count(msgref.ackdispatchStruct.transactionid))
506  {
507  // for now 4/5/13 don't think about msgref.status
508  Transactions[msgref.ackdispatchStruct.transactionid]->continueCommitTransaction(1);
509  }
510  }
511  break;
512 
513  case TOPIC_PROCEDURE1:
514  newprocedure(2);
515  break;
516 
517  case TOPIC_PROCEDURE2:
518  newprocedure(3);
519  break;
520 
521  case TOPIC_DISPATCH:
522  handledispatch();
523  break;
524 
525  case TOPIC_ACKAPPLY:
526  {
527  class MessageAckApply &msgref = *(class MessageAckApply *)msgrcv;
528 
529  if (Appliers.count(msgref.ackapplyStruct.applierid))
530  {
531  Appliers[msgref.ackapplyStruct.applierid]->ackedApply(msgref);
532  }
533  else
534  {
535  printf("%s %i no Applier to ack status %i %li,%li,%li\n",
536  __FILE__,
537  __LINE__, msgref.ackapplyStruct.status,
539  msgref.ackapplyStruct.applierid,
540  msgref.ackapplyStruct.partitionid);
541  }
542  }
543  break;
544 
545  case TOPIC_OPERATION:
546  {
547  operationMap::iterator it;
548  it = pendingOperations.find(((class MessageUserSchema *)msgrcv)->userschemaStruct.operationid);
549 
550  if (it != pendingOperations.end())
551  {
552  class Operation &operationRef = *it->second;
553  operationRef.handleOperation(*((class MessageUserSchema *)msgrcv));
554  }
555  }
556  break;
557 
558  case TOPIC_COMPILE:
559  newstatement();
560  break;
561 
562  case TOPIC_TABLENAME:
563  {
564  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
565  domainidsToSchemata[msgrcvref.userschemaStruct.domainid]->tableNameToId[msgrcvref.argstring] =
566  msgrcvref.userschemaStruct.tableid;
567  }
568  break;
569 
570  case TOPIC_FIELDNAME:
571  {
572  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
573  domainidsToSchemata[msgrcvref.userschemaStruct.domainid]->fieldNameToId[msgrcvref.userschemaStruct.tableid][msgrcvref.argstring] = msgrcvref.userschemaStruct.fieldid;
574  }
575  break;
576 
577  default:
578  fprintf(logfile, "anomaly %i %s %i\n",
579  msgrcv->messageStruct.topic, __FILE__, __LINE__);
580  }
581  }
582  }
583 }
584 
586 {
587 }
588 
590 {
591  epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL);
592  close(sockfd);
593  loggedInUsers.erase(sockfd);
594 }
595 
597 {
598  char inbuf[PAYLOADSIZE];
599  ssize_t bytesread = read(sockfd, inbuf, PAYLOADSIZE);
600 
601  if (bytesread < 8)
602  {
603  return -1;
604  }
605 
606  uint64_t a;
607  memcpy(&a, inbuf, sizeof(a));
608  int64_t msgsize = be64toh(a);
609 
610  if (bytesread-8 != msgsize)
611  {
612  printf("%s %i bytesread %li msgsize %li sockfd %i\n", __FILE__, __LINE__,
613  bytesread, msgsize, sockfd);
614  return -2;
615  }
616 
617  char operationlength = inbuf[8];
618 
619  if ((int)operationlength >= msgsize-1)
620  {
621  return -3;
622  }
623 
624  int64_t localargsize = msgsize - 1 - (int)operationlength;
625  operation->assign(inbuf+9, (int)operationlength);
626  memset(&args, 0, PAYLOADSIZE);
627  memcpy(args, inbuf+9+(int)operationlength, localargsize);
628 
629  return localargsize;
630 }
631 
632 // launcher, regular function
633 void *transactionAgent(void *identity)
634 {
636  return NULL;
637 }
638 
639 // builtins
641 {
642  vector<string> rv;
643  sendResponse(false, STATUS_OK, &rv);
644 }
645 
647 {
648  switch (cmd)
649  {
650  case STARTCMD:
651  {
652  operationPtr = new class Operation(OP_AUTH, this, -1, -1);
654 
655  vector<string> v;
658  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_LOGIN);
659  class MessageUserSchema &msgref = *msg;
661  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
662  msgref.userschemaStruct.argsize = argsize;
663  msgref.userschemaStruct.instance = instance;
664  msgref.userschemaStruct.operationid = operationid;
665  msgref.argstring.assign(args, 0, argsize);
666  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
667  }
668  break;
669 
670  case OKCMD:
671  {
672  // let's hope I remember to populate the object's domainid & userid
673  // before calling this
674  class MessageUserSchema &msgrcvref =
675  *(class MessageUserSchema *)msgrcv;
677  authInfo aInfo;
678  aInfo.domainid = msgrcvref.userschemaStruct.domainid;
679  aInfo.userid = msgrcvref.userschemaStruct.userid;
680  aInfo.domainName.assign(operationPtr->domainName);
681  loggedInUsers[sockfd] = aInfo;
682  vector<string> rv;
683  sendResponse(false, STATUS_OK, &rv);
684  endOperation();
685  }
686  break;
687 
688  case NOTOKCMD:
689  {
690  if (__sync_add_and_fetch(&cfgs.badloginmessages, 0))
691  {
692  vector<string> rv;
693  sendResponse(false, STATUS_NOTOK, &rv);
694  }
695 
696  endOperation();
697  endConnection();
698  }
699  break;
700 
701  default:
702  fprintf(logfile, "TransactionAgent::login cmd unrecognized %i\n", cmd);
703  }
704 }
705 
707 {
708  vector<string> rv;
709  sendResponse(false, STATUS_OK, &rv);
710  endConnection();
711 }
712 
714 {
715  switch (cmd)
716  {
717  case STARTCMD:
718  {
719  operationPtr = new class Operation(OP_AUTH, this, userid, domainid);
721  class MessageUserSchema *msg =
722  new class MessageUserSchema(TOPIC_CHANGEPASSWORD);
723  class MessageUserSchema &msgref = *msg;
725  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
726  msgref.userschemaStruct.argsize = argsize;
727  msgref.userschemaStruct.instance = instance;
728  msgref.userschemaStruct.operationid = operationid;
729  msgref.userschemaStruct.domainid = domainid;
730  msgref.userschemaStruct.userid = userid;
731  msgref.argstring.assign(args, 0, argsize);
732  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
733  }
734  break;
735 
736  case OKCMD:
737  {
738  vector<string> rv;
739  sendResponse(false, STATUS_OK, &rv);
740  endOperation();
741  }
742  break;
743 
744  case NOTOKCMD:
745  {
746  vector<string> rv;
747  sendResponse(false, STATUS_NOTOK, &rv);
748  endOperation();
749  }
750  break;
751 
752  default:
753  fprintf(logfile, "TransactionAgent::changepassword cmd unrecognized %i\n", cmd);
754  }
755 }
756 
758 {
759  switch (cmd)
760  {
761  case STARTCMD:
762  {
763  operationPtr = new class Operation(OP_AUTH, this, userid, domainid);
765  class MessageUserSchema *msg =
766  new class MessageUserSchema(TOPIC_CREATEDOMAIN);
767  class MessageUserSchema &msgref = *msg;
769  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
770  msgref.userschemaStruct.argsize = argsize;
771  msgref.userschemaStruct.instance = instance;
772  msgref.userschemaStruct.operationid = operationid;
773  msgref.userschemaStruct.domainid = domainid;
774  msgref.userschemaStruct.userid = userid;
775  msgref.argstring.assign(args, 0, argsize);
776  // mboxes.userSchemaMgr.send(msgsnd, true);
777  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
778  }
779  break;
780 
781  case OKCMD:
782  {
783  class MessageUserSchema *msg =
784  new class MessageUserSchema(TOPIC_CREATEUSER);
785  class MessageUserSchema &msgref = *msg;
786  vector<string> rv;
787  // this is created domainid:
788  rv.push_back(boost::lexical_cast<string>(msgref.userschemaStruct.domainid));
789  sendResponse(false, STATUS_OK, &rv);
790  endOperation();
791  }
792  break;
793 
794  case NOTOKCMD:
795  {
796  vector<string> rv;
797  sendResponse(false, STATUS_NOTOK, &rv);
798  endOperation();
799  }
800  break;
801 
802  default:
803  fprintf(logfile, "TransactionAgent::createdomain cmd unrecognized %i\n",
804  cmd);
805  }
806 }
807 
809 {
810  switch (cmd)
811  {
812  case STARTCMD:
813  {
814  operationPtr = new class Operation(OP_AUTH, this, userid, domainid);
816  class MessageUserSchema *msg =
817  new class MessageUserSchema(TOPIC_CREATEUSER);
818  class MessageUserSchema &msgref = *msg;
820  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
821  msgref.userschemaStruct.argsize = argsize;
822  msgref.userschemaStruct.instance = instance;
823  msgref.userschemaStruct.operationid = operationid;
824  msgref.userschemaStruct.domainid = domainid;
825  msgref.userschemaStruct.userid = userid;
826  msgref.argstring.assign(args, 0, argsize);
827  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
828  }
829  break;
830 
831  case OKCMD:
832  {
833  class MessageUserSchema *msg =
834  new class MessageUserSchema(TOPIC_CREATEUSER);
835  class MessageUserSchema &msgref = *msg;
836  vector<string> rv;
837  // this is created userid:
838  rv.push_back(boost::lexical_cast<string>(msgref.userschemaStruct.userid));
839  sendResponse(false, STATUS_OK, &rv);
840  endOperation();
841  }
842  break;
843 
844  case NOTOKCMD:
845  {
846  vector<string> rv;
847  sendResponse(false, STATUS_NOTOK, &rv);
848  endOperation();
849  }
850  break;
851 
852  default:
853  fprintf(logfile, "TransactionAgent::createuser cmd unrecognized %i\n",
854  cmd);
855  }
856 }
857 
859 {
860  switch (cmd)
861  {
862  case STARTCMD:
863  {
864  operationPtr = new class Operation(OP_AUTH, this, userid, domainid);
866  class MessageUserSchema *msg =
867  new class MessageUserSchema(TOPIC_DELETEUSER);
868  class MessageUserSchema &msgref = *msg;
870  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
871  msgref.userschemaStruct.argsize = argsize;
872  msgref.userschemaStruct.instance = instance;
873  msgref.userschemaStruct.operationid = operationid;
874  msgref.userschemaStruct.domainid = domainid;
875  msgref.userschemaStruct.userid = userid;
876  msgref.argstring.assign(args, 0, argsize);
877  // mboxes.userSchemaMgr.send(msgsnd, true);
878  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
879  }
880  break;
881 
882  case OKCMD:
883  {
884  vector<string> rv;
885  sendResponse(false, STATUS_OK, &rv);
886  endOperation();
887  }
888  break;
889 
890  case NOTOKCMD:
891  {
892  vector<string> rv;
893  sendResponse(false, STATUS_NOTOK, &rv);
894  endOperation();
895  }
896  break;
897 
898  default:
899  fprintf(logfile, "TransactionAgent::deleteuser cmd unrecognized %i\n",
900  cmd);
901  }
902 }
903 
905 {
906  switch (cmd)
907  {
908  case STARTCMD:
909  {
910  operationPtr = new class Operation(OP_AUTH, this, userid, domainid);
912  class MessageUserSchema *msg =
913  new class MessageUserSchema(TOPIC_DELETEDOMAIN);
914  class MessageUserSchema &msgref = *msg;
916  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
917  msgref.userschemaStruct.argsize = argsize;
918  msgref.userschemaStruct.instance = instance;
919  msgref.userschemaStruct.operationid = operationid;
920  msgref.userschemaStruct.domainid = domainid;
921  msgref.userschemaStruct.userid = userid;
922  msgref.argstring.assign(args, 0, argsize);
923  // mboxes.userSchemaMgr.send(msgsnd, true);
924  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
925  }
926  break;
927 
928  case OKCMD:
929  {
930  vector<string> rv;
931  sendResponse(false, STATUS_OK, &rv);
932  endOperation();
933  }
934  break;
935 
936  case NOTOKCMD:
937  {
938  vector<string> rv;
939  sendResponse(false, STATUS_NOTOK, &rv);
940  endOperation();
941  }
942  break;
943 
944  default:
945  fprintf(logfile, "TransactionAgent::deletedomain cmd unrecognized %i\n",
946  cmd);
947  }
948 }
949 
950 // schema builtins
952 {
953  switch (cmd)
954  {
955  case STARTCMD:
957  break;
958 
959  case USMRESPONSECMD:
961  break;
962 
964  responseVector.clear();
966  break;
967 
968  default:
969  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
970  __LINE__);
971  }
972 }
973 
975 {
976  switch (cmd)
977  {
978  case STARTCMD:
980  break;
981 
982  case USMRESPONSECMD:
983  {
985 
986  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
987  class MessageUserSchema msg;
992  msg.argstring = msgrcvref.argstring;
993 
994  mboxes.toAllOfType(ACTOR_TRANSACTIONAGENT, myIdentity.address, msg);
995  }
996  break;
997 
999  responseVector.clear();
1000  responseVector.push_back(boost::lexical_cast<string>
1003  break;
1004 
1005  default:
1006  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
1007  __LINE__);
1008  }
1009 }
1010 
1012 {
1013  switch (cmd)
1014  {
1015  case STARTCMD:
1017  break;
1018 
1019  case USMRESPONSECMD:
1020  {
1022 
1023  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
1024  class MessageUserSchema msg;
1030  msg.argstring = msgrcvref.argstring;
1031 
1032  mboxes.toAllOfType(ACTOR_TRANSACTIONAGENT, myIdentity.address, msg);
1033  }
1034  break;
1035 
1036  case TASENGINESRESPONSECMD:
1037  responseVector.clear();
1038  responseVector.push_back(boost::lexical_cast<string>
1041  break;
1042 
1043  default:
1044  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
1045  __LINE__);
1046  }
1047 }
1048 
1050 {
1051  switch (cmd)
1052  {
1053  case STARTCMD:
1055  break;
1056 
1057  case USMRESPONSECMD:
1059  break;
1060 
1061  case TASENGINESRESPONSECMD:
1062  responseVector.clear();
1064  break;
1065 
1066  default:
1067  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
1068  __LINE__);
1069  }
1070 }
1071 
1073 {
1074  switch (cmd)
1075  {
1076  case STARTCMD:
1078  break;
1079 
1080  case USMRESPONSECMD:
1082  break;
1083 
1084  case TASENGINESRESPONSECMD:
1085  responseVector.clear();
1087  break;
1088 
1089  default:
1090  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
1091  __LINE__);
1092  }
1093 }
1094 
1096 {
1097  switch (cmd)
1098  {
1099  case STARTCMD:
1101  break;
1102 
1103  case USMRESPONSECMD:
1105  break;
1106 
1107  case TASENGINESRESPONSECMD:
1108  responseVector.clear();
1110  break;
1111 
1112  default:
1113  fprintf(logfile, "topic unrecognized %i %s %i\n", cmd, __FILE__,
1114  __LINE__);
1115  }
1116 }
1117 
1118 // not builtin
1120 {
1123 }
1124 
1125 // schema loopback functions
1127 {
1128  createSchema(this);
1129  class MessageUserSchema *msg =
1130  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1132  domainProceduresMap domainProcedures;
1133  int64_t did = ((class MessageUserSchema *)msgrcv)->userschemaStruct.domainid;
1134  domainidsToProcedures[did] = domainProcedures;
1135 }
1136 
1138 {
1139  // either succeeds or fails :-)
1140  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
1141  status =
1142  domainidsToSchemata[msgrcvref.userschemaStruct.domainid]->createTable(msgrcvref.userschemaStruct.tableid);
1143  class MessageUserSchema *msg =
1144  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1145  class MessageUserSchema &msgref = *msg;
1146  msgref.userschemaStruct.tableid = msgrcvref.userschemaStruct.tableid;
1148  ((class Message *)msgrcv)->messageStruct.sourceAddr, *msg);
1149 }
1150 
1152 {
1153  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
1154  class Schema *schemaPtr =
1156  class Table *tablePtr =
1157  schemaPtr->tables[msgrcvref.userschemaStruct.tableid];
1158  class MessageUserSchema *msg =
1159  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1160  class MessageUserSchema &msgref = *msg;
1161  msgref.userschemaStruct.fieldid =
1162  tablePtr->addfield((fieldtype_e) msgrcvref.userschemaStruct.fieldtype,
1163  msgrcvref.userschemaStruct.fieldlen,
1164  msgrcvref.argstring,
1165  (indextype_e) msgrcvref.userschemaStruct.indextype);
1168  ((class Message *)msgrcv)->messageStruct.sourceAddr, *msg);
1169 }
1170 
1172 {
1173  // either succeeds or fails :-)
1174  class MessageUserSchema *msg =
1175  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1179 }
1180 
1182 {
1183  // either succeeds or fails :-)
1184  class MessageUserSchema *msg =
1185  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1189 }
1190 
1192 {
1193  // either succeeds or fails :-)
1194  class MessageUserSchema *msg =
1195  new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1199 }
1200 
1202 {
1203  switch (cmd)
1204  {
1205  case STARTCMD:
1206  {
1207  operationPtr = new class Operation(OP_SCHEMA, this, userid, domainid);
1208  operationPtr->setbuiltincmd(builtin);
1211  class MessageUserSchema *msg =
1212  new class MessageUserSchema(TOPIC_SCHEMAREQUEST);
1213  class MessageUserSchema &msgref = *msg;
1215  msgref.messageStruct.payloadtype = PAYLOADUSERSCHEMA;
1216  msgref.userschemaStruct.builtincmd = builtin;
1217  msgref.userschemaStruct.argsize = argsize;
1218  msgref.userschemaStruct.instance = instance;
1219  msgref.userschemaStruct.operationid = operationid;
1220  msgref.userschemaStruct.userid = userid;
1221  msgref.userschemaStruct.domainid = domainid;
1222  msgref.argstring.assign(args, 0, argsize);
1223  mboxes.toUserSchemaMgr(this->myIdentity.address, msgref);
1224  }
1225  break;
1226 
1227  case USMRESPONSECMD:
1228  {
1229  class MessageUserSchema &msgrcvref =
1230  *(class MessageUserSchema *)msgrcv;
1231 
1232  if (msgrcvref.userschemaStruct.status != BUILTIN_STATUS_OK) // abort
1233  {
1234  responseVector.clear();
1236  endOperation();
1237  return;
1238  }
1239 
1240  class MessageUserSchema msg(TOPIC_SCHEMAREQUEST);
1244  msg.userschemaStruct.builtincmd = builtin;
1250  if (msgrcvref.userschemaStruct.argsize)
1251  {
1253  msg.argstring.assign(args, 0, argsize);
1254  }
1255  else
1256  {
1257  msg.argstring=msgrcvref.argstring;
1258  }
1259 
1263  msgrcvref.userschemaStruct.tableindexid;
1267 
1269  ACTOR_TRANSACTIONAGENT, myIdentity.address, msg);
1271  ACTOR_ENGINE, myIdentity.address, msg);
1272 
1274  }
1275  break;
1276 
1277  case TASENGINESRESPONSECMD:
1278  {
1279  class MessageUserSchema &msgrcvref =
1280  *(class MessageUserSchema *)msgrcv;
1281 
1282  if (msgrcvref.userschemaStruct.status != BUILTIN_STATUS_OK)
1283  {
1284  responseVector.clear();
1286  endOperation();
1287  return;
1288  }
1289 
1291  {
1292  // not ready yet
1293  return;
1294  }
1295 
1297  endOperation();
1298  }
1299  break;
1300 
1301  default:
1302  fprintf(logfile, "bad case %i %s %i\n", cmd, __FILE__, __LINE__);
1303  }
1304 }
1305 
1307 {
1308  newprocedure(1);
1309 }
1310 
1312 {
1313  vector<string> resultVector;
1314  msgpack2Vector(&resultVector, args, argsize);
1315  // int64_t sid = atol(resultVector[0].c_str());
1316  string statementname(resultVector[0]);
1317  string sqlstatement(resultVector[1]);
1318  class Larxer lx2((char *)sqlstatement.c_str(), this,
1320 
1321  if (lx2.statementPtr==NULL)
1322  {
1323  vector<string> rv;
1324  sendResponse(false, STATUS_NOTOK, &rv);
1325  return;
1326  }
1327 
1328  delete lx2.statementPtr;
1329 
1330  class MessageUserSchema msg;
1334  msg.procname = statementname;
1335  msg.argstring = sqlstatement;
1336 
1337  mboxes.toAllOfType(ACTOR_TRANSACTIONAGENT, myIdentity.address, msg);
1338 
1339  vector<string> rv;
1340  rv.push_back(statementname);
1341  sendResponse(false, STATUS_OK, &rv);
1342 }
1343 
1344 msgpack::sbuffer *makeSbuf(msgpack::sbuffer *sbuf)
1345 {
1346  return sbuf;
1347 }
1348 
1349 msgpack::sbuffer *makeSbuf(vector<string> *v)
1350 {
1351  msgpack::sbuffer *sbuf = new msgpack::sbuffer;
1352  msgpack::pack(*sbuf, *v);
1353  return sbuf;
1354 }
1355 
1356 msgpack::sbuffer *makeSbuf(map<string, string> *m)
1357 {
1358  msgpack::sbuffer *sbuf = new msgpack::sbuffer;
1359  msgpack::pack(*sbuf, *m);
1360  return sbuf;
1361 }
1362 
1364 {
1365  return ++nexttransactionid;
1366 }
1367 
1369 {
1370  return ++nextapplierid;
1371 }
1372 
1374 {
1375  fprintf(logfile, "TA bad message stub %s %i\n", __FILE__, __LINE__);
1376 }
1377 
1379 {
1380  // replicaMembers[replica][member] = nodeid
1381  // vector< vector<int64_t> > replicaMembers;
1382  // tas[nodeid][tainstance] = actorid
1383  // vector< vector<int64_t> > tas;
1384 
1385  // get ta instance based on my actorid, then build the replica members
1386  // if replicas > 2, or just find the other one if numreplicas==2
1387  // find my replica
1388  if (myTopology.numreplicas <= 1)
1389  {
1390  return;
1391  }
1392 
1393  for (size_t n=0; n < myTopology.replicaMembers.size(); n++)
1394  {
1395  for (size_t m=0; m < myTopology.replicaMembers[n].size(); m++)
1396  {
1398  {
1399  myreplica = n;
1400  mymember = m;
1401  }
1402  }
1403  }
1404 
1405  if (myTopology.numreplicas==2)
1406  {
1407  size_t otherreplica = myreplica==0 ? 1 : 0;
1409  myTopology.replicaMembers[otherreplica][mymember];
1410  int64_t othernodeid = myTopology.replicaMembers[otherreplica][mymember];
1411 
1412  if (othernodeid)
1413  {
1415  myTopology.tas[othernodeid][myIdentity.instance];
1416  }
1417 
1418  return;
1419  }
1420 
1421  vector<Topology::addressStruct> ras;
1422 
1423  for (size_t n=0; n < myTopology.replicaMembers.size(); n++)
1424  {
1425  if (myreplica==n)
1426  {
1427  continue;
1428  }
1429 
1430  ras.push_back({myTopology.replicaMembers[n][mymember],
1432  });
1433  }
1434 
1435  replicaAddresses.swap(ras);
1436 }
1437 
1438 void TransactionAgent::newprocedure(int64_t entrypoint)
1439 {
1440  switch (entrypoint)
1441  {
1442  case 1: // client sends loadprocedure command
1443  {
1444  vector<string> resultVector;
1445  msgpack2Vector(&resultVector, args, argsize);
1446 
1447  class MessageUserSchema msg;
1451  msg.pathname = resultVector[0];
1452  msg.procname = storedprocprefix;
1453  msg.procname += domainName;
1454  msg.procname += "_";
1455  msg.procname.append(resultVector[1]);
1456 
1457  // get 1 ta from each node and send a copy of that message
1458  for (size_t n=0; n < myTopology.allActors.size(); n++)
1459  {
1460  for (size_t m=0; m < myTopology.allActors[n].size(); m++)
1461  {
1462  if (myTopology.allActors[n][m]==ACTOR_TRANSACTIONAGENT)
1463  {
1464  class MessageUserSchema *nmsg = new class MessageUserSchema;
1465  *nmsg = msg;
1466  mboxes.toActor(myIdentity.address, {(int16_t)n, (int16_t)m},
1467  *nmsg);
1468 
1469  break;
1470  }
1471  }
1472  }
1473 
1474  vector<string> rv;
1475  sendResponse(false, STATUS_OK, &rv);
1476  }
1477  break;
1478 
1479  case 2: // TOPIC_PROCEDURE1, load procedure, then send to all ta's this node
1480  {
1481  class MessageUserSchema &inmsg = *((class MessageUserSchema *)msgrcv);
1482 
1483  const char *dlsym_error;
1484  dlerror();
1485  void *soPtr = dlopen(inmsg.pathname.c_str(), RTLD_LAZY);
1486 
1487  if (!soPtr)
1488  {
1489  dlsym_error = dlerror();
1490  puts(dlsym_error);
1491  return;
1492  }
1493 
1494  dlerror();
1495 
1496  string funcNameCreate = inmsg.procname + "_create";
1497  spclasscreate call_func1create =
1498  (spclasscreate) dlsym(soPtr, funcNameCreate.c_str());
1499  dlsym_error = dlerror();
1500 
1501  if (dlsym_error)
1502  {
1503  printf("%s %i anomaly nodeid %i instance %li error %s\n", __FILE__,
1504  __LINE__, myTopology.nodeid, myIdentity.instance,
1505  dlsym_error);
1506  return;
1507  }
1508 
1509  dlerror();
1510 
1511  string funcNameDestroy = inmsg.procname + "_destroy";
1512  spclassdestroy call_func1destroy =
1513  (spclassdestroy) dlsym(soPtr, funcNameDestroy.c_str());
1514  dlsym_error = dlerror();
1515 
1516  if (dlsym_error)
1517  {
1518  return;
1519  }
1520 
1521  dlerror();
1522 
1523  class MessageUserSchema msg;
1527  msg.procs.procedurecreator = (void *)call_func1create;
1528  msg.procs.proceduredestroyer = (void *)call_func1destroy;
1529  msg.userschemaStruct.intdata = inmsg.procname.length();
1530  msg.argstring.assign(inmsg.procname, 0, msg.userschemaStruct.intdata);
1531 
1532  for (size_t n=0; n < myTopology.allActors[myTopology.nodeid].size(); n++)
1533  {
1535  ACTOR_TRANSACTIONAGENT)
1536  {
1537  class MessageUserSchema *nmsg = new class MessageUserSchema;
1538  *nmsg = msg;
1540  (int16_t)n}, *nmsg);
1541  }
1542  }
1543  }
1544  break;
1545 
1546  case 3:
1547  {
1548  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
1549  domainidsToProcedures[msgrcvref.userschemaStruct.domainid][msgrcvref.argstring] = msgrcvref.procs;
1550  }
1551  break;
1552 
1553  default:
1554  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
1555  }
1556 }
1557 
1559 {
1560  class MessageDispatch &msgrcvref = *(class MessageDispatch *)msgrcv;
1561  domainid = msgrcvref.dispatchStruct.domainid;
1562  class MessageAckDispatch *msg =
1563  new class MessageAckDispatch(msgrcvref.dispatchStruct.transactionid,
1564  STATUS_OK);
1566 
1567  int64_t partitioncount=0;
1568  boost::unordered_map<int64_t, class MessageApply *> msgs;
1569  class Applier *applierPtr = new class Applier(this, domainid,
1570  msgrcvref.messageStruct.sourceAddr, partitioncount);
1571 
1572  boost::unordered_map< int64_t,
1573  vector<MessageDispatch::record_s> >::iterator it;
1574 
1575  for (it = msgrcvref.records.begin(); it != msgrcvref.records.end(); it++)
1576  {
1577  // it->first int64_t partitionid, it->second vector of records
1578  vector<MessageDispatch::record_s> &recordsref = it->second;
1579 
1580  if (!msgs.count(it->first))
1581  {
1582  msgs[it->first] =
1583  new class MessageApply(msgrcvref.pidsids[it->first],
1584  applierPtr->applierid,
1585  domainid);
1586  }
1587 
1588  msgs[it->first]->rows = recordsref;
1589  class Schema *schemaPtr = domainidsToSchemata[domainid];
1590 
1591  for (size_t n=0; n < recordsref.size(); n++)
1592  {
1593  class Table &tableRef = *schemaPtr->tables[recordsref[n].tableid];
1594 
1595  switch (recordsref[n].primitive)
1596  {
1597  case INSERT:
1598  {
1599  vector<fieldValue_s> fields;
1600  tableRef.unmakerow(&recordsref[n].row, &fields);
1601 
1602  for (uint16_t f=0; f < tableRef.fields.size(); f++)
1603  {
1604  if (tableRef.fields[f].indextype==NONE)
1605  {
1606  continue;
1607  }
1608 
1609  // hence, create new index entry
1610  MessageApply::applyindex_s indexinfo;
1611  indexinfo.fieldVal = fields[f];
1612  indexinfo.fieldid = f;
1613  indexinfo.flags = 0;
1614  MessageApply::setisaddflag(&indexinfo.flags);
1615  indexinfo.tableid = recordsref[n].tableid;
1616  indexinfo.entry = {recordsref[n].rowid,
1617  getPartitionid(fields[f],
1618  tableRef.fields[f].type,
1619  (int16_t)myTopology.numpartitions)
1620  };
1621 
1622  msgs[indexinfo.entry.engineid]->indices.push_back(indexinfo);
1623  }
1624  }
1625  break;
1626 
1627  case UPDATE:
1628  {
1629  vector<fieldValue_s> newfields;
1630  tableRef.unmakerow(&recordsref[n].row, &newfields);
1631  vector<fieldValue_s> oldfields;
1632  tableRef.unmakerow(&recordsref[n].oldrow, &oldfields);
1633 
1634  for (size_t f=0; f < tableRef.fields.size(); f++)
1635  {
1636  if (tableRef.fields[f].indextype==NONE)
1637  {
1638  continue;
1639  }
1640 
1641  // only add entries if new & old are different
1642  bool aredifferent;
1643 
1644  switch (tableRef.fields[f].type)
1645  {
1646  case INT:
1647  aredifferent = newfields[f].value.integer !=
1648  oldfields[f].value.integer ? true : false;
1649  break;
1650 
1651  case UINT:
1652  aredifferent = newfields[f].value.uinteger !=
1653  oldfields[f].value.uinteger ? true : false;
1654  break;
1655 
1656  case BOOL:
1657  aredifferent = newfields[f].value.boolean !=
1658  oldfields[f].value.boolean ? true : false;
1659  break;
1660 
1661  case FLOAT:
1662  aredifferent = newfields[f].value.floating !=
1663  oldfields[f].value.floating ? true : false;
1664  break;
1665 
1666  case CHAR:
1667  aredifferent = newfields[f].value.character !=
1668  oldfields[f].value.character ? true : false;
1669  break;
1670 
1671  case CHARX:
1672  aredifferent = newfields[f].str.compare(oldfields[f].str)
1673  ? true : false;
1674  break;
1675 
1676  case VARCHAR:
1677  aredifferent = newfields[f].str.compare(oldfields[f].str)
1678  ? true : false;
1679  break;
1680 
1681  default:
1682  printf("%s %i anomaly fieldtype %i\n", __FILE__,
1683  __LINE__, tableRef.fields[f].type);
1684  aredifferent=false;
1685  }
1686 
1687  if (aredifferent==true)
1688  {
1689  // delete the old, add the new
1690  MessageApply::applyindex_s indexinfo;
1691  indexinfo.fieldVal = oldfields[f];
1692  indexinfo.fieldid = f;
1693  indexinfo.flags = 0;
1694  indexinfo.tableid = recordsref[n].tableid;
1695  indexinfo.entry = {recordsref[n].rowid,
1696  getPartitionid(oldfields[f],
1697  tableRef.fields[f].type,
1699  };
1700 
1701  msgs[indexinfo.entry.engineid]->indices.push_back(indexinfo);
1702 
1703  indexinfo.fieldVal = newfields[f];
1704  indexinfo.fieldid = f;
1705  indexinfo.flags = 0;
1706  MessageApply::setisaddflag(&indexinfo.flags);
1707  indexinfo.tableid = recordsref[n].tableid;
1708  indexinfo.entry = {recordsref[n].rowid,
1709  getPartitionid(newfields[f],
1710  tableRef.fields[f].type,
1712  };
1713 
1714  msgs[indexinfo.entry.engineid]->indices.push_back(indexinfo);
1715  }
1716  }
1717  }
1718  break;
1719 
1720  case DELETE:
1721  {
1722  vector<fieldValue_s> fields;
1723  tableRef.unmakerow(&recordsref[n].oldrow, &fields);
1724 
1725  for (size_t f=0; f < tableRef.fields.size(); f++)
1726  {
1727  if (tableRef.fields[f].indextype==NONE)
1728  {
1729  continue;
1730  }
1731 
1732  // hence, create new index entry
1733  MessageApply::applyindex_s indexinfo;
1734  indexinfo.fieldVal = fields[f];
1735  indexinfo.flags = 0;
1736  indexinfo.fieldid = f;
1737  indexinfo.tableid = recordsref[n].tableid;
1738  indexinfo.entry = {recordsref[n].rowid,
1739  getPartitionid(fields[f],
1740  tableRef.fields[f].type,
1742  };
1743 
1744  msgs[indexinfo.entry.engineid]->indices.push_back(indexinfo);
1745  }
1746  }
1747  break;
1748 
1749  default:
1750  printf("%s %i anomaly primitive %i\n", __FILE__, __LINE__,
1751  recordsref[n].primitive);
1752  }
1753  }
1754  }
1755 
1756  boost::unordered_map<int64_t, class MessageApply *>::iterator it2;
1757 
1758  for (it2 = msgs.begin(); it2 != msgs.end(); it2++)
1759  {
1760  mboxes.toPartition(myIdentity.address, it2->first, *it2->second);
1761  }
1762 }
1763 
1765 {
1766  class MessageUserSchema &msgrcvref = *(class MessageUserSchema *)msgrcv;
1767 
1768  class Larxer lx((char *)msgrcvref.argstring.c_str(), this,
1770 
1771  if (lx.statementPtr==NULL)
1772  {
1773  printf("%s %i anomaly\n", __FILE__, __LINE__);
1774  }
1775 
1777  statements[msgrcvref.userschemaStruct.domainid][msgrcvref.procname] =
1778  *lx.statementPtr;
1779  delete lx.statementPtr;
1780 }