InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
IbGateway.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
30 #include "IbGateway.h"
31 #line 32 "IbGateway.cc"
32 
33 #define EPOLLEVENTS 1024
34 
36  myIdentity(*myIdentityArg), fds(NULL), nfds(0), ismultinode(false)
37 {
38  delete myIdentityArg;
39 
42 
43  size_t found = myIdentity.argstring.find(':');
44  string node = myIdentity.argstring.substr(0, found);
45  string service = myIdentity.argstring.substr(found+1,
46  myIdentity.argstring.size()-
47  (found+1));
48 
49  struct addrinfo hints = {};
50  hints.ai_family = AF_INET;
51  hints.ai_socktype = SOCK_STREAM;
52  hints.ai_protocol = IPPROTO_TCP;
53  hints.ai_flags = AI_PASSIVE;
54  struct addrinfo *servinfo;
55 
56  int rv = getaddrinfo(node.c_str(), service.c_str(), &hints, &servinfo);
57 
58  if (rv)
59  {
60  fprintf(logfile, "%s %i getaddrinfo %s %i\n", __FILE__, __LINE__,
61  gai_strerror(rv), rv);
62  exit(1);
63  }
64 
65  int sockfd = 0;
66  int yes = 1;
67  struct addrinfo *p=NULL;
68  int so_rcvbuf=16777216;
69  socklen_t optlen=sizeof(so_rcvbuf);
70  inbuf=new (std::nothrow) char[so_rcvbuf];
71  if (inbuf==NULL)
72  {
73  fprintf(logfile, "%s %i malloc inbuf failed\n", __FILE__, __LINE__);
74  exit(1);
75  }
76 
77  for (p = servinfo; p != NULL; p = p->ai_next)
78  {
79  sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
80 
81  if (sockfd == -1)
82  {
83  fprintf(logfile, "%s %i socket errno %i\n", __FILE__, __LINE__,
84  errno);
85  continue;
86  }
87 
88  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1)
89  {
90  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
91  errno);
92  continue;
93  }
94 
95  if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, optlen)==-1)
96  {
97  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
98  errno);
99  continue;
100  }
101 
102  if (bind(sockfd, p->ai_addr, p->ai_addrlen)==-1)
103  {
104  close(sockfd);
105  fprintf(logfile, "%s %i bind errno %i\n", __FILE__, __LINE__, errno);
106  continue;
107  }
108 
109  break;
110  }
111 
112  freeaddrinfo(servinfo);
113 
114  if (p==NULL)
115  {
116  fprintf(logfile, "%s %i listener: failed to bind\n", __FILE__, __LINE__);
117  return;
118  }
119 
120  fcntl(sockfd, F_SETFL, O_NONBLOCK==-1);
121 
122  if (listen(sockfd, 1000) == -1)
123  {
124  fprintf(logfile, "%s %i listen errno %i\n", __FILE__, __LINE__, errno);
125  exit(1);
126  }
127 
128  struct sockaddr_in their_addr; // connector's address information
129 
130  socklen_t sin_size = sizeof(their_addr);
131 
132  addtofds(sockfd);
133 
134  dcstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
135 
136  while (1)
137  {
138  int eventcount = poll(fds, nfds, -1);
139 
140  if (eventcount < 0)
141  {
142  continue;
143  }
144 
145  int m = 0;
146 
147  for (nfds_t n=0; n < nfds; n++)
148  {
149  if (m==eventcount)
150  {
151  break;
152  }
153 
154  if (!fds[n].revents)
155  {
156  continue;
157  }
158 
159  m++;
160  short event = fds[n].revents;
161 
162  // if it's the listening socket, then accept(), otherwise
163  if (fds[n].fd==sockfd)
164  {
165  if ((event & EPOLLERR) || (event & EPOLLHUP))
166  {
167  continue;
168  }
169  else if (event & POLLIN)
170  {
171  int newfd = accept(sockfd, (struct sockaddr *)&their_addr,
172  &sin_size);
173 
174  if (newfd == -1)
175  {
176  printf("%s %i accept errno %i\n", __FILE__, __LINE__,
177  errno);
178  continue;
179  }
180  if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf,
181  optlen)==-1)
182  {
183  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__,
184  __LINE__, errno);
185  continue;
186  }
187  fcntl(newfd, F_SETFL, O_NONBLOCK);
188  addtofds(newfd);
189  if (ismultinode==false)
190  {
191  setprio();
192  ismultinode=true;
193  }
194  continue;
195  }
196  }
197 
198  if ((event & EPOLLERR) || (event & EPOLLHUP))
199  {
200  close(fds[n].fd);
201  fdremoveset.insert(fds[n].fd);
202  fprintf(logfile, "%s %i instance %li n %lu\n", __FILE__,
203  __LINE__, myIdentity.instance, n);
204  continue;
205  }
206  else if (event & POLLIN)
207  {
208  ssize_t readed;
209  int readfd=fds[n].fd;
210 
211  do
212  {
213  readed=read(readfd, inbuf, so_rcvbuf);
214  switch (readed)
215  {
216  case -1:
217  if (errno==EAGAIN || errno==EWOULDBLOCK)
218  {
219  if (pendingReads.count(readfd))
220  {
221  string &strRef=pendingReads[readfd];
222  size_t pos=0;
223  while (pos < strRef.size())
224  {
225  if (sizeof(size_t)>(size_t)(strRef.size()-
226  pos))
227  { // can't even read size of message group
228  break;
229  }
230  size_t packagesize=*(size_t *)(strRef.c_str()+
231  pos);
232  if (packagesize > strRef.size()-pos)
233  { // can't read next message group entirely
234  break;
235  }
236  inbufhandler(strRef.c_str()+pos, packagesize);
237  pos += packagesize;
238  }
239  if (pos<strRef.size())
240  { // background the remainder
241  string newstr(strRef, pos, string::npos);
242  strRef.swap(newstr);
243  }
244  else
245  {
246  strRef.clear();
247  }
248  }
249  }
250  else
251  {
252  close(readfd);
253  fdremoveset.insert(readfd);
254  pendingReads.erase(readfd);
255  break;
256  }
257  break;
258 
259  case 0:
260  close(readfd);
261  fdremoveset.insert(readfd);
262  pendingReads.erase(readfd);
263  break;
264 
265  default:
266  {
267  if (pendingReads.count(readfd))
268  {
269  pendingReads[readfd].append(inbuf, readed);
270  }
271  else
272  {
273  size_t pos=0;
274  while (pos < (size_t)readed)
275  {
276  if (sizeof(size_t)>(size_t)(readed-pos))
277  { // can't even read size of message group
278  break;
279  }
280  size_t packagesize=*(size_t *)(inbuf+pos);
281  if (packagesize > readed-pos)
282  { // can't read next message group entirely
283  break;
284  }
285  inbufhandler(inbuf+pos, packagesize);
286  pos += packagesize;
287  }
288  if (pos<(size_t)readed)
289  { // background the remainder
290  pendingReads[readfd].assign(inbuf+pos,
291  readed-pos);
292  }
293  }
294  }
295  }
296  }
297  while (readed > 0);
298 
299  }
300  }
301 
302  removefds();
303  }
304 }
305 
307 {
308  delete inbuf;
309  delete dcstrsmall;
310 }
311 
312 // have read everything before processing
313 void IbGateway::inbufhandler(const char *buf, size_t bufsize)
314 {
315  char *inbuf;
316  size_t inbufsize;
317  char *dcstr;
318  char *dcstrbig;
319  bool isdcstrbig=false;
320 
321  if (cfgs.compressgw==true)
322  { // SERIALIZEDMAXSIZE
323  int bs=SERIALIZEDMAXSIZE;
324  dcstr=dcstrsmall;
325  while (1)
326  {
327  ssize_t dcsize=LZ4_decompress_safe(buf+sizeof(bufsize), dcstr,
328  bufsize-sizeof(bufsize), bs);
329  if (dcsize < 0)
330  {
331  if (isdcstrbig==true)
332  {
333  delete dcstrbig;
334  }
335  else
336  {
337  isdcstrbig=true;
338  }
339  bs *= 2;
340  dcstrbig=new (std::nothrow) char[bs];
341  dcstr=dcstrbig;
342  continue;
343  }
344  inbuf=dcstr;
345  inbufsize=dcsize;
346  break;
347  }
348  }
349  else
350  {
351  inbuf=(char *)buf;
352  inbufsize=bufsize;
353  }
354 
355  size_t pos=sizeof(size_t); // i already know the whole size, it's bufsize
356  while (pos<inbufsize)
357  {
358  size_t s=*(size_t *)(inbuf+pos);
359  pos += sizeof(s);
360  string *serstr=new string(inbuf+pos, s);
361  pos += s;
362  class MessageSerialized *msgsnd=new class MessageSerialized(serstr);
364  msgsnd->messageStruct.destAddr, *msgsnd);
365  }
366 
367  if (isdcstrbig==true)
368  {
369  delete[] dcstrbig;
370  }
371 }
372 
373 void IbGateway::addtofds(int newfd)
374 {
375  struct pollfd *newfds = new struct pollfd[++nfds];
376 
377  for (nfds_t n=0; n < nfds-1; n++)
378  {
379  newfds[n] = fds[n];
380  }
381 
382  newfds[nfds-1] = {newfd, EPOLLIN | EPOLLERR | EPOLLHUP, 0};
383 
384  if (fds != NULL)
385  {
386  delete fds;
387  }
388 
389  fds = newfds;
390 }
391 
393 {
394  if (!fdremoveset.size())
395  {
396  return;
397  }
398 
399  struct pollfd *newfds = new struct pollfd[nfds - fdremoveset.size()];
400 
401  nfds_t m=0;
402 
403  for (nfds_t n=0; n < nfds - fdremoveset.size(); n++)
404  {
405  if (!fdremoveset.count(fds[n].fd))
406  {
407  newfds[m++] = fds[n];
408  }
409  }
410 
411  delete fds;
412  fds = newfds;
413  nfds -= fdremoveset.size();
414  fdremoveset.clear();
415  fprintf(logfile, "%s %i instance %li removefds nfds %lu\n", __FILE__,
416  __LINE__, myIdentity.instance, nfds);
417 }
418 
419 // launcher, regular function
420 void *ibGateway(void *identity)
421 {
422  new IbGateway((Topology::partitionAddress *)identity);
423  while (1)
424  {
425  sleep(500000);
426  }
427  return NULL;
428 }