34 #line 35 "TopologyMgr.cc"
49 myIdentity(*myIdentityArg)
55 fprintf(
logfile,
"%s %i daemon errno %i\n", __FILE__, __LINE__, errno);
61 void *zmqresponder = zmq_socket(
zmqcontext, ZMQ_REP);
62 int rv = zmq_bind(zmqresponder,
zmqsocket.c_str());
67 printf(
"%s %i zmq_bind errno %i\n", __FILE__, __LINE__, errno);
71 int epollfd = epoll_create(1);
78 zmq_msg_init(&zmqrecvmsg);
79 int retval = zmq_msg_recv(&zmqrecvmsg, zmqresponder, 0);
83 fprintf(
logfile,
"%s %i zmq_recv errno %i\n", __FILE__, __LINE__,
87 msgpack::sbuffer replysbuf;
88 msgpack::packer<msgpack::sbuffer> replypk(&replysbuf);
90 msgpack::sbuffer inbuf;
91 inbuf.write((
char *)zmq_msg_data(&zmqrecvmsg), zmq_msg_size(&zmqrecvmsg));
92 msgpack::unpacker pac;
93 pac.reserve_buffer(inbuf.size());
94 memcpy(pac.buffer(), inbuf.data(), inbuf.size());
95 pac.buffer_consumed(inbuf.size());
96 msgpack::unpacked result;
98 if (pac.next(&result)==
false)
100 replypk.pack_int(CMDNOTOK);
102 zmq_msg_close(&zmqrecvmsg);
107 msgpack::object obj = result.get();
114 if (pac.next(&result)==
false)
116 replypk.pack_int(CMDNOTOK);
118 zmq_msg_close(&zmqrecvmsg);
123 msgpack::object obj2 = result.get();
128 case CMDANONYMOUSPING:
130 if (pac.next(&result)==
false)
132 replypk.pack_int(CMDNOTOK);
134 zmq_msg_close(&zmqrecvmsg);
139 msgpack::object obj3 = result.get();
144 !(val==0 ? 0 : 1), val==0 ? 0 : 1);
145 replypk.pack_int(CMDOK);
149 case CMDBADLOGINMESSAGES:
151 if (pac.next(&result)==
false)
153 replypk.pack_int(CMDNOTOK);
155 zmq_msg_close(&zmqrecvmsg);
160 msgpack::object obj3 = result.get();
164 !(val==0 ? 0 : 1), val==0 ? 0 : 1);
165 replypk.pack_int(CMDOK);
170 replypk.pack_int(CMDNOTOK);
172 zmq_msg_close(&zmqrecvmsg);
180 if (pac.next(&result)==
false)
182 replypk.pack_int(CMDNOTOK);
184 zmq_msg_close(&zmqrecvmsg);
189 msgpack::object obj2 = result.get();
194 case CMDGETTOPOLOGYMGRMBOXPTR:
196 replypk.pack_int(CMDOK);
202 replypk.pack_int(CMDNOTOK);
204 zmq_msg_close(&zmqrecvmsg);
214 if (pac.next(&result)==
false)
216 replypk.pack_int(CMDNOTOK);
218 zmq_msg_close(&zmqrecvmsg);
223 msgpack::object obj2 = result.get();
226 if (pac.next(&result)==
false)
228 replypk.pack_int(CMDNOTOK);
230 zmq_msg_close(&zmqrecvmsg);
235 msgpack::object obj3 = result.get();
236 obj3.convert(&actorid);
238 class Mbox *newmbox=NULL;
244 vector<string> nodes;
245 vector<string> services;
248 msgpack::object obj4;
250 if (pac.next(&result)==
false)
252 replypk.pack_int(CMDNOTOK);
254 zmq_msg_close(&zmqrecvmsg);
260 nodes.push_back(node);
262 if (pac.next(&result)==
false)
264 replypk.pack_int(CMDNOTOK);
266 zmq_msg_close(&zmqrecvmsg);
271 obj4.convert(&service);
272 services.push_back(service);
274 if (pac.next(&result)==
false)
276 replypk.pack_int(CMDNOTOK);
278 zmq_msg_close(&zmqrecvmsg);
284 nodes.push_back(node);
286 if (pac.next(&result)==
false)
288 replypk.pack_int(CMDNOTOK);
290 zmq_msg_close(&zmqrecvmsg);
295 obj4.convert(&service);
296 services.push_back(service);
298 newmbox =
new class Mbox;
300 if (pthread_create(&tid, NULL,
listener,
306 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
308 replypk.pack_int(CMDNOTOK);
310 zmq_msg_close(&zmqrecvmsg);
315 replypk.pack_int(CMDOK);
316 replypk.pack_int64((int64_t)newmbox);
321 case CMDUSERSCHEMAMGR:
323 if (pac.next(&result)==
false)
325 replypk.pack_int(CMDNOTOK);
327 zmq_msg_close(&zmqrecvmsg);
331 string globaladminpassword;
332 msgpack::object obj4 = result.get();
333 obj4.convert(&globaladminpassword);
334 newmbox =
new class Mbox;
342 vector<string>()))==-1)
344 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
346 replypk.pack_int(CMDNOTOK);
348 zmq_msg_close(&zmqrecvmsg);
353 replypk.pack_int(CMDOK);
354 replypk.pack_int64((int64_t)newmbox);
360 newmbox =
new class Mbox;
367 vector<string>()))==-1)
369 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
371 replypk.pack_int(CMDNOTOK);
373 zmq_msg_close(&zmqrecvmsg);
378 replypk.pack_int(CMDOK);
379 replypk.pack_int64((int64_t)newmbox);
384 case CMDTRANSACTIONAGENT:
386 if (pac.next(&result)==
false)
388 replypk.pack_int(CMDNOTOK);
390 zmq_msg_close(&zmqrecvmsg);
395 msgpack::object obj4 = result.get();
396 obj4.convert(&instance);
398 newmbox =
new class Mbox;
401 newmbox, epollfd,
string(), actorid,
408 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
410 replypk.pack_int(CMDNOTOK);
412 zmq_msg_close(&zmqrecvmsg);
417 replypk.pack_int(CMDOK);
418 replypk.pack_int64((int64_t)newmbox);
425 if (pac.next(&result)==
false)
427 replypk.pack_int(CMDNOTOK);
429 zmq_msg_close(&zmqrecvmsg);
434 msgpack::object obj4 = result.get();
435 obj4.convert(&instance);
437 newmbox =
new class Mbox;
440 newmbox, epollfd,
string(), actorid,
445 if (pthread_create(&tid, NULL,
engine, paddr)==-1)
447 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
449 replypk.pack_int(CMDNOTOK);
451 zmq_msg_close(&zmqrecvmsg);
456 replypk.pack_int(CMDOK);
457 replypk.pack_int64((int64_t)newmbox);
464 if (pac.next(&result)==
false)
466 replypk.pack_int(CMDNOTOK);
468 zmq_msg_close(&zmqrecvmsg);
473 msgpack::object obj4 = result.get();
474 obj4.convert(&instance);
476 newmbox =
new class Mbox;
479 newmbox, epollfd,
string(), actorid,
480 vector<string>(), vector<string>());
483 if (pthread_create(&tid, NULL,
obGateway, paddr)==-1)
485 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
487 replypk.pack_int(CMDNOTOK);
489 zmq_msg_close(&zmqrecvmsg);
494 replypk.pack_int(CMDOK);
495 replypk.pack_int64((int64_t)newmbox);
502 if (pac.next(&result)==
false)
504 replypk.pack_int(CMDNOTOK);
506 zmq_msg_close(&zmqrecvmsg);
511 msgpack::object obj4 = result.get();
512 obj4.convert(&instance);
514 if (pac.next(&result)==
false)
516 replypk.pack_int(CMDNOTOK);
518 zmq_msg_close(&zmqrecvmsg);
523 msgpack::object obj5 = result.get();
524 obj5.convert(&hostport);
526 newmbox =
new class Mbox;
529 newmbox, epollfd, hostport, actorid,
530 vector<string>(), vector<string>());
533 if (pthread_create(&tid, NULL,
ibGateway, paddr)==-1)
535 fprintf(
logfile,
"%s %i pthread_create errno %i\n", __FILE__,
537 replypk.pack_int(CMDNOTOK);
539 zmq_msg_close(&zmqrecvmsg);
544 replypk.pack_int(CMDOK);
545 replypk.pack_int64((int64_t)newmbox);
551 replypk.pack_int(CMDNOTOK);
553 zmq_msg_close(&zmqrecvmsg);
562 replypk.pack_int(CMDOK);
566 case CMDGLOBALCONFIG:
569 replypk.pack_int(CMDOK);
574 replypk.pack_int(CMDNOTOK);
576 zmq_msg_close(&zmqrecvmsg);
582 if (zmq_msg_close(&zmqrecvmsg)==-1)
584 perror(
"zmq_msg_close");
585 printf(
"%s %i %i\n", __FILE__, __LINE__, errno);
612 zmq_msg_init_size(&zmqmsg, sbuf.size());
613 memcpy(zmq_msg_data(&zmqmsg), sbuf.data(), sbuf.size());
614 zmq_msg_send(&zmqmsg, zmqsocket, 0);
615 zmq_msg_close(&zmqmsg);
619 msgpack::unpacked &result)
622 msgpack::object obj = result.get();
636 vector<Topology::actor_s> v(t.size());
638 for (
size_t n=0; n < t.size(); n++)
655 addr.
type = ACTOR_LISTENER;
703 msgpack::unpacked &result)
706 msgpack::object obj = result.get();
708 obj.convert(&activereplica);
712 vector<int64_t> replicaids;
713 obj.convert(&replicaids);
717 vector<int64_t> nodeids;
718 obj.convert(&nodeids);
722 vector<int64_t> actorids;
723 obj.convert(&actorids);
727 vector<int64_t> ibgatewaynodes;
728 obj.convert(&ibgatewaynodes);
732 vector<int64_t> ibgatewayinstances;
733 obj.convert(&ibgatewayinstances);
737 vector<string> ibgatewayhostports;
738 obj.convert(&ibgatewayhostports);
743 obj.convert(&dmgrnode);
748 obj.convert(&dmgrmboxptr);
753 obj.convert(&usmgrnode);
757 int64_t usmgrmboxptr;
758 obj.convert(&usmgrmboxptr);
763 obj.convert(&numreplicas);
765 vector< vector<int16_t> > replicaMembers;
767 for (
size_t n=0; n < numreplicas; n++)
773 replicaMembers.push_back(v);
779 obj.convert(&lentas);
781 vector< vector<int16_t> > tas;
782 tas.push_back(vector<int16_t>());
784 for (
size_t n=1; n <= lentas; n++)
793 map< int64_t, vector<int> > allActorsMap;
794 vector< vector<int> > allActors;
796 while (pac.next(&result))
805 allActorsMap[nid] = aids;
808 allActors.resize(allActorsMap.rbegin()->first + 1, vector<int>());
809 map< int64_t, vector<int> >::iterator it;
811 for (it = allActorsMap.begin(); it != allActorsMap.end(); ++it)
813 allActors[it->first] = it->second;
816 boost::unordered_map< int16_t, vector<int> > allActorsThisReplica;
817 boost::unordered_set<int64_t> nodesThisReplica;
818 int64_t myreplica = -1;
820 for (
size_t n=0; n < replicaMembers.size(); n++)
822 for (
size_t m=0; m < replicaMembers[n].size(); m++)
833 for (
size_t n=0; n < replicaMembers[myreplica].size(); n++)
835 nodesThisReplica.insert(replicaMembers[myreplica][n]);
838 for (
size_t n=0; n < allActors.size(); n++)
840 if (nodesThisReplica.count(n))
842 allActorsThisReplica[n] = allActors[n];
847 size_t numpartitions=0;
849 for (
size_t n=0; n < replicaids.size(); n++)
851 if (replicaids[n]==0)
857 vector< vector<Topology::partitionAddress> > pl(numreplicas);
871 for (
size_t n=0; n < replicaids.size(); n++)
889 addr.
type = ACTOR_ENGINE;
890 pl[replicaids[n]].push_back(addr);
893 vector<Topology::partitionAddress> pltr;
897 pltr = pl[myreplica];
903 map< int64_t, vector<string> > ibgws;
905 for (
size_t n=0; n < ibgatewaynodes.size(); n++)
907 int64_t node = ibgatewaynodes[n];
908 int64_t instance = ibgatewayinstances[n];
909 string hostport = ibgatewayhostports[n];
911 if (ibgws[node].size() < (size_t)(instance+1))
913 ibgws[node].resize(instance+1,
string());
914 ibgws[node][instance] = hostport;