InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TransactionAgent.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #ifndef INFINISQLTRANSACTIONAGENT_H
30 #define INFINISQLTRANSACTIONAGENT_H
31 
32 #include "gch.h"
33 #include "Operation.h"
34 #include "Table.h"
35 #include "Schema.h"
36 #include "Larxer.h"
37 
45 {
46  NOPAYLOAD = 0,
49 };
50 
56 {
57  IDLNOCMD = 0,
70 };
71 
72 // deletethis this
77 typedef struct
78 {
80  bool isrow;
81 
82  int64_t rowid;
83  int64_t tableid;
84  std::string row;
86 
87  // for index
88  int64_t fieldid;
90  // for pending locks
91  int64_t pendingcmdid;
92 } idl;
93 // delete this
94 typedef vector<idl> cmds;
95 
96 #include "Transaction.h"
97 #include "Applier.h"
98 
99 typedef boost::unordered_map<std::string, procedures_s> domainProceduresMap;
100 
105 typedef struct
106 {
107  int64_t resultCode;
108  msgpack::sbuffer *sbuf;
109  std::vector<std::string> *responseVector;
110 } responseData;
111 
116 typedef struct
117 {
118  int64_t domainid;
119  int64_t userid;
120  std::string domainName;
121 } authInfo;
122 
123 typedef boost::unordered_map<int, responseData> sendLaterMap;
124 // there is apparently a bug in boost::unordered_map that causes
125 // this thing to dump core when trying to count or erase a key under
126 // some circumstances. no time presently to figure it out
127 //typedef boost::unordered_map<int, authInfo> socketAuthInfo;
128 typedef std::map<int, authInfo> socketAuthInfo;
129 typedef boost::unordered_map<int64_t, class Operation *> operationMap;
130 
131 msgpack::sbuffer *makeSbuf(msgpack::sbuffer *);
132 msgpack::sbuffer *makeSbuf(vector<string> *);
133 msgpack::sbuffer *makeSbuf(std::map<string, string> *);
134 
141 {
142 public:
144  virtual ~TransactionAgent();
145 
150  void updateReplicas();
151 
152  // pubic for replyTa:
153  // Mbox::msgstruct msgsnd;
154  int64_t operationid;
155  int64_t domainid;
156  int64_t userid;
157  int64_t status;
158  int64_t tainstance;
159  //public for createSchema:
160  class Message *msgrcv;
163 
164  // needs to be all/mostly public for stored procedures
165 
166  // builtins
174  void ping(builtincmds_e cmd);
180  void login(builtincmds_e cmd);
186  void logout(builtincmds_e cmd);
192  void changepassword(builtincmds_e cmd);
198  void createdomain(builtincmds_e cmd);
204  void createuser(builtincmds_e cmd);
210  void deleteuser(builtincmds_e cmd);
216  void deletedomain(builtincmds_e cmd);
222  void createschema(builtincmds_e cmd);
228  void createtable(builtincmds_e cmd);
234  void addcolumn(builtincmds_e cmd);
240  void deleteindex(builtincmds_e cmd);
246  void deletetable(builtincmds_e cmd);
252  void deleteschema(builtincmds_e cmd);
258  void loadprocedure(builtincmds_e cmd);
264  void compile(builtincmds_e cmd);
271  void schemaBoilerplate(builtincmds_e cmd, int builtin);
272  // loop-back schema commands
277  void TAcreateschema();
282  void TAcreatetable();
287  void TAaddcolumn();
292  void TAdeleteindex();
297  void TAdeletetable();
302  void TAdeleteschema();
307  void TAloadprocedure();
312  void endOperation();
313  //private:
318  void endConnection();
325  int64_t readSocket();
332  int64_t getnexttransactionid();
339  int64_t getnextapplierid();
344  void badMessageHandler();
353  void newprocedure(int64_t entrypoint);
358  void newstatement();
359 
364  void handledispatch();
365 
374  template <typename T>
375  void sendResponse(bool resending, int64_t resultCode, T response)
376  {
377  msgpack::sbuffer *sbuf = makeSbuf(response);
378  int64_t totalsize = 2*sizeof(uint64_t) + sbuf->size();
379  char payload[PAYLOADSIZE];
380  uint64_t x = htobe64((uint64_t)totalsize);
381  memcpy(payload, &x, sizeof(x));
382  x = htobe64((uint64_t)resultCode);
383  memcpy(payload + sizeof(x), &x, sizeof(x));
384 
385  memcpy(payload+(2*sizeof(x)), sbuf->data(), sbuf->size());
386  ssize_t totalwritten = write(sockfd, payload, totalsize);
387 
388  if (totalwritten == totalsize) // send was successful
389  {
390  if (resending)
391  {
392  waitingToSend.erase(sockfd);
393  }
394 
395  delete sbuf;
396  return;
397  }
398 
399  if (totalwritten == -1)
400  {
401  if (errno==EAGAIN || errno==EWOULDBLOCK)
402  {
403  // we wait for a time to send the data
404  sendLaterMap::iterator sendLaterIterator;
405  sendLaterIterator = waitingToSend.find(sockfd);
406 
407  if (sendLaterIterator != waitingToSend.end()) //gratuitous
408  {
409  printf("%s %i endConnection\n", __FILE__, __LINE__);
410  endConnection();
411  return;
412  }
413 
414  if (!resending)
415  {
416  responseData resp;
417  resp.resultCode = resultCode;
418  resp.sbuf = sbuf;
419  resp.responseVector = NULL;
420  waitingToSend[sockfd] = resp;
421  }
422 
423  struct epoll_event ev;
424  ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
425  ev.data.fd = sockfd;
426  if (epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev))
427  {
428  printf("%s %i endConnection\n", __FILE__, __LINE__);
429  endConnection();
430  }
431  }
432  else
433  {
434  perror("oops");
435  endConnection();
436  }
437 
438  return;
439  }
440  }
441 
451  template < typename T >
452  static void usmReply(T actor, Topology::addressStruct &dest,
453  class MessageUserSchema &msg)
454  {
456  msg.userschemaStruct.operationid = actor->operationid;
457  msg.userschemaStruct.domainid = actor->domainid;
458  msg.userschemaStruct.userid = actor->userid;
459  msg.userschemaStruct.status = actor->status;
460 
461  actor->mboxes.toActor(actor->myIdentity.address, dest, msg);
462  }
463 
465  class Mboxes mboxes;
467  int64_t instance;
468  class Mbox *mymboxPtr;
469  int epollfd;
470  int sockfd;
472  std::string *operation;
474  int64_t argsize;
475  char args[PAYLOADSIZE]; // get rid of this when possible
476  std::string argstring;
479  operationMap::iterator pendingOperationsIterator;
480  int64_t operationidcounter; // never touch this!
481  std::string domainName;
482  std::vector<std::string> responseVector;
484  domainidToSchemaMap::iterator domainidsToSchemataIterator;
485  boost::unordered_map<int64_t, domainProceduresMap> domainidsToProcedures;
486  boost::unordered_map<int64_t, class Transaction *> Transactions;
487  boost::unordered_map<int64_t, class Applier *> Appliers;
488  // Pgs[socket] = *Pg
489  boost::unordered_map<int, class Pg *> Pgs;
491  int64_t nextapplierid;
493 
494  size_t myreplica;
495  size_t mymember;
497  std::vector<Topology::addressStruct> replicaAddresses;
498 
499  // statements[domainid][statementname] = compiledstatement
500  // user submits statementname
501  boost::unordered_map< int64_t,
502  boost::unordered_map<std::string, class Statement> > statements;
503 };
504 
505 class ApiInterface;
506 typedef ApiInterface *(*spclasscreate)(class TransactionAgent *,
507  class ApiInterface *, void *);
508 typedef void(*spclassdestroy)(ApiInterface *);
509 
517 template < typename T >
518 void replyTa(T servent, topic_e result, void *msg)
519 {
520  class MessageUserSchema &msgref = *(class MessageUserSchema *)msg;
521  servent->msgsnd.data = (void *)msg;
522  ((class Message *)servent->msgsnd.data)->messageStruct.topic = result;
523  ((class Message *)servent->msgsnd.data)->messageStruct.payloadtype =
525  msgref.userschemaStruct.operationid = servent->operationid;
526  msgref.userschemaStruct.domainid = servent->domainid;
527  msgref.userschemaStruct.userid = servent->userid;
528  msgref.userschemaStruct.status = servent->status;
529 }
530 
538 void *transactionAgent(void *identity);
539 
540 #endif /* INFINISQLTRANSACTIONAGENT_H */