InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
TopologyMgr.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
33 #include "TopologyMgr.h"
34 #line 35 "TopologyMgr.cc"
35 
36 extern cfg_s cfgs;
37 
38 void *userSchemaMgr(void *);
39 void *listener(void *);
40 void *transactionAgent(void *);
41 void *engine(void *);
42 void *deadlockMgr(void *);
43 void *ibGateway(void *);
44 void *obGateway(void *);
45 
46 void replyToManager(void *, msgpack::sbuffer &);
47 
49  myIdentity(*myIdentityArg)
50 {
51  delete myIdentityArg;
52 
53  if (daemon(1, 1))
54  {
55  fprintf(logfile, "%s %i daemon errno %i\n", __FILE__, __LINE__, errno);
56  exit(1);
57  }
58 
59  // start 0mq socket
60  // void *zmqcontext = zmq_ctx_new();
61  void *zmqresponder = zmq_socket(zmqcontext, ZMQ_REP);
62  int rv = zmq_bind(zmqresponder, zmqsocket.c_str());
63 
64  if (rv == -1)
65  {
66  perror("zmq_bind");
67  printf("%s %i zmq_bind errno %i\n", __FILE__, __LINE__, errno);
68  exit(1);
69  }
70 
71  int epollfd = epoll_create(1);
72 
73  while (1)
74  {
75  HECK:
76 
77  zmq_msg_t zmqrecvmsg;
78  zmq_msg_init(&zmqrecvmsg);
79  int retval = zmq_msg_recv(&zmqrecvmsg, zmqresponder, 0);
80 
81  if (retval == -1)
82  {
83  fprintf(logfile, "%s %i zmq_recv errno %i\n", __FILE__, __LINE__,
84  errno);
85  }
86 
87  msgpack::sbuffer replysbuf;
88  msgpack::packer<msgpack::sbuffer> replypk(&replysbuf);
89 
90  msgpack::sbuffer inbuf;
91  inbuf.write((char *)zmq_msg_data(&zmqrecvmsg), zmq_msg_size(&zmqrecvmsg));
92  msgpack::unpacker pac;
93  pac.reserve_buffer(inbuf.size());
94  memcpy(pac.buffer(), inbuf.data(), inbuf.size());
95  pac.buffer_consumed(inbuf.size());
96  msgpack::unpacked result;
97 
98  if (pac.next(&result)==false)
99  {
100  replypk.pack_int(CMDNOTOK);
101  replyToManager(zmqresponder, replysbuf);
102  zmq_msg_close(&zmqrecvmsg);
103  goto HECK;
104  }
105 
106  int cmd;
107  msgpack::object obj = result.get();
108  obj.convert(&cmd);
109 
110  switch (cmd)
111  {
112  case CMDSET:
113  {
114  if (pac.next(&result)==false)
115  {
116  replypk.pack_int(CMDNOTOK);
117  replyToManager(zmqresponder, replysbuf);
118  zmq_msg_close(&zmqrecvmsg);
119  goto HECK;
120  }
121 
122  int cmd2;
123  msgpack::object obj2 = result.get();
124  obj2.convert(&cmd2);
125 
126  switch (cmd2)
127  {
128  case CMDANONYMOUSPING:
129  {
130  if (pac.next(&result)==false)
131  {
132  replypk.pack_int(CMDNOTOK);
133  replyToManager(zmqresponder, replysbuf);
134  zmq_msg_close(&zmqrecvmsg);
135  goto HECK;
136  }
137 
138  int val;
139  msgpack::object obj3 = result.get();
140  obj3.convert(&val);
141 
142  // t or f
143  __sync_bool_compare_and_swap(&cfgs.anonymousping,
144  !(val==0 ? 0 : 1), val==0 ? 0 : 1);
145  replypk.pack_int(CMDOK);
146  }
147  break;
148 
149  case CMDBADLOGINMESSAGES:
150  {
151  if (pac.next(&result)==false)
152  {
153  replypk.pack_int(CMDNOTOK);
154  replyToManager(zmqresponder, replysbuf);
155  zmq_msg_close(&zmqrecvmsg);
156  goto HECK;
157  }
158 
159  int val;
160  msgpack::object obj3 = result.get();
161  obj3.convert(&val);
162  // t or f
163  __sync_bool_compare_and_swap(&cfgs.badloginmessages,
164  !(val==0 ? 0 : 1), val==0 ? 0 : 1);
165  replypk.pack_int(CMDOK);
166  }
167  break;
168 
169  default:
170  replypk.pack_int(CMDNOTOK);
171  replyToManager(zmqresponder, replysbuf);
172  zmq_msg_close(&zmqrecvmsg);
173  goto HECK;
174  }
175  }
176  break;
177 
178  case CMDGET:
179  {
180  if (pac.next(&result)==false)
181  {
182  replypk.pack_int(CMDNOTOK);
183  replyToManager(zmqresponder, replysbuf);
184  zmq_msg_close(&zmqrecvmsg);
185  goto HECK;
186  }
187 
188  int cmd2;
189  msgpack::object obj2 = result.get();
190  obj2.convert(&cmd2);
191 
192  switch (cmd2)
193  {
194  case CMDGETTOPOLOGYMGRMBOXPTR:
195  {
196  replypk.pack_int(CMDOK);
197  replypk.pack_int64((int64_t)myIdentity.mbox);
198  }
199  break;
200 
201  default:
202  replypk.pack_int(CMDNOTOK);
203  replyToManager(zmqresponder, replysbuf);
204  zmq_msg_close(&zmqrecvmsg);
205  goto HECK;
206  }
207  }
208  break;
209 
210  case CMDSTART:
211  {
212  pthread_t tid;
213 
214  if (pac.next(&result)==false)
215  {
216  replypk.pack_int(CMDNOTOK);
217  replyToManager(zmqresponder, replysbuf);
218  zmq_msg_close(&zmqrecvmsg);
219  goto HECK;
220  }
221 
222  int cmd2;
223  msgpack::object obj2 = result.get();
224  obj2.convert(&cmd2);
225 
226  if (pac.next(&result)==false)
227  {
228  replypk.pack_int(CMDNOTOK);
229  replyToManager(zmqresponder, replysbuf);
230  zmq_msg_close(&zmqrecvmsg);
231  goto HECK;
232  }
233 
234  int64_t actorid;
235  msgpack::object obj3 = result.get();
236  obj3.convert(&actorid);
237 
238  class Mbox *newmbox=NULL;
239 
240  switch (cmd2)
241  {
242  case CMDLISTENER:
243  {
244  vector<string> nodes;
245  vector<string> services;
246  string node;
247  string service;
248  msgpack::object obj4;
249 
250  if (pac.next(&result)==false)
251  {
252  replypk.pack_int(CMDNOTOK);
253  replyToManager(zmqresponder, replysbuf);
254  zmq_msg_close(&zmqrecvmsg);
255  goto HECK;
256  }
257 
258  obj4 = result.get();
259  obj4.convert(&node);
260  nodes.push_back(node);
261 
262  if (pac.next(&result)==false)
263  {
264  replypk.pack_int(CMDNOTOK);
265  replyToManager(zmqresponder, replysbuf);
266  zmq_msg_close(&zmqrecvmsg);
267  goto HECK;
268  }
269 
270  obj4 = result.get();
271  obj4.convert(&service);
272  services.push_back(service);
273 
274  if (pac.next(&result)==false)
275  {
276  replypk.pack_int(CMDNOTOK);
277  replyToManager(zmqresponder, replysbuf);
278  zmq_msg_close(&zmqrecvmsg);
279  goto HECK;
280  }
281 
282  obj4 = result.get();
283  obj4.convert(&node);
284  nodes.push_back(node);
285 
286  if (pac.next(&result)==false)
287  {
288  replypk.pack_int(CMDNOTOK);
289  replyToManager(zmqresponder, replysbuf);
290  zmq_msg_close(&zmqrecvmsg);
291  goto HECK;
292  }
293 
294  obj4 = result.get();
295  obj4.convert(&service);
296  services.push_back(service);
297 
298  newmbox = new class Mbox;
299 
300  if (pthread_create(&tid, NULL, listener,
301  nodeTopology.newActor(ACTOR_LISTENER, newmbox,
302  epollfd, string(),
303  actorid, nodes,
304  services))==-1)
305  {
306  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
307  __LINE__, errno);
308  replypk.pack_int(CMDNOTOK);
309  replyToManager(zmqresponder, replysbuf);
310  zmq_msg_close(&zmqrecvmsg);
311  goto HECK;
312  }
313  else
314  {
315  replypk.pack_int(CMDOK);
316  replypk.pack_int64((int64_t)newmbox);
317  }
318  }
319  break;
320 
321  case CMDUSERSCHEMAMGR:
322  {
323  if (pac.next(&result)==false)
324  {
325  replypk.pack_int(CMDNOTOK);
326  replyToManager(zmqresponder, replysbuf);
327  zmq_msg_close(&zmqrecvmsg);
328  goto HECK;
329  }
330 
331  string globaladminpassword;
332  msgpack::object obj4 = result.get();
333  obj4.convert(&globaladminpassword);
334  newmbox = new class Mbox;
335 
336  if (pthread_create(&tid, NULL, userSchemaMgr,
337  nodeTopology.newActor(ACTOR_USERSCHEMAMGR,
338  newmbox, epollfd,
339  globaladminpassword,
340  actorid,
341  vector<string>(),
342  vector<string>()))==-1)
343  {
344  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
345  __LINE__, errno);
346  replypk.pack_int(CMDNOTOK);
347  replyToManager(zmqresponder, replysbuf);
348  zmq_msg_close(&zmqrecvmsg);
349  goto HECK;
350  }
351  else
352  {
353  replypk.pack_int(CMDOK);
354  replypk.pack_int64((int64_t)newmbox);
355  }
356  }
357  break;
358 
359  case CMDDEADLOCKMGR:
360  newmbox = new class Mbox;
361 
362  if (pthread_create(&tid, NULL, deadlockMgr,
363  nodeTopology.newActor(ACTOR_DEADLOCKMGR,
364  newmbox, epollfd,
365  string(), actorid,
366  vector<string>(),
367  vector<string>()))==-1)
368  {
369  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
370  __LINE__, errno);
371  replypk.pack_int(CMDNOTOK);
372  replyToManager(zmqresponder, replysbuf);
373  zmq_msg_close(&zmqrecvmsg);
374  goto HECK;
375  }
376  else
377  {
378  replypk.pack_int(CMDOK);
379  replypk.pack_int64((int64_t)newmbox);
380  }
381 
382  break;
383 
384  case CMDTRANSACTIONAGENT:
385  {
386  if (pac.next(&result)==false)
387  {
388  replypk.pack_int(CMDNOTOK);
389  replyToManager(zmqresponder, replysbuf);
390  zmq_msg_close(&zmqrecvmsg);
391  goto HECK;
392  }
393 
394  int64_t instance;
395  msgpack::object obj4 = result.get();
396  obj4.convert(&instance);
397 
398  newmbox = new class Mbox;
400  nodeTopology.newActor(ACTOR_TRANSACTIONAGENT,
401  newmbox, epollfd, string(), actorid,
402  vector<string>(),
403  vector<string>());
404  paddr->instance = instance;
405 
406  if (pthread_create(&tid, NULL, transactionAgent, paddr)==-1)
407  {
408  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
409  __LINE__, errno);
410  replypk.pack_int(CMDNOTOK);
411  replyToManager(zmqresponder, replysbuf);
412  zmq_msg_close(&zmqrecvmsg);
413  goto HECK;
414  }
415  else
416  {
417  replypk.pack_int(CMDOK);
418  replypk.pack_int64((int64_t)newmbox);
419  }
420  }
421  break;
422 
423  case CMDENGINE:
424  {
425  if (pac.next(&result)==false)
426  {
427  replypk.pack_int(CMDNOTOK);
428  replyToManager(zmqresponder, replysbuf);
429  zmq_msg_close(&zmqrecvmsg);
430  goto HECK;
431  }
432 
433  int64_t instance;
434  msgpack::object obj4 = result.get();
435  obj4.convert(&instance);
436 
437  newmbox = new class Mbox;
439  nodeTopology.newActor(ACTOR_ENGINE,
440  newmbox, epollfd, string(), actorid,
441  vector<string>(),
442  vector<string>());
443  paddr->instance = instance;
444 
445  if (pthread_create(&tid, NULL, engine, paddr)==-1)
446  {
447  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
448  __LINE__, errno);
449  replypk.pack_int(CMDNOTOK);
450  replyToManager(zmqresponder, replysbuf);
451  zmq_msg_close(&zmqrecvmsg);
452  goto HECK;
453  }
454  else
455  {
456  replypk.pack_int(CMDOK);
457  replypk.pack_int64((int64_t)newmbox);
458  }
459  }
460  break;
461 
462  case CMDOBGATEWAY:
463  {
464  if (pac.next(&result)==false)
465  {
466  replypk.pack_int(CMDNOTOK);
467  replyToManager(zmqresponder, replysbuf);
468  zmq_msg_close(&zmqrecvmsg);
469  goto HECK;
470  }
471 
472  int64_t instance;
473  msgpack::object obj4 = result.get();
474  obj4.convert(&instance);
475 
476  newmbox = new class Mbox;
478  nodeTopology.newActor(ACTOR_OBGATEWAY,
479  newmbox, epollfd, string(), actorid,
480  vector<string>(), vector<string>());
481  paddr->instance = instance;
482 
483  if (pthread_create(&tid, NULL, obGateway, paddr)==-1)
484  {
485  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
486  __LINE__, errno);
487  replypk.pack_int(CMDNOTOK);
488  replyToManager(zmqresponder, replysbuf);
489  zmq_msg_close(&zmqrecvmsg);
490  goto HECK;
491  }
492  else
493  {
494  replypk.pack_int(CMDOK);
495  replypk.pack_int64((int64_t)newmbox);
496  }
497  }
498  break;
499 
500  case CMDIBGATEWAY:
501  {
502  if (pac.next(&result)==false)
503  {
504  replypk.pack_int(CMDNOTOK);
505  replyToManager(zmqresponder, replysbuf);
506  zmq_msg_close(&zmqrecvmsg);
507  goto HECK;
508  }
509 
510  int64_t instance;
511  msgpack::object obj4 = result.get();
512  obj4.convert(&instance);
513 
514  if (pac.next(&result)==false)
515  {
516  replypk.pack_int(CMDNOTOK);
517  replyToManager(zmqresponder, replysbuf);
518  zmq_msg_close(&zmqrecvmsg);
519  goto HECK;
520  }
521 
522  string hostport;
523  msgpack::object obj5 = result.get();
524  obj5.convert(&hostport);
525 
526  newmbox = new class Mbox;
528  nodeTopology.newActor(ACTOR_IBGATEWAY,
529  newmbox, epollfd, hostport, actorid,
530  vector<string>(), vector<string>());
531  paddr->instance = instance;
532 
533  if (pthread_create(&tid, NULL, ibGateway, paddr)==-1)
534  {
535  fprintf(logfile, "%s %i pthread_create errno %i\n", __FILE__,
536  __LINE__, errno);
537  replypk.pack_int(CMDNOTOK);
538  replyToManager(zmqresponder, replysbuf);
539  zmq_msg_close(&zmqrecvmsg);
540  goto HECK;
541  }
542  else
543  {
544  replypk.pack_int(CMDOK);
545  replypk.pack_int64((int64_t)newmbox);
546  }
547  }
548  break;
549 
550  default:
551  replypk.pack_int(CMDNOTOK);
552  replyToManager(zmqresponder, replysbuf);
553  zmq_msg_close(&zmqrecvmsg);
554  goto HECK;
555  }
556  }
557  break;
558 
559  case CMDLOCALCONFIG:
560  {
561  updateLocalConfig(pac, result);
562  replypk.pack_int(CMDOK);
563  }
564  break;
565 
566  case CMDGLOBALCONFIG:
567  {
568  updateGlobalConfig(pac, result);
569  replypk.pack_int(CMDOK);
570  }
571  break;
572 
573  default:
574  replypk.pack_int(CMDNOTOK);
575  replyToManager(zmqresponder, replysbuf);
576  zmq_msg_close(&zmqrecvmsg);
577  goto HECK;
578  }
579 
580  replyToManager(zmqresponder, replysbuf);
581 
582  if (zmq_msg_close(&zmqrecvmsg)==-1)
583  {
584  perror("zmq_msg_close");
585  printf("%s %i %i\n", __FILE__, __LINE__, errno);
586  }
587  }
588 }
589 
591 {
592 }
593 
595 {
596 }
597 
599 void *topologyMgr(void *identity)
600 {
601  new TopologyMgr((Topology::partitionAddress *)identity);
602  while (1)
603  {
604  sleep(1000000);
605  }
606  return NULL;
607 }
608 
609 void replyToManager(void *zmqsocket, msgpack::sbuffer &sbuf)
610 {
611  zmq_msg_t zmqmsg;
612  zmq_msg_init_size(&zmqmsg, sbuf.size());
613  memcpy(zmq_msg_data(&zmqmsg), sbuf.data(), sbuf.size());
614  zmq_msg_send(&zmqmsg, zmqsocket, 0);
615  zmq_msg_close(&zmqmsg);
616 }
617 
618 void TopologyMgr::updateLocalConfig(msgpack::unpacker &pac,
619  msgpack::unpacked &result)
620 {
621  pac.next(&result);
622  msgpack::object obj = result.get();
623  vector<int64_t> t;
624  obj.convert(&t);
625 
626  vector<int64_t> i;
627  pac.next(&result);
628  obj = result.get();
629  obj.convert(&i);
630 
631  vector<int64_t> m;
632  pac.next(&result);
633  obj = result.get();
634  obj.convert(&m);
635 
636  vector<Topology::actor_s> v(t.size());
637 
638  for (size_t n=0; n < t.size(); n++)
639  {
640  Topology::actor_s a = { (actortypes_e)t[n], i[n], (class Mbox *)m[n] };
641  v[n] = a;
642  }
643 
644  pthread_mutex_lock(&nodeTopologyMutex);
645  nodeTopology.actorList.swap(v);
648 
649  Topology::partitionAddress addr = {};
651  addr.epollfd = -1;
652  addr.argstring = "";
653  addr.address.actorid = 4;
654  addr.mbox = (class Mbox *)nodeTopology.actorList[4].mbox;
655  addr.type = ACTOR_LISTENER;
656 
657  // if (nodeTopology.actorList[]
661 
662  for (size_t n=0; n < nodeTopology.actorList.size(); n++)
663  {
664  if (nodeTopology.actorList[n].type==ACTOR_TRANSACTIONAGENT)
665  {
667  addr.address.actorid = n;
668  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
669  addr.type = nodeTopology.actorList[n].type;
670  addr.instance = nodeTopology.actorList[n].instance;
671  }
672  else if (nodeTopology.actorList[n].type==ACTOR_ENGINE)
673  {
675  addr.address.actorid = n;
676  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
677  addr.type = nodeTopology.actorList[n].type;
678  addr.instance = nodeTopology.actorList[n].instance;
679  }
680  else if (nodeTopology.actorList[n].type==ACTOR_IBGATEWAY)
681  {
682  addr.address.actorid = n;
683  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
684  addr.type = nodeTopology.actorList[n].type;
685  addr.instance = nodeTopology.actorList[n].instance;
686  }
687  else if (nodeTopology.actorList[n].type==ACTOR_OBGATEWAY)
688  {
690  addr.address.actorid = n;
691  addr.mbox = (class Mbox *)nodeTopology.actorList[n].mbox;
692  addr.type = nodeTopology.actorList[n].type;
693  addr.instance = nodeTopology.actorList[n].instance;
694  }
695  }
696 
697  pthread_mutex_unlock(&nodeTopologyMutex);
698 
699  broadcastConfig();
700 }
701 
702 void TopologyMgr::updateGlobalConfig(msgpack::unpacker &pac,
703  msgpack::unpacked &result)
704 {
705  pac.next(&result);
706  msgpack::object obj = result.get();
707  int activereplica;
708  obj.convert(&activereplica);
709 
710  pac.next(&result);
711  obj = result.get();
712  vector<int64_t> replicaids;
713  obj.convert(&replicaids);
714 
715  pac.next(&result);
716  obj = result.get();
717  vector<int64_t> nodeids;
718  obj.convert(&nodeids);
719 
720  pac.next(&result);
721  obj = result.get();
722  vector<int64_t> actorids;
723  obj.convert(&actorids);
724 
725  pac.next(&result);
726  obj = result.get();
727  vector<int64_t> ibgatewaynodes;
728  obj.convert(&ibgatewaynodes);
729 
730  pac.next(&result);
731  obj = result.get();
732  vector<int64_t> ibgatewayinstances;
733  obj.convert(&ibgatewayinstances);
734 
735  pac.next(&result);
736  obj = result.get();
737  vector<string> ibgatewayhostports;
738  obj.convert(&ibgatewayhostports);
739 
740  pac.next(&result);
741  obj = result.get();
742  int64_t dmgrnode;
743  obj.convert(&dmgrnode);
744 
745  pac.next(&result);
746  obj = result.get();
747  int64_t dmgrmboxptr;
748  obj.convert(&dmgrmboxptr);
749 
750  pac.next(&result);
751  obj = result.get();
752  int64_t usmgrnode;
753  obj.convert(&usmgrnode);
754 
755  pac.next(&result);
756  obj = result.get();
757  int64_t usmgrmboxptr;
758  obj.convert(&usmgrmboxptr);
759 
760  pac.next(&result);
761  obj = result.get();
762  size_t numreplicas;
763  obj.convert(&numreplicas);
764 
765  vector< vector<int16_t> > replicaMembers;
766 
767  for (size_t n=0; n < numreplicas; n++)
768  {
769  pac.next(&result);
770  obj = result.get();
771  vector<int16_t> v;
772  obj.convert(&v);
773  replicaMembers.push_back(v);
774  }
775 
776  pac.next(&result);
777  obj = result.get();
778  size_t lentas;
779  obj.convert(&lentas);
780 
781  vector< vector<int16_t> > tas;
782  tas.push_back(vector<int16_t>());
783 
784  for (size_t n=1; n <= lentas; n++)
785  {
786  pac.next(&result);
787  obj = result.get();
788  vector<int16_t> v;
789  obj.convert(&v);
790  tas.push_back(v);
791  }
792 
793  map< int64_t, vector<int> > allActorsMap;
794  vector< vector<int> > allActors;
795 
796  while (pac.next(&result))
797  {
798  obj = result.get();
799  int64_t nid;
800  obj.convert(&nid);
801  pac.next(&result);
802  obj = result.get();
803  vector<int> aids;
804  obj.convert(&aids);
805  allActorsMap[nid] = aids;
806  }
807 
808  allActors.resize(allActorsMap.rbegin()->first + 1, vector<int>());
809  map< int64_t, vector<int> >::iterator it;
810 
811  for (it = allActorsMap.begin(); it != allActorsMap.end(); ++it)
812  {
813  allActors[it->first] = it->second;
814  }
815 
816  boost::unordered_map< int16_t, vector<int> > allActorsThisReplica;
817  boost::unordered_set<int64_t> nodesThisReplica;
818  int64_t myreplica = -1;
819 
820  for (size_t n=0; n < replicaMembers.size(); n++)
821  {
822  for (size_t m=0; m < replicaMembers[n].size(); m++)
823  {
824  if (replicaMembers[n][m]==myTopology.nodeid)
825  {
826  myreplica = n;
827  }
828  }
829  }
830 
831  if (myreplica >= 0)
832  {
833  for (size_t n=0; n < replicaMembers[myreplica].size(); n++)
834  {
835  nodesThisReplica.insert(replicaMembers[myreplica][n]);
836  }
837 
838  for (size_t n=0; n < allActors.size(); n++)
839  {
840  if (nodesThisReplica.count(n))
841  {
842  allActorsThisReplica[n] = allActors[n];
843  }
844  }
845  }
846 
847  size_t numpartitions=0;
848 
849  for (size_t n=0; n < replicaids.size(); n++)
850  {
851  if (replicaids[n]==0)
852  {
853  numpartitions++;
854  }
855  }
856 
857  vector< vector<Topology::partitionAddress> > pl(numreplicas);
858  /*
859  Topology::partitionAddress pa = {};
860  vector<Topology::partitionAddress> pl2(numpartitions, pa);
861  vector<Topology::partitionAddress> pl2;
862  for (size_t n=0; n < numreplicas; n++)
863  {
864  pl[n] = pl2;
865  }
866  */
867  pthread_mutex_lock(&nodeTopologyMutex);
868 
869  nodeTopology.activereplica = activereplica;
870 
871  for (size_t n=0; n < replicaids.size(); n++)
872  {
874  addr.address.nodeid = nodeids[n];
875  addr.address.actorid = actorids[n];
876  addr.epollfd = -1;
877  addr.argstring = "";
878  addr.instance = -1;
879 
880  if (nodeTopology.nodeid==addr.address.nodeid)
881  {
882  addr.mbox = nodeTopology.actorList[addr.address.actorid].mbox;
883  }
884  else
885  {
886  addr.mbox = NULL;
887  }
888 
889  addr.type = ACTOR_ENGINE;
890  pl[replicaids[n]].push_back(addr);
891  }
892 
893  vector<Topology::partitionAddress> pltr;
894 
895  if (myreplica >= 0)
896  {
897  pltr = pl[myreplica];
898  }
899 
900  nodeTopology.partitionList.swap(pl);
902 
903  map< int64_t, vector<string> > ibgws;
904 
905  for (size_t n=0; n < ibgatewaynodes.size(); n++)
906  {
907  int64_t node = ibgatewaynodes[n];
908  int64_t instance = ibgatewayinstances[n];
909  string hostport = ibgatewayhostports[n];
910 
911  if (ibgws[node].size() < (size_t)(instance+1))
912  {
913  ibgws[node].resize(instance+1, string());
914  ibgws[node][instance] = hostport;
915  }
916  }
917 
918  nodeTopology.ibGateways.swap(ibgws);
919 
920  nodeTopology.userSchemaMgrNode = usmgrnode;
921  nodeTopology.userSchemaMgrMbox = (class Mbox *)usmgrmboxptr;
922  nodeTopology.deadlockMgrNode = dmgrnode;
923  nodeTopology.deadlockMgrMbox = (class Mbox *)dmgrmboxptr;
924  // nodeTopology.numpartitions = nodeTopology.partitionListThisReplica.size();
925  nodeTopology.numpartitions = numpartitions;
926  nodeTopology.replicaMembers.swap(replicaMembers);
927  nodeTopology.tas.swap(tas);
928  nodeTopology.allActors.swap(allActors);
929  nodeTopology.allActorsThisReplica.swap(allActorsThisReplica);
930  nodeTopology.numreplicas = numreplicas;
931  pthread_mutex_unlock(&nodeTopologyMutex);
932 
933  broadcastConfig();
934 }
935 
937 {
939  // next, go through each and send mails
940  class Message *msg;
941 
942  for (size_t n=0; n < myTopology.actorList.size(); n++)
943  {
944  if (myTopology.actorList[n].type != ACTOR_NONE)
945  {
946  msg = new class Message;
952  msg->messageStruct.destAddr.actorid = n;
953  mboxes.actoridToProducers[n]->sendMsg(*msg);
954  }
955  }
956 }