31 #line 32 "TransactionAgent.cc"
34 myIdentity(*myIdentityArg), nexttransactionid(0), nextapplierid(0),
35 myreplica(-1), mymember(-1)
47 typedef boost::unordered_map<std::string,
128 if ((events & EPOLLERR) || (events & EPOLLHUP))
134 if (events & EPOLLIN)
151 socketAuthInfo::iterator loggedInUsersIterator;
162 else if (
operation->compare(
"ping")==0 &&
177 domainid = loggedInUsersIterator->second.domainid;
178 userid = loggedInUsersIterator->second.userid;
180 loggedInUsersIterator->second.domainName;
190 spC(
this, NULL, (
void *)spD);
196 builtinsMap::iterator builtinsIterator;
197 builtinsIterator = builtins.find(*
operation);
199 if (builtinsIterator != builtins.end())
201 (this->*(builtinsIterator->second))(
STARTCMD);
213 if (events & EPOLLOUT)
215 struct epoll_event ev;
216 ev.events = EPOLLIN | EPOLLHUP | EPOLLET;
226 sendLaterMap::iterator waitingToSendIterator;
232 waitingToSendIterator->second;
250 fprintf(
logfile,
"\t%s %i hanging it up\n",
264 printf(
"%s %i anomaly listenertype %i\n", __FILE__, __LINE__,
283 new class Pg(this, msgrcvref.socketStruct.socket);
287 fprintf(
logfile,
"%s %i sockfd %i already mapped\n",
295 printf(
"%s %i anomaly listenertype %i\n", __FILE__, __LINE__,
412 fprintf(
logfile,
"builtincmd %i %s %i\n",
452 fprintf(
logfile,
"builtincmd unrecognized %li %s %i\n",
473 fprintf(
logfile,
"%s %i transactionid %li\n", __FILE__,
476 fprintf(
logfile,
"%s %i thismsg %p next ptr, count %p %lu, messageStruct.payloadtype %i pendingcmdid %i entrypoint %i locktype %i\n", __FILE__, __LINE__,
msgrcv,
Mbox::getPtr(msgref.
nextmsg),
Mbox::getCount(msgref.
nextmsg), msgref.
messageStruct.
payloadtype, msgref.
transactionStruct.
transaction_pendingcmdid, msgref.
transactionStruct.
transaction_tacmdentrypoint, ((
class MessageSubtransactionCmd *)
msgrcv)->subtransactionStruct.locktype);
535 printf(
"%s %i no Applier to ack status %i %li,%li,%li\n",
547 operationMap::iterator it;
552 class Operation &operationRef = *it->second;
578 fprintf(
logfile,
"anomaly %i %s %i\n",
607 memcpy(&a, inbuf,
sizeof(a));
608 int64_t msgsize = be64toh(a);
610 if (bytesread-8 != msgsize)
612 printf(
"%s %i bytesread %li msgsize %li sockfd %i\n", __FILE__, __LINE__,
613 bytesread, msgsize,
sockfd);
617 char operationlength = inbuf[8];
619 if ((
int)operationlength >= msgsize-1)
624 int64_t localargsize = msgsize - 1 - (int)operationlength;
625 operation->assign(inbuf+9, (
int)operationlength);
627 memcpy(
args, inbuf+9+(
int)operationlength, localargsize);
662 msgref.userschemaStruct.argsize =
argsize;
663 msgref.userschemaStruct.instance =
instance;
702 fprintf(
logfile,
"TransactionAgent::login cmd unrecognized %i\n", cmd);
722 new class MessageUserSchema(TOPIC_CHANGEPASSWORD);
726 msgref.userschemaStruct.argsize =
argsize;
727 msgref.userschemaStruct.instance =
instance;
729 msgref.userschemaStruct.domainid =
domainid;
730 msgref.userschemaStruct.userid =
userid;
753 fprintf(
logfile,
"TransactionAgent::changepassword cmd unrecognized %i\n", cmd);
766 new class MessageUserSchema(TOPIC_CREATEDOMAIN);
770 msgref.userschemaStruct.argsize =
argsize;
771 msgref.userschemaStruct.instance =
instance;
773 msgref.userschemaStruct.domainid =
domainid;
774 msgref.userschemaStruct.userid =
userid;
784 new class MessageUserSchema(TOPIC_CREATEUSER);
788 rv.push_back(boost::lexical_cast<string>(msgref.userschemaStruct.domainid));
803 fprintf(
logfile,
"TransactionAgent::createdomain cmd unrecognized %i\n",
817 new class MessageUserSchema(TOPIC_CREATEUSER);
821 msgref.userschemaStruct.argsize =
argsize;
822 msgref.userschemaStruct.instance =
instance;
824 msgref.userschemaStruct.domainid =
domainid;
825 msgref.userschemaStruct.userid =
userid;
834 new class MessageUserSchema(TOPIC_CREATEUSER);
838 rv.push_back(boost::lexical_cast<string>(msgref.userschemaStruct.userid));
853 fprintf(
logfile,
"TransactionAgent::createuser cmd unrecognized %i\n",
867 new class MessageUserSchema(TOPIC_DELETEUSER);
871 msgref.userschemaStruct.argsize =
argsize;
872 msgref.userschemaStruct.instance =
instance;
874 msgref.userschemaStruct.domainid =
domainid;
875 msgref.userschemaStruct.userid =
userid;
899 fprintf(
logfile,
"TransactionAgent::deleteuser cmd unrecognized %i\n",
913 new class MessageUserSchema(TOPIC_DELETEDOMAIN);
917 msgref.userschemaStruct.argsize =
argsize;
918 msgref.userschemaStruct.instance =
instance;
920 msgref.userschemaStruct.domainid =
domainid;
921 msgref.userschemaStruct.userid =
userid;
945 fprintf(
logfile,
"TransactionAgent::deletedomain cmd unrecognized %i\n",
969 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1006 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1044 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1067 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1090 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1113 fprintf(
logfile,
"topic unrecognized %i %s %i\n", cmd, __FILE__,
1130 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1144 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1154 class Schema *schemaPtr =
1156 class Table *tablePtr =
1159 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1175 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1185 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1195 new class MessageUserSchema(TOPIC_SCHEMAREPLY);
1212 new class MessageUserSchema(TOPIC_SCHEMAREQUEST);
1216 msgref.userschemaStruct.builtincmd = builtin;
1217 msgref.userschemaStruct.argsize =
argsize;
1218 msgref.userschemaStruct.instance =
instance;
1219 msgref.userschemaStruct.operationid =
operationid;
1220 msgref.userschemaStruct.userid =
userid;
1221 msgref.userschemaStruct.domainid =
domainid;
1302 fprintf(
logfile,
"bad case %i %s %i\n", cmd, __FILE__, __LINE__);
1313 vector<string> resultVector;
1316 string statementname(resultVector[0]);
1317 string sqlstatement(resultVector[1]);
1318 class Larxer lx2((char *)sqlstatement.c_str(),
this,
1340 rv.push_back(statementname);
1351 msgpack::sbuffer *sbuf =
new msgpack::sbuffer;
1352 msgpack::pack(*sbuf, *v);
1358 msgpack::sbuffer *sbuf =
new msgpack::sbuffer;
1359 msgpack::pack(*sbuf, *m);
1375 fprintf(
logfile,
"TA bad message stub %s %i\n", __FILE__, __LINE__);
1407 size_t otherreplica =
myreplica==0 ? 1 : 0;
1421 vector<Topology::addressStruct> ras;
1444 vector<string> resultVector;
1455 msg.
procname.append(resultVector[1]);
1483 const char *dlsym_error;
1485 void *soPtr = dlopen(inmsg.
pathname.c_str(), RTLD_LAZY);
1489 dlsym_error = dlerror();
1496 string funcNameCreate = inmsg.
procname +
"_create";
1499 dlsym_error = dlerror();
1503 printf(
"%s %i anomaly nodeid %i instance %li error %s\n", __FILE__,
1511 string funcNameDestroy = inmsg.
procname +
"_destroy";
1514 dlsym_error = dlerror();
1535 ACTOR_TRANSACTIONAGENT)
1540 (int16_t)n}, *nmsg);
1554 fprintf(
logfile,
"anomaly: %lu %s %i\n", entrypoint, __FILE__, __LINE__);
1568 boost::unordered_map<int64_t, class MessageApply *> msgs;
1570 msgrcvref.messageStruct.sourceAddr, partitioncount);
1572 boost::unordered_map< int64_t,
1573 vector<MessageDispatch::record_s> >::iterator it;
1575 for (it = msgrcvref.records.begin(); it != msgrcvref.records.end(); it++)
1578 vector<MessageDispatch::record_s> &recordsref = it->second;
1580 if (!msgs.count(it->first))
1584 applierPtr->applierid,
1588 msgs[it->first]->
rows = recordsref;
1591 for (
size_t n=0; n < recordsref.size(); n++)
1593 class Table &tableRef = *schemaPtr->
tables[recordsref[n].tableid];
1595 switch (recordsref[n].primitive)
1599 vector<fieldValue_s>
fields;
1600 tableRef.
unmakerow(&recordsref[n].row, &fields);
1602 for (uint16_t f=0; f < tableRef.
fields.size(); f++)
1613 indexinfo.
flags = 0;
1615 indexinfo.
tableid = recordsref[n].tableid;
1616 indexinfo.
entry = {recordsref[n].rowid,
1622 msgs[indexinfo.
entry.
engineid]->indices.push_back(indexinfo);
1629 vector<fieldValue_s> newfields;
1630 tableRef.
unmakerow(&recordsref[n].row, &newfields);
1631 vector<fieldValue_s> oldfields;
1632 tableRef.
unmakerow(&recordsref[n].oldrow, &oldfields);
1634 for (
size_t f=0; f < tableRef.
fields.size(); f++)
1644 switch (tableRef.
fields[f].type)
1647 aredifferent = newfields[f].value.integer !=
1648 oldfields[f].value.integer ?
true :
false;
1652 aredifferent = newfields[f].value.uinteger !=
1653 oldfields[f].value.uinteger ?
true :
false;
1657 aredifferent = newfields[f].value.boolean !=
1658 oldfields[f].value.boolean ?
true :
false;
1662 aredifferent = newfields[f].value.floating !=
1663 oldfields[f].value.floating ?
true :
false;
1667 aredifferent = newfields[f].value.character !=
1668 oldfields[f].value.character ?
true :
false;
1672 aredifferent = newfields[f].str.compare(oldfields[f].str)
1677 aredifferent = newfields[f].str.compare(oldfields[f].str)
1682 printf(
"%s %i anomaly fieldtype %i\n", __FILE__,
1683 __LINE__, tableRef.
fields[f].type);
1687 if (aredifferent==
true)
1693 indexinfo.
flags = 0;
1694 indexinfo.
tableid = recordsref[n].tableid;
1695 indexinfo.
entry = {recordsref[n].rowid,
1701 msgs[indexinfo.
entry.
engineid]->indices.push_back(indexinfo);
1705 indexinfo.
flags = 0;
1707 indexinfo.
tableid = recordsref[n].tableid;
1708 indexinfo.
entry = {recordsref[n].rowid,
1714 msgs[indexinfo.
entry.
engineid]->indices.push_back(indexinfo);
1722 vector<fieldValue_s>
fields;
1723 tableRef.
unmakerow(&recordsref[n].oldrow, &fields);
1725 for (
size_t f=0; f < tableRef.
fields.size(); f++)
1735 indexinfo.
flags = 0;
1737 indexinfo.
tableid = recordsref[n].tableid;
1738 indexinfo.
entry = {recordsref[n].rowid,
1744 msgs[indexinfo.
entry.
engineid]->indices.push_back(indexinfo);
1750 printf(
"%s %i anomaly primitive %i\n", __FILE__, __LINE__,
1751 recordsref[n].primitive);
1756 boost::unordered_map<int64_t, class MessageApply *>::iterator it2;
1758 for (it2 = msgs.begin(); it2 != msgs.end(); it2++)
1773 printf(
"%s %i anomaly\n", __FILE__, __LINE__);