InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ObGateway Class Reference

execute Outbound Gateway actor More...

#include <ObGateway.h>

Collaboration diagram for ObGateway:

Public Member Functions

 ObGateway (Topology::partitionAddress *myIdentityArg)
 
virtual ~ObGateway ()
 
void updateRemoteGateways ()
 

Public Attributes

Topology::partitionAddress myIdentity
 
class Mboxes mboxes
 
class Topology myTopology
 

Private Attributes

std::vector< int > remoteGateways
 
socklen_t optlen
 
int so_sndbuf
 
char * serstrsmall
 
char * cstrsmall
 
bool ismultinode
 

Detailed Description

execute Outbound Gateway actor

Parameters
myIdentityArghow to identify this

Definition at line 40 of file ObGateway.h.

Constructor & Destructor Documentation

ObGateway::ObGateway ( Topology::partitionAddress myIdentityArg)

Definition at line 33 of file ObGateway.cc.

References Topology::partitionAddress::address, cfgs, cfg_s::compressgw, cstrsmall, Message::message_s::destAddr, Topology::partitionAddress::instance, Topology::partitionAddress::mbox, mboxes, Message::messageStruct, MessageBatchSerialized::msgbatch, myIdentity, myTopology, MessageBatchSerialized::nmsgs, Topology::addressStruct::nodeid, Topology::nodeid, Mboxes::nodeid, MessageBatchSerialized::msgbatch_s::nodeid, optlen, Mbox::receive(), remoteGateways, SERIALIZEDMAXSIZE, MessageBatchSerialized::msgbatch_s::serializedmsg, serstrsmall, Message::size(), so_sndbuf, Message::message_s::topic, TOPIC_BATCHSERIALIZED, TOPIC_SERIALIZED, TOPIC_TOPOLOGY, Mboxes::update(), and updateRemoteGateways().

33  :
34  myIdentity(*myIdentityArg), so_sndbuf(16777216), ismultinode(false)
35 {
36  delete myIdentityArg;
37 
41 
42  optlen=sizeof(so_sndbuf);
43  int waitfor = 100;
44 
45  // pendingMsgs[remotenodeid]=serialized messages
46  char *sendstr=NULL;
47  size_t sendsize=0;
48  boost::unordered_map< int64_t, vector<string *> > pendingMsgs;
49  char *serstr=NULL;
50  serstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
51  char *serstrbig=NULL;
52  bool isserstrbig=false;
53  char *cstr=NULL;
54  cstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
55  char *cstrbig=NULL;
56  bool iscstrbig=false;
57 
58  while (1)
59  {
60  for (size_t inmsg=0; inmsg < 5000; inmsg++)
61  {
62  class Message *msgrcv = myIdentity.mbox->receive(waitfor);
63 
64  if (msgrcv==NULL)
65  {
66  waitfor = 100;
67  break;
68  }
69 
70  waitfor = 0;
71 
72  switch (msgrcv->messageStruct.topic)
73  {
74  case TOPIC_TOPOLOGY:
77  break;
78 
79  case TOPIC_SERIALIZED: // destined for remote host
80  pendingMsgs[msgrcv->messageStruct.destAddr.nodeid].push_back(((class MessageSerialized *)msgrcv)->data);
81  break;
82 
84  {
85  class MessageBatchSerialized &msgRef=
86  *(class MessageBatchSerialized *)msgrcv;
87  for (short n=0; n<msgRef.nmsgs; n++)
88  {
89  pendingMsgs[msgRef.msgbatch[n].nodeid].push_back(msgRef.msgbatch[n].serializedmsg);
90  }
91  }
92  break;
93 
94  default:
95  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
96  msgrcv->messageStruct.topic);
97  }
98  }
99 
100  // send all pendings
101  boost::unordered_map< int64_t, vector<string *> >::iterator it;
102  for (it=pendingMsgs.begin(); it != pendingMsgs.end(); ++it)
103  {
104  vector<string *> &msgsRef = it->second;
105  size_t s=sizeof(size_t);
106  for (size_t n=0; n < msgsRef.size(); n++)
107  {
108  s += sizeof(size_t) + msgsRef[n]->size();
109  }
110  if (!s)
111  {
112  continue;
113  }
114  // format is: total size, [msgsize, msg]...
115  if (s>SERIALIZEDMAXSIZE)
116  {
117  serstrbig=new (std::nothrow) char[s];
118  serstr=serstrbig;
119  isserstrbig=true;
120  }
121  else
122  {
123  serstr=serstrsmall;
124  isserstrbig=false;
125  }
126  memcpy(serstr, &s, sizeof(s));
127  size_t pos=sizeof(s);
128  for (size_t n=0; n < msgsRef.size(); n++)
129  {
130  string &msgRef=*msgsRef[n];
131  size_t ms=msgRef.size();
132  memcpy(serstr+pos, &ms, sizeof(ms));
133  pos += sizeof(size_t);
134  memcpy(serstr+pos, msgRef.c_str(), ms);
135  pos += ms;
136  delete &msgRef;
137  }
138 
139  // compress
140  if (cfgs.compressgw==true)
141  {
142  int cbound=LZ4_compressBound(s);
143  if (cbound+sizeof(size_t) > SERIALIZEDMAXSIZE)
144  {
145  cstrbig=new (std::nothrow) char[cbound+sizeof(size_t)];
146  cstr=cstrbig;
147  iscstrbig=true;
148  }
149  else
150  {
151  cstr=cstrsmall;
152  iscstrbig=false;
153  }
154  size_t csize=LZ4_compress(serstr, cstr+sizeof(csize), s);
155  csize += sizeof(csize);
156  memcpy(cstr, &csize, sizeof(csize));
157  sendstr=cstr;
158  sendsize=csize;
159  }
160  else
161  {
162  sendstr=serstr;
163  sendsize=s;
164  }
165 
166  if (send(remoteGateways[it->first], sendstr, sendsize, 0)==-1)
167  {
168  printf("%s %i send errno %i nodeid %i instance %li it->first %li socket %i\n", __FILE__, __LINE__, errno, myTopology.nodeid, myIdentity.instance,
169  it->first, remoteGateways[it->first]);
170  }
171  pendingMsgs.erase(it->first);
172 
173  if (isserstrbig==true)
174  {
175  delete[] serstrbig;
176  }
177  if (iscstrbig==true)
178  {
179  delete[] cstrbig;
180  }
181  }
182  }
183 }

Here is the call graph for this function:

ObGateway::~ObGateway ( )
virtual

Definition at line 185 of file ObGateway.cc.

References cstrsmall, and serstrsmall.

186 {
187  delete serstrsmall;
188  delete cstrsmall;
189 }

Member Function Documentation

void ObGateway::updateRemoteGateways ( )

Definition at line 191 of file ObGateway.cc.

References Topology::ibGateways, Topology::partitionAddress::instance, ismultinode, logfile, myIdentity, myTopology, Topology::nodeid, remoteGateways, setprio(), and so_sndbuf.

Referenced by ObGateway().

192 {
193  map< int64_t, vector<string> >::iterator it;
194 
195  if (!myTopology.ibGateways.empty() &&
196  (int64_t)remoteGateways.size() < myTopology.ibGateways.rbegin()->first+1)
197  {
198  remoteGateways.resize(myTopology.ibGateways.rbegin()->first+1, 0);
199  }
200 
201  for (it = myTopology.ibGateways.begin();
202  it != myTopology.ibGateways.end(); ++it)
203  {
204  vector<string> &vecstringRef = it->second;
205 
206  if (it->first==myTopology.nodeid)
207  {
208  continue;
209  }
210  if ((int64_t)myTopology.ibGateways[it->first].size() <
212  {
213  continue;
214  }
215 
216  if (remoteGateways[it->first]==0)
217  {
218  // connect to server
219  if (ismultinode==false)
220  {
221  setprio();
222  ismultinode=true;
223  }
224  int sockfd;
225  size_t found = vecstringRef[myIdentity.instance].find(':');
226  string node = vecstringRef[myIdentity.instance].substr(0, found);
227  string service = vecstringRef[myIdentity.instance].substr(found+1,
228  vecstringRef[myIdentity.instance].size()-(found+1));
229  struct addrinfo hints;
230  memset(&hints, 0, sizeof(struct addrinfo));
231  hints.ai_family = AF_INET;
232  hints.ai_socktype = SOCK_STREAM;
233  hints.ai_flags = AI_PASSIVE;
234  struct addrinfo *servinfo;
235 
236  if (getaddrinfo(node.c_str(), service.c_str(), &hints,
237  &servinfo))
238  {
239  fprintf(logfile, "%s %i getaddrinfo\n", __FILE__, __LINE__);
240  return;
241  }
242 
243  sockfd = socket(servinfo->ai_family, servinfo->ai_socktype,
244  servinfo->ai_protocol);
245  if (sockfd == -1)
246  {
247  fprintf(logfile, "%s %i socket\n", __FILE__, __LINE__);
248  return;
249  }
250 
251  if (connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen))
252  {
253  fprintf(logfile, "%s %i connect errno %i '%s:%s'\n", __FILE__,
254  __LINE__, errno, node.c_str(), service.c_str());
255  return;
256  }
257 
258  if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf,
259  sizeof(so_sndbuf))==-1)
260  {
261  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__,
262  __LINE__, errno);
263  continue;
264  }
265 
266  freeaddrinfo(servinfo);
267 
268  remoteGateways[it->first] = sockfd;
269  }
270  }
271 }

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

char* ObGateway::cstrsmall
private

Definition at line 57 of file ObGateway.h.

Referenced by ObGateway(), and ~ObGateway().

bool ObGateway::ismultinode
private

Definition at line 58 of file ObGateway.h.

Referenced by updateRemoteGateways().

class Mboxes ObGateway::mboxes

Definition at line 48 of file ObGateway.h.

Referenced by ObGateway().

Topology::partitionAddress ObGateway::myIdentity

Definition at line 47 of file ObGateway.h.

Referenced by ObGateway(), and updateRemoteGateways().

class Topology ObGateway::myTopology

Definition at line 49 of file ObGateway.h.

Referenced by ObGateway(), and updateRemoteGateways().

socklen_t ObGateway::optlen
private

Definition at line 54 of file ObGateway.h.

Referenced by ObGateway().

std::vector<int> ObGateway::remoteGateways
private

Definition at line 53 of file ObGateway.h.

Referenced by ObGateway(), and updateRemoteGateways().

char* ObGateway::serstrsmall
private

Definition at line 56 of file ObGateway.h.

Referenced by ObGateway(), and ~ObGateway().

int ObGateway::so_sndbuf
private

Definition at line 55 of file ObGateway.h.

Referenced by ObGateway(), and updateRemoteGateways().


The documentation for this class was generated from the following files: