InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Transaction.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #include "Transaction.h"
30 #line 31 "Transaction.cc"
31 
32 Transaction::Transaction(class TransactionAgent *taPtrarg, int64_t domainidarg)
33  : taPtr(taPtrarg), domainid(domainidarg)
34 {
38  state = EXPANDING;
40  pendingcmdid = 0;
41  lockcount = 0;
42  lockpendingcount = 0;
43  nextpendingcmdid = 0;
44 }
45 
47 {
49 }
50 
51 int64_t Transaction::getengine(fieldtype_e fieldtype, fieldValue_s &fieldValue)
52 {
53  uint64_t hash;
54 
55  switch (fieldtype)
56  {
57  case INT:
58  hash = SpookyHash::Hash64((void *) &fieldValue.value.integer,
59  sizeof(fieldValue.value.integer), 0);
60  break;
61 
62  case UINT:
63  hash = SpookyHash::Hash64((void *) &fieldValue.value.uinteger,
64  sizeof(fieldValue.value.uinteger), 0);
65  break;
66 
67  case BOOL:
68  hash = SpookyHash::Hash64((void *) &fieldValue.value.boolean,
69  sizeof(fieldValue.value.boolean), 0);
70  break;
71 
72  case FLOAT:
73  hash = SpookyHash::Hash64((void *) &fieldValue.value.floating,
74  sizeof(fieldValue.value.floating), 0);
75  break;
76 
77  case CHAR:
78  hash = SpookyHash::Hash64((void *) &fieldValue.value.character,
79  sizeof(fieldValue.value.character), 0);
80  break;
81 
82  case CHARX:
83  {
84  trimspace(fieldValue.str);
85  hash = SpookyHash::Hash64((void *) fieldValue.str.c_str(),
86  fieldValue.str.length(), 0);
87  }
88  break;
89 
90  case VARCHAR:
91  {
92  trimspace(fieldValue.str);
93  hash = SpookyHash::Hash64((void *) fieldValue.str.c_str(),
94  fieldValue.str.length(), 0);
95  }
96  break;
97 
98  default:
99  fprintf(logfile, "anomaly %i %s %i\n", fieldtype, __FILE__, __LINE__);
100  return -1;
101  }
102 
103  return hash % nodeTopology.numpartitions;
104 }
105 
106 int64_t Transaction::getEngineid(class Table *tablePtr, int64_t fieldnum)
107 {
108  fieldtype_e fieldType = tablePtr->fields[fieldnum].type;
109  uint64_t hash;
110 
111  switch (fieldType)
112  {
113  case INT:
114  hash = SpookyHash::Hash64((void *) &fieldValues[fieldnum].value.integer,
115  sizeof(fieldValues[fieldnum].value.integer),
116  0);
117  break;
118 
119  case UINT:
120  hash = SpookyHash::Hash64((void *) &fieldValues[fieldnum].value.uinteger,
121  sizeof(fieldValues[fieldnum].value.uinteger),
122  0);
123  break;
124 
125  case BOOL:
126  hash = SpookyHash::Hash64((void *) &fieldValues[fieldnum].value.boolean,
127  sizeof(fieldValues[fieldnum].value.boolean),
128  0);
129  break;
130 
131  case FLOAT:
132  hash = SpookyHash::Hash64((void *) &fieldValues[fieldnum].value.floating,
133  sizeof(fieldValues[fieldnum].value.floating),
134  0);
135  break;
136 
137  case CHAR:
138  hash = SpookyHash::Hash64(
139  (void *) &fieldValues[fieldnum].value.character,
140  sizeof(fieldValues[fieldnum].value.character), 0);
141  break;
142 
143  case CHARX:
144  hash = SpookyHash::Hash64((void *) fieldValues[fieldnum].str.c_str(),
145  fieldValues[fieldnum].str.length(), 0);
146  break;
147 
148  case VARCHAR:
149  hash = SpookyHash::Hash64((void *) fieldValues[fieldnum].str.c_str(),
150  fieldValues[fieldnum].str.length(), 0);
151  break;
152 
153  default:
154  fprintf(logfile, "anomaly %i %s %i\n", fieldType, __FILE__, __LINE__);
155  return -1;
156  }
157 
158  return hash % nodeTopology.numpartitions;
159 }
160 
161 int64_t Transaction::getEngineid(class Table *tablePtr, int64_t fieldid,
162  fieldValue_s *val)
163 {
164  fieldtype_e fieldType = tablePtr->fields[fieldid].type;
165  uint64_t hash;
166 
167  switch (fieldType)
168  {
169  case INT:
170  hash = SpookyHash::Hash64((void *) &val->value.integer,
171  sizeof(val->value.integer), 0);
172  break;
173 
174  case UINT:
175  hash = SpookyHash::Hash64((void *) &val->value.uinteger,
176  sizeof(val->value.uinteger), 0);
177  break;
178 
179  case BOOL:
180  hash = SpookyHash::Hash64((void *) &val->value.boolean,
181  sizeof(val->value.boolean), 0);
182  break;
183 
184  case FLOAT:
185  hash = SpookyHash::Hash64((void *) &val->value.floating,
186  sizeof(val->value.floating), 0);
187  break;
188 
189  case CHAR:
190  hash = SpookyHash::Hash64((void *) &val->value.character,
191  sizeof(val->value.character), 0);
192  break;
193 
194  case CHARX:
195  hash = SpookyHash::Hash64((void *) val->str.c_str(), val->str.length(),
196  0);
197  break;
198 
199  case VARCHAR:
200  hash = SpookyHash::Hash64((void *) val->str.c_str(), val->str.length(),
201  0);
202  break;
203 
204  default:
205  fprintf(logfile, "anomaly %i %s %i\n", fieldType, __FILE__, __LINE__);
206  return -1;
207  }
208 
209  return hash % nodeTopology.numpartitions;
210 }
211 
212 // TODO forget this for now 10/22/2012
213 void Transaction::dispatch(class Message *msgrcv)
214 {
215 }
216 
217 void Transaction::dispatched(class Message *msgrcv)
218 {
219 }
220 
221 void Transaction::continueInsertRow(int64_t entrypoint)
222 {
223  class MessageSubtransactionCmd &subtransactionCmdRef =
224  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
225  switch (entrypoint)
226  {
227  case 1:
228  {
229  currentCmdState.rowid = subtransactionCmdRef.subtransactionStruct.rowid;
231  subtransactionCmdRef.subtransactionStruct.engineid;
233 
235 
236  for (size_t n=0; n<currentCmdState.indexEntries.size(); n++)
237  {
238  if (fieldValues[n].isnull==true)
239  {
240  continue; // can add as many nulls as possible if nulls allowed
241  }
242 
243  if (currentCmdState.tablePtr->fields[n].index.isunique==true)
244  {
246  currentCmdState.indexEntries[n].isaddunique = true;
247  // engine UNIQUEINDEX looks for:
248  // transaction_enginecmd = UNIQUEINDEX
249  // tableid,fieldid,rowid,engineid (of index value),
250  // fieldValue
251  class MessageSubtransactionCmd *msg =
252  new class MessageSubtransactionCmd();
254  subtransactionCmdRef.subtransactionStruct.rowid;
255 
256  msg->fieldVal = currentCmdState.indexEntries[n].fieldVal;
258  msg->subtransactionStruct.fieldid = n;
260  subtransactionCmdRef.subtransactionStruct.rowid;
262  currentCmdState.rowidsEngineids[n].engineid;
263 
265  currentCmdState.rowidsEngineids[n].engineid,
266  (void *)msg);
267  }
268  }
269 
271  {
272  return; // means need to wait for replies
273  }
274  }
275  break;
276 
277  case 2:
278  {
279  indexInfo_s idxInfo = {};
280  idxInfo.engineid = subtransactionCmdRef.subtransactionStruct.engineid;
281  idxInfo.fieldVal = subtransactionCmdRef.fieldVal;
282  idxInfo.fieldid = subtransactionCmdRef.subtransactionStruct.fieldid;
283  idxInfo.locktype = subtransactionCmdRef.subtransactionStruct.locktype;
284  idxInfo.tableid = subtransactionCmdRef.subtransactionStruct.tableid;
285  idxInfo.isaddunique = true;
286 
287  switch (idxInfo.locktype)
288  {
289  case NOLOCK: // constraint violation, abort command
291  return;
292 // break;
293 
294  case INDEXLOCK:
295  checkLock(ADDLOCKEDENTRY, false, 0,
296  subtransactionCmdRef.subtransactionStruct.tableid,
297  0, subtransactionCmdRef.subtransactionStruct.fieldid,
298  &subtransactionCmdRef.fieldVal);
299  break;
300 
301  case INDEXPENDINGLOCK:
302  checkLock(ADDLOCKPENDINGENTRY, false, 0,
303  subtransactionCmdRef.subtransactionStruct.tableid, 0,
304  subtransactionCmdRef.subtransactionStruct.fieldid,
305  &subtransactionCmdRef.fieldVal);
306  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
307  return;
308 // break;
309 
310  case PENDINGTOINDEXLOCK:
312  subtransactionCmdRef.subtransactionStruct.tableid, 0,
313  subtransactionCmdRef.subtransactionStruct.fieldid,
314  &subtransactionCmdRef.fieldVal);
315  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
316  break;
317 
318  case PENDINGTOINDEXNOLOCK: // unique constraint violation
320  subtransactionCmdRef.subtransactionStruct.tableid, 0,
321  subtransactionCmdRef.subtransactionStruct.fieldid,
322  &subtransactionCmdRef.fieldVal);
323  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
324  return;
325 // break;
326 
327  default:
328  fprintf(logfile, "anomaly: %i %s %i\n",
329  subtransactionCmdRef.subtransactionStruct.locktype,
330  __FILE__, __LINE__);
331  }
332 
333  currentCmdState.indexEntries[idxInfo.fieldid] = idxInfo;
335 
337  {
338  return;
339  }
340  }
341  break;
342 
343  default:
344  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
345  }
346 
347  // all replies have been received
350  };
351  stagedRow_s sRow = {};
352  sRow.cmd = INSERT;
353  sRow.locktype = WRITELOCK;
354  sRow.newRow = currentCmdState.row;
357  stagedRows[uur] = sRow;
358 
359  returnNewRow = uur;
360 
362 }
363 
364 // only 1 stage this can be in, so no need to switch on entrypoint
365 void Transaction::continueDeleteRow(int64_t entrypoint)
366 {
367  class MessageSubtransactionCmd &subtransactionCmdRef =
368  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
369 
370  if (subtransactionCmdRef.subtransactionStruct.status != STATUS_OK)
371  {
373  return;
374  }
375  else
376  {
379  }
380 }
381 
382 void Transaction::continueSelectRows(int64_t entrypoint)
383 {
384  class MessageSubtransactionCmd &subtransactionCmdRef =
385  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
386 
387  switch (entrypoint)
388  {
389  case 1:
390  {
391 
392  // add rowid-engineids to vector, decrement currentCmdState.engines
393  // if it's zero, then send messages to engines to see if they're
394  // real rowids
395  size_t numhits = subtransactionCmdRef.indexHits.size();
397  reserve(currentCmdState.rowidsEngineids.size() + numhits);
398 
399  for (size_t n = 0; n < numhits; n++)
400  {
402  push_back(subtransactionCmdRef.indexHits[n]);
403  }
404 
406 
408  {
409  if (currentCmdState.rowidsEngineids.empty()==true)
410  {
411  reenter(APISTATUS_OK); // nothing returned
412  return;
413  }
414 
415  // now walk through the hits creating messages destined for
416  // each engine
417  // SELECTROWS,SUBTRANSACTIONCMDPAYLOAD
418  // tableid, rowids, locktype
419  // for 1 hit, no rigamarole
420  indexEntry_s rowidengineid;
421 
422  if (currentCmdState.rowidsEngineids.size()==1)
423  {
425 
426  class MessageSubtransactionCmd *msg =
427  new class MessageSubtransactionCmd();
430  rowidengineid = currentCmdState.rowidsEngineids[0];
431  msg->rowids.push_back(rowidengineid.rowid);
433  rowidengineid.engineid, (void *)msg);
434  }
435  else // walk through
436  {
437  boost::unordered_map< int64_t, vector<int64_t> > payloads;
439 
440  for (size_t n=0; n < currentCmdState.rowidsEngineids.size(); n++)
441  {
442  rowidengineid = currentCmdState.rowidsEngineids[n];
443  payloads[rowidengineid.engineid].push_back(rowidengineid.rowid);
444  }
445 
446  boost::unordered_map< int64_t, vector<int64_t> >::iterator it;
447 
448  for (it = payloads.begin(); it != payloads.end(); it++)
449  {
451 
452  class MessageSubtransactionCmd *msg =
453  new class MessageSubtransactionCmd();
457  rowidengineid = currentCmdState.rowidsEngineids[0];
458  msg->rowids = it->second;
460  it->first, (void *)msg);
461  }
462  }
463  }
464  }
465  break;
466 
467  case 2:
468  // here's where we receive the returned rows
469  // done receiving when all replies come in and no pendings remain
470  // replies (currentCmdState.engines is the counter)
471  // reply content is returnRows, vector of returnRow:
472  // put each in mapofRows, with appropriate locktype & row.
473  // return to user? rowid,tableid,engineid vector ?: returnselectedrows
474  // probably save space, populate return stuff before all replies received
475 
476  // actually, i think they all have to be ready to be locked before
477  // moving them
478  // into the mapofRows, otherwise rollback would be tricky for the
479  // command itself
480  {
481  // something faster can probably be done for simple equality
482  // selects, since they have 1 returned object, or optimize later
483  returnRow_s rRow = {};
484  uuRecord_s uur = { -1, currentCmdState.tableid,
485  subtransactionCmdRef.transactionStruct.engineinstance
486  };
487  stagedRow_s sRow = {};
488 
489  for (size_t n=0; n < subtransactionCmdRef.returnRows.size(); n++)
490  {
491  rRow = subtransactionCmdRef.returnRows[n];
492  uur.rowid = rRow.rowid;
493 
494  if (currentCmdState.pendingStagedRows.count(uur))
495  {
496  continue; // don't re-lock the same thing, but this is probably
497  // gratuitous. this check needs to happen when promoting
498  // to Transaction::mapofRows
499  }
500 
501  sRow.originalRow = rRow.row;
502  sRow.originalrowid = uur.rowid;
503  sRow.originalengineid =
504  subtransactionCmdRef.transactionStruct.engineinstance;
506  subtransactionCmdRef.transactionStruct.previoussubtransactionid;
507  sRow.cmd = NOCOMMAND;
508 
509  switch (rRow.locktype)
510  {
511  case NOLOCK:
512  sRow.locktype = NOLOCK;
513  break;
514 
515  case READLOCK:
516  sRow.locktype = READLOCK;
517  checkLock(ADDLOCKEDENTRY, true, rRow.rowid,
519  subtransactionCmdRef.transactionStruct.engineinstance,
520  -1, NULL);
521  break;
522 
523  case WRITELOCK:
524  sRow.locktype = WRITELOCK;
525  checkLock(ADDLOCKEDENTRY, true, rRow.rowid,
527  subtransactionCmdRef.transactionStruct.engineinstance,
528  -1, NULL);
529  break;
530 
531  case PENDINGLOCK:
532  sRow.locktype = PENDINGLOCK;
533  checkLock(ADDLOCKPENDINGENTRY, true, rRow.rowid,
535  subtransactionCmdRef.transactionStruct.engineinstance,
536  -1, NULL);
537  break;
538 
539  case PENDINGTOWRITELOCK:
540  sRow.locktype = WRITELOCK;
543  subtransactionCmdRef.transactionStruct.engineinstance,
544  -1, NULL);
545  break;
546 
547  case PENDINGTOREADLOCK:
548  sRow.locktype = READLOCK;
551  subtransactionCmdRef.transactionStruct.engineinstance,
552  -1, NULL);
553  break;
554 
555  case PENDINGTONOLOCK:
556  sRow.locktype = NOLOCK;
559  subtransactionCmdRef.transactionStruct.engineinstance,
560  -1, NULL);
561  break;
562 
563  case NOTFOUNDLOCK:
564  continue;
565 // break;
566 
567  default:
568  fprintf(logfile, "anomaly: %i %s %i\n", rRow.locktype, __FILE__,
569  __LINE__);
570  continue;
571  }
572 
574  }
575 
576  // how to know if this was engineresponse or LOCKPENDING?
577  // locktype is a clue.
578  if (rRow.locktype != PENDINGTOWRITELOCK &&
579  rRow.locktype != PENDINGTOREADLOCK &&
580  rRow.locktype != PENDINGTONOLOCK)
581  {
583  }
584 
586  {
587  // this means all messages have been received
588  // and no PENDING locks
589  // put everything in mapofRows (even NOLOCK), except
590  // for entries already there. also put rowid,tableid,engineid
591  // in return vector "returnselectedrows"
592  returnselectedrows.clear();
594  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
595 
596  for (it = currentCmdState.pendingStagedRows.begin();
597  it != currentCmdState.pendingStagedRows.end(); it++)
598  {
599  if (stagedRows.count(it->first))
600  {
601  continue; // don't re-lock the same thing
602  }
603 
604  stagedRows[it->first] = it->second;
605  returnselectedrows.push_back(uur);
606  }
607 
609  return;
610  }
611  }
612  break;
613 
614  default:
615  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
616  }
617 }
618 
619 void Transaction::continueFetchRows(int64_t entrypoint)
620 {
621  switch (entrypoint)
622  {
623  case 1:
624  {
625 
626  }
627  break;
628 
629  case 2:
630  {
631 
632  }
633  break;
634 
635  default:
636  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
637  }
638 }
639 
640 // TODO
641 void Transaction::continueUnlockRow(int64_t entrypoint)
642 {
643  switch (entrypoint)
644  {
645  case 1:
646  {
647 
648  }
649  break;
650 
651  case 2:
652  {
653 
654  }
655  break;
656 
657  default:
658  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
659  }
660 }
661 
662 // TODO
664 {
665  switch (entrypoint)
666  {
667  case 1:
668  {
669 
670  }
671  break;
672 
673  case 2:
674  {
675 
676  }
677  break;
678 
679  default:
680  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
681  }
682 }
683 
686  int64_t tacmdentrypoint, int64_t engineid,
687  void *data)
688 {
689  class MessageTransaction &msgref = *(class MessageTransaction *)data;
692 
694 
695  if (engineToSubTransactionids.count(engineid))
696  {
698  engineToSubTransactionids[engineid];
699  }
700  else
701  {
703  }
704 
708  msgref.transactionStruct.transaction_enginecmd = enginecmd;
710  msgref.transactionStruct.transaction_tacmdentrypoint = tacmdentrypoint;
712  *((class Message *)data));
713 }
714 
716 {
717  msgrcv = msgrcvarg;
718  class MessageTransaction &msgrcvRef =
719  *((class MessageTransaction *)msgrcv);
720 
722  {
723  printf("%s %i pendingcmdid %li msgrcvRef.transaction_pendingcmdid %i\n",
724  __FILE__, __LINE__, pendingcmdid,
727  return;
728  }
729 
731  {
734  }
735 
736  switch (pendingcmd)
737  {
738  case NOCOMMAND:
739  {
741  }
742  break;
743 
744  case INSERT:
746  break;
747 
748  case UPDATE:
750  break;
751 
752  case DELETE:
754  break;
755 
756  case REPLACE:
758  break;
759 
760  case SELECT:
762  break;
763 
764  case FETCH:
766  break;
767 
768  case UNLOCK:
770  break;
771 
772  case COMMIT:
774  break;
775 
776  case ROLLBACK:
778  break;
779 
782  break;
783 
786  break;
787 
790  break;
791 
794  break;
795 
796  case PRIMITIVE_SQLDELETE:
798  break;
799 
800  case PRIMITIVE_SQLINSERT:
802  break;
803 
804  case PRIMITIVE_SQLUPDATE:
806  break;
807 
810  break;
811 
812  default:
813  fprintf(logfile, "anomaly: %i %s %i\n", pendingcmd, __FILE__, __LINE__);
814  }
815 }
816 
817 void Transaction::select(int64_t tableid, int64_t fieldid, locktype_e locktype,
818  searchParams_s *searchParameters)
819 {
820  searchParams_s &searchParamsRef = *searchParameters;
821 
822  if (pendingcmd != NOCOMMAND)
823  {
825  return;
826  }
827 
829  pendingcmd = SELECT;
831  currentCmdState.fieldid = fieldid;
832  currentCmdState.locktype = locktype;
833 
834  // INDEXSEARCH,SUBTRANSACTIONCMDPAYLOAD
835  // tableid,fieldid,searchParameters
836  // returns indexHits
840 
841  if (searchParamsRef.op == OPERATOR_EQ) // IN should probably be optimized this way
842  // too, eventually
843  {
844  int64_t destengineid=-1;
846  // engine id is the hashed value, then send message to it
847  class MessageSubtransactionCmd *msg =
848  new class MessageSubtransactionCmd();
851  msg->searchParameters = searchParamsRef;
852  fieldtype_e fieldtype = schemaPtr->tables[tableid]->fields[fieldid].type;
853 
854  switch (fieldtype)
855  {
856  case INT:
857  destengineid = getEngineid(searchParamsRef.values[0].value.integer);
858  break;
859 
860  case UINT:
861  destengineid = getEngineid(searchParamsRef.values[0].value.uinteger);
862  break;
863 
864  case BOOL:
865  destengineid = getEngineid(searchParamsRef.values[0].value.boolean);
866  break;
867 
868  case FLOAT:
869  destengineid = getEngineid(searchParamsRef.values[0].value.floating);
870  break;
871 
872  case CHAR:
873  destengineid =
874  getEngineid(searchParamsRef.values[0].value.character);
875  break;
876 
877  case CHARX:
878  destengineid = getEngineid(&searchParamsRef.values[0].str);
879  break;
880 
881  case VARCHAR:
882  destengineid = getEngineid(&searchParamsRef.values[0].str);
883  break;
884 
885  default:
886  fprintf(logfile, "anomaly %i %s %i\n", fieldtype, __FILE__,
887  __LINE__);
888  }
889 
891  destengineid, (void *)msg);
892  }
893  else
894  {
895  // walk through all engines, send message to each, bumping engines
897 
898  for (int n=0; n < currentCmdState.engines; n++)
899  {
900  class MessageSubtransactionCmd *msg =
901  new class MessageSubtransactionCmd();
904  msg->searchParameters = searchParamsRef;
906  n, (void *)msg);
907  }
908  }
909 }
910 
912 {
914  {
915  return;
916  }
917 
918  if (currentCmdState.pendingStagedRows.empty()==true)
919  {
920  return;
921  }
922 
923  int64_t rowid, tableid, engineid;
924  rowOrField_s blankRof = {};
925  rowOrField_s rof;
926  // vector<rowOrField> *rofs;
927 
928  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
929 
930  for (it = currentCmdState.pendingStagedRows.begin();
931  it != currentCmdState.pendingStagedRows.end(); it++)
932  {
933  rowid = it->first.rowid;
934  tableid = it->first.tableid;
935  engineid = it->first.engineid;
936  rof = blankRof;
937 
938  rof.isrow = true;
939  rof.rowid = rowid;
940  rof.tableid = tableid;
941 
942  class MessageCommitRollback *msg = new class MessageCommitRollback();
943  // rofs = new vector<rowOrField>;
944  msg->rofs.push_back(rof);
945 
947  (void *)msg);
948  }
949 
951 }
952 
953 void Transaction::checkLock(deadlockchange_e changetype, bool isrow,
954  int64_t rowid, int64_t tableid, int64_t engineid,
955  int64_t fieldid, fieldValue_s *fieldVal)
956 {
957  switch (changetype)
958  {
959  case ADDLOCKEDENTRY:
960  lockcount++;
961  break;
962 
963  case ADDLOCKPENDINGENTRY:
965  break;
966 
967  case REMOVELOCKEDENTRY:
968  lockcount--;
969  break;
970 
973  break;
974 
977  lockcount++;
978  break;
979 
980  default:
981  fprintf(logfile, "anomaly %i %s %i\n", changetype, __FILE__, __LINE__);
982  }
983 
984  // OK, now, what? I'm either in deadlock and sending another message
985  // not deadlocked, not deadlocked
986  // deadlocked, then not
987  // not deadlocked, then deadlocked
988  if (currentCmdState.ispossibledeadlock==false &&
990  {
991  // move along
992  return;
993  }
994 
996  && lockcount)
997  {
998  // new deadlock!
1000  // send a bunch of messages, in pendingMapofRows, then the input
1001  class MessageDeadlock *msg = new class MessageDeadlock;
1002  class MessageDeadlock &msgref = *msg;
1003  newDeadLockLists_s &nodesRef = msgref.nodes;
1004 
1005  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
1006 
1007  for (it = currentCmdState.pendingStagedRows.begin();
1008  it != currentCmdState.pendingStagedRows.end(); it++)
1009  {
1010  stagedRow_s &sRowRef = it->second;
1011  int64_t this_rowid = it->first.rowid;
1012  int64_t this_tableid = it->first.tableid;
1013  int64_t this_engineid = it->first.engineid;
1014  string deadlockNode;
1015  // free this if there's no message to send (no contents in its sets)
1016 
1017  // row first
1018  DeadlockMgr::makeLockedItem(true, this_rowid, this_tableid,
1019  this_engineid, domainid, 0,
1020  (long double)0, (string *)NULL,
1021  &deadlockNode);
1022 
1023  if (sRowRef.locktype==WRITELOCK || sRowRef.locktype==READLOCK)
1024  {
1025  nodesRef.locked.insert(deadlockNode);
1026  }
1027  else if (sRowRef.locktype==PENDINGLOCK)
1028  {
1029  nodesRef.waiting.insert(deadlockNode);
1030  }
1031 
1032  // indices
1033  boost::unordered_map< int64_t, lockFieldValue_s >::iterator it;
1034 
1035  for (it = sRowRef.uniqueIndices.begin();
1036  it != sRowRef.uniqueIndices.end(); it++)
1037  {
1038  lockFieldValue_s &lockFieldValueRef = it->second;
1039 
1040  if (lockFieldValueRef.locktype==INDEXLOCK)
1041  {
1042  deadlockNode.clear();
1043  long double fieldinput;
1044  memcpy(&fieldinput, &lockFieldValueRef.fieldVal.value,
1045  sizeof(fieldinput));
1046  DeadlockMgr::makeLockedItem(false, 0, this_tableid,
1047  this_engineid, domainid,
1048  it->first, fieldinput,
1049  &lockFieldValueRef.fieldVal.str,
1050  &deadlockNode);
1051  nodesRef.locked.insert(deadlockNode);
1052  }
1053  else if (lockFieldValueRef.locktype==INDEXPENDINGLOCK)
1054  {
1055  deadlockNode.clear();
1056  long double fieldinput;
1057  memcpy(&fieldinput, &lockFieldValueRef.fieldVal.value,
1058  sizeof(fieldinput));
1059  DeadlockMgr::makeLockedItem(false, 0, this_tableid,
1060  this_engineid, domainid,
1061  it->first, fieldinput,
1062  &lockFieldValueRef.fieldVal.str,
1063  &deadlockNode);
1064  nodesRef.waiting.insert(deadlockNode);
1065  }
1066  }
1067  }
1068 
1069  if (nodesRef.locked.empty()==true && nodesRef.waiting.empty()==true)
1070  {
1071  // delete nodes; // nothing to deadlock, but this should be an
1072  // anomaly
1073  delete msg;
1074  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
1075  return;
1076  }
1077 
1082 
1083  // taPtr->mboxes.deadlockMgr.send(msgsnd, true);
1085  return;
1086  }
1087 
1088  if (lockcount && lockpendingcount)
1089  {
1090  // still deadlocked, just send 1 message based on type
1091  class MessageDeadlock *msg = new class MessageDeadlock();
1092  class MessageDeadlock &msgref = *msg;
1093 
1094  // prepare string(s) for submittal
1095  if (isrow==true)
1096  {
1097  DeadlockMgr::makeLockedItem(true, rowid, tableid, engineid, domainid,
1098  fieldid, (long double)0, (string *)NULL,
1099  &msgref.deadlockNode);
1100  }
1101  else
1102  {
1103  long double fieldinput;
1104  memcpy(&fieldinput, &fieldVal->value, sizeof(fieldinput));
1105  DeadlockMgr::makeLockedItem(false, rowid, tableid, engineid,
1106  domainid, fieldid, fieldinput,
1107  &fieldVal->str, &msgref.deadlockNode);
1108  }
1109 
1110  // send message to dmgr
1112  msgref.deadlockStruct.deadlockchange = changetype;
1114 
1116 
1117  return;
1118  }
1119 
1120  if (!lockcount || !lockpendingcount)
1121  {
1122  // deadlock over, send message to dmgr to that effect
1124 
1125  // send message to dmgr
1126  class MessageDeadlock *msg = new class MessageDeadlock;
1127  class MessageDeadlock &msgref = *msg;
1130 
1131  // taPtr->mboxes.deadlockMgr.send(msgsnd, true);
1133  return;
1134  }
1135 }
1136 
1138 {
1140  {
1142  return;
1143  }
1144 
1145  if (stagedRows[currentCmdState.originaluur].locktype != WRITELOCK ||
1147  pendingcmd != NOCOMMAND)
1148  {
1150  return;
1151  }
1152 
1156  class Table &tableRef = *currentCmdState.tablePtr;
1157  // construct new row and put in currentCmdState.newRow
1159 
1160  if (tableRef.unmakerow(&stagedRows[currentCmdState.originaluur].originalRow,
1162  {
1164  return;
1165  }
1166 
1168  reserve(currentCmdState.originalFieldValues.size());
1169 
1170  // create new row from old row & updates and set it in currentCmdState.newRow
1171  for (size_t n=0; n < currentCmdState.originalFieldValues.size(); n++)
1172  {
1174  {
1175  if (currentCmdState.fieldid==(int64_t)n)
1176  {
1178  }
1179  else
1180  {
1182  push_back(currentCmdState.originalFieldValues[n]);
1183  }
1184  }
1185  else if (fieldsToUpdate.count(n))
1186  {
1188  }
1189  else
1190  {
1192  push_back(currentCmdState.originalFieldValues[n]);
1193  }
1194  }
1195 
1196  if (tableRef.makerow(&currentCmdState.newFieldValues,
1197  &currentCmdState.newRow)==false)
1198  {
1200  return;
1201  }
1202 
1204  {
1205  if (currentCmdState.fieldid==0)
1206  {
1208  replace();
1209  return;
1210  }
1211  }
1212  else if (fieldsToUpdate.count(0))
1213  {
1215  replace();
1216  return;
1217  }
1218 
1219  // otherwise, it's an update
1222  continueUpdateRow(1); // somewhere after replaces, so continueReplace forwards
1223  // there...
1224 }
1225 
1227 {
1228  // do like insert first, check null constraints then get new row
1229  for (size_t n=0; n < currentCmdState.newFieldValues.size(); n++)
1230  {
1231  if (checkNullConstraintOK(n)==false)
1232  {
1234  return;
1235  }
1236  }
1237 
1241 
1242  // just send message to engine to put the new row in place
1243  // subtransactionCmd *cmd = new subtransactionCmd();
1244  class MessageSubtransactionCmd *msg = new class MessageSubtransactionCmd();
1245 
1247  msg->row = currentCmdState.newRow;
1248 
1250  currentCmdState.newuur.engineid, (void *)msg);
1251 }
1252 
1253 void Transaction::continueUpdateRow(int64_t entrypoint)
1254 {
1255  switch (entrypoint)
1256  {
1257  case 1:
1258  {
1259  // for update only, send currentCmdState.newRow, rowid,tableid to
1260  // subtransaction subtransactionCmd *cmd = new subtransactionCmd();
1261  class MessageSubtransactionCmd *msg =
1262  new class MessageSubtransactionCmd();
1265  msg->row = currentCmdState.newRow;
1266 
1268  currentCmdState.newuur.engineid, (void *)msg);
1269  }
1270  break;
1271 
1272  case 2:
1273  {
1274  // process updaterow message (status needs to be STATUS_OK)
1275  class MessageSubtransactionCmd &subtransactionCmdRef =
1276  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
1277  int64_t status = subtransactionCmdRef.subtransactionStruct.status;
1278 
1279  if (status != STATUS_OK)
1280  {
1281  fprintf(logfile, "anomaly %li %s %i\n", status, __FILE__, __LINE__);
1283  reenter(status);
1284  }
1285  }
1286 
1287  // break; just fall through if the row was staged ok
1288  case 3: // do unique indices. point them to currentCmdState.newuur.rowid & engineid
1289  {
1291  sRow.cmd = UPDATE;
1292  sRow.newRow = currentCmdState.newRow;
1295  // sRow.originalRow should already be in the stagedRows
1296  // following 2 are probably gratuitous
1299  // sRow.uniqueIndices should be born clear
1300 
1302 
1303  for (size_t n=0; n < currentCmdState.newFieldValues.size(); n++)
1304  {
1305  if (currentCmdState.newFieldValues[n].isnull==true)
1306  {
1307  continue; // can add as many nulls as possible if nulls allowed
1308  }
1309 
1310  if (currentCmdState.tablePtr->fields[n].index.isunique==true)
1311  {
1312  if (currentCmdState.newFieldValues[n].value.floating ==
1313  currentCmdState.originalFieldValues[n].value.floating &&
1314  currentCmdState.newFieldValues[n].isnull ==
1318  {
1319  continue; // no change to field
1320  }
1321 
1323  lockFieldValue_s lockFieldVal = {};
1324  lockFieldVal.locktype = NOLOCK; // no lock yet
1325  lockFieldVal.fieldVal.isnull = false;
1326  lockFieldVal.fieldVal.str =
1328  memcpy(&lockFieldVal.fieldVal.value,
1329  &currentCmdState.newFieldValues[n].value,
1330  sizeof(lockFieldVal.fieldVal.value));
1332  n, &lockFieldVal.fieldVal);
1333  sRow.uniqueIndices[n] = lockFieldVal;
1334 
1335  class MessageSubtransactionCmd *msg =
1336  new class MessageSubtransactionCmd();
1337 
1338  msg->subtransactionStruct.isrow = false;
1339  msg->fieldVal.isnull = lockFieldVal.fieldVal.isnull;
1340  msg->fieldVal.str = lockFieldVal.fieldVal.str;
1341  memcpy(&msg->fieldVal.value, &lockFieldVal.fieldVal.value,
1342  sizeof(lockFieldVal.fieldVal.value));
1345  msg->subtransactionStruct.fieldid = n;
1350  lockFieldVal.engineid, (void *)msg);
1351  }
1352  }
1353 
1355 
1357  {
1358  continueUpdateRow(5); // no need to wait for unique index responses
1359  }
1360  }
1361  break;
1362 
1363  case 4:
1364  {
1365  // get responses from unique index set
1366  class MessageSubtransactionCmd &subtransactionCmdRef =
1367  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
1368  int64_t tableid = subtransactionCmdRef.subtransactionStruct.tableid;
1369  int64_t fieldid = subtransactionCmdRef.subtransactionStruct.fieldid;
1371  fieldVal.isnull = subtransactionCmdRef.fieldVal.isnull;
1372  fieldVal.str = subtransactionCmdRef.fieldVal.str;
1373  memcpy(&fieldVal.value, &subtransactionCmdRef.fieldVal.value,
1374  sizeof(fieldVal.value));
1375 
1376  switch (subtransactionCmdRef.subtransactionStruct.locktype)
1377  {
1378  case NOLOCK: // unique constraint violation, abort command
1379  printf("%s %i APISTATUS_UNIQUECONSTRAINT (NOLOCK)\n", __FILE__,
1380  __LINE__);
1382  return;
1383 // break;
1384 
1385  case INDEXLOCK:
1386  currentCmdState.pendingStagedRows[currentCmdState.originaluur].uniqueIndices[fieldid].locktype = INDEXLOCK;
1387  checkLock(ADDLOCKEDENTRY, false, 0, tableid, 0, fieldid, &fieldVal);
1388  break;
1389 
1390  case INDEXPENDINGLOCK:
1392  checkLock(ADDLOCKPENDINGENTRY, false, 0, tableid, 0,
1393  fieldid, &fieldVal);
1394  return;
1395 // break;
1396 
1397  case PENDINGTOINDEXLOCK:
1398  currentCmdState.pendingStagedRows[currentCmdState.originaluur].uniqueIndices[fieldid].locktype = INDEXLOCK;
1399  checkLock(TRANSITIONPENDINGTOLOCKEDENTRY, false, 0, tableid,
1400  0, fieldid, &fieldVal);
1401  break;
1402 
1403  case PENDINGTOINDEXNOLOCK: // unique constraint violation
1404  printf("%s %i APISTATUS_UNIQUECONSTRAINT (PENDINGTOINDEXNOLOCK)\n",
1405  __FILE__, __LINE__);
1407  return;
1408 // break;
1409 
1410  default:
1411  fprintf(logfile, "anomaly: %i %s %i\n",
1412  subtransactionCmdRef.subtransactionStruct.locktype,
1413  __FILE__, __LINE__);
1414  }
1415 
1416  if (--enginesWithUniqueIndices) // need to wait for more replies
1417  {
1418  return;
1419  }
1420 
1421  // otherwise, we're home free!
1422  }
1423 
1424  // break; fall through intentionally
1425  case 5: // post to stagedRows & reenter ? include row for user response? and
1426  // uur?
1427  {
1430  // no need to return anything explicit, since the new row is in
1431  // stagedRows for the original uur
1433  }
1434  break;
1435 
1436  default:
1437  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
1438  }
1439 }
1440 
1441 void Transaction::continueReplaceRow(int64_t entrypoint)
1442 {
1443  class MessageSubtransactionCmd &subtransactionCmdRef =
1444  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
1445 
1446  switch (entrypoint)
1447  {
1448  case 1:
1449  {
1450  // NEWROW assumed always succeeds
1452  subtransactionCmdRef.subtransactionStruct.rowid;
1453 
1454  // now delete the old row, with forwarder
1455  class MessageSubtransactionCmd *msg =
1456  new class MessageSubtransactionCmd();
1463  currentCmdState.originaluur.engineid, (void *)msg);
1464  }
1465  break;
1466 
1467  case 2:
1468  {
1469  // process deleted
1470  int64_t status = subtransactionCmdRef.subtransactionStruct.status;
1471 
1472  if (status != STATUS_OK)
1473  {
1474  // rollback inserted row, fire & forget, reenter
1475  // rollback: ROLLBACKCMD,COMMITROLLBACKPAYLOAD
1476  // vector of rowOrField
1477  rowOrField_s rof = {};
1478  class MessageCommitRollback *msg = new class MessageCommitRollback();
1479  rof.isrow = true;
1482  msg->rofs.operator [](0) = rof;
1484  currentCmdState.newuur.engineid, (void *)msg);
1485 
1487  return;
1488  }
1489 
1490  // otherwise, I'm now an update & jump to continueUpdateRow(1)
1491  pendingcmd = UPDATE;
1492  continueUpdateRow(3);
1493  }
1494  break;
1495 
1496  default:
1497  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
1498  }
1499 }
1500 
1501 void Transaction::abortCmd(int reentrystatus)
1502 {
1503  // this should be just abort stopping deadlock for this transaction and
1504  // revert of the original uur. then reenter.
1506 
1508  {
1510  // send message to dmgr
1511  class MessageDeadlock *msg = new class MessageDeadlock;
1512  class MessageDeadlock &msgref = *msg;
1515  // taPtr->mboxes.deadlockMgr.send(msgsnd, true);
1517  }
1518 
1519  reenter(reentrystatus);
1520 }
1521 
1523 {
1524  // take care of return message from mirroring operation, then
1525  // walk through map of rows, along with indices
1526  // printf("%s %i continueCommitTransaction(%li)\n", __FILE__, __LINE__, entrypoint);
1527 
1528  switch (entrypoint)
1529  {
1530  case 1:
1531  {
1532  // take care of return message from mirroring operation, then
1533  // walk through map of rows, along with indices
1534  if (--waitfordispatched)
1535  {
1536  return;
1537  }
1538 
1539  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
1540  boost::unordered_map< int64_t, class MessageCommitRollback *> msgs;
1542  rowOrField_s blankRof = {};
1543  rowOrField_s rof;
1544 
1545  for (it = stagedRows.begin(); it != stagedRows.end(); it++)
1546  {
1547  class Table &tableRef = *schemaPtr->tables[it->first.tableid];
1548  stagedRow_s &sRowRef = it->second;
1549  rof = blankRof;
1550  rof.tableid = it->first.tableid;
1551 
1552  switch (sRowRef.cmd)
1553  {
1554  case NOCOMMAND:
1555  {
1556  // subtransaction will just unlock this row
1557  rof.isrow = true;
1558  rof.rowid = it->first.rowid;
1559 
1560  addRof(it->first.engineid, rof, msgs);
1561  }
1562  break;
1563 
1564  case INSERT:
1565  {
1566  rof.isrow = true;
1567  rof.rowid = it->first.rowid;
1568 
1569  addRof(it->first.engineid, rof, msgs);
1570  // index stuff
1571  vector <fieldValue_s> fieldValues;
1572  tableRef.unmakerow(&sRowRef.newRow, &fieldValues);
1573 
1574  for (size_t n=0; n < tableRef.fields.size(); n++)
1575  {
1576  class Field &fieldRef = tableRef.fields[n];
1577  class Index &indexRef = fieldRef.index;
1578 
1579  if (indexRef.indextype==NONE)
1580  {
1581  continue;
1582  }
1583 
1584  rof = blankRof;
1585  rof.isrow = false;
1586  rof.tableid = it->first.tableid;
1587  rof.fieldid = n;
1588 
1589  if (fieldValues[n].isnull==true)
1590  {
1591  rof.isnotaddunique = true;
1592  rof.deleteindexentry = false;
1593  rof.rowid=it->first.rowid;
1594  rof.engineid=it->first.engineid;
1595  rof.fieldVal.isnull=true;
1596  addRof(n % nodeTopology.numpartitions, rof, msgs);
1597 
1598  continue;
1599  }
1600 
1601  rof.fieldVal = fieldValues[n];
1602 
1603  if (indexRef.isunique==true) // commit something already locked
1604  {
1605  rof.isnotaddunique = false;
1606  }
1607  else
1608  {
1609  rof.isnotaddunique = true;
1610  rof.deleteindexentry = false;
1611  rof.engineid = it->first.engineid;
1612  rof.rowid = it->first.rowid;
1613  }
1614 
1615  switch (fieldRef.type)
1616  {
1617  case CHARX:
1618  trimspace(rof.fieldVal.str);
1619  break;
1620 
1621  case VARCHAR:
1622  trimspace(rof.fieldVal.str);
1623 
1624  default:
1625  ;
1626  }
1627 
1628  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1629  rof, msgs);
1630  }
1631  }
1632  break;
1633 
1634  case DELETE:
1635  {
1636  rof.isrow = true;
1637  rof.rowid = it->first.rowid;
1638 
1639  addRof(it->first.engineid, rof, msgs);
1640  // index stuff
1641  vector <fieldValue_s> fieldValues;
1642  tableRef.unmakerow(&sRowRef.originalRow, &fieldValues);
1643 
1644  for (size_t n=0; n < tableRef.fields.size(); n++)
1645  {
1646  class Field &fieldRef = tableRef.fields[n];
1647  class Index &indexRef = fieldRef.index;
1648 
1649  if (indexRef.indextype==NONE)
1650  {
1651  continue;
1652  }
1653 
1654  rof = blankRof;
1655  rof.isrow = false;
1656  rof.tableid = it->first.tableid;
1657  rof.fieldid = n;
1658  rof.isnotaddunique = true;
1659  rof.deleteindexentry = true;
1660  rof.engineid = it->first.engineid;
1661  rof.rowid = it->first.rowid;
1662 
1663  if (fieldValues[n].isnull==true)
1664  {
1665  rof.fieldVal.isnull=true;
1666  addRof(n % nodeTopology.numpartitions, rof, msgs);
1667 
1668  continue;
1669  }
1670 
1671  rof.fieldVal = fieldValues[n];
1672 
1673  switch (fieldRef.type)
1674  {
1675  case CHARX:
1676  trimspace(rof.fieldVal.str);
1677  break;
1678 
1679  case VARCHAR:
1680  trimspace(rof.fieldVal.str);
1681 
1682  default:
1683  ;
1684  }
1685 
1686  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1687  rof, msgs);
1688  }
1689  }
1690  break;
1691 
1692  case UPDATE:
1693  {
1694  if (sRowRef.originalrowid==sRowRef.newrowid &&
1695  sRowRef.originalengineid==sRowRef.newengineid)
1696  {
1697  rof.isrow = true;
1698  rof.rowid = it->first.rowid;
1699  addRof(it->first.engineid, rof, msgs);
1700  // index stuff
1701  vector <fieldValue_s> originalFieldValues;
1702  tableRef.unmakerow(&sRowRef.originalRow,
1703  &originalFieldValues);
1704  vector <fieldValue_s> newFieldValues;
1705  tableRef.unmakerow(&sRowRef.newRow, &newFieldValues);
1706 
1707  for (size_t n=0; n < tableRef.fields.size(); n++)
1708  {
1709  class Field &fieldRef = tableRef.fields[n];
1710  class Index &indexRef = fieldRef.index;
1711 
1712  if (indexRef.indextype==NONE)
1713  {
1714  continue;
1715  }
1716 
1717  if ((originalFieldValues[n].value.floating !=
1718  newFieldValues[n].value.floating) ||
1719  (originalFieldValues[n].isnull !=
1720  newFieldValues[n].isnull) ||
1721  (originalFieldValues[n].str !=
1722  newFieldValues[n].str))
1723  {
1724  // add index entry
1725  rof = blankRof;
1726  rof.isrow = false;
1727  rof.tableid = it->first.tableid;
1728  rof.fieldid = n;
1729  rof.fieldVal = newFieldValues[n];
1730 
1731  if (newFieldValues[n].isnull==true)
1732  {
1733  rof.isnotaddunique = true;
1734  rof.deleteindexentry = false;
1735  rof.rowid=sRowRef.newrowid;
1736  rof.engineid=sRowRef.newengineid;
1737  rof.fieldVal.isnull=true;
1739  msgs);
1740 
1741  }
1742  else
1743  {
1744  if (indexRef.isunique==true) //commit something already locked
1745  {
1746  rof.isnotaddunique = false;
1747  }
1748  else
1749  {
1750  rof.isnotaddunique = true;
1751  rof.deleteindexentry = false;
1752  rof.engineid = sRowRef.newengineid;
1753  rof.rowid = sRowRef.newrowid;
1754  }
1755 
1756  switch (fieldRef.type)
1757  {
1758  case CHARX:
1759  trimspace(rof.fieldVal.str);
1760  break;
1761 
1762  case VARCHAR:
1763  trimspace(rof.fieldVal.str);
1764 
1765  default:
1766  ;
1767  }
1768 
1769  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1770  rof, msgs);
1771  }
1772 
1773  // delete index entry
1774  rof = blankRof;
1775  rof.isrow = false;
1776  rof.tableid = it->first.tableid;
1777  rof.fieldid = n;
1778  rof.isnotaddunique = true;
1779  rof.deleteindexentry = true;
1780  rof.engineid = sRowRef.originalengineid;
1781  rof.rowid = sRowRef.originalrowid;
1782 
1783  if (originalFieldValues[n].isnull==true)
1784  {
1785  rof.fieldVal.isnull=true;
1787  msgs);
1788 
1789  continue;
1790  }
1791 
1792  rof.fieldVal = originalFieldValues[n];
1793 
1794  switch (fieldRef.type)
1795  {
1796  case CHARX:
1797  trimspace(rof.fieldVal.str);
1798  break;
1799 
1800  case VARCHAR:
1801  trimspace(rof.fieldVal.str);
1802 
1803  default:
1804  ;
1805  }
1806 
1807  addRof(getEngineid(&tableRef, n, &rof.fieldVal), rof,
1808  msgs);
1809  }
1810  }
1811  }
1812  else // replace
1813  {
1814  rof.isrow = true;
1815  // commit new row
1816  rof.rowid = sRowRef.newrowid;
1817  addRof(sRowRef.newengineid, rof, msgs);
1818  // commit delete on original row
1819  rof.rowid = it->first.rowid;
1820 
1821 
1822  if (!currentCmdState.replaceEngineMsgs.count(it->first.engineid))
1823  {
1824  currentCmdState.replaceEngineMsgs[it->first.engineid] =
1825  new class MessageCommitRollback();
1826  }
1827 
1828  currentCmdState.replaceEngineMsgs[it->first.engineid]->rofs.push_back(rof);
1829  // index stuff
1830  vector <fieldValue_s> originalFieldValues;
1831  tableRef.unmakerow(&sRowRef.originalRow,
1832  &originalFieldValues);
1833  vector <fieldValue_s> newFieldValues;
1834  tableRef.unmakerow(&sRowRef.newRow, &newFieldValues);
1835 
1836  for (size_t n=0; n < tableRef.fields.size(); n++)
1837  {
1838  class Index &indexRef = tableRef.fields[n].index;
1839 
1840  if (indexRef.indextype==NONE)
1841  {
1842  continue;
1843  }
1844 
1845  if ((originalFieldValues[n].value.floating !=
1846  newFieldValues[n].value.floating) ||
1847  (originalFieldValues[n].isnull !=
1848  newFieldValues[n].isnull) ||
1849  (originalFieldValues[n].str != newFieldValues[n].str))
1850  {
1851  // add index entry
1852  rof = blankRof;
1853  rof.isrow = false;
1854  rof.tableid = it->first.tableid;
1855  rof.fieldid = n;
1856 
1857  if (newFieldValues[n].isnull==true)
1858  {
1859  rof.isnotaddunique = true;
1860  rof.deleteindexentry = false;
1861  rof.rowid=sRowRef.newrowid;
1862  rof.engineid=sRowRef.newengineid;
1863  rof.fieldVal.isnull=true;
1865  msgs);
1866  }
1867  else
1868  {
1869  rof.fieldVal = newFieldValues[n];
1870 
1871  if (indexRef.isunique==true) //commit something already locked
1872  {
1873  rof.isnotaddunique = false;
1874  }
1875  else
1876  {
1877  rof.isnotaddunique = true;
1878  rof.deleteindexentry = false;
1879  rof.engineid = sRowRef.newengineid;
1880  rof.rowid = sRowRef.newrowid;
1881  }
1882 
1883  class Field &fieldRef = tableRef.fields[n];
1884 
1885  switch (fieldRef.type)
1886  {
1887  case CHARX:
1888  trimspace(rof.fieldVal.str);
1889  break;
1890 
1891  case VARCHAR:
1892  trimspace(rof.fieldVal.str);
1893 
1894  default:
1895  ;
1896  }
1897 
1898  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1899  rof, msgs);
1900  }
1901 
1902  // delete index entry
1903  rof = blankRof;
1904  rof.isrow = false;
1905  rof.tableid = it->first.tableid;
1906  rof.fieldid = n;
1907  rof.isnotaddunique = true;
1908  rof.deleteindexentry = true;
1909  rof.engineid = sRowRef.originalengineid;
1910  rof.rowid = sRowRef.originalrowid;
1911 
1912  if (originalFieldValues[n].isnull==true)
1913  {
1914  rof.fieldVal.isnull=true;
1916  msgs);
1917 
1918  continue;
1919  }
1920 
1921  rof.fieldVal = originalFieldValues[n];
1922  class Field &fieldRef = tableRef.fields[n];
1923 
1924  switch (fieldRef.type)
1925  {
1926  case CHARX:
1927  trimspace(rof.fieldVal.str);
1928  break;
1929 
1930  case VARCHAR:
1931  trimspace(rof.fieldVal.str);
1932 
1933  default:
1934  ;
1935  }
1936 
1937  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1938  rof, msgs);
1939  }
1940  else // replace index value, point to new rowid,engineid
1941  {
1942  rof = blankRof;
1943  rof.isrow = false;
1944  rof.tableid = it->first.tableid;
1945  rof.fieldid = n;
1946  rof.fieldVal = originalFieldValues[n];
1947  // need a rowOrField.replaceevalue flag
1948  // and a function in Index::
1949  rof.isreplace = true;
1950  rof.isnotaddunique = true;
1951  rof.rowid = sRowRef.originalrowid;
1952  rof.engineid = sRowRef.originalengineid;
1953  rof.newrowid = sRowRef.newrowid;
1954  rof.newengineid = sRowRef.newengineid;
1955 
1956  if (rof.fieldVal.isnull==true)
1957  {
1959  msgs);
1960  }
1961  else
1962  {
1963  class Field &fieldRef = tableRef.fields[n];
1964 
1965  switch (fieldRef.type)
1966  {
1967  case CHARX:
1968  trimspace(rof.fieldVal.str);
1969  break;
1970 
1971  case VARCHAR:
1972  trimspace(rof.fieldVal.str);
1973 
1974  default:
1975  ;
1976  }
1977 
1978  addRof(getEngineid(&tableRef, n, &rof.fieldVal),
1979  rof, msgs);
1980  }
1981 
1982  }
1983  }
1984  }
1985  }
1986  break;
1987 
1988  default:
1989  fprintf(logfile, "anomaly: %i %s %i\n", sRowRef.cmd, __FILE__,
1990  __LINE__);
1991  }
1992  }
1993 
1994  // send to engines
1995  boost::unordered_map< int64_t, class MessageCommitRollback *>::iterator
1996  msgsIt;
1997  currentCmdState.engines = msgs.size();
1998 
1999  if (!currentCmdState.engines)
2000  {
2001  // nothing to do
2002  // taPtr->Transactions.erase(transactionid);
2003  // reenter(APISTATUS_OK);
2005  return;
2006  }
2007 
2008  for (msgsIt = msgs.begin(); msgsIt != msgs.end(); msgsIt++)
2009  {
2010  sendTransaction(COMMITCMD, PAYLOADCOMMITROLLBACK, 2, msgsIt->first,
2011  (void *)msgsIt->second);
2012  }
2013  }
2014  break;
2015 
2016  case 2: // take responses, just count them down. if replace deletes, do
2017  // commit2
2018  {
2019  if (!(--currentCmdState.engines))
2020  {
2021  if (currentCmdState.replaceEngineMsgs.empty()==false)
2022  {
2025  boost::unordered_map<int64_t,
2026  class MessageCommitRollback *>::iterator it;
2027 
2028  for (it = currentCmdState.replaceEngineMsgs.begin();
2029  it != currentCmdState.replaceEngineMsgs.end(); it++)
2030  {
2032  it->first, (void *)it->second);
2033  }
2034  }
2035  else
2036  {
2038  }
2039  }
2040  }
2041  break;
2042 
2043  case 3: // processing replacedelete engine responses
2044  if (--currentCmdState.engines)
2045  {
2046  return;
2047  }
2048 
2049  // break; pass through to finish commit
2050  case 4: // gotta end the subtransactions TOPIC_ENDSUBTRANSACTION
2051  {
2052  boost::unordered_map<int64_t, int64_t>::iterator it;
2053  class MessageSubtransactionCmd msg;
2056 
2057  for (it = engineToSubTransactionids.begin();
2058  it != engineToSubTransactionids.end(); it++)
2059  {
2060  if (it->first > taPtr->myTopology.numpartitions || it->first < 0)
2061  {
2062  printf("%s %i anomaly %li %i\n", __FILE__, __LINE__, it->first,
2065  return;
2066  }
2067 
2068  msg.transactionStruct.subtransactionid = it->second;
2069  class MessageSubtransactionCmd *nmsg =
2070  new class MessageSubtransactionCmd;
2071  *nmsg=msg;
2073  *nmsg);
2074  }
2075 
2077  return;
2078  }
2079  break;
2080 
2081  default:
2082  fprintf(logfile, "anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
2083  }
2084 }
2085 
2087 {
2088  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
2089  rowOrField_s blankRof = {};
2090  rowOrField_s rof;
2091 
2092  for (it = stagedRows.begin(); it != stagedRows.end(); it++)
2093  {
2094  stagedRow_s &sRowRef = it->second;
2095  rof = blankRof;
2096  rof.isrow = true;
2097  rof.tableid = it->first.tableid;
2098 
2099  if (sRowRef.cmd==UPDATE && ((it->first.rowid != sRowRef.newrowid) ||
2100  (it->first.engineid != sRowRef.newengineid)))
2101  {
2102  // send rollback to new row if it's a replacement
2103  rof.rowid = sRowRef.newrowid;
2104 
2105  class MessageCommitRollback *msg = new class MessageCommitRollback();
2106  msg->rofs.push_back(rof);
2107 
2109  sRowRef.newengineid, (void *)msg);
2110  }
2111 
2112  rof.rowid = it->first.rowid;
2113  // rofs = new vector<rowOrField>;
2114  class MessageCommitRollback *msg = new class MessageCommitRollback();
2115  msg->rofs.push_back(rof);
2117  it->first.engineid, (void *)msg);
2118 
2119  // now for indices (tableid already set above)
2120  rof.isrow = false;
2121  rof.isnotaddunique = false;
2122  rof.isreplace = false;
2123 
2124  boost::unordered_map< int64_t, lockFieldValue_s >::iterator itIndices;
2125 
2126  for (itIndices = it->second.uniqueIndices.begin();
2127  itIndices != it->second.uniqueIndices.end(); itIndices++)
2128  {
2129  rof.fieldid = itIndices->first;
2130  rof.fieldVal = itIndices->second.fieldVal;
2131 
2132  class MessageCommitRollback *msg = new class MessageCommitRollback();
2133  msg->rofs.push_back(rof);
2135  itIndices->second.engineid, (void *)msg);
2136  }
2137  }
2138 
2139  // tell the engines to kill their subtransactions
2140  boost::unordered_map<int64_t, int64_t>::iterator itEngines;
2141  class MessageSubtransactionCmd msg;
2144 
2145  for (itEngines = engineToSubTransactionids.begin();
2146  itEngines != engineToSubTransactionids.end(); itEngines++)
2147  {
2148  msg.transactionStruct.subtransactionid = itEngines->second;
2149  class MessageSubtransactionCmd *nmsg =
2150  new class MessageSubtransactionCmd;
2151  *nmsg = msg;
2152 
2153  taPtr->mboxes.toPartition(taPtr->myIdentity.address, itEngines->first,
2154  *nmsg);
2155  }
2156 
2158  return;
2159 }
2160 
2161 // cmd is either ROLLBACKCMD or REVERTCMD to either rollback or revert
2163 {
2164  if (!stagedRows.count(uur))
2165  {
2166  return;
2167  }
2168 
2169  stagedRow_s &sRowRef = stagedRows[uur];
2170  rowOrField_s rof = {};
2171 
2172  rof.isrow = true;
2173  rof.tableid = uur.tableid;
2174 
2175  if (sRowRef.cmd==UPDATE && ((uur.rowid != sRowRef.newrowid) ||
2176  (uur.engineid != sRowRef.newengineid)))
2177  {
2178  rof.rowid = sRowRef.newrowid;
2179 
2180  class MessageCommitRollback *msg = new class MessageCommitRollback();
2181  msg->rofs.push_back(rof);
2182 
2184  sRowRef.newengineid, (void *)msg);
2185  }
2186 
2187  rof.rowid = uur.rowid;
2188  class MessageCommitRollback *msg = new class MessageCommitRollback();
2189  msg->rofs.push_back(rof);
2191  uur.engineid, (void *)msg);
2192 
2193  // indices
2194  rof.isrow = false;
2195  rof.isnotaddunique = false;
2196  rof.isreplace = false;
2197  boost::unordered_map< int64_t, lockFieldValue_s >::iterator it;
2198 
2199  for (it = sRowRef.uniqueIndices.begin(); it != sRowRef.uniqueIndices.end();
2200  it++)
2201  {
2202  rof.fieldid = it->first;
2203  rof.fieldVal = it->second.fieldVal;
2204 
2205  class MessageCommitRollback *msg = new class MessageCommitRollback();
2206  msg->rofs.push_back(rof);
2208  it->second.engineid, (void *)msg);
2209  }
2210 }
2211 
2212 void Transaction::reenter(int64_t res)
2213 {
2214  if (reentryObject != NULL)
2215  {
2216  resultCode = res;
2218  pendingcmdid = 0;
2220  }
2221  else
2222  {
2223  delete this;
2224  }
2225 }
2226 
2228 {
2229  currentCmdState.tableid = -1;
2230  currentCmdState.tablePtr = NULL;
2231  currentCmdState.indexEntries.clear();
2234  currentCmdState.rowid = 0;
2237  currentCmdState.row.clear();
2238  currentCmdState.fieldid = -1;
2239  currentCmdState.isunique = false;
2241  currentCmdState.rowPtr = NULL;
2243  currentCmdState.fieldVal.str.clear();
2244  memset(&currentCmdState.fieldVal.value, 0,
2245  sizeof(currentCmdState.fieldVal.value));
2247 }
2248 
2249 int64_t Transaction::getEngineid(int64_t input)
2250 {
2251  return SpookyHash::Hash64((void *) &input, sizeof(input), 0) %
2253 }
2254 
2255 int64_t Transaction::getEngineid(uint64_t input)
2256 {
2257  return SpookyHash::Hash64((void *) &input, sizeof(input), 0) %
2259 }
2260 
2261 int64_t Transaction::getEngineid(bool input)
2262 {
2263  return SpookyHash::Hash64((void *) &input, sizeof(input), 0) %
2265 }
2266 
2267 int64_t Transaction::getEngineid(long double input)
2268 {
2269  return SpookyHash::Hash64((void *) &input, sizeof(input), 0) %
2271 }
2272 
2273 int64_t Transaction::getEngineid(char input)
2274 {
2275  return SpookyHash::Hash64((void *) &input, sizeof(input), 0) %
2277 }
2278 
2279 int64_t Transaction::getEngineid(string *input)
2280 {
2281  trimspace(*input);
2282  return SpookyHash::Hash64((void *) input->c_str(), input->length(), 0) %
2284 }
2285 
2287 {
2288  printf("Transaction bad message stub %s %i\n", __FILE__, __LINE__); // stub
2289 }
2290 
2291 // for ApiInterface::insert()
2293 {
2294  fieldValue_s fieldVal = {};
2295  fieldVal.isnull = true;
2296  fieldValues.push_back(fieldVal);
2297 }
2298 
2300 {
2301  fieldValue_s fieldVal = {};
2302  fieldVal.value.integer = val;
2303  fieldValues.push_back(fieldVal);
2304 }
2305 
2306 void Transaction::addFieldToRow(uint64_t val)
2307 {
2308  fieldValue_s fieldVal = {};
2309  fieldVal.value.uinteger = val;
2310  fieldValues.push_back(fieldVal);
2311 }
2312 
2314 {
2315  fieldValue_s fieldVal = {};
2316  fieldVal.value.boolean = val;
2317  fieldValues.push_back(fieldVal);
2318 }
2319 
2320 void Transaction::addFieldToRow(long double val)
2321 {
2322  fieldValue_s fieldVal = {};
2323  fieldVal.value.floating = val;
2324  fieldValues.push_back(fieldVal);
2325 }
2326 
2328 {
2329  fieldValue_s fieldVal = {};
2330  fieldVal.value.character = val;
2331  fieldValues.push_back(fieldVal);
2332 }
2333 
2335 {
2336  fieldValue_s fieldVal = {};
2337  fieldVal.str = val;
2338  fieldValues.push_back(fieldVal);
2339 }
2340 
2342 {
2344  pendingcmdid = 0;
2346 }
2347 
2348 // returns true if field passes null constraint check, false if fails
2349 // constraint check
2351 {
2352  if (currentCmdState.tablePtr->fields[fieldnum].indextype == UNIQUENOTNULL ||
2353  currentCmdState.tablePtr->fields[fieldnum].indextype ==
2354  NONUNIQUENOTNULL ||
2355  currentCmdState.tablePtr->fields[fieldnum].indextype ==UNORDEREDNOTNULL)
2356  {
2357  if (fieldValues[fieldnum].isnull==true)
2358  {
2359  return false;
2360  }
2361  }
2362 
2363  return true;
2364 }
2365 
2366 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, int64_t input)
2367 {
2368  if (isnull==true)
2369  {
2370  val->isnull = true;
2371  }
2372  else
2373  {
2374  val->value.integer = input;
2375  }
2376 }
2377 
2378 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, uint64_t input)
2379 {
2380  if (isnull==true)
2381  {
2382  val->isnull = true;
2383  }
2384  else
2385  {
2386  val->value.uinteger = input;
2387  }
2388 }
2389 
2390 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, bool input)
2391 {
2392  if (isnull==true)
2393  {
2394  val->isnull = true;
2395  }
2396  else
2397  {
2398  val->value.boolean = input;
2399  }
2400 }
2401 
2402 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, long double input)
2403 {
2404  if (isnull==true)
2405  {
2406  val->isnull = true;
2407  }
2408  else
2409  {
2410  val->value.floating = input;
2411  }
2412 }
2413 
2414 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, char input)
2415 {
2416  if (isnull==true)
2417  {
2418  val->isnull = true;
2419  }
2420  else
2421  {
2422  val->value.character = input;
2423  }
2424 }
2425 
2426 void Transaction::makeFieldValue(fieldValue_s *val, bool isnull, string input)
2427 {
2428  if (isnull==true)
2429  {
2430  val->isnull = true;
2431  }
2432  else
2433  {
2434  val->str = input;
2435  }
2436 }
2437 
2439 {
2440  revertback(uur, ROLLBACKCMD);
2442 }
2443 
2445 {
2446  revertback(uur, REVERTCMD);
2447 }
2448 
2449 void Transaction::addRof(int64_t engineid, rowOrField_s &rof,
2450  boost::unordered_map< int64_t,
2451  class MessageCommitRollback *> &msgs)
2452 {
2453  if (!msgs.count(engineid))
2454  {
2455  msgs[engineid] = new class MessageCommitRollback();
2456  }
2457 
2458  msgs[engineid]->rofs.push_back(rof);
2459 }
2460 
2462 {
2463  return ++nextpendingcmdid;
2464 }
2465 
2467 {
2468  class MessageDispatch *msg = new class MessageDispatch;
2469 
2473  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
2474 
2475  for (it = stagedRows.begin(); it != stagedRows.end(); it++)
2476  {
2477  const uuRecord_s &uurRef = it->first;
2478  stagedRow_s &srowRef = it->second;
2480 
2481  if (srowRef.cmd==INSERT || srowRef.cmd==UPDATE || srowRef.cmd==DELETE)
2482  {
2483  r.primitive = srowRef.cmd;
2484  r.tableid = uurRef.tableid;
2486 
2487  if (srowRef.cmd==INSERT || srowRef.cmd == UPDATE)
2488  {
2489  r.row = srowRef.newRow;
2490  r.rowid = srowRef.newrowid;
2491 
2492  if (srowRef.cmd==UPDATE)
2493  {
2494  r.oldrow = srowRef.originalRow;
2495  }
2496  }
2497  else // DELETE
2498  {
2499  r.row = srowRef.originalRow;
2500  r.rowid = srowRef.originalrowid;
2501  }
2502 
2503  msg->records[uurRef.engineid].push_back(r);
2504  }
2505  }
2506 
2507  if (!msg->records.size())
2508  {
2509  delete msg;
2510  return NULL;
2511  }
2512 
2513  return msg;
2514 }
2515 
2516 void Transaction::sqlPredicate(class Statement *statement,
2517  operatortypes_e op, int64_t tableid,
2518  string &leftoperand, string &rightoperand,
2519  locktype_e locktype,
2520  vector<fieldValue_s> &inValues,
2521  void *continuationData,
2522  boost::unordered_map<uuRecord_s, returnRow_s>
2523  &results)
2524 {
2526  {
2527  0
2528  };
2529  sqlcmdstate.statement = statement;
2530  sqlcmdstate.results = &results;
2531  sqlcmdstate.locktype = locktype;
2533  sqlcmdstate.continuationData = continuationData;
2534 
2535  if (pendingcmd != NOCOMMAND)
2536  {
2538  return;
2539  }
2540 
2543 
2544  string *fieldidoperand;
2545 
2546  if (op==OPERATOR_ISNULL || op==OPERATOR_ISNOTNULL)
2547  {
2548  fieldidoperand=&rightoperand;
2549  }
2550  else
2551  {
2552  fieldidoperand=&leftoperand;
2553  }
2554 
2555  string &fieldidoperandRef=*fieldidoperand;
2556 
2557  if (fieldidoperandRef[0] != OPERAND_FIELDID)
2558  {
2559  printf("%s %i operand is not fieldid, it is %c\n", __FILE__, __LINE__,
2560  fieldidoperandRef[0]);
2561  if (fieldidoperandRef[0]==OPERAND_IDENTIFIER)
2562  {
2563  printf("%s %i identifier: %s\n", __FILE__, __LINE__,
2564  fieldidoperandRef.substr(1, string::npos).c_str());
2565  }
2566 
2567  return;
2568  }
2569 
2570  int64_t fieldid;
2571  memcpy(&fieldid, &fieldidoperandRef[1], sizeof(fieldid));
2572  class Field &fieldRef = schemaPtr->tables[tableid]->fields[fieldid];
2573 
2574  searchParams_s searchParams = {};
2575 
2576  switch (op)
2577  {
2578  case OPERATOR_BETWEEN:
2579  {
2580  searchParams.values.resize(2, fieldValue_s());
2581 
2582  switch (rightoperand[0])
2583  {
2584  case OPERAND_STRING:
2585  {
2586  size_t len;
2587  memcpy(&len, &rightoperand[1], sizeof(len));
2588 
2589  if (fieldRef.type==CHAR)
2590  {
2591  searchParams.values[0].value.character =
2592  rightoperand[1+sizeof(int64_t)];
2593  searchParams.values[1].value.character =
2594  rightoperand[1+sizeof(int64_t)+len];
2595  }
2596  else
2597  {
2598  searchParams.values[0].str.resize(len, (char)0);
2599  searchParams.values[1].str.resize(rightoperand.size()-
2600  (1+sizeof(len)+len), (char)0);
2601  memcpy(&searchParams.values[0].str[0],
2602  &rightoperand[1+sizeof(len)], len);
2603  memcpy(&searchParams.values[1].str[0],
2604  &rightoperand[1+sizeof(len)+len],
2605  searchParams.values[1].str.size());
2606  }
2607  }
2608  break;
2609 
2610  case OPERAND_INTEGER:
2611  memcpy(&searchParams.values[0].value.integer,
2612  &rightoperand[1], sizeof(int64_t));
2613  memcpy(&searchParams.values[1].value.integer,
2614  &rightoperand[1+sizeof(int64_t)], sizeof(int64_t));
2615  break;
2616 
2617  case OPERAND_FLOAT:
2618  memcpy(&searchParams.values[0].value.floating,
2619  &rightoperand[1], sizeof(long double));
2620  memcpy(&searchParams.values[1].value.floating,
2621  &rightoperand[1+sizeof(long double)], sizeof(long double));
2622  break;
2623 
2624  default:
2625  printf("%s %i operand type %c not supported on rhs of predicate.\n",
2626  __FILE__, __LINE__, rightoperand[0]);
2627  return;
2628  }
2629  }
2630  break;
2631 
2632  case OPERATOR_NOTBETWEEN:
2633  {
2634  searchParams.values.resize(2, fieldValue_s());
2635 
2636  switch (rightoperand[0])
2637  {
2638  case OPERAND_STRING:
2639  {
2640  size_t len;
2641  memcpy(&len, &rightoperand[1], sizeof(len));
2642 
2643  if (fieldRef.type==CHAR)
2644  {
2645  searchParams.values[0].value.character =
2646  rightoperand[1+sizeof(int64_t)];
2647  searchParams.values[1].value.character =
2648  rightoperand[1+sizeof(int64_t)+len];
2649  }
2650  else
2651  {
2652  searchParams.values[0].str =
2653  rightoperand.substr(1+sizeof(len), len);
2654  searchParams.values[1].str =
2655  rightoperand.substr(1+sizeof(len)+len, string::npos);
2656  }
2657  }
2658  break;
2659 
2660  case OPERAND_INTEGER:
2661  memcpy(&searchParams.values[0].value.integer,
2662  &rightoperand[1], sizeof(int64_t));
2663  memcpy(&searchParams.values[1].value.integer,
2664  &rightoperand[1+sizeof(int64_t)], sizeof(int64_t));
2665  break;
2666 
2667  case OPERAND_FLOAT:
2668  memcpy(&searchParams.values[0].value.floating,
2669  &rightoperand[1], sizeof(long double));
2670  memcpy(&searchParams.values[1].value.floating,
2671  &rightoperand[1+sizeof(long double)], sizeof(long double));
2672  break;
2673 
2674  default:
2675  printf("%s %i operand type %c not supported on rhs of predicate.\n",
2676  __FILE__, __LINE__, rightoperand[0]);
2677  return;
2678  }
2679  }
2680  break;
2681 
2682  case OPERATOR_IN:
2683  {
2684  searchParams.values = inValues;
2685  }
2686  break;
2687 
2688  case OPERATOR_NOTIN:
2689  {
2690  searchParams.values = inValues;
2691  }
2692  break;
2693 
2694  case OPERATOR_ISNULL:
2695  // unary operator, don't do anything, but don't do default either
2696  break;
2697 
2698  case OPERATOR_ISNOTNULL:
2699  // unary operator, don't do anything, but don't do default either
2700  break;
2701 
2702  default:
2703  {
2704  searchParams.values.push_back(fieldValue_s {});
2705 
2706  switch (rightoperand[0])
2707  {
2708  case OPERAND_STRING:
2709  if (fieldRef.type==CHAR)
2710  {
2711  searchParams.values[0].value.character = rightoperand[1];
2712  }
2713  else
2714  {
2715  searchParams.values[0].str =
2716  rightoperand.substr(1, string::npos);
2717  }
2718 
2719  break;
2720 
2721  case OPERAND_INTEGER:
2722  memcpy(&searchParams.values[0].value.integer, &rightoperand[1],
2723  sizeof(int64_t));
2724  break;
2725 
2726  case OPERAND_BOOLEAN:
2727  if (rightoperand[1]=='t')
2728  {
2729  searchParams.values[0].value.boolean=true;
2730  }
2731  else
2732  {
2733  searchParams.values[0].value.boolean=false;
2734  }
2735 
2736  break;
2737 
2738  case OPERAND_FLOAT:
2739  memcpy(&searchParams.values[0].value.floating, &rightoperand[1],
2740  sizeof(long double));
2741  break;
2742 
2743  default:
2744  printf("%s %i operand type %c not supported on rhs of predicate.\n",
2745  __FILE__, __LINE__, rightoperand[0]);
2746  return;
2747  }
2748  }
2749  }
2750 
2751  if (op==OPERATOR_EQ)
2752  {
2753  // send to only single engine based on hashval
2754  fieldtype_e fieldtype = schemaPtr->tables[tableid]->fields[fieldid].type;
2755  volatile int64_t destengineid = -1;
2757 
2758  switch (fieldtype)
2759  {
2760  case INT:
2761  destengineid =
2762  getEngineid(searchParams.values[0].value.integer);
2763  break;
2764 
2765  case UINT:
2766  destengineid =
2767  getEngineid(searchParams.values[0].value.uinteger);
2768  break;
2769 
2770  case BOOL:
2771  destengineid =
2772  getEngineid(searchParams.values[0].value.boolean);
2773  break;
2774 
2775  case FLOAT:
2776  destengineid =
2777  getEngineid(searchParams.values[0].value.floating);
2778  break;
2779 
2780  case CHAR:
2781  destengineid =
2782  getEngineid(searchParams.values[0].value.character);
2783  break;
2784 
2785  case CHARX:
2786  destengineid =
2787  getEngineid(&searchParams.values[0].str);
2788  break;
2789 
2790  case VARCHAR:
2791  destengineid =
2792  getEngineid(&searchParams.values[0].str);
2793  break;
2794 
2795  default:
2796  printf("%s %i anomaly %i\n", __FILE__, __LINE__, fieldtype);
2797  return;
2798  }
2799 
2800  if (fieldid==0 &&
2801  schemaPtr->tables[tableid]->fields[fieldid].index.isunique == true)
2802  {
2803  class MessageSubtransactionCmd *msg =
2804  new class MessageSubtransactionCmd();
2806  msg->subtransactionStruct.fieldid = fieldid;
2807  msg->subtransactionStruct.locktype = locktype;
2808  searchParams.op = op;
2809  msg->searchParameters = searchParams;
2811  destengineid, msg);
2812  }
2813  else
2814  {
2815  class MessageSubtransactionCmd *msg =
2816  new class MessageSubtransactionCmd();
2818  msg->subtransactionStruct.fieldid = fieldid;
2819  searchParams.op = op;
2820  msg->searchParameters = searchParams;
2822  destengineid, msg);
2823  }
2824  }
2825  else
2826  {
2827  class MessageSubtransactionCmd msg;
2829  msg.subtransactionStruct.fieldid = fieldid;
2830  searchParams.op = op;
2831  msg.searchParameters = searchParams;
2832 
2834  {
2836  class MessageSubtransactionCmd *nmsg =
2837  new class MessageSubtransactionCmd;
2838  *nmsg = msg;
2840  fieldid % nodeTopology.numpartitions, nmsg);
2841  }
2842  else
2843  {
2845 
2846  for (int n=0; n < sqlcmdstate.eventwaitcount; n++)
2847  {
2848  class MessageSubtransactionCmd *nmsg =
2849  new class MessageSubtransactionCmd;
2850  *nmsg = msg;
2852  n, nmsg);
2853  }
2854  }
2855  }
2856 }
2857 
2858 void Transaction::continueSqlPredicate(int64_t entrypoint)
2859 {
2860  class MessageSubtransactionCmd &subtransactionCmdRef =
2861  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
2862 
2863  switch (entrypoint)
2864  {
2865  case 1:
2866  {
2868  subtransactionCmdRef.indexHits.begin(),
2869  subtransactionCmdRef.indexHits.end());
2870 
2871  if (--sqlcmdstate.eventwaitcount == 0)
2872  {
2873  if (sqlcmdstate.indexHits.empty()==true)
2874  {
2875  // no rows returned
2877  pendingcmdid = 0;
2878 
2880  {
2882  }
2883  else
2884  {
2885  // SQLSELECTALL
2887  }
2888 
2889  return;
2890  }
2891 
2892  if (sqlcmdstate.indexHits.size()==1)
2893  {
2894  // send just 1 message to retrieve 1 row
2896  class MessageSubtransactionCmd *msg =
2897  new class MessageSubtransactionCmd();
2901  msg->rowids.push_back(hit.rowid);
2903  hit.engineid, msg);
2904  }
2905  else
2906  {
2907  /* map of engineids to vectors of rowids */
2908  boost::unordered_map< int64_t, vector<int64_t> > payloads;
2909 
2910  for (size_t n=0; n < sqlcmdstate.indexHits.size(); n++)
2911  {
2913  payloads[hit.engineid].push_back(hit.rowid);
2914  }
2915 
2916  sqlcmdstate.eventwaitcount = payloads.size();
2917  boost::unordered_map< int64_t, vector<int64_t> >::iterator it;
2918 
2919  for (it = payloads.begin(); it != payloads.end(); it++)
2920  {
2921  class MessageSubtransactionCmd *msg =
2922  new class MessageSubtransactionCmd();
2925  msg->rowids = it->second;
2927  it->first, (void *)msg);
2928  }
2929  }
2930  }
2931  }
2932  break;
2933 
2934  case 2:
2935  {
2936  boost::unordered_map<uuRecord_s, returnRow_s> &resultsRef =
2938 
2939  uuRecord_s uur = {-1, sqlcmdstate.tableid,
2940  subtransactionCmdRef.transactionStruct.engineinstance
2941  };
2942  bool islockchange = false;
2943 
2944  for (size_t n=0; n < subtransactionCmdRef.returnRows.size(); n++)
2945  {
2946  returnRow_s &returnrowRef = subtransactionCmdRef.returnRows[n];
2947  uur.rowid = returnrowRef.rowid;
2948 
2949  switch (returnrowRef.locktype)
2950  {
2951  case NOLOCK:
2952  break;
2953 
2954  case READLOCK:
2955  break;
2956 
2957  case WRITELOCK:
2958  break;
2959 
2960  case PENDINGLOCK:
2961  // abort if lock pending for now, but make backlog (6/26/2013)
2963  return;
2964 // break;
2965 
2966  case PENDINGTOWRITELOCK:
2967  // abort if lock pending for now, but make backlog (6/26/2013)
2969  return;
2970 // break;
2971 
2972  case PENDINGTOREADLOCK:
2973  // abort if lock pending for now, but make backlog (6/26/2013)
2975  return;
2976 // break;
2977 
2978  case PENDINGTONOLOCK:
2979  // abort if lock pending for now, but make backlog (6/26/2013)
2981  return;
2982 // break;
2983 
2984  case NOTFOUNDLOCK:
2985  continue;
2986 // break;
2987 
2988  default:
2989  fprintf(logfile, "anomaly: %i %s %i\n", returnrowRef.locktype,
2990  __FILE__, __LINE__);
2991  continue;
2992  }
2993 
2994  resultsRef[uur] = returnrowRef;
2995  }
2996 
2997  if (islockchange==false)
2998  {
3000  }
3001 
3003  {
3004  // re-enter, the statement is finished
3005  switch (pendingcmd)
3006  {
3009  pendingcmdid = 0;
3011  break;
3012 
3015  pendingcmdid = 0;
3017  break;
3018 
3021  pendingcmdid = 0;
3023  break;
3024 
3027  pendingcmdid = 0;
3029  break;
3030 
3031  default:
3032  printf("%s %i anomaly %i\n", __FILE__, __LINE__, pendingcmd);
3033  }
3034  }
3035  }
3036  break;
3037 
3038  default:
3039  printf("%s %i anomaly %li\n", __FILE__, __LINE__, entrypoint);
3040  return;
3041  }
3042 }
3043 
3044 void Transaction::sqlSelectAll(class Statement *statement, int64_t tableid,
3045  locktype_e locktype,
3046  pendingprimitive_e pendingprimitive,
3047  boost::unordered_map<uuRecord_s,
3048  returnRow_s> &results)
3049 {
3051  {
3052  0
3053  };
3054  sqlcmdstate.statement = statement;
3055  sqlcmdstate.results = &results;
3056  sqlcmdstate.locktype = locktype;
3058 
3059  if (pendingcmd != NOCOMMAND)
3060  {
3062  return;
3063  }
3064 
3066  pendingcmd = pendingprimitive;
3067 
3068  class MessageSubtransactionCmd msg;
3070  msg.subtransactionStruct.fieldid = 0;
3071  msg.subtransactionStruct.locktype = locktype;
3073 
3075 
3076  for (int64_t n=0; n < sqlcmdstate.eventwaitcount; n++)
3077  {
3078  class MessageSubtransactionCmd *nmsg =
3079  new class MessageSubtransactionCmd;
3080  *nmsg = msg;
3082  }
3083 }
3084 
3085 void Transaction::continueSqlDelete(int64_t entrypoint)
3086 {
3087  class MessageSubtransactionCmd &msgrcvRef =
3088  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
3089 
3091  {
3093  return;
3094  }
3095 
3097 
3098  if (msgrcvRef.subtransactionStruct.status != STATUS_OK)
3099  {
3101  return;
3102  }
3103 
3105  {
3107  }
3108 }
3109 
3110 void Transaction::continueSqlInsert(int64_t entrypoint)
3111 {
3112  class MessageSubtransactionCmd &msgrcvRef =
3113  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
3114 
3115  switch (entrypoint)
3116  {
3117  case 1:
3118  {
3120  {
3121  msgrcvRef.subtransactionStruct.rowid,
3124  };
3125  stagedRow_s newStagedRow = {};
3126  newStagedRow.newRow =
3128  newStagedRow.newrowid =
3130  newStagedRow.locktype = WRITELOCK;
3131  newStagedRow.cmd=INSERT;
3132 
3133  class Table &tableRef =
3135 
3136  for (size_t n=0; n < tableRef.fields.size(); n++)
3137  {
3138  // nonunique indices are handled in commit
3139  if (tableRef.fields[n].index.isunique != true ||
3141  {
3142  continue;
3143  }
3144 
3145  lockFieldValue_s lockFieldValue = {};
3146  lockFieldValue.engineid = getengine(tableRef.fields[n].type,
3148  // locktype could potentially change
3149  lockFieldValue.locktype = INDEXLOCK;
3150  lockFieldValue.fieldVal =
3152  newStagedRow.uniqueIndices[n]=lockFieldValue;
3153 
3155  class MessageSubtransactionCmd *msg =
3156  new class MessageSubtransactionCmd();
3163  msg->subtransactionStruct.fieldid = n;
3164  msg->fieldVal =
3166 
3168  lockFieldValue.engineid, msg);
3169  }
3170 
3172  newStagedRow;
3173 
3175  {
3176  // come back for responses
3177  return;
3178  }
3179  }
3180  break;
3181 
3182  case 2:
3183  {
3184  switch (msgrcvRef.subtransactionStruct.locktype)
3185  {
3186  case NOLOCK: // constraint violation, abort command
3188  return;
3189 // break;
3190 
3191  case INDEXLOCK:
3192  break;
3193 
3194  case INDEXPENDINGLOCK:
3196  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
3197  return;
3198 // break;
3199 
3200  case PENDINGTOINDEXLOCK:
3202  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
3203  return;
3204 // break;
3205 
3206  case PENDINGTOINDEXNOLOCK: // unique constraint violation
3208  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
3209  return;
3210 // break;
3211 
3212  default:
3213  fprintf(logfile, "anomaly: %i %s %i\n",
3214  msgrcvRef.subtransactionStruct.locktype, __FILE__, __LINE__);
3216  return;
3217  }
3218 
3220  {
3221  return;
3222  }
3223  }
3224  break;
3225 
3226  default:
3227  printf("%s %i anomaly %li\n", __FILE__, __LINE__, entrypoint);
3228  }
3229 
3230  // all responses returned, if any, so finish statement
3232 }
3233 
3234 void Transaction::continueSqlUpdate(int64_t entrypoint)
3235 {
3236  // only 1 entrypoint
3237  class MessageSubtransactionCmd &msgrcvRef =
3238  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
3239 
3240  switch (msgrcvRef.subtransactionStruct.locktype)
3241  {
3242  case WRITELOCK: // insertrow
3243  break;
3244 
3245  case NOLOCK:
3246  printf("%s %i APISTATUS_UNIQUECONSTRAINT (NOLOCK)\n", __FILE__,
3247  __LINE__);
3249  return;
3250 // break;
3251 
3252  case INDEXLOCK:
3253  break;
3254 
3255  case INDEXPENDINGLOCK:
3257  return;
3258 // break;
3259 
3260  case PENDINGTOINDEXLOCK:
3262  return;
3263 // break;
3264 
3265  case PENDINGTOINDEXNOLOCK: // unique constraint violation
3267  return;
3268 // break;
3269 
3270  default:
3271  printf("%s %i locktype %i\n", __FILE__, __LINE__,
3272  msgrcvRef.subtransactionStruct.locktype);
3274  return;
3275  }
3276 
3278  {
3279  return;
3280  }
3281 
3284 }
3285 
3286 void Transaction::continueSqlReplace(int64_t entrypoint)
3287 {
3288  class MessageSubtransactionCmd &msgrcvRef =
3289  *(static_cast<MessageSubtransactionCmd *>(msgrcv));
3290 
3291  switch (entrypoint)
3292  {
3293  case 1:
3294  {
3295  stagedRow_s &stagedRowRef =
3297  stagedRowRef.newrowid=msgrcvRef.subtransactionStruct.rowid;
3298 
3300  {
3301  msgrcvRef.subtransactionStruct.rowid,
3304  };
3305 
3306  // now delete the old row, with forwarder
3307  class MessageSubtransactionCmd *msg =
3308  new class MessageSubtransactionCmd();
3311  msg->subtransactionStruct.rowid = stagedRowRef.originalrowid;
3312  msg->subtransactionStruct.forward_rowid = stagedRowRef.newrowid;
3315  stagedRowRef.originalengineid, msg);
3317 
3318  // indices for all fields
3319  class Table &tableRef =
3321  vector<fieldValue_s> fieldValues;
3322  tableRef.unmakerow(&stagedRowRef.newRow, &fieldValues);
3323 
3324  for (size_t n=0; n < fieldValues.size(); n++)
3325  {
3326  class Field &fieldRef = tableRef.fields[n];
3327 
3328  if (fieldRef.indextype==NONE)
3329  {
3330  continue;
3331  }
3332 
3333  if (fieldRef.index.isunique==true &&
3335  {
3336  // update, new entry, sendTransaction, stagedRows.uniqueIndices
3338 
3339  lockFieldValue_s lockFieldValue = {};
3340  lockFieldValue.engineid = getengine(fieldRef.type,
3341  fieldValues[n]);
3342  // locktype could potentially change
3343  lockFieldValue.locktype = INDEXLOCK;
3344  lockFieldValue.fieldVal = fieldValues[n];
3345  stagedRowRef.uniqueIndices[n]=lockFieldValue;
3346 
3347  class MessageSubtransactionCmd *msg =
3348  new class MessageSubtransactionCmd();
3349  msg->subtransactionStruct.isrow = false;
3350  msg->fieldVal = fieldValues[n];
3353  msg->subtransactionStruct.fieldid = n;
3354  msg->subtransactionStruct.rowid = stagedRowRef.newrowid;
3355  msg->subtransactionStruct.engineid = stagedRowRef.newengineid;
3357  lockFieldValue.engineid, msg);
3358  }
3359  }
3360  }
3361  break;
3362 
3363  case 2:
3364  {
3365  // like continueSqlUpdate(1)
3366  switch (msgrcvRef.subtransactionStruct.locktype)
3367  {
3368  case WRITELOCK: // insertrow
3369  break;
3370 
3371  case NOLOCK:
3373  return;
3374 // break;
3375 
3376  case INDEXLOCK:
3377  break;
3378 
3379  case INDEXPENDINGLOCK:
3381  return;
3382 // break;
3383 
3384  case PENDINGTOINDEXLOCK:
3386  return;
3387 // break;
3388 
3389  case PENDINGTOINDEXNOLOCK: // unique constraint violation
3391  return;
3392 // break;
3393 
3394  default:
3395  printf("%s %i locktype %i\n", __FILE__, __LINE__,
3396  msgrcvRef.subtransactionStruct.locktype);
3398  return;
3399  }
3400 
3402  {
3403  return;
3404  }
3405 
3408  }
3409  break;
3410 
3411  default:
3412  printf("%s %i anomaly %li\n", __FILE__, __LINE__, entrypoint);
3414  return;
3415  }
3416 }
3417 
3418 void Transaction::checkSqlLock(deadlockchange_e changetype, bool isrow,
3419  int64_t rowid, int64_t tableid, int64_t engineid,
3420  int64_t fieldid, fieldValue_s *fieldVal)
3421 {
3422  switch (changetype)
3423  {
3424  case ADDLOCKEDENTRY:
3425  lockcount++;
3426  break;
3427 
3428  case ADDLOCKPENDINGENTRY:
3429  lockpendingcount++;
3430  break;
3431 
3432  case REMOVELOCKEDENTRY:
3433  lockcount--;
3434  break;
3435 
3437  lockpendingcount--;
3438  break;
3439 
3441  lockpendingcount--;
3442  lockcount++;
3443  break;
3444 
3445  default:
3446  fprintf(logfile, "anomaly %i %s %i\n", changetype, __FILE__, __LINE__);
3447  }
3448 
3449  // OK, now, what? I'm either in deadlock and sending another message
3450  // not deadlocked, not deadlocked
3451  // deadlocked, then not
3452  // not deadlocked, then deadlocked
3453  if (sqlcmdstate.ispossibledeadlock==false &&
3454  (!lockcount || !lockpendingcount))
3455  {
3456  // move along
3457  return;
3458  }
3459 
3461  && lockcount)
3462  {
3463  // new deadlock!
3465  // send a bunch of messages, in pendingMapofRows, then the input
3466  class MessageDeadlock *msg = new class MessageDeadlock;
3467  class MessageDeadlock &msgref = *msg;
3468  newDeadLockLists_s &nodesRef = msgref.nodes;
3469 
3470  boost::unordered_map< uuRecord_s, stagedRow_s >::iterator it;
3471 
3472  for (it = currentCmdState.pendingStagedRows.begin();
3473  it != currentCmdState.pendingStagedRows.end(); it++)
3474  {
3475  stagedRow_s &sRowRef = it->second;
3476  int64_t this_rowid = it->first.rowid;
3477  int64_t this_tableid = it->first.tableid;
3478  int64_t this_engineid = it->first.engineid;
3479  string deadlockNode;
3480  // free this if there's no message to send (no contents in its sets)
3481 
3482  // row first
3483  DeadlockMgr::makeLockedItem(true, this_rowid, this_tableid,
3484  this_engineid, domainid, 0,
3485  (long double)0, (string *)NULL,
3486  &deadlockNode);
3487 
3488  if (sRowRef.locktype==WRITELOCK || sRowRef.locktype==READLOCK)
3489  {
3490  nodesRef.locked.insert(deadlockNode);
3491  }
3492  else if (sRowRef.locktype==PENDINGLOCK)
3493  {
3494  nodesRef.waiting.insert(deadlockNode);
3495  }
3496 
3497  // indices
3498  boost::unordered_map< int64_t, lockFieldValue_s >::iterator it;
3499 
3500  for (it = sRowRef.uniqueIndices.begin();
3501  it != sRowRef.uniqueIndices.end(); it++)
3502  {
3503  lockFieldValue_s &lockFieldValueRef = it->second;
3504 
3505  if (lockFieldValueRef.locktype==INDEXLOCK)
3506  {
3507  deadlockNode.clear();
3508  long double fieldinput;
3509  memcpy(&fieldinput, &lockFieldValueRef.fieldVal.value,
3510  sizeof(fieldinput));
3511  DeadlockMgr::makeLockedItem(false, 0, this_tableid,
3512  this_engineid, domainid,
3513  it->first, fieldinput,
3514  &lockFieldValueRef.fieldVal.str,
3515  &deadlockNode);
3516  nodesRef.locked.insert(deadlockNode);
3517  }
3518  else if (lockFieldValueRef.locktype==INDEXPENDINGLOCK)
3519  {
3520  deadlockNode.clear();
3521  long double fieldinput;
3522  memcpy(&fieldinput, &lockFieldValueRef.fieldVal.value,
3523  sizeof(fieldinput));
3524  DeadlockMgr::makeLockedItem(false, 0, this_tableid,
3525  this_engineid, domainid,
3526  it->first, fieldinput,
3527  &lockFieldValueRef.fieldVal.str,
3528  &deadlockNode);
3529  nodesRef.waiting.insert(deadlockNode);
3530  }
3531  }
3532  }
3533 
3534  if (nodesRef.locked.empty()==true && nodesRef.waiting.empty()==true)
3535  {
3536  // delete nodes; // nothing to deadlock, but this should be an anomaly
3537  delete msg;
3538  fprintf(logfile, "anomaly: %s %i\n", __FILE__, __LINE__);
3539  return;
3540  }
3541 
3546 
3547  // taPtr->mboxes.deadlockMgr.send(msgsnd, true);
3549  return;
3550  }
3551 
3552  if (lockcount && lockpendingcount)
3553  {
3554  // still deadlocked, just send 1 message based on type
3555  class MessageDeadlock *msg = new class MessageDeadlock();
3556  class MessageDeadlock &msgref = *msg;
3557 
3558  // prepare string(s) for submittal
3559  if (isrow==true)
3560  {
3561  DeadlockMgr::makeLockedItem(true, rowid, tableid, engineid, domainid,
3562  fieldid, (long double)0, (string *)NULL,
3563  &msgref.deadlockNode);
3564  }
3565  else
3566  {
3567  long double fieldinput;
3568  memcpy(&fieldinput, &fieldVal->value, sizeof(fieldinput));
3569  DeadlockMgr::makeLockedItem(false, rowid, tableid, engineid,
3570  domainid, fieldid, fieldinput,
3571  &fieldVal->str, &msgref.deadlockNode);
3572  }
3573 
3574  // send message to dmgr
3576  msgref.deadlockStruct.deadlockchange = changetype;
3578 
3580 
3581  return;
3582  }
3583 
3584  if (!lockcount || !lockpendingcount)
3585  {
3586  // deadlock over, send message to dmgr to that effect
3588 
3589  // send message to dmgr
3590  class MessageDeadlock *msg = new class MessageDeadlock;
3591  class MessageDeadlock &msgref = *msg;
3594 
3595  // taPtr->mboxes.deadlockMgr.send(msgsnd, true);
3597  return;
3598  }
3599 }
3600 
3602 {
3604  pendingcmd = COMMIT;
3605  //dispatch a message to the secondary node(s), then execute the
3606  //continueCommit
3608 
3609  switch (taPtr->myTopology.numreplicas)
3610  {
3611  case 1:
3612  waitfordispatched = 1;
3614  break;
3615 
3616  case 2:
3617  {
3618  // make message, then send it
3619  class MessageDispatch *msg = makeMessageDispatch();
3620 
3621  if (msg != NULL)
3622  {
3624  taPtr->replicaAddress, *msg);
3625  }
3626  }
3627  break;
3628 
3629  default: // more than 2 replicas
3630  {
3631  // make message then copy and send for each replica
3632  class MessageDispatch *msg = makeMessageDispatch();
3633 
3634  if (msg != NULL)
3635  {
3636  for (size_t n=1; n < taPtr->replicaAddresses.size(); n++)
3637  {
3638  class MessageDispatch *nmsg = new class MessageDispatch;
3639  *nmsg = *msg;
3641  taPtr->replicaAddresses[n], *nmsg);
3642  }
3643 
3645  taPtr->replicaAddresses[taPtr->replicaAddresses.size()-1], *msg);
3646  }
3647  }
3648  }
3649 }