InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
IbGateway Class Reference

Inbound Gateway Actor. More...

#include <IbGateway.h>

Collaboration diagram for IbGateway:

Public Member Functions

 IbGateway (Topology::partitionAddress *myIdentityArg)
 execute Inbound Gateway More...
 
virtual ~IbGateway ()
 

Public Attributes

Topology::partitionAddress myIdentity
 
class Mboxes mboxes
 
class Topology myTopology
 

Private Member Functions

void inbufhandler (const char *buf, size_t bufsize)
 process compressed batch of incoming messages More...
 
void addtofds (int newfd)
 add file descriptor to remote ObGateway to poll structure More...
 
void removefds ()
 remove file descriptors from poll struct More...
 

Private Attributes

boost::unordered_map< int,
std::string > 
pendingReads
 
boost::unordered_set< int > fdremoveset
 
struct pollfd * fds
 
nfds_t nfds
 
char * inbuf
 
char * dcstrsmall
 
bool ismultinode
 

Detailed Description

Inbound Gateway Actor.

Definition at line 38 of file IbGateway.h.

Constructor & Destructor Documentation

IbGateway::IbGateway ( Topology::partitionAddress myIdentityArg)

execute Inbound Gateway

receives Message batches from other nodes, decompresses and distributes to local actors.

Parameters
myIdentityArghow to identify this

< TODO: handle listener failure

Definition at line 35 of file IbGateway.cc.

References Topology::partitionAddress::address, addtofds(), Topology::partitionAddress::argstring, dcstrsmall, fdremoveset, fds, inbuf, inbufhandler(), Topology::partitionAddress::instance, ismultinode, logfile, mboxes, myIdentity, myTopology, nfds, Topology::addressStruct::nodeid, Mboxes::nodeid, pendingReads, removefds(), SERIALIZEDMAXSIZE, setprio(), and Mboxes::update().

Referenced by ibGateway().

35  :
36  myIdentity(*myIdentityArg), fds(NULL), nfds(0), ismultinode(false)
37 {
38  delete myIdentityArg;
39 
42 
43  size_t found = myIdentity.argstring.find(':');
44  string node = myIdentity.argstring.substr(0, found);
45  string service = myIdentity.argstring.substr(found+1,
46  myIdentity.argstring.size()-
47  (found+1));
48 
49  struct addrinfo hints = {};
50  hints.ai_family = AF_INET;
51  hints.ai_socktype = SOCK_STREAM;
52  hints.ai_protocol = IPPROTO_TCP;
53  hints.ai_flags = AI_PASSIVE;
54  struct addrinfo *servinfo;
55 
56  int rv = getaddrinfo(node.c_str(), service.c_str(), &hints, &servinfo);
57 
58  if (rv)
59  {
60  fprintf(logfile, "%s %i getaddrinfo %s %i\n", __FILE__, __LINE__,
61  gai_strerror(rv), rv);
62  exit(1);
63  }
64 
65  int sockfd = 0;
66  int yes = 1;
67  struct addrinfo *p=NULL;
68  int so_rcvbuf=16777216;
69  socklen_t optlen=sizeof(so_rcvbuf);
70  inbuf=new (std::nothrow) char[so_rcvbuf];
71  if (inbuf==NULL)
72  {
73  fprintf(logfile, "%s %i malloc inbuf failed\n", __FILE__, __LINE__);
74  exit(1);
75  }
76 
77  for (p = servinfo; p != NULL; p = p->ai_next)
78  {
79  sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
80 
81  if (sockfd == -1)
82  {
83  fprintf(logfile, "%s %i socket errno %i\n", __FILE__, __LINE__,
84  errno);
85  continue;
86  }
87 
88  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1)
89  {
90  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
91  errno);
92  continue;
93  }
94 
95  if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, optlen)==-1)
96  {
97  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
98  errno);
99  continue;
100  }
101 
102  if (bind(sockfd, p->ai_addr, p->ai_addrlen)==-1)
103  {
104  close(sockfd);
105  fprintf(logfile, "%s %i bind errno %i\n", __FILE__, __LINE__, errno);
106  continue;
107  }
108 
109  break;
110  }
111 
112  freeaddrinfo(servinfo);
113 
114  if (p==NULL)
115  {
116  fprintf(logfile, "%s %i listener: failed to bind\n", __FILE__, __LINE__);
117  return;
118  }
119 
120  fcntl(sockfd, F_SETFL, O_NONBLOCK==-1);
121 
122  if (listen(sockfd, 1000) == -1)
123  {
124  fprintf(logfile, "%s %i listen errno %i\n", __FILE__, __LINE__, errno);
125  exit(1);
126  }
127 
128  struct sockaddr_in their_addr; // connector's address information
129 
130  socklen_t sin_size = sizeof(their_addr);
131 
132  addtofds(sockfd);
133 
134  dcstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
135 
136  while (1)
137  {
138  int eventcount = poll(fds, nfds, -1);
139 
140  if (eventcount < 0)
141  {
142  continue;
143  }
144 
145  int m = 0;
146 
147  for (nfds_t n=0; n < nfds; n++)
148  {
149  if (m==eventcount)
150  {
151  break;
152  }
153 
154  if (!fds[n].revents)
155  {
156  continue;
157  }
158 
159  m++;
160  short event = fds[n].revents;
161 
162  // if it's the listening socket, then accept(), otherwise
163  if (fds[n].fd==sockfd)
164  {
165  if ((event & EPOLLERR) || (event & EPOLLHUP))
166  {
167  continue;
168  }
169  else if (event & POLLIN)
170  {
171  int newfd = accept(sockfd, (struct sockaddr *)&their_addr,
172  &sin_size);
173 
174  if (newfd == -1)
175  {
176  printf("%s %i accept errno %i\n", __FILE__, __LINE__,
177  errno);
178  continue;
179  }
180  if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf,
181  optlen)==-1)
182  {
183  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__,
184  __LINE__, errno);
185  continue;
186  }
187  fcntl(newfd, F_SETFL, O_NONBLOCK);
188  addtofds(newfd);
189  if (ismultinode==false)
190  {
191  setprio();
192  ismultinode=true;
193  }
194  continue;
195  }
196  }
197 
198  if ((event & EPOLLERR) || (event & EPOLLHUP))
199  {
200  close(fds[n].fd);
201  fdremoveset.insert(fds[n].fd);
202  fprintf(logfile, "%s %i instance %li n %lu\n", __FILE__,
203  __LINE__, myIdentity.instance, n);
204  continue;
205  }
206  else if (event & POLLIN)
207  {
208  ssize_t readed;
209  int readfd=fds[n].fd;
210 
211  do
212  {
213  readed=read(readfd, inbuf, so_rcvbuf);
214  switch (readed)
215  {
216  case -1:
217  if (errno==EAGAIN || errno==EWOULDBLOCK)
218  {
219  if (pendingReads.count(readfd))
220  {
221  string &strRef=pendingReads[readfd];
222  size_t pos=0;
223  while (pos < strRef.size())
224  {
225  if (sizeof(size_t)>(size_t)(strRef.size()-
226  pos))
227  { // can't even read size of message group
228  break;
229  }
230  size_t packagesize=*(size_t *)(strRef.c_str()+
231  pos);
232  if (packagesize > strRef.size()-pos)
233  { // can't read next message group entirely
234  break;
235  }
236  inbufhandler(strRef.c_str()+pos, packagesize);
237  pos += packagesize;
238  }
239  if (pos<strRef.size())
240  { // background the remainder
241  string newstr(strRef, pos, string::npos);
242  strRef.swap(newstr);
243  }
244  else
245  {
246  strRef.clear();
247  }
248  }
249  }
250  else
251  {
252  close(readfd);
253  fdremoveset.insert(readfd);
254  pendingReads.erase(readfd);
255  break;
256  }
257  break;
258 
259  case 0:
260  close(readfd);
261  fdremoveset.insert(readfd);
262  pendingReads.erase(readfd);
263  break;
264 
265  default:
266  {
267  if (pendingReads.count(readfd))
268  {
269  pendingReads[readfd].append(inbuf, readed);
270  }
271  else
272  {
273  size_t pos=0;
274  while (pos < (size_t)readed)
275  {
276  if (sizeof(size_t)>(size_t)(readed-pos))
277  { // can't even read size of message group
278  break;
279  }
280  size_t packagesize=*(size_t *)(inbuf+pos);
281  if (packagesize > readed-pos)
282  { // can't read next message group entirely
283  break;
284  }
285  inbufhandler(inbuf+pos, packagesize);
286  pos += packagesize;
287  }
288  if (pos<(size_t)readed)
289  { // background the remainder
290  pendingReads[readfd].assign(inbuf+pos,
291  readed-pos);
292  }
293  }
294  }
295  }
296  }
297  while (readed > 0);
298 
299  }
300  }
301 
302  removefds();
303  }
304 }

Here is the call graph for this function:

Here is the caller graph for this function:

IbGateway::~IbGateway ( )
virtual

Definition at line 306 of file IbGateway.cc.

References dcstrsmall, and inbuf.

307 {
308  delete inbuf;
309  delete dcstrsmall;
310 }

Member Function Documentation

void IbGateway::addtofds ( int  newfd)
private

add file descriptor to remote ObGateway to poll structure

Parameters
newfdsocket descriptor

Definition at line 373 of file IbGateway.cc.

References fds.

Referenced by IbGateway().

374 {
375  struct pollfd *newfds = new struct pollfd[++nfds];
376 
377  for (nfds_t n=0; n < nfds-1; n++)
378  {
379  newfds[n] = fds[n];
380  }
381 
382  newfds[nfds-1] = {newfd, EPOLLIN | EPOLLERR | EPOLLHUP, 0};
383 
384  if (fds != NULL)
385  {
386  delete fds;
387  }
388 
389  fds = newfds;
390 }

Here is the caller graph for this function:

void IbGateway::inbufhandler ( const char *  buf,
size_t  bufsize 
)
private

process compressed batch of incoming messages

Parameters
bufincoming message buffer
bufsizebuffer size

Definition at line 313 of file IbGateway.cc.

References cfgs, cfg_s::compressgw, dcstrsmall, Message::message_s::destAddr, inbuf, mboxes, Message::messageStruct, SERIALIZEDMAXSIZE, Message::message_s::sourceAddr, and Mboxes::toActor().

Referenced by IbGateway().

314 {
315  char *inbuf;
316  size_t inbufsize;
317  char *dcstr;
318  char *dcstrbig;
319  bool isdcstrbig=false;
320 
321  if (cfgs.compressgw==true)
322  { // SERIALIZEDMAXSIZE
323  int bs=SERIALIZEDMAXSIZE;
324  dcstr=dcstrsmall;
325  while (1)
326  {
327  ssize_t dcsize=LZ4_decompress_safe(buf+sizeof(bufsize), dcstr,
328  bufsize-sizeof(bufsize), bs);
329  if (dcsize < 0)
330  {
331  if (isdcstrbig==true)
332  {
333  delete dcstrbig;
334  }
335  else
336  {
337  isdcstrbig=true;
338  }
339  bs *= 2;
340  dcstrbig=new (std::nothrow) char[bs];
341  dcstr=dcstrbig;
342  continue;
343  }
344  inbuf=dcstr;
345  inbufsize=dcsize;
346  break;
347  }
348  }
349  else
350  {
351  inbuf=(char *)buf;
352  inbufsize=bufsize;
353  }
354 
355  size_t pos=sizeof(size_t); // i already know the whole size, it's bufsize
356  while (pos<inbufsize)
357  {
358  size_t s=*(size_t *)(inbuf+pos);
359  pos += sizeof(s);
360  string *serstr=new string(inbuf+pos, s);
361  pos += s;
362  class MessageSerialized *msgsnd=new class MessageSerialized(serstr);
364  msgsnd->messageStruct.destAddr, *msgsnd);
365  }
366 
367  if (isdcstrbig==true)
368  {
369  delete[] dcstrbig;
370  }
371 }

Here is the call graph for this function:

Here is the caller graph for this function:

void IbGateway::removefds ( )
private

remove file descriptors from poll struct

Definition at line 392 of file IbGateway.cc.

References fdremoveset, fds, Topology::partitionAddress::instance, logfile, and myIdentity.

Referenced by IbGateway().

393 {
394  if (!fdremoveset.size())
395  {
396  return;
397  }
398 
399  struct pollfd *newfds = new struct pollfd[nfds - fdremoveset.size()];
400 
401  nfds_t m=0;
402 
403  for (nfds_t n=0; n < nfds - fdremoveset.size(); n++)
404  {
405  if (!fdremoveset.count(fds[n].fd))
406  {
407  newfds[m++] = fds[n];
408  }
409  }
410 
411  delete fds;
412  fds = newfds;
413  nfds -= fdremoveset.size();
414  fdremoveset.clear();
415  fprintf(logfile, "%s %i instance %li removefds nfds %lu\n", __FILE__,
416  __LINE__, myIdentity.instance, nfds);
417 }

Here is the caller graph for this function:

Member Data Documentation

char* IbGateway::dcstrsmall
private

Definition at line 81 of file IbGateway.h.

Referenced by IbGateway(), inbufhandler(), and ~IbGateway().

boost::unordered_set<int> IbGateway::fdremoveset
private

Definition at line 77 of file IbGateway.h.

Referenced by IbGateway(), and removefds().

struct pollfd* IbGateway::fds
private

Definition at line 78 of file IbGateway.h.

Referenced by addtofds(), IbGateway(), and removefds().

char* IbGateway::inbuf
private

Definition at line 80 of file IbGateway.h.

Referenced by IbGateway(), inbufhandler(), and ~IbGateway().

bool IbGateway::ismultinode
private

Definition at line 82 of file IbGateway.h.

Referenced by IbGateway().

class Mboxes IbGateway::mboxes

Definition at line 53 of file IbGateway.h.

Referenced by IbGateway(), and inbufhandler().

Topology::partitionAddress IbGateway::myIdentity

Definition at line 52 of file IbGateway.h.

Referenced by IbGateway(), and removefds().

class Topology IbGateway::myTopology

Definition at line 54 of file IbGateway.h.

Referenced by IbGateway().

nfds_t IbGateway::nfds
private

Definition at line 79 of file IbGateway.h.

Referenced by IbGateway().

boost::unordered_map<int, std::string> IbGateway::pendingReads
private

Definition at line 76 of file IbGateway.h.

Referenced by IbGateway().


The documentation for this class was generated from the following files: