InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TopologyMgr Class Reference

execute Topology Manager actor More...

#include <TopologyMgr.h>

Collaboration diagram for TopologyMgr:

Public Member Functions

 TopologyMgr (Topology::partitionAddress *myIdentityArg)
 
 TopologyMgr (const TopologyMgr &orig)
 
virtual ~TopologyMgr ()
 

Private Member Functions

void updateLocalConfig (msgpack::unpacker &pac, msgpack::unpacked &result)
 local (node-specific) Topology update More...
 
void updateGlobalConfig (msgpack::unpacker &pac, msgpack::unpacked &result)
 global (cluster wide) Topology update More...
 
void broadcastConfig ()
 tell all local actors to update their versions of Topology More...
 

Private Attributes

Topology::partitionAddress myIdentity
 
class Mboxes mboxes
 
class Topology myTopology
 

Detailed Description

execute Topology Manager actor

Parameters
myIdentityArghow to identify this

Definition at line 53 of file TopologyMgr.h.

Constructor & Destructor Documentation

TopologyMgr::TopologyMgr ( Topology::partitionAddress myIdentityArg)

Definition at line 48 of file TopologyMgr.cc.

References cfg_s::anonymousping, cfg_s::badloginmessages, deadlockMgr(), engine(), ibGateway(), Topology::partitionAddress::instance, listener(), logfile, Topology::partitionAddress::mbox, myIdentity, Topology::newActor(), nodeTopology, obGateway(), replyToManager(), transactionAgent(), updateGlobalConfig(), updateLocalConfig(), userSchemaMgr(), zmqcontext, and zmqsocket.

48  :
49  myIdentity(*myIdentityArg)
50 {
51  delete myIdentityArg;
52 
53  if (daemon(1, 1))
54  {
55  fprintf(logfile, "%s %i daemon errno %i\n", __FILE__, __LINE__, errno);
56  exit(1);
57  }
58 
59  // start 0mq socket
60  // void *zmqcontext = zmq_ctx_new();
61  void *zmqresponder = zmq_socket(zmqcontext, ZMQ_REP);
62  int rv = zmq_bind(zmqresponder, zmqsocket.c_str());
63 
64  if (rv == -1)
65  {
66  perror("zmq_bind");
67  printf("%s %i zmq_bind errno %i\n", __FILE__, __LINE__, errno);
68  exit(1);
69  }
70 
71  int epollfd = epoll_create(1);
72 
73  while (1)
74  {
75  HECK:
76 
77  zmq_msg_t zmqrecvmsg;
78  zmq_msg_init(&zmqrecvmsg);
79  int retval = zmq_msg_recv(&zmqrecvmsg, zmqresponder, 0);
80 
81  if (retval == -1)
82  {
83  fprintf(logfile, "%s %i zmq_recv errno %i\n", __FILE__, __LINE__,
84  errno);
85  }
86 
87  msgpack::sbuffer replysbuf;
88  msgpack::packer<msgpack::sbuffer> replypk(&replysbuf);
89 
90  msgpack::sbuffer inbuf;
91  inbuf.write((char *)zmq_msg_data(&zmqrecvmsg), zmq_msg_size(&zmqrecvmsg));
92  msgpack::unpacker pac;
93  pac.reserve_buffer(inbuf.size());
94  memcpy(pac.buffer(), inbuf.data(), inbuf.size());
95  pac.buffer_consumed(inbuf.size());
96  msgpack::unpacked result;
97 
98  if (pac.next(&result)==false)
99  {
100  replypk.pack_int(CMDNOTOK);
101  replyToManager(zmqresponder, replysbuf);
102  zmq_msg_close(&zmqrecvmsg);
103  goto HECK;
104  }
105 
106  int cmd;
107  msgpack::object obj = result.get();
108  obj.convert(&cmd);
109 
110  switch (cmd)
111  {
112  case CMDSET:
113  {
114  if (pac.next(&result)==false)
115  {
116  replypk.pack_int(CMDNOTOK);
117  replyToManager(zmqresponder, replysbuf);
118  zmq_msg_close(&zmqrecvmsg);
119  goto HECK;
120  }
121 
122  int cmd2;
123  msgpack::object obj2 = result.get();
124  obj2.convert(&cmd2);
125 
126  switch (cmd2)
127  {
128  case CMDANONYMOUSPING:
129  {
130  if (pac.next(&result)==false)
131  {
132  replypk.pack_int(CMDNOTOK);
133  replyToManager(zmqresponder, replysbuf);
134  zmq_msg_close(&zmqrecvmsg);
135  goto HECK;
136  }
137 
138  int val;
139  msgpack::object obj3 = result.get();
140  obj3.convert(&val);
141 
142  // t or f
143  __sync_bool_compare_and_swap(&cfgs.anonymousping,
144  !(val==0 ? 0 : 1), val==0 ? 0 : 1);
145  replypk.pack_int(CMDOK);
146  }
147  break;
148 
149  case CMDBADLOGINMESSAGES:
150  {
151  if (pac.next(&result)==false)
152  {
153  replypk.pack_int(CMDNOTOK);
154  replyToManager(zmqresponder, replysbuf);
155  zmq_msg_close(&zmqrecvmsg);
156  goto HECK;
157  }
158 
159  int val;
160  msgpack::object obj3 = result.get();
161  obj3.convert(&val);
162  // t or f
163  __sync_bool_compare_and_swap(&cfgs.badloginmessages,
164  !(val==0 ? 0 : 1), val==0 ? 0 : 1);
165  replypk.pack_int(CMDOK);
166  }
167  break;
168 
169  default:
170  replypk.pack_int(CMDNOTOK);
171  replyToManager(zmqresponder, replysbuf);
172  zmq_msg_close(&zmqrecvmsg);
173  goto HECK;
174  }
175  }
176  break;
177 
178  case CMDGET:
179  {
180  if (pac.next(&result)==false)
181  {
182  replypk.pack_int(CMDNOTOK);
183  replyToManager(zmqresponder, replysbuf);
184  zmq_msg_close(&zmqrecvmsg);
185  goto HECK;
186  }
187 
188  int cmd2;
189  msgpack::object obj2 = result.get();
190  obj2.convert(&cmd2);
191 
192  switch (cmd2)
193  {
194  case CMDGETTOPOLOGYMGRMBOXPTR:
195  {
196  replypk.pack_int(CMDOK);
197  replypk.pack_int64((int64_t)myIdentity.mbox);
198  }
199  break;
200 
201  default:
202  replypk.pack_int(CMDNOTOK);
203  replyToManager(zmqresponder, replysbuf);
204  zmq_msg_close(&zmqrecvmsg);
205  goto HECK;
206  }
207  }
208  break;
209 
210  case CMDSTART:
211  {
212  pthread_t tid;
213 
214  if (pac.next(&result)==false)
215  {
216  replypk.pack_int(CMDNOTOK);
217  replyToManager(zmqresponder, replysbuf);
218  zmq_msg_close(&zmqrecvmsg);
219  goto HECK;
220  }
221 
222  int cmd2;
223  msgpack::object obj2 = result.get();
224  obj2.convert(&cmd2);
225 
226  if (pac.next(&result)==false)
227  {
228  replypk.pack_int(CMDNOTOK);
229  replyToManager(zmqresponder, replysbuf);
230  zmq_msg_close(&zmqrecvmsg);
231  goto HECK;
232  }
233 
234  int64_t actorid;
235  msgpack::object obj3 = result.get();
236  obj3.convert(&actorid);
237 
238  class Mbox *newmbox=NULL;
239 
240  switch (cmd2)
241  {
242  case CMDLISTENER:
243  {
244  vector<string> nodes;
245  vector<string> services;
246  string node;
247  string service;
248  msgpack::object obj4;
249 
250  if (pac.next(&result)==false)
251  {
252  replypk.pack_int(CMDNOTOK);
253  replyToManager(zmqresponder, replysbuf);
254  zmq_msg_close(&zmqrecvmsg);
255  goto HECK;
256  }
257 
258  obj4 = result.get();
259  obj4.convert(&node);
260  nodes.push_back(node);
261 
262  if (pac.next(&result)==false)
263  {
264  replypk.pack_int(CMDNOTOK);
265  replyToManager(zmqresponder, replysbuf);
266  zmq_msg_close(&zmqrecvmsg);
267  goto HECK;
268  }
269 
270  obj4 = result.get();
271  obj4.convert(&service);
272  services.push_back(service);
273 
274  if (pac.next(&result)==false)
275  {
276  replypk.pack_int(CMDNOTOK);
277  replyToManager(zmqresponder, replysbuf);
278  zmq_msg_close(&zmqrecvmsg);
279  goto HECK;
280  }
281 
282  obj4 = result.get();
283  obj4.convert(&node);
284  nodes.push_back(node);
285 
286  if (pac.next(&result)==false)
287  {
288  replypk.pack_int(CMDNOTOK);
289  replyToManager(zmqresponder, replysbuf);
290  zmq_msg_close(&zmqrecvmsg);
291  goto HECK;
292  }
293 
294  obj4 = result.get();
295  obj4.convert(&service);
296  services.push_back(service);
297 
298  newmbox = new class Mbox;
299 
300  if (pthread_create(&tid, NULL, listener,
301  nodeTopology.newActor(ACTOR_LISTENER, newmbox,
302  epollfd, string(),
303  actorid, nodes,
304  services))==-1)
305  {
306  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
307  __LINE__, errno);
308  replypk.pack_int(CMDNOTOK);
309  replyToManager(zmqresponder, replysbuf);
310  zmq_msg_close(&zmqrecvmsg);
311  goto HECK;
312  }
313  else
314  {
315  replypk.pack_int(CMDOK);
316  replypk.pack_int64((int64_t)newmbox);
317  }
318  }
319  break;
320 
321  case CMDUSERSCHEMAMGR:
322  {
323  if (pac.next(&result)==false)
324  {
325  replypk.pack_int(CMDNOTOK);
326  replyToManager(zmqresponder, replysbuf);
327  zmq_msg_close(&zmqrecvmsg);
328  goto HECK;
329  }
330 
331  string globaladminpassword;
332  msgpack::object obj4 = result.get();
333  obj4.convert(&globaladminpassword);
334  newmbox = new class Mbox;
335 
336  if (pthread_create(&tid, NULL, userSchemaMgr,
337  nodeTopology.newActor(ACTOR_USERSCHEMAMGR,
338  newmbox, epollfd,
339  globaladminpassword,
340  actorid,
341  vector<string>(),
342  vector<string>()))==-1)
343  {
344  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
345  __LINE__, errno);
346  replypk.pack_int(CMDNOTOK);
347  replyToManager(zmqresponder, replysbuf);
348  zmq_msg_close(&zmqrecvmsg);
349  goto HECK;
350  }
351  else
352  {
353  replypk.pack_int(CMDOK);
354  replypk.pack_int64((int64_t)newmbox);
355  }
356  }
357  break;
358 
359  case CMDDEADLOCKMGR:
360  newmbox = new class Mbox;
361 
362  if (pthread_create(&tid, NULL, deadlockMgr,
363  nodeTopology.newActor(ACTOR_DEADLOCKMGR,
364  newmbox, epollfd,
365  string(), actorid,
366  vector<string>(),
367  vector<string>()))==-1)
368  {
369  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
370  __LINE__, errno);
371  replypk.pack_int(CMDNOTOK);
372  replyToManager(zmqresponder, replysbuf);
373  zmq_msg_close(&zmqrecvmsg);
374  goto HECK;
375  }
376  else
377  {
378  replypk.pack_int(CMDOK);
379  replypk.pack_int64((int64_t)newmbox);
380  }
381 
382  break;
383 
384  case CMDTRANSACTIONAGENT:
385  {
386  if (pac.next(&result)==false)
387  {
388  replypk.pack_int(CMDNOTOK);
389  replyToManager(zmqresponder, replysbuf);
390  zmq_msg_close(&zmqrecvmsg);
391  goto HECK;
392  }
393 
394  int64_t instance;
395  msgpack::object obj4 = result.get();
396  obj4.convert(&instance);
397 
398  newmbox = new class Mbox;
400  nodeTopology.newActor(ACTOR_TRANSACTIONAGENT,
401  newmbox, epollfd, string(), actorid,
402  vector<string>(),
403  vector<string>());
404  paddr->instance = instance;
405 
406  if (pthread_create(&tid, NULL, transactionAgent, paddr)==-1)
407  {
408  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
409  __LINE__, errno);
410  replypk.pack_int(CMDNOTOK);
411  replyToManager(zmqresponder, replysbuf);
412  zmq_msg_close(&zmqrecvmsg);
413  goto HECK;
414  }
415  else
416  {
417  replypk.pack_int(CMDOK);
418  replypk.pack_int64((int64_t)newmbox);
419  }
420  }
421  break;
422 
423  case CMDENGINE:
424  {
425  if (pac.next(&result)==false)
426  {
427  replypk.pack_int(CMDNOTOK);
428  replyToManager(zmqresponder, replysbuf);
429  zmq_msg_close(&zmqrecvmsg);
430  goto HECK;
431  }
432 
433  int64_t instance;
434  msgpack::object obj4 = result.get();
435  obj4.convert(&instance);
436 
437  newmbox = new class Mbox;
439  nodeTopology.newActor(ACTOR_ENGINE,
440  newmbox, epollfd, string(), actorid,
441  vector<string>(),
442  vector<string>());
443  paddr->instance = instance;
444 
445  if (pthread_create(&tid, NULL, engine, paddr)==-1)
446  {
447  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
448  __LINE__, errno);
449  replypk.pack_int(CMDNOTOK);
450  replyToManager(zmqresponder, replysbuf);
451  zmq_msg_close(&zmqrecvmsg);
452  goto HECK;
453  }
454  else
455  {
456  replypk.pack_int(CMDOK);
457  replypk.pack_int64((int64_t)newmbox);
458  }
459  }
460  break;
461 
462  case CMDOBGATEWAY:
463  {
464  if (pac.next(&result)==false)
465  {
466  replypk.pack_int(CMDNOTOK);
467  replyToManager(zmqresponder, replysbuf);
468  zmq_msg_close(&zmqrecvmsg);
469  goto HECK;
470  }
471 
472  int64_t instance;
473  msgpack::object obj4 = result.get();
474  obj4.convert(&instance);
475 
476  newmbox = new class Mbox;
478  nodeTopology.newActor(ACTOR_OBGATEWAY,
479  newmbox, epollfd, string(), actorid,
480  vector<string>(), vector<string>());
481  paddr->instance = instance;
482 
483  if (pthread_create(&tid, NULL, obGateway, paddr)==-1)
484  {
485  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
486  __LINE__, errno);
487  replypk.pack_int(CMDNOTOK);
488  replyToManager(zmqresponder, replysbuf);
489  zmq_msg_close(&zmqrecvmsg);
490  goto HECK;
491  }
492  else
493  {
494  replypk.pack_int(CMDOK);
495  replypk.pack_int64((int64_t)newmbox);
496  }
497  }
498  break;
499 
500  case CMDIBGATEWAY:
501  {
502  if (pac.next(&result)==false)
503  {
504  replypk.pack_int(CMDNOTOK);
505  replyToManager(zmqresponder, replysbuf);
506  zmq_msg_close(&zmqrecvmsg);
507  goto HECK;
508  }
509 
510  int64_t instance;
511  msgpack::object obj4 = result.get();
512  obj4.convert(&instance);
513 
514  if (pac.next(&result)==false)
515  {
516  replypk.pack_int(CMDNOTOK);
517  replyToManager(zmqresponder, replysbuf);
518  zmq_msg_close(&zmqrecvmsg);
519  goto HECK;
520  }
521 
522  string hostport;
523  msgpack::object obj5 = result.get();
524  obj5.convert(&hostport);
525 
526  newmbox = new class Mbox;
528  nodeTopology.newActor(ACTOR_IBGATEWAY,
529  newmbox, epollfd, hostport, actorid,
530  vector<string>(), vector<string>());
531  paddr->instance = instance;
532 
533  if (pthread_create(&tid, NULL, ibGateway, paddr)==-1)
534  {
535  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
536  __LINE__, errno);
537  replypk.pack_int(CMDNOTOK);
538  replyToManager(zmqresponder, replysbuf);
539  zmq_msg_close(&zmqrecvmsg);
540  goto HECK;
541  }
542  else
543  {
544  replypk.pack_int(CMDOK);
545  replypk.pack_int64((int64_t)newmbox);
546  }
547  }
548  break;
549 
550  default:
551  replypk.pack_int(CMDNOTOK);
552  replyToManager(zmqresponder, replysbuf);
553  zmq_msg_close(&zmqrecvmsg);
554  goto HECK;
555  }
556  }
557  break;
558 
559  case CMDLOCALCONFIG:
560  {
561  updateLocalConfig(pac, result);
562  replypk.pack_int(CMDOK);
563  }
564  break;
565 
566  case CMDGLOBALCONFIG:
567  {
568  updateGlobalConfig(pac, result);
569  replypk.pack_int(CMDOK);
570  }
571  break;
572 
573  default:
574  replypk.pack_int(CMDNOTOK);
575  replyToManager(zmqresponder, replysbuf);
576  zmq_msg_close(&zmqrecvmsg);
577  goto HECK;
578  }
579 
580  replyToManager(zmqresponder, replysbuf);
581 
582  if (zmq_msg_close(&zmqrecvmsg)==-1)
583  {
584  perror("zmq_msg_close");
585  printf("%s %i %i\n", __FILE__, __LINE__, errno);
586  }
587  }
588 }

Here is the call graph for this function:

TopologyMgr::TopologyMgr ( const TopologyMgr orig)

Definition at line 590 of file TopologyMgr.cc.

591 {
592 }
TopologyMgr::~TopologyMgr ( )
virtual

Definition at line 594 of file TopologyMgr.cc.

595 {
596 }

Member Function Documentation

void TopologyMgr::broadcastConfig ( )
private

tell all local actors to update their versions of Topology

Definition at line 936 of file TopologyMgr.cc.

References Topology::addressStruct::actorid, Mboxes::actoridToProducers, Topology::actorList, Message::message_s::destAddr, mboxes, Message::messageStruct, myTopology, Topology::addressStruct::nodeid, Topology::nodeid, PAYLOADMESSAGE, Message::message_s::payloadtype, Message::message_s::sourceAddr, Message::message_s::topic, TOPIC_TOPOLOGY, and Mboxes::update().

Referenced by updateGlobalConfig(), and updateLocalConfig().

937 {
939  // next, go through each and send mails
940  class Message *msg;
941 
942  for (size_t n=0; n < myTopology.actorList.size(); n++)
943  {
944  if (myTopology.actorList[n].type != ACTOR_NONE)
945  {
946  msg = new class Message;
952  msg->messageStruct.destAddr.actorid = n;
953  mboxes.actoridToProducers[n]->sendMsg(*msg);
954  }
955  }
956 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TopologyMgr::updateGlobalConfig ( msgpack::unpacker &  pac,
msgpack::unpacked &  result 
)
private

global (cluster wide) Topology update

transmitted by manager process over 0mq serialized by msgpack

Parameters
pacmsgpack unpacker object
resultresult of unpacking

Definition at line 702 of file TopologyMgr.cc.

References Topology::activereplica, Topology::addressStruct::actorid, Topology::actorList, Topology::partitionAddress::address, Topology::allActors, Topology::allActorsThisReplica, Topology::partitionAddress::argstring, broadcastConfig(), Topology::deadlockMgrMbox, Topology::deadlockMgrNode, Topology::partitionAddress::epollfd, Topology::ibGateways, Topology::partitionAddress::instance, Topology::partitionAddress::mbox, myTopology, Topology::addressStruct::nodeid, Topology::nodeid, nodeTopology, nodeTopologyMutex, Topology::numpartitions, Topology::numreplicas, Topology::partitionList, Topology::partitionListThisReplica, Topology::replicaMembers, Topology::tas, Topology::partitionAddress::type, Topology::userSchemaMgrMbox, and Topology::userSchemaMgrNode.

Referenced by TopologyMgr().

704 {
705  pac.next(&result);
706  msgpack::object obj = result.get();
707  int activereplica;
708  obj.convert(&activereplica);
709 
710  pac.next(&result);
711  obj = result.get();
712  vector<int64_t> replicaids;
713  obj.convert(&replicaids);
714 
715  pac.next(&result);
716  obj = result.get();
717  vector<int64_t> nodeids;
718  obj.convert(&nodeids);
719 
720  pac.next(&result);
721  obj = result.get();
722  vector<int64_t> actorids;
723  obj.convert(&actorids);
724 
725  pac.next(&result);
726  obj = result.get();
727  vector<int64_t> ibgatewaynodes;
728  obj.convert(&ibgatewaynodes);
729 
730  pac.next(&result);
731  obj = result.get();
732  vector<int64_t> ibgatewayinstances;
733  obj.convert(&ibgatewayinstances);
734 
735  pac.next(&result);
736  obj = result.get();
737  vector<string> ibgatewayhostports;
738  obj.convert(&ibgatewayhostports);
739 
740  pac.next(&result);
741  obj = result.get();
742  int64_t dmgrnode;
743  obj.convert(&dmgrnode);
744 
745  pac.next(&result);
746  obj = result.get();
747  int64_t dmgrmboxptr;
748  obj.convert(&dmgrmboxptr);
749 
750  pac.next(&result);
751  obj = result.get();
752  int64_t usmgrnode;
753  obj.convert(&usmgrnode);
754 
755  pac.next(&result);
756  obj = result.get();
757  int64_t usmgrmboxptr;
758  obj.convert(&usmgrmboxptr);
759 
760  pac.next(&result);
761  obj = result.get();
762  size_t numreplicas;
763  obj.convert(&numreplicas);
764 
765  vector< vector<int16_t> > replicaMembers;
766 
767  for (size_t n=0; n < numreplicas; n++)
768  {
769  pac.next(&result);
770  obj = result.get();
771  vector<int16_t> v;
772  obj.convert(&v);
773  replicaMembers.push_back(v);
774  }
775 
776  pac.next(&result);
777  obj = result.get();
778  size_t lentas;
779  obj.convert(&lentas);
780 
781  vector< vector<int16_t> > tas;
782  tas.push_back(vector<int16_t>());
783 
784  for (size_t n=1; n <= lentas; n++)
785  {
786  pac.next(&result);
787  obj = result.get();
788  vector<int16_t> v;
789  obj.convert(&v);
790  tas.push_back(v);
791  }
792 
793  map< int64_t, vector<int> > allActorsMap;
794  vector< vector<int> > allActors;
795 
796  while (pac.next(&result))
797  {
798  obj = result.get();
799  int64_t nid;
800  obj.convert(&nid);
801  pac.next(&result);
802  obj = result.get();
803  vector<int> aids;
804  obj.convert(&aids);
805  allActorsMap[nid] = aids;
806  }
807 
808  allActors.resize(allActorsMap.rbegin()->first + 1, vector<int>());
809  map< int64_t, vector<int> >::iterator it;
810 
811  for (it = allActorsMap.begin(); it != allActorsMap.end(); ++it)
812  {
813  allActors[it->first] = it->second;
814  }
815 
816  boost::unordered_map< int16_t, vector<int> > allActorsThisReplica;
817  boost::unordered_set<int64_t> nodesThisReplica;
818  int64_t myreplica = -1;
819 
820  for (size_t n=0; n < replicaMembers.size(); n++)
821  {
822  for (size_t m=0; m < replicaMembers[n].size(); m++)
823  {
824  if (replicaMembers[n][m]==myTopology.nodeid)
825  {
826  myreplica = n;
827  }
828  }
829  }
830 
831  if (myreplica >= 0)
832  {
833  for (size_t n=0; n < replicaMembers[myreplica].size(); n++)
834  {
835  nodesThisReplica.insert(replicaMembers[myreplica][n]);
836  }
837 
838  for (size_t n=0; n < allActors.size(); n++)
839  {
840  if (nodesThisReplica.count(n))
841  {
842  allActorsThisReplica[n] = allActors[n];
843  }
844  }
845  }
846 
847  size_t numpartitions=0;
848 
849  for (size_t n=0; n < replicaids.size(); n++)
850  {
851  if (replicaids[n]==0)
852  {
853  numpartitions++;
854  }
855  }
856 
857  vector< vector<Topology::partitionAddress> > pl(numreplicas);
858  /*
859  Topology::partitionAddress pa = {};
860  vector<Topology::partitionAddress> pl2(numpartitions, pa);
861  vector<Topology::partitionAddress> pl2;
862  for (size_t n=0; n < numreplicas; n++)
863  {
864  pl[n] = pl2;
865  }
866  */
867  pthread_mutex_lock(&nodeTopologyMutex);
868 
869  nodeTopology.activereplica = activereplica;
870 
871  for (size_t n=0; n < replicaids.size(); n++)
872  {
874  addr.address.nodeid = nodeids[n];
875  addr.address.actorid = actorids[n];
876  addr.epollfd = -1;
877  addr.argstring = "";
878  addr.instance = -1;
879 
880  if (nodeTopology.nodeid==addr.address.nodeid)
881  {
882  addr.mbox = nodeTopology.actorList[addr.address.actorid].mbox;
883  }
884  else
885  {
886  addr.mbox = NULL;
887  }
888 
889  addr.type = ACTOR_ENGINE;
890  pl[replicaids[n]].push_back(addr);
891  }
892 
893  vector<Topology::partitionAddress> pltr;
894 
895  if (myreplica >= 0)
896  {
897  pltr = pl[myreplica];
898  }
899 
900  nodeTopology.partitionList.swap(pl);
902 
903  map< int64_t, vector<string> > ibgws;
904 
905  for (size_t n=0; n < ibgatewaynodes.size(); n++)
906  {
907  int64_t node = ibgatewaynodes[n];
908  int64_t instance = ibgatewayinstances[n];
909  string hostport = ibgatewayhostports[n];
910 
911  if (ibgws[node].size() < (size_t)(instance+1))
912  {
913  ibgws[node].resize(instance+1, string());
914  ibgws[node][instance] = hostport;
915  }
916  }
917 
918  nodeTopology.ibGateways.swap(ibgws);
919 
920  nodeTopology.userSchemaMgrNode = usmgrnode;
921  nodeTopology.userSchemaMgrMbox = (class Mbox *)usmgrmboxptr;
922  nodeTopology.deadlockMgrNode = dmgrnode;
923  nodeTopology.deadlockMgrMbox = (class Mbox *)dmgrmboxptr;
924  // nodeTopology.numpartitions = nodeTopology.partitionListThisReplica.size();
925  nodeTopology.numpartitions = numpartitions;
926  nodeTopology.replicaMembers.swap(replicaMembers);
927  nodeTopology.tas.swap(tas);
928  nodeTopology.allActors.swap(allActors);
929  nodeTopology.allActorsThisReplica.swap(allActorsThisReplica);
930  nodeTopology.numreplicas = numreplicas;
931  pthread_mutex_unlock(&nodeTopologyMutex);
932 
933  broadcastConfig();
934 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TopologyMgr::updateLocalConfig ( msgpack::unpacker &  pac,
msgpack::unpacked &  result 
)
private

local (node-specific) Topology update

transmitted by manager process over 0mq serialized by msgpack

Parameters
pacmsgpack unpacker object
resultresult of unpacking

Definition at line 618 of file TopologyMgr.cc.

References Topology::addressStruct::actorid, Topology::actorList, Topology::partitionAddress::address, Topology::partitionAddress::argstring, broadcastConfig(), Topology::partitionAddress::epollfd, Topology::partitionAddress::instance, Topology::partitionAddress::mbox, Topology::addressStruct::nodeid, Topology::nodeid, nodeTopology, nodeTopologyMutex, Topology::numengines, Topology::numobgateways, Topology::numtransactionagents, and Topology::partitionAddress::type.

Referenced by TopologyMgr().

620 {
621  pac.next(&result);
622  msgpack::object obj = result.get();
623  vector<int64_t> t;
624  obj.convert(&t);
625 
626  vector<int64_t> i;
627  pac.next(&result);
628  obj = result.get();
629  obj.convert(&i);
630 
631  vector<int64_t> m;
632  pac.next(&result);
633  obj = result.get();
634  obj.convert(&m);
635 
636  vector<Topology::actor_s> v(t.size());
637 
638  for (size_t n=0; n < t.size(); n++)
639  {
640  Topology::actor_s a = { (actortypes_e)t[n], i[n], (class Mbox *)m[n] };
641  v[n] = a;
642  }
643 
644  pthread_mutex_lock(&nodeTopologyMutex);
645  nodeTopology.actorList.swap(v);
648 
649  Topology::partitionAddress addr = {};
651  addr.epollfd = -1;
652  addr.argstring = "";
653  addr.address.actorid = 4;
654  addr.mbox = (class Mbox *)nodeTopology.actorList[4].mbox;
655  addr.type = ACTOR_LISTENER;
656 
657  // if (nodeTopology.actorList[]
661 
662  for (size_t n=0; n < nodeTopology.actorList.size(); n++)
663  {
664  if (nodeTopology.actorList[n].type==ACTOR_TRANSACTIONAGENT)
665  {
667  addr.address.actorid = n;
668  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
669  addr.type = nodeTopology.actorList[n].type;
670  addr.instance = nodeTopology.actorList[n].instance;
671  }
672  else if (nodeTopology.actorList[n].type==ACTOR_ENGINE)
673  {
675  addr.address.actorid = n;
676  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
677  addr.type = nodeTopology.actorList[n].type;
678  addr.instance = nodeTopology.actorList[n].instance;
679  }
680  else if (nodeTopology.actorList[n].type==ACTOR_IBGATEWAY)
681  {
682  addr.address.actorid = n;
683  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
684  addr.type = nodeTopology.actorList[n].type;
685  addr.instance = nodeTopology.actorList[n].instance;
686  }
687  else if (nodeTopology.actorList[n].type==ACTOR_OBGATEWAY)
688  {
690  addr.address.actorid = n;
691  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
692  addr.type = nodeTopology.actorList[n].type;
693  addr.instance = nodeTopology.actorList[n].instance;
694  }
695  }
696 
697  pthread_mutex_unlock(&nodeTopologyMutex);
698 
699  broadcastConfig();
700 }

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

class Mboxes TopologyMgr::mboxes
private

Definition at line 85 of file TopologyMgr.h.

Referenced by broadcastConfig().

Topology::partitionAddress TopologyMgr::myIdentity
private

Definition at line 84 of file TopologyMgr.h.

Referenced by TopologyMgr().

class Topology TopologyMgr::myTopology
private

Definition at line 86 of file TopologyMgr.h.

Referenced by broadcastConfig(), and updateGlobalConfig().


The documentation for this class was generated from the following files: