InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Pg.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
30 #include "Pg.h"
31 #include "pgoids.h"
32 #include "TransactionAgent.h"
33 #line 34 "Pg.cc"
34 
35 /* implemented based on http://www.postgresql.org/docs/9.2/static/protocol.html */
36 
37 Pg::Pg(class TransactionAgent *taPtrarg, int sockfdarg) :
38  state(STATE_BEGIN),
39  sockfd(sockfdarg), pgcmdtype('\0'), size(0), outcmd('\0'), userid(-1),
40  schemaPtr(NULL), session_isautocommit(true), isintransactionblock(false)
41 {
42  domainid=-1;
43  taPtr = taPtrarg;
44  statementPtr=NULL;
45  transactionPtr=NULL;
46 
47  taPtr->Pgs[sockfd]=this;
48 }
49 
51 {
52 }
53 
54 // processing logic for Pg, state machine
55 void Pg::cont()
56 {
57  if (state==STATE_EXITING)
58  {
59  return;
60  }
61 
62  class MessageSocket &msgrcvref = *((class MessageSocket *)taPtr->msgrcv);
63 
64  string newdata;
65 
66  if ((msgrcvref.socketStruct.events & EPOLLERR) ||
67  (msgrcvref.socketStruct.events & EPOLLHUP))
68  {
70  return;
71  }
72 
73  if (msgrcvref.socketStruct.events & EPOLLIN)
74  {
75  // read stuff from the socket
76  if (readsocket(newdata)==false)
77  {
79  return;
80  }
81 
82  if (!newdata.size())
83  {
84  // must've been spurious event from epoll
85  return;
86  }
87  }
88  else if (msgrcvref.socketStruct.events & EPOLLOUT)
89  {
90  if (rewritesocket()==-1)
91  {
93  }
94 
95  return;
96  }
97  else
98  {
99  printf("%s %i anomaly events %u\n", __FILE__, __LINE__,
100  msgrcvref.socketStruct.events);
101  return;
102  }
103 
104  short retval = initcmd(newdata);
105 
106  switch (retval)
107  {
108  case -1: // bogus input
109  closesocket(*taPtr);
110  return;
111 // break;
112 
113  case 0: // command not completely received
114  return;
115 // break;
116 
117  case 1: // command completely received
118  pos = 0;
119  outbuf.clear();
120  break;
121 
122  default:
123  printf("%s %i anomaly retval %i\n", __FILE__, __LINE__, retval);
124  }
125 
126  // now, type & size are set.
127 
128  switch (state)
129  {
130  case STATE_BEGIN:
131  {
132  if (size==8) // ssl request
133  {
134  int32_t sslreq;
135 
136  if (get(&sslreq)==false)
137  {
138  closesocket(*taPtr);
139  return;
140  }
141 
142  if (sslreq != 80877103)
143  {
144  closesocket(*taPtr);
145  return;
146  }
147 
148  outbuf.assign("N");
149 
150  if (writesocket()==-1)
151  {
152  closesocket(*taPtr);
153  }
154 
155  size = 0;
156  return;
157  }
158  else // normal start packet
159  {
160  int32_t protvers;
161 
162  if (get(&protvers)==false)
163  {
164  closesocket(*taPtr);
165  return;
166  }
167 
168  if (protvers != 196608)
169  {
170  closesocket(*taPtr);
171  return;
172  }
173 
174  while (inbuf[pos] != '\0')
175  {
176  string paramstr;
177 
178  if (get(paramstr)==false)
179  {
180  closesocket(*taPtr);
181  return;
182  }
183 
184  string valstr;
185 
186  if (get(valstr)==false)
187  {
188  closesocket(*taPtr);
189  return;
190  }
191 
192  startupArgs[paramstr] = valstr;
193  }
194 
195  if (!startupArgs.count("user"))
196  {
197  closesocket(*taPtr);
198  return;
199  }
200 
201  if (!startupArgs.count("database"))
202  {
203  startupArgs["database"] = startupArgs["user"];
204  }
205 
206  outcmd = 'R';
207  put((int32_t)3);
208  replymsg();
209 
210  if (writesocket()==-1)
211  {
212  closesocket(*taPtr);
213  return;
214  }
215 
216  state = STATE_AUTH;
217  size = 0;
218  inbuf.clear();
219  }
220  }
221  break;
222 
223  case STATE_AUTH:
224  {
225  if (pgcmdtype != 'p')
226  {
227  closesocket(*taPtr);
228  return;
229  }
230 
231  string password;
232 
233  if (get(password)==false)
234  {
235  closesocket(*taPtr);
236  return;
237  }
238 
239  // send credentials to USM
240  class Operation &operationRef = *(new class Operation(OP_PGLOGIN, taPtr,
241  -1, -1));
242  operationRef.sockfd = sockfd;
243 
244  class MessageUserSchema *msg =
245  new class MessageUserSchema(TOPIC_OPERATION);
246  msg->userschemaStruct.caller = 1;
247  msg->userschemaStruct.callerstate = 1;
248  msg->userschemaStruct.operationid = operationRef.operationid;
250 
251  msg->username = startupArgs["user"];
252  msg->domainname = startupArgs["database"];
253  msg->password = password;
254  taPtr->mboxes.toUserSchemaMgr(taPtr->myIdentity.address, *msg);
255  }
256  break;
257 
258  case STATE_ESTABLISHED:
259  {
260  switch (pgcmdtype)
261  {
262  case 'Q':
263  {
264  string query;
265  get(query);
266  inbuf.clear();
267  size=0;
268  executeStatement(query);
269  }
270  break;
271 
272  case 'X':
273  closesocket(*taPtr);
274  break;
275 
276  default:
277  printf("%s %i don't know how to handle cmdtype %c\n", __FILE__,
278  __LINE__, pgcmdtype);
279  }
280  }
281  break;
282 
283  case STATE_ABORTED:
284  {
285  switch (pgcmdtype)
286  {
287  case 'Q':
288  {
289  string query;
290  get(query);
291  inbuf.clear();
292  size=0;
293 
294  class Larxer lx((char *)query.c_str(), taPtr, schemaPtr);
295 
296  if (lx.statementPtr==NULL)
297  {
298  putErrorResponse("ERROR", "42601", "syntax error, aborted");
299  return;
300  }
301 
303 
304  if (statementPtr->queries[0].type==CMD_COMMIT ||
306  {
307  putCommandComplete("ROLLBACK");
309  isintransactionblock=false;
310 
311  if (writesocket()==-1)
312  {
313  closesocket(*taPtr);
314  return;
315  }
316  }
317  else
318  {
319  putErrorResponse("ERROR", "25P02", "current transaction is aborted, commands ignored until end of transaction block");
320  }
321 
322  delete statementPtr;
323  }
324  break;
325 
326  case 'X':
327  closesocket(*taPtr);
328  break;
329 
330  default:
331  printf("%s %i don't know how to handle cmdtype %c\n", __FILE__,
332  __LINE__, pgcmdtype);
333  }
334  }
335  break;
336 
337  default:
338  printf("%s %i anomaly state %i\n", __FILE__, __LINE__, state);
339  }
340 }
341 
342 /* false: error or EOF, true: something read (or spurious poll event) */
343 bool Pg::readsocket(string &buf)
344 {
345  char in[8192];
346 
347  while (1)
348  {
349  ssize_t readed = recv(sockfd, in, 8192, 0);
350 
351  if (readed > 0)
352  {
353  buf.append(in, readed);
354  }
355  else if (readed == -1)
356  {
357  if (errno==EAGAIN || errno==EWOULDBLOCK)
358  {
359  return true;
360  }
361 
362  return false;
363  }
364  else
365  {
366  // EOF
367  return false;
368  }
369  }
370 }
371 
373 {
375  pgclosesocket(taRef, sockfd);
376  sockfd=-1;
377 
378  if (transactionPtr != NULL)
379  {
385  return;
386  }
387 
388  if (statementPtr==NULL)
389  {
390  delete this;
391  }
392 }
393 
394 void Pg::pgclosesocket(class TransactionAgent &taRef, int socketfd)
395 {
396 // taRef.Pgs.erase(socketfd);
397  // NEW WAY
398  epoll_ctl(taRef.myIdentity.epollfd, EPOLL_CTL_DEL, socketfd, NULL);
399  if (socketfd <= NUMSOCKETS)
400  {
401  pthread_mutex_lock(&connectionsMutex);
402  socketAffinity[socketfd]=0;
403  listenerTypes[socketfd]=LISTENER_NONE;
404  pthread_mutex_unlock(&connectionsMutex);
405  }
406  else
407  {
408  fprintf(logfile, "%s %i fd %i > %i\n", __FILE__, __LINE__, socketfd,
409  NUMSOCKETS);
410  }
411  taRef.Pgs.erase(socketfd);
412  close(socketfd);
413 }
414 
415 /* -1: error, 0: not enough data, 1: enough data */
416 short Pg::initcmd(string &newdata)
417 {
418  if (!size) // i don't have initial data yet
419  {
420  if (inbuf.size())
421  {
422  inbuf.append(newdata);
423  newdata.swap(inbuf);
424  inbuf.clear();
425  }
426 
427  if (newdata.size() < (state==STATE_BEGIN ? sizeof(size) :
428  sizeof(size)+1))
429  {
430  // wait for more data
431  inbuf.append(newdata);
432  return 0;
433  }
434 
435  // got enough data initially, now set the header vars
436  if (state==STATE_BEGIN)
437  {
438  memcpy(&size, &newdata[0], sizeof(size));
439  size=be32toh(size);
440  inbuf = newdata.substr(sizeof(size), string::npos);
441  }
442  else
443  {
444  pgcmdtype = newdata[0];
445  memcpy(&size, &newdata[1], sizeof(size));
446  size=be32toh(size);
447  inbuf = newdata.substr(sizeof(size)+1, string::npos);
448  }
449  }
450  else
451  {
452  inbuf.append(newdata);
453  }
454 
455  // already have initial data, see if the whole request received
456  if (inbuf.size() + sizeof(size) == size)
457  {
458  // got whole thing
459  return 1;
460  }
461 
462  if (inbuf.size() + sizeof(size) < size)
463  {
464  // wait for more
465  fprintf(logfile, "%s %i sockfd %i pgcmdtype %c inbuf.size() %lu sizeof(size) %lu size %u state %i '%s'\n", __FILE__, __LINE__, sockfd, pgcmdtype,
466  inbuf.size(), sizeof(size), size, state, inbuf.c_str());
467  return 0;
468  }
469 
470  // bogus data
471  return -1;
472 }
473 
474 bool Pg::get(int16_t *val)
475 {
476  if (pos + sizeof(int16_t) > inbuf.size())
477  {
478  // not enough room in inbuf to get requested data
479  return false;
480  }
481 
482  memcpy(val, &inbuf[pos], sizeof(int16_t));
483  *val = be16toh(*val);
484  pos += sizeof(int16_t);
485 
486  return true;
487 }
488 
489 bool Pg::get(int32_t *val)
490 {
491  if (pos + sizeof(int32_t) > inbuf.size())
492  {
493  // not enough room in inbuf to get requested data
494  return false;
495  }
496 
497  memcpy(val, &inbuf[pos], sizeof(int32_t));
498  *val = be32toh(*val);
499  pos += sizeof(int32_t);
500 
501  return true;
502 }
503 
504 bool Pg::get(int64_t *val)
505 {
506  if (pos + sizeof(int64_t) > inbuf.size())
507  {
508  // not enough room in inbuf to get requested data
509  return false;
510  }
511 
512  memcpy(val, &inbuf[pos], sizeof(int64_t));
513  *val = be64toh(*val);
514  pos += sizeof(int64_t);
515 
516  return true;
517 }
518 
519 bool Pg::get(vector<int16_t> &val, size_t nelem)
520 {
521  if (pos + sizeof(int16_t)*nelem > inbuf.size())
522  {
523  // not enough room in inbuf to get requested data
524  return false;
525  }
526 
527  for (size_t n=0; n < nelem; n++)
528  {
529  int16_t elem;
530  memcpy(&elem, &inbuf[pos], sizeof(int16_t));
531  elem = be16toh(elem);
532  val.push_back(elem);
533  pos += sizeof(int16_t);
534  }
535 
536  return true;
537 }
538 
539 bool Pg::get(vector<int32_t> &val, size_t nelem)
540 {
541  if (pos + sizeof(int32_t)*nelem > inbuf.size())
542  {
543  // not enough room in inbuf to get requested data
544  return false;
545  }
546 
547  for (size_t n=0; n < nelem; n++)
548  {
549  int32_t elem;
550  memcpy(&elem, &inbuf[pos], sizeof(int32_t));
551  elem = be32toh(elem);
552  val.push_back(elem);
553  pos += sizeof(int32_t);
554  }
555 
556  return true;
557 }
558 
559 bool Pg::get(vector<int64_t> &val, size_t nelem)
560 {
561  if (pos + sizeof(int64_t)*nelem > inbuf.size())
562  {
563  // not enough room in inbuf to get requested data
564  return false;
565  }
566 
567  for (size_t n=0; n < nelem; n++)
568  {
569  int64_t elem;
570  memcpy(&elem, &inbuf[pos], sizeof(int64_t));
571  elem = be64toh(elem);
572  val.push_back(elem);
573  pos += sizeof(int64_t);
574  }
575 
576  return true;
577 }
578 
579 // get Byte1
580 bool Pg::get(char *val)
581 {
582  if (pos + sizeof(char) > inbuf.size())
583  {
584  // not enough room in inbuf to get requested data
585  return false;
586  }
587 
588  *val = inbuf[pos];
589  pos += sizeof(char);
590 
591  return true;
592 }
593 
594 // get ByteN where N>1
595 bool Pg::get(string &val, size_t nelem)
596 {
597  if (pos + sizeof(char)*nelem > inbuf.size())
598  {
599  // not enough room in inbuf to get requested data
600  return false;
601  }
602 
603  val = inbuf.substr(pos, nelem);
604  pos += sizeof(char)*nelem;
605 
606  return true;
607 }
608 
609 // get string
610 bool Pg::get(string &val)
611 {
612  size_t endpos = inbuf.find('\0', pos);
613 
614  if (endpos == string::npos)
615  {
616  return false;
617  }
618 
619  val = inbuf.substr(pos, endpos-pos);
620  pos = endpos+1;
621 
622  return 1;
623 }
624 
625 void Pg::put(int16_t val)
626 {
627  size_t curpos = outmsg.size();
628  outmsg.resize(curpos + sizeof(int16_t));
629  val = htobe16(val);
630  memcpy(&outmsg[curpos], &val, sizeof(int16_t));
631 }
632 
633 void Pg::put(int32_t val)
634 {
635  size_t curpos = outmsg.size();
636  outmsg.resize(curpos + sizeof(int32_t));
637  val = htobe32(val);
638  memcpy(&outmsg[curpos], &val, sizeof(int32_t));
639 }
640 
641 void Pg::put(int64_t val)
642 {
643  size_t curpos = outmsg.size();
644  outmsg.resize(curpos + sizeof(int64_t));
645  val = htobe64(val);
646  memcpy(&outmsg[curpos], &val, sizeof(int64_t));
647 }
648 
649 void Pg::put(vector<int16_t> &val)
650 {
651  size_t curpos = outmsg.size();
652  size_t nelem = val.size();
653  outmsg.resize(curpos + sizeof(int16_t)*nelem);
654 
655  for (size_t n=0; n < nelem; n++)
656  {
657  int16_t elem = htobe16(val[n]);
658  memcpy(&outmsg[curpos], &elem, sizeof(int16_t));
659  curpos += sizeof(int16_t);
660  }
661 }
662 
663 void Pg::put(vector<int32_t> &val)
664 {
665  size_t curpos = outmsg.size();
666  size_t nelem = val.size();
667  outmsg.resize(curpos + sizeof(int32_t)*nelem);
668 
669  for (size_t n=0; n < nelem; n++)
670  {
671  int32_t elem = htobe32(val[n]);
672  memcpy(&outmsg[curpos], &elem, sizeof(int32_t));
673  curpos += sizeof(int32_t);
674  }
675 }
676 
677 void Pg::put(vector<int64_t> &val)
678 {
679  size_t curpos = outmsg.size();
680  size_t nelem = val.size();
681  outmsg.resize(curpos + sizeof(int64_t)*nelem);
682 
683  for (size_t n=0; n < nelem; n++)
684  {
685  int64_t elem = htobe64(val[n]);
686  memcpy(&outmsg[curpos], &elem, sizeof(int64_t));
687  curpos += sizeof(int64_t);
688  }
689 }
690 
691 // Byte1
692 void Pg::put(char val)
693 {
694  size_t curpos = outmsg.size();
695  outmsg.resize(curpos + sizeof(char));
696  outmsg[curpos] = val;
697 }
698 
699 // ByteN where N>1
700 void Pg::put(char *val, size_t nelem)
701 {
702  outmsg.append(val, nelem);
703 }
704 
705 void Pg::put(string &val)
706 {
707  outmsg.append(val);
708  outmsg.append(1, '\0');
709 }
710 
711 void Pg::put(char *val)
712 {
713  outmsg.append(val);
714  outmsg.append(1, '\0');
715 }
716 
717 /* -1: socket error, 0: send complete, 1: data remaining & backgrounded */
719 {
720  uint32_t osize = htobe32((uint32_t)outmsg.size() + sizeof(osize));
721  string prependstr(sizeof(osize) + sizeof(outcmd), '\0');
722  prependstr[0] = outcmd;
723  memcpy(&prependstr[1], &osize, sizeof(osize));
724  outmsg.insert(0, prependstr);
725  outbuf.append(outmsg);
726 
727  outmsg.clear();
728 }
729 
730 // put ReadyForQuery at the end
732 {
734  {
735  outcmd='Z';
736 
737  if (transactionPtr==NULL)
738  {
739  put('I');
740  }
741  else
742  {
743  put('T');
744  }
745 
746  replymsg();
747  }
748  else if (state==STATE_ABORTED)
749  {
750  outcmd='Z';
751  put('E');
752  replymsg();
753  }
754 
755  size_t curpos = 0;
756  size_t outbuflen = outbuf.size();
757 
758  while (curpos < outbuflen)
759  {
760  ssize_t sent = send(sockfd, &outbuf[curpos],
761  (outbuflen-curpos) >= 8192 ? 8192 :
762  outbuflen-curpos, 0);
763 
764  if (sent == -1)
765  {
766  if (errno==EAGAIN || errno==EWOULDBLOCK)
767  {
768  string backgroundstr = outbuf.substr(curpos, string::npos);
769  outbuf.swap(backgroundstr);
770  struct epoll_event epevent;
771  epevent.events = EPOLLOUT | EPOLLHUP | EPOLLET;
772  epevent.data.fd = sockfd;
773  epoll_ctl(taPtr->myIdentity.epollfd, EPOLL_CTL_MOD, sockfd,
774  &epevent);
775  return 1;
776  }
777 
778  // some real error, so return & close socket
779  return -1;
780  }
781 
782  curpos += sent;
783  }
784 
785  outbuf.clear();
786  return 0;
787 }
788 
790 {
791  switch (writesocket())
792  {
793  case -1:
794  return -1;
795 // break;
796 
797  case 0:
798  {
799  struct epoll_event epevent;
800  epevent.events = EPOLLIN | EPOLLHUP | EPOLLET;
801  epevent.data.fd=sockfd;
802  epoll_ctl(taPtr->myIdentity.epollfd, EPOLL_CTL_MOD, sockfd, &epevent);
803  return 0;
804  }
805  break;
806 
807  case 1:
808  return 1;
809 // break;
810 
811  default:
812  printf("%s %i anomaly WTF\n", __FILE__, __LINE__);
813  return -1;
814  }
815 }
816 
817 void Pg::continueLogin(int cmdstate, class MessageUserSchema &msgrcvref)
818 {
819  if (msgrcvref.userschemaStruct.status==STATUS_OK)
820  {
821  userid = msgrcvref.userschemaStruct.userid;
822  domainid = msgrcvref.userschemaStruct.domainid;
824  procedureprefix.append(msgrcvref.domainname);
825  procedureprefix.append("_");
827 
829  }
830  else // STATUS_NOTOK
831  {
832  putErrorResponse("FATAL", "28P01", "authenticationfailure");
833  }
834 }
835 
836 void Pg::executeStatement(string &stmtstr)
837 {
838  class Larxer lx((char *)stmtstr.c_str(), taPtr, schemaPtr);
839 
840  if (lx.statementPtr==NULL)
841  {
842  putErrorResponse("ERROR", "42601", "syntax error");
843  return;
844  }
845 
847 
848  if (statementPtr->resolveTableFields()==false)
849  {
851  putErrorResponse("ERROR", "42704", "table or column does not exist");
852  delete statementPtr;
853  return;
854  }
855 
857  {
858  statementPtr->queries[0].storedProcedure.insert(0, procedureprefix);
859  }
860 
861  results = results_s();
862 
863  if (transactionPtr==NULL)
864  {
865  command_autocommit=true;
866  }
867  else
868  {
869  command_autocommit=false;
870  }
871 
873  transactionPtr, vector<string>());
874 }
875 
876 void Pg::continuePgFunc(int64_t entrypoint, void *statePtr)
877 {
878  /* based on the statement type and transaction state, a variety of things
879  * if session_isautocommit==true, and is SELECT, INSERT, UPDATE, DELETE,
880  * then output
881  * If autocommit==false, and SELECT, INSERT, UPDATE, DELETE, then prepare
882  * output but don't output
883  * if COMMIT (END), then commit open transaction and output results already
884  * prepared
885  * if ROLLBACK, then rollback open transaction and output results
886  * CommandComplete at the end of everything returned
887  * If set, then set whatever
888  */
890 
891  if (state==STATE_EXITING)
892  {
893  if (transactionPtr==NULL)
894  {
895  fprintf(logfile, "%s %i deleting this %p\n", __FILE__, __LINE__,
896  this);
897  delete this;
898  return;
899  }
900  else
901  {
903  return;
904  }
905  }
906 
907  switch (results.cmdtype)
908  {
909  case CMD_SELECT:
910  {
912  {
914  putDataRows();
915 
916  std::stringstream tag;
917  tag << "SELECT " << results.selectResults.size();
918  putCommandComplete((char *)tag.str().c_str());
919  }
920  else
921  {
924  return;
925  }
926 
927  if (isintransactionblock==false && (session_isautocommit==true ||
928  command_autocommit==true))
929  {
931  }
932  else
933  {
934  if (writesocket()==-1)
935  {
936  closesocket(*taPtr);
937  }
938  }
939  }
940  break;
941 
942  case CMD_INSERT:
944  {
945  putCommandComplete("INSERT 0 1");
946  }
947  else
948  {
951  return;
952  }
953 
954  if (isintransactionblock==false && (session_isautocommit==true ||
955  command_autocommit==true))
956  {
958  }
959  else
960  {
961  if (writesocket()==-1)
962  {
963  closesocket(*taPtr);
964  }
965  }
966 
967  break;
968 
969  case CMD_UPDATE:
970  {
972  {
973  std::stringstream tag;
974  tag << "UPDATE " << results.statementResults.size();
975  putCommandComplete((char *)tag.str().c_str());
976  }
977  else
978  {
981  return;
982  }
983 
984  if (isintransactionblock==false && (session_isautocommit==true ||
985  command_autocommit==true))
986  {
988  }
989  else
990  {
991  if (writesocket()==-1)
992  {
993  closesocket(*taPtr);
994  }
995  }
996  }
997  break;
998 
999  case CMD_DELETE:
1000  {
1002  {
1003  std::stringstream tag;
1004  tag << "DELETE " << results.statementResults.size();
1005  putCommandComplete((char *)tag.str().c_str());
1006  }
1007  else
1008  {
1011  return;
1012  }
1013 
1014  if (isintransactionblock==false && (session_isautocommit==true ||
1015  command_autocommit==true))
1016  {
1018  }
1019  else
1020  {
1021  if (writesocket()==-1)
1022  {
1023  closesocket(*taPtr);
1024  }
1025  }
1026  }
1027  break;
1028 
1029  case CMD_BEGIN:
1030  if (transactionPtr==NULL)
1031  {
1032  transactionPtr = new class Transaction(taPtr, schemaPtr->domainid);
1033  isintransactionblock=true;
1034  putCommandComplete("BEGIN");
1035 
1036  if (writesocket()==-1)
1037  {
1038  closesocket(*taPtr);
1039  }
1040  }
1041  else
1042  {
1043  putNoticeResponse("WARNING", "25001", "there is already a transaction in progress");
1044  putCommandComplete("BEGIN");
1045 
1046  if (writesocket()==-1)
1047  {
1048  closesocket(*taPtr);
1049  }
1050  }
1051 
1052  break;
1053 
1054  case CMD_COMMIT:
1055  if (transactionPtr != NULL)
1056  {
1057  isintransactionblock=false;
1059  }
1060  else
1061  {
1062  putNoticeResponse("WARNING", "25P01", "there is no transaction in progress");
1063  putCommandComplete("COMMIT");
1064 
1065  if (writesocket()==-1)
1066  {
1067  closesocket(*taPtr);
1068  }
1069  }
1070 
1071  break;
1072 
1073  case CMD_ROLLBACK:
1074  if (transactionPtr != NULL)
1075  {
1076  isintransactionblock=false;
1078  }
1079  else
1080  {
1081  putNoticeResponse("WARNING", "25P01", "there is no transaction in progress");
1082  putCommandComplete("ROLLBACK");
1083 
1084  if (writesocket()==-1)
1085  {
1086  closesocket(*taPtr);
1087  }
1088  }
1089 
1090  break;
1091 
1092  case CMD_SET:
1093  break;
1094 
1095  case CMD_STOREDPROCEDURE:
1096  {
1098  {
1100  putDataRows();
1101 
1102  std::stringstream tag;
1103  tag << "SELECT " << results.selectResults.size();
1104  putCommandComplete((char *)tag.str().c_str());
1105  }
1106  else
1107  {
1110  return;
1111  }
1112 
1113  if (isintransactionblock==false && (session_isautocommit==true ||
1114  command_autocommit==true))
1115  {
1117  }
1118  else
1119  {
1120  if (writesocket()==-1)
1121  {
1122  closesocket(*taPtr);
1123  }
1124  }
1125  }
1126  break;
1127 
1128  default:
1129  printf("%s %i anomaly cmdtype %i statementStatus %li\n", __FILE__,
1130  __LINE__, results.cmdtype, results.statementStatus);
1131  }
1132 }
1133 
1134 void Pg::putCommandComplete(char *tag)
1135 {
1136  outcmd='C';
1137  put(tag);
1138  replymsg();
1139 }
1140 
1141 void Pg::putErrorResponse(char *severity, char *code, char *message)
1142 {
1143  outcmd='E';
1144  put('S');
1145  put(severity);
1146  put('C');
1147  put(code);
1148  put('M');
1149  put(message);
1150  put(char(0));
1151 
1152  replymsg();
1153 
1154  if (writesocket()==-1)
1155  {
1156  closesocket(*taPtr);
1157  }
1158 }
1159 
1160 void Pg::putNoticeResponse(char *severity, char *code, char *message)
1161 {
1162  outcmd='N';
1163  put('S');
1164  put(severity);
1165  put('C');
1166  put(code);
1167  put('M');
1168  put(message);
1169  put(char(0));
1170 
1171  replymsg();
1172 }
1173 
1175 {
1176  if (transactionPtr==NULL)
1177  {
1178  continuePgCommitimplicit(1, NULL);
1179  return;
1180  }
1181 
1182  transactionPtr->reentryObject = this;
1185  transactionPtr->reentryState = NULL;
1186 
1188 }
1189 
1191 {
1192  if (transactionPtr==NULL)
1193  {
1194  continuePgCommitexplicit(1, NULL);
1195  return;
1196  }
1197 
1198  transactionPtr->reentryObject = this;
1201  transactionPtr->reentryState = NULL;
1202 
1204 }
1205 
1207 {
1208  if (isintransactionblock==true)
1209  {
1211  }
1212 
1213  if (transactionPtr==NULL)
1214  {
1215  continuePgRollbackimplicit(1, NULL);
1216  return;
1217  }
1218 
1219  transactionPtr->reentryObject = this;
1222  transactionPtr->reentryState = NULL;
1223 
1225 }
1226 
1228 {
1229  if (transactionPtr==NULL)
1230  {
1231  continuePgRollbackexplicit(1, NULL);
1232  return;
1233  }
1234 
1235  transactionPtr->reentryObject = this;
1238  transactionPtr->reentryState = NULL;
1239 
1241 }
1242 
1244 {
1245  if (transactionPtr != NULL)
1246  {
1247  return false;
1248  }
1249 
1250  transactionPtr = new class Transaction(taPtr, domainid);
1251  return true;
1252 }
1253 
1255 {
1256  outcmd='T';
1257  int16_t numfields = (int16_t)results.selectFields.size();
1258  put(numfields);
1259 
1260  for (int16_t n=0; n < numfields; n++)
1261  {
1262  put(results.selectFields[n].name);
1263  put((int32_t)0);
1264  put((int16_t)0);
1265 
1266  switch (results.selectFields[n].type)
1267  {
1268  case INT:
1269  put((int32_t)INT8OID);
1270  put((int16_t)8);
1271  break;
1272 
1273  case UINT:
1274  put((int32_t)INT8OID);
1275  put((int16_t)8);
1276  break;
1277 
1278  case BOOL:
1279  put((int32_t)BOOLOID);
1280  put((int16_t)1);
1281  break;
1282 
1283  case FLOAT:
1284  put((int32_t)FLOAT8OID);
1285  put((int16_t)8);
1286  break;
1287 
1288  case CHAR:
1289  put((int32_t)CHAROID);
1290  put((int16_t)1);
1291  break;
1292 
1293  case CHARX:
1294  put((int32_t)BPCHAROID);
1295  put((int16_t)-2);
1296  break;
1297 
1298  case VARCHAR:
1299  put((int32_t)VARCHAROID);
1300  put((int16_t)-2);
1301  break;
1302 
1303  default:
1304  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
1305  results.selectFields[n].type);
1306  return;
1307  }
1308 
1309  put((int32_t)0);
1310  put((int16_t)0); // text format code
1311  }
1312 
1313  replymsg();
1314 }
1315 
1317 {
1318  boost::unordered_map< uuRecord_s, vector<fieldValue_s> >::const_iterator it;
1319 
1320  for (it = results.selectResults.begin(); it != results.selectResults.end();
1321  it++)
1322  {
1323  outcmd='D';
1324  int16_t numfields = (int16_t)results.selectFields.size();
1325  put(numfields);
1326 
1327  const vector<fieldValue_s> &fieldValues = it->second;
1328 
1329  for (int16_t n=0; n < numfields; n++)
1330  {
1331  if (fieldValues[n].isnull==true)
1332  {
1333  put((int32_t)-1);
1334  continue;
1335  }
1336 
1337  switch (results.selectFields[n].type)
1338  {
1339  case INT:
1340  {
1341  char val[21]; // length of smallest int64_t + \0
1342  int32_t len=sprintf(val, "%li", fieldValues[n].value.integer);
1343  put(len);
1344  put(val, len);
1345  }
1346  break;
1347 
1348  case UINT:
1349  {
1350  char val[21]; // length of largest uint64_t + \0
1351  int32_t len=sprintf(val, "%lu", fieldValues[n].value.uinteger);
1352  put(len);
1353  put(val, len);
1354  }
1355  break;
1356 
1357  case BOOL:
1358  put((int32_t)1);
1359 
1360  if (fieldValues[n].value.boolean==true)
1361  {
1362  put('t');
1363  }
1364  else
1365  {
1366  put('f');
1367  }
1368 
1369  break;
1370 
1371  case FLOAT:
1372  {
1373  std::stringstream val;
1374  val << (double)fieldValues[n].value.floating;
1375 
1376  if ((double)fieldValues[n].value.floating /
1377  (int64_t)fieldValues[n].value.floating == 1)
1378  {
1379  val << ".0";
1380  }
1381 
1382  int32_t len=val.str().size();
1383  put((int32_t)len);
1384  put((char *)val.str().c_str(), len);
1385  }
1386  break;
1387 
1388  case CHAR:
1389  put((int32_t)1);
1390  put(fieldValues[n].value.character);
1391  break;
1392 
1393  case CHARX:
1394  {
1395  int32_t len = fieldValues[n].str.size();
1396  put((int32_t)len);
1397  put((char *)fieldValues[n].str.c_str(), len);
1398  }
1399  break;
1400 
1401  case VARCHAR:
1402  {
1403  int32_t len = fieldValues[n].str.size();
1404  put((int32_t)len);
1405  put((char *)fieldValues[n].str.c_str(), len);
1406  }
1407  break;
1408 
1409  default:
1410  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
1411  results.selectFields[n].type);
1412  return;
1413  }
1414  }
1415 
1416  replymsg();
1417  }
1418 }
1419 
1420 void Pg::continuePgCommitimplicit(int64_t entrypoint, void *statePtr)
1421 {
1422  if (transactionPtr != NULL)
1423  {
1424  delete transactionPtr;
1425  transactionPtr=NULL;
1426  }
1427 
1428  if (state==STATE_EXITING)
1429  {
1430  if (statementPtr==NULL)
1431  {
1432  fprintf(logfile, "%s %i deleting this %p\n", __FILE__, __LINE__,
1433  this);
1434  delete this;
1435  }
1436 
1437  return;
1438  }
1439 
1440  if (writesocket()==-1)
1441  {
1442  closesocket(*taPtr);
1443  }
1444 }
1445 
1446 void Pg::continuePgCommitexplicit(int64_t entrypoint, void *statePtr)
1447 {
1448  putCommandComplete("COMMIT");
1449 
1450  if (transactionPtr != NULL)
1451  {
1452  delete transactionPtr;
1453  transactionPtr=NULL;
1454  }
1455 
1456  if (state==STATE_EXITING)
1457  {
1458  if (statementPtr==NULL)
1459  {
1460  fprintf(logfile, "%s %i deleting this %p\n", __FILE__, __LINE__,
1461  this);
1462  delete this;
1463  }
1464 
1465  return;
1466  }
1467 
1468  if (writesocket()==-1)
1469  {
1470  closesocket(*taPtr);
1471  }
1472 
1473  outbuf.clear();
1474 }
1475 
1476 void Pg::continuePgRollbackimplicit(int64_t entrypoint, void *statePtr)
1477 {
1478  outbuf.clear();
1479 
1480  if (transactionPtr != NULL)
1481  {
1482  delete transactionPtr;
1483  transactionPtr=NULL;
1484  }
1485 
1486  if (state==STATE_EXITING)
1487  {
1488  if (statementPtr==NULL)
1489  {
1490  fprintf(logfile, "%s %i deleting this %p\n", __FILE__, __LINE__,
1491  this);
1492  delete this;
1493  }
1494 
1495  return;
1496  }
1497 }
1498 
1499 void Pg::continuePgRollbackexplicit(int64_t entrypoint, void *statePtr)
1500 {
1501  if (transactionPtr != NULL)
1502  {
1503  delete transactionPtr;
1504  transactionPtr=NULL;
1505  }
1506 
1507  if (state==STATE_EXITING)
1508  {
1509  if (statementPtr==NULL)
1510  {
1511  fprintf(logfile, "%s %i deleting this %p\n", __FILE__, __LINE__,
1512  this);
1513  delete this;
1514  }
1515 
1516  return;
1517  }
1518 
1519  putCommandComplete("ROLLBACK");
1520 
1521  if (writesocket()==-1)
1522  {
1523  closesocket(*taPtr);
1524  }
1525 
1526  outbuf.clear();
1527 }
1528 
1529 /* pg 9.2.4 response to perl client ParameterStatus list:
1530  * application_name
1531  client_encoding UTF8
1532  DateStyle ISO, MDY
1533  integer_datetimes on
1534  IntervalStyle postgres
1535  is_superuser off
1536  server_encoding UTF8
1537  server_version 9.2.4
1538  session_authorization mtravis
1539  standard_conforming_strings on
1540  TimeZone US/Pacific
1541 */
1543 {
1544  // AuthenticationOk
1545  outcmd = 'R';
1546  put((int32_t)0);
1547  replymsg();
1548 
1549  putParameterStatus("application_name", "");
1550  putParameterStatus("client_encoding", "LATIN1");
1551  putParameterStatus("DateStyle", "ISO, MDY");
1552  putParameterStatus("integer_datetimes", "on");
1553  putParameterStatus("IntervalStyle", "postgres");
1554  putParameterStatus("is_superuser", "off");
1555  putParameterStatus("server_encoding", "LATIN1");
1556  putParameterStatus("server_version", "9.2.4");
1557  putParameterStatus("session_authorization",
1558  (char *)startupArgs["username"].c_str());
1559  putParameterStatus("standard_conforming_strings", "on");
1560  tzset();
1561  // long TimeZone name from tzname[2] having problems compiling
1562  putParameterStatus("TimeZone", tzname[1]);
1563  // putParameterStatus("TimeZone", "US/Pacific");
1564 
1565  // forget key data for now, but put something here
1566  outcmd = 'K';
1567  put(int32_t(1));
1568  put(int32_t(2));
1569  replymsg();
1570 
1571  // ReadyForQuery
1572  outcmd = 'Z';
1573  put((char)'I');
1574  replymsg();
1575 
1576  if (writesocket()==-1)
1577  {
1578  closesocket(*taPtr);
1579  return;
1580  }
1581 
1583  size = 0;
1584  inbuf.clear();
1585 }
1586 
1587 void Pg::putParameterStatus(char *name, char *value)
1588 {
1589  outcmd='S';
1590  put(name);
1591  put(value);
1592  replymsg();
1593 }
1594 
1595 void Pg::errorStatus(int64_t status)
1596 {
1597  switch (status)
1598  {
1599  case APISTATUS_NOTOK:
1600  putErrorResponse("ERROR", "42000",
1601  "generic InfiniSQL error");
1602  break;
1603 
1605  putErrorResponse("ERROR", "23000", "integrity_constraint_violation");
1606  break;
1607 
1608  case APISTATUS_LOCK:
1609  putErrorResponse("ERROR", "55P03", "lock not available");
1610  break;
1611 
1612  default:
1613  printf("%s %i anomaly %li\n", __FILE__, __LINE__, status);
1614  putErrorResponse("ERROR", "42000",
1615  "syntax_error_or_access_rule_violation");
1616  }
1617 }