InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Engine.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #include "Engine.h"
30 #line 31 "Engine.cc"
31 
33  myIdentity(*myIdentityArg)
34 {
35  delete myIdentityArg;
37 
41 
42  int64_t builtincmd = 0;
43  int waitfor = 100;
44 
46  while (1)
47  {
49  for (size_t inmsg=0; inmsg < MSGRECEIVEBATCHSIZE; inmsg++)
50  {
51  GETMSG(msgrcv, myIdentity.mbox, waitfor)
52 
53  if (msgrcv==NULL)
54  {
55  waitfor = 100;
56  break;
57  }
58 
59  waitfor = 0;
60 
62  {
63  class MessageUserSchema &msgrcvRef =
64  *(class MessageUserSchema *)msgrcv;
65 
66  argsize = msgrcvRef.userschemaStruct.argsize;
67  taAddr = msgrcvRef.messageStruct.sourceAddr;
70  userid = msgrcvRef.userschemaStruct.userid;
71  builtincmd = msgrcvRef.userschemaStruct.builtincmd;
72  }
73 
74  switch (msgrcv->messageStruct.topic)
75  {
77  switch (builtincmd)
78  {
80  createschema();
81  break;
82 
83  case BUILTINCREATETABLE:
84  createtable();
85  break;
86 
87  case BUILTINADDCOLUMN:
88  addcolumn();
89  break;
90 
91  case BUILTINDELETEINDEX:
92  deleteindex();
93  break;
94 
95  case BUILTINDELETETABLE:
96  deletetable();
97  break;
98 
100  deleteschema();
101  break;
102 
103  default:
104  fprintf(logfile, "Engine bad schema builtin %li %s %i\n",
105  builtincmd, __FILE__, __LINE__);
106  }
107 
108  break;
109 
110  case TOPIC_TRANSACTION:
111  {
112  class MessageTransaction &msgrcvRef =
113  *(class MessageTransaction *)msgrcv;
114  // create SubTransaction if no subtransaction
115  if (msgrcvRef.transactionStruct.subtransactionid <= 0)
116  {
117  class SubTransaction &subTransactionidRef =
118  *(new class SubTransaction(msgrcvRef.messageStruct.sourceAddr,
119  msgrcvRef.transactionStruct.transactionid,
120  msgrcvRef.transactionStruct.domainid,
121  this));
122  subTransactionidRef.processTransactionMessage(msgrcv);
123  }
124  else if (SubTransactions.count(msgrcvRef.transactionStruct.subtransactionid))
125  {
126  SubTransactions[msgrcvRef.transactionStruct.subtransactionid]->processTransactionMessage(msgrcv);
127  }
128  }
129  break;
130 
132  {
133  class MessageTransaction &msgrcvRef =
134  *(class MessageTransaction *)msgrcv;
135 
137  {
139  }
140  }
141  break;
142 
143  case TOPIC_TOPOLOGY:
146  break;
147 
148  case TOPIC_APPLY:
149  {
150  apply();
151  }
152  break;
153 
154  default:
155  printf("%s %i Engine bad topic %i\n", __FILE__, __LINE__,
157  }
158  }
159  }
160 }
161 
163 {
164 }
165 
166 // launcher, regular function
167 void *engine(void *identity)
168 {
169  Engine((Topology::partitionAddress *)identity);
170  return NULL;
171 }
172 
173 /* builtins for schema, no need for args, since abort is a function of status,
174  * either figure it out in the function itself or outside in the main loop */
176 {
177  createSchema(this);
178  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
180 }
181 
183 {
184  // should check if map is ok
185  class MessageUserSchema &msgrcvRef = *(class MessageUserSchema *)msgrcv;
186  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
187  status =
188  domainidsToSchemata[msgrcvRef.userschemaStruct.domainid]->createTable(msgrcvRef.userschemaStruct.tableid);
191 }
192 
194 {
195  class MessageUserSchema &msgrcvRef = *(class MessageUserSchema *)msgrcv;
196  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
197  // either succeeds or fails :-)
198  class Schema &schemaRef = *domainidsToSchemata[domainid];
199  class Table &tableRef = *schemaRef.tables[msgrcvRef.userschemaStruct.tableid];
201  tableRef.addfield((fieldtype_e) msgrcvRef.userschemaStruct.fieldtype,
202  msgrcvRef.userschemaStruct.fieldlen, "",
206 }
207 
209 {
210  // either succeeds or fails :-)
211  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
214 }
215 
217 {
218  // either succeeds or fails :-)
219  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
222 }
223 
225 {
226  // either succeeds or fails :-)
227  class MessageUserSchema *msg = new class MessageUserSchema(TOPIC_SCHEMAREPLY);
230 }
231 
233 {
234  return ++nextsubtransactionid;
235 }
236 
238 {
239  for (size_t n=0; n < myTopology.partitionListThisReplica.size(); n++)
240  {
242  {
243  partitionid = n;
244  }
245  }
246 }
247 
248 // records
249 bool Engine::applyItem(int64_t subtransactionid, class Schema &schemaRef,
251 {
252  class Table &tableRef = *schemaRef.tables[record.tableid];
253 
254  switch (record.primitive)
255  {
256  case INSERT:
257  {
258  if (tableRef.rows.count(record.rowid))
259  {
260  printf("%s %i anomaly should not be an existing rowid %li\n",
261  __FILE__, __LINE__, record.rowid);
262  return false;
263  }
264 
265  rowdata_s *row = new rowdata_s();
266  row->previoussubtransactionid = subtransactionid;
267  row->row = record.row;
268  tableRef.rows[record.rowid] = row;
269  }
270  break;
271 
272  case UPDATE:
273  {
274  if (tableRef.rows.count(record.rowid))
275  {
276  printf("%s %i anomaly should be an existing rowid %li\n", __FILE__,
277  __LINE__, record.rowid);
278  return false;
279  }
280 
281  if (tableRef.rows[record.rowid]->previoussubtransactionid !=
282  subtransactionid)
283  {
284  return false;
285  }
286 
287  rowdata_s *row = new rowdata_s();
288  row->previoussubtransactionid = subtransactionid;
289  row->row = record.row;
290  tableRef.rows[record.rowid] = row;
291  }
292  break;
293 
294  case DELETE:
295  {
296  if (!tableRef.rows.count(record.rowid))
297  {
298  return false;
299  }
300 
301  if (tableRef.rows[record.rowid]->previoussubtransactionid !=
302  subtransactionid)
303  {
304  return false;
305  }
306 
307  delete tableRef.rows[record.rowid];
308  tableRef.rows.erase(record.rowid);
309  }
310  break;
311 
312  default:
313  printf("%s %i anomaly primitive %i\n", __FILE__, __LINE__,
314  record.primitive);
315  return false;
316  }
317 
318  return true;
319 }
320 
321 // indices
322 bool Engine::applyItem(int64_t subtransactionid, class Schema &schemaRef,
323  MessageApply::applyindex_s &indexinfo)
324 {
325  class Table &tableRef = *schemaRef.tables[indexinfo.tableid];
326  class Index &indexRef = tableRef.fields[indexinfo.fieldid].index;
327 
328  if (indexRef.indextype==UNIQUE || indexRef.indextype==UNORDERED ||
329  indexRef.indextype==UNIQUENOTNULL || indexRef.indextype==UNORDEREDNOTNULL)
330  {
331  // is unique
332  if (indexinfo.fieldVal.isnull==true)
333  {
334  // is unique is null
335  if (MessageApply::getisaddflag(indexinfo.flags)==true)
336  {
337  // is unique is null add
338  vector<int64_t> v(2);
339  v[0] = indexinfo.entry.rowid;
340  v[1] = indexinfo.entry.engineid;
341  indexRef.nulls.insert(v);
342  }
343  else
344  {
345  // is unique is null delete
346  vector<int64_t> v(2);
347  v[0] = indexinfo.entry.rowid;
348  v[1] = indexinfo.entry.engineid;
349 
350  if (!indexRef.nulls.count(v))
351  {
352  return false;
353  }
354 
355  indexRef.nulls.erase(v);
356  }
357  }
358  else
359  {
360  // is unique not null
361  if (MessageApply::getisaddflag(indexinfo.flags)==true)
362  {
363  // is unique not null add
364  if (indexRef.addifnotthere(indexinfo.fieldVal,
365  indexinfo.entry.rowid,
366  indexinfo.entry.engineid,
367  subtransactionid)==false)
368  {
369  if (indexRef.getprevioussubtransactionid(indexinfo.fieldVal) >
370  subtransactionid)
371  {
372  return false;
373  }
374  }
375  }
376  else
377  {
378  // is unique not null delete
379  if (indexRef.checkifthere(indexinfo.fieldVal)==true)
380  {
381  if (indexRef.checkifmatch(indexinfo.fieldVal,
382  indexinfo.entry.rowid,
383  indexinfo.entry.engineid)==true)
384  {
385  indexRef.rm(indexinfo.fieldVal);
386  }
387  else
388  {
389  if (indexRef.getprevioussubtransactionid(indexinfo.fieldVal) <
390  subtransactionid)
391  {
392  return false;
393  }
394  }
395  }
396  else
397  {
398  return false;
399  }
400  }
401  }
402  }
403  else
404  {
405  // not unique
406  if (indexinfo.fieldVal.isnull==true)
407  {
408  // not unique is null
409  if (MessageApply::getisaddflag(indexinfo.flags)==true)
410  {
411  // not unique is null add
412  vector<int64_t> v(2);
413  v[0] = indexinfo.entry.rowid;
414  v[1] = indexinfo.entry.engineid;
415  indexRef.nulls.insert(v);
416  }
417  else
418  {
419  // not unique is null delete
420  vector<int64_t> v(2);
421  v[0] = indexinfo.entry.rowid;
422  v[1] = indexinfo.entry.engineid;
423 
424  if (!indexRef.nulls.count(v))
425  {
426  return false;
427  }
428 
429  indexRef.nulls.erase(v);
430  }
431  }
432  else
433  {
434  // not unique not null
435  if (MessageApply::getisaddflag(indexinfo.flags)==true)
436  {
437  // not unique not null add
438  switch (indexRef.indexmaptype)
439  {
440  case nonuniqueint:
441  {
442  int64_t v=indexinfo.fieldVal.value.integer;
443  indexRef.nonuniqueIntIndex->insert(pair<int64_t,
445  indexinfo.entry));
446  }
447  break;
448 
449  case nonuniqueuint:
450  {
451  uint64_t v=indexinfo.fieldVal.value.uinteger;
452  indexRef.nonuniqueUintIndex->insert(pair<uint64_t,
454  indexinfo.entry));
455  }
456  break;
457 
458  case nonuniquebool:
459  {
460  bool v=indexinfo.fieldVal.value.boolean;
461  indexRef.nonuniqueBoolIndex->insert(pair<bool,
463  indexinfo.entry));
464  }
465  break;
466 
467  case nonuniquefloat:
468  {
469  long double v=indexinfo.fieldVal.value.floating;
470  indexRef.nonuniqueFloatIndex->insert(pair<long double,
472  indexinfo.entry));
473  }
474  break;
475 
476  case nonuniquechar:
477  {
478  indexRef.nonuniqueCharIndex->insert(pair<char,
480  indexinfo.entry));
481  }
482  break;
483 
484  case nonuniquecharx:
485  {
486  indexRef.nonuniqueStringIndex->insert(pair<string,
488  indexinfo.entry));
489  }
490  break;
491 
492  case nonuniquevarchar:
493  {
494  indexRef.nonuniqueStringIndex->insert(pair<string,
496  indexinfo.entry));
497  }
498  break;
499 
500  default:
501  printf("%s %i anomaly indexmaptype %i\n", __FILE__, __LINE__,
502  indexRef.indexmaptype);
503  }
504  }
505  else
506  {
507  // not unique not null delete
508  switch (indexRef.indexmaptype)
509  {
510  case nonuniqueint:
511  {
512  pair<multimap<int64_t, nonLockingIndexEntry_s>::iterator,
513  multimap<int64_t, nonLockingIndexEntry_s>::iterator>
514  iteratorRange;
515  nonuniqueIntMap::iterator it;
516 
517  iteratorRange =
518  indexRef.nonuniqueIntIndex->equal_range(indexinfo.fieldVal.value.integer);
519 
520  for (it=iteratorRange.first; it != iteratorRange.second; it++)
521  {
522  if (it->second.rowid==indexinfo.entry.rowid &&
523  it->second.engineid==indexinfo.entry.engineid)
524  {
525  indexRef.nonuniqueIntIndex->erase(it);
526  return true;
527  }
528  }
529 
530  return false; // found no entry, so background it
531  }
532  break;
533 
534  case nonuniqueuint:
535  {
536  pair<multimap<uint64_t, nonLockingIndexEntry_s>::iterator,
537  multimap<uint64_t, nonLockingIndexEntry_s>::iterator>
538  iteratorRange;
539  nonuniqueUintMap::iterator it;
540 
541  iteratorRange =
542  indexRef.nonuniqueUintIndex->equal_range(indexinfo.fieldVal.value.uinteger);
543 
544  for (it=iteratorRange.first; it != iteratorRange.second; it++)
545  {
546  if (it->second.rowid==indexinfo.entry.rowid &&
547  it->second.engineid==indexinfo.entry.engineid)
548  {
549  indexRef.nonuniqueUintIndex->erase(it);
550  return true;
551  }
552  }
553 
554  return false; // found no entry, so background it
555  }
556  break;
557 
558  case nonuniquebool:
559  {
560  pair<multimap<bool, nonLockingIndexEntry_s>::iterator,
561  multimap<bool, nonLockingIndexEntry_s>::iterator>
562  iteratorRange;
563  nonuniqueBoolMap::iterator it;
564 
565  iteratorRange =
566  indexRef.nonuniqueBoolIndex->equal_range(indexinfo.fieldVal.value.boolean);
567 
568  for (it=iteratorRange.first; it != iteratorRange.second; it++)
569  {
570  if (it->second.rowid==indexinfo.entry.rowid &&
571  it->second.engineid==indexinfo.entry.engineid)
572  {
573  indexRef.nonuniqueBoolIndex->erase(it);
574  return true;
575  }
576  }
577 
578  return false; // found no entry, so background it
579  }
580  break;
581 
582  case nonuniquefloat:
583  {
584  pair<multimap<long double, nonLockingIndexEntry_s>::iterator,
585  multimap<long double, nonLockingIndexEntry_s>::iterator>
586  iteratorRange;
587  nonuniqueFloatMap::iterator it;
588 
589  iteratorRange =
590  indexRef.nonuniqueFloatIndex->equal_range(indexinfo.fieldVal.value.floating);
591 
592  for (it=iteratorRange.first; it != iteratorRange.second; it++)
593  {
594  if (it->second.rowid==indexinfo.entry.rowid &&
595  it->second.engineid==indexinfo.entry.engineid)
596  {
597  indexRef.nonuniqueFloatIndex->erase(it);
598  return true;
599  }
600  }
601 
602  return false; // found no entry, so background it
603  }
604  break;
605 
606  case nonuniquechar:
607  {
608  pair<multimap<char, nonLockingIndexEntry_s>::iterator,
609  multimap<char, nonLockingIndexEntry_s>::iterator>
610  iteratorRange;
611  nonuniqueCharMap::iterator it;
612 
613  iteratorRange =
614  indexRef.nonuniqueCharIndex->equal_range(indexinfo.fieldVal.value.character);
615 
616  for (it=iteratorRange.first; it != iteratorRange.second; it++)
617  {
618  if (it->second.rowid==indexinfo.entry.rowid &&
619  it->second.engineid==indexinfo.entry.engineid)
620  {
621  indexRef.nonuniqueCharIndex->erase(it);
622  return true;
623  }
624  }
625 
626  return false; // found no entry, so background it
627  }
628  break;
629 
630  case nonuniquecharx:
631  {
632  pair<multimap<string, nonLockingIndexEntry_s>::iterator,
633  multimap<string, nonLockingIndexEntry_s>::iterator>
634  iteratorRange;
635  nonuniqueStringMap::iterator it;
636 
637  iteratorRange =
638  indexRef.nonuniqueStringIndex->equal_range(indexinfo.fieldVal.str);
639 
640  for (it=iteratorRange.first; it != iteratorRange.second; it++)
641  {
642  if (it->second.rowid==indexinfo.entry.rowid &&
643  it->second.engineid==indexinfo.entry.engineid)
644  {
645  indexRef.nonuniqueStringIndex->erase(it);
646  return true;
647  }
648  }
649 
650  return false; // found no entry, so background it
651  }
652  break;
653 
654  case nonuniquevarchar:
655  {
656  pair<multimap<string, nonLockingIndexEntry_s>::iterator,
657  multimap<string, nonLockingIndexEntry_s>::iterator>
658  iteratorRange;
659  nonuniqueStringMap::iterator it;
660 
661  iteratorRange =
662  indexRef.nonuniqueStringIndex->equal_range(indexinfo.fieldVal.str);
663 
664  for (it=iteratorRange.first; it != iteratorRange.second; it++)
665  {
666  if (it->second.rowid==indexinfo.entry.rowid &&
667  it->second.engineid==indexinfo.entry.engineid)
668  {
669  indexRef.nonuniqueStringIndex->erase(it);
670  return true;
671  }
672  }
673 
674  return false; // found no entry, so background it
675  }
676  break;
677 
678  default:
679  printf("%s %i anomaly indexmaptype %i\n", __FILE__, __LINE__,
680  indexRef.indexmaptype);
681  }
682  }
683  }
684  }
685 
686  return true;
687 }
688 
690 {
691  class MessageApply &inmsg = *(class MessageApply *)msgrcv;
692  class Schema &schemaRef = *domainidsToSchemata[inmsg.applyStruct.domainid];
693 
694  for (size_t n=0; n < inmsg.rows.size(); n++)
695  {
696  if (applyItem(inmsg.applyStruct.subtransactionid, schemaRef,
697  inmsg.rows[n])==false)
698  {
699  background(inmsg, inmsg.rows[n]);
700  }
701  }
702 
703  for (size_t n=0; n < inmsg.indices.size(); n++)
704  {
705  if (applyItem(inmsg.applyStruct.subtransactionid, schemaRef,
706  inmsg.indices[n])==false)
707  {
708  background(inmsg, inmsg.indices[n]);
709  }
710  }
711 
712  if (!backgrounded.count(inmsg.applyStruct.subtransactionid))
713  {
714  class MessageAckApply *ackmsg =
715  new class MessageAckApply(inmsg.applyStruct.subtransactionid,
716  inmsg.applyStruct.applierid, -1, STATUS_OK);
718  *ackmsg);
719  }
720 
721  // now, walk through backgrounded items
722  map<int64_t, background_s>::iterator itb;
723 
724  for (itb = backgrounded.begin(); itb != backgrounded.end(); itb++)
725  {
726  // itb->first: subtransactionid
727  background_s &bref = itb->second;
728  vector<MessageDispatch::record_s>::iterator itr;
729 
730  for (itr = bref.rows.begin(); itr != bref.rows.end(); itr++)
731  {
732  if (applyItem(itb->first, schemaRef, *itr)==true)
733  {
734  bref.rows.erase(itr);
735  }
736  }
737 
738  vector<MessageApply::applyindex_s>::iterator iti;
739 
740  for (iti = bref.indices.begin(); iti != bref.indices.end(); iti++)
741  {
742  if (applyItem(itb->first, schemaRef, *iti)==true)
743  {
744  bref.indices.erase(iti);
745  }
746  }
747 
748  if (!bref.rows.size() && !bref.indices.size())
749  {
750  class MessageAckApply *ackmsg =
751  new class MessageAckApply(itb->first, bref.applierid, -1,
752  STATUS_OK);
753  mboxes.toActor(myIdentity.address, bref.taAddress, *ackmsg);
754  backgrounded.erase(itb);
755  }
756  }
757 }
758 
761 {
762  if (!backgrounded.count(inmsg.applyStruct.subtransactionid))
763  {
764  background_s b;
765  b.applierid = inmsg.applyStruct.applierid;
768  }
769 
770  backgrounded[inmsg.applyStruct.subtransactionid].rows.push_back(item);
771 }
772 
775 {
776  if (!backgrounded.count(inmsg.applyStruct.subtransactionid))
777  {
778  background_s b;
779  b.applierid = inmsg.applyStruct.applierid;
782  }
783 
784  backgrounded[inmsg.applyStruct.subtransactionid].indices.push_back(item);
785 }
786