31 #line 32 "IbGateway.cc"
33 #define EPOLLEVENTS 1024
36 myIdentity(*myIdentityArg), fds(NULL), nfds(0), ismultinode(false)
49 struct addrinfo hints = {};
50 hints.ai_family = AF_INET;
51 hints.ai_socktype = SOCK_STREAM;
52 hints.ai_protocol = IPPROTO_TCP;
53 hints.ai_flags = AI_PASSIVE;
54 struct addrinfo *servinfo;
56 int rv = getaddrinfo(node.c_str(), service.c_str(), &hints, &servinfo);
60 fprintf(
logfile,
"%s %i getaddrinfo %s %i\n", __FILE__, __LINE__,
61 gai_strerror(rv), rv);
67 struct addrinfo *p=NULL;
68 int so_rcvbuf=16777216;
69 socklen_t optlen=
sizeof(so_rcvbuf);
70 inbuf=
new (std::nothrow)
char[so_rcvbuf];
73 fprintf(
logfile,
"%s %i malloc inbuf failed\n", __FILE__, __LINE__);
77 for (p = servinfo; p != NULL; p = p->ai_next)
79 sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
83 fprintf(
logfile,
"%s %i socket errno %i\n", __FILE__, __LINE__,
88 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(yes))==-1)
90 fprintf(
logfile,
"%s %i setsockopt errno %i\n", __FILE__, __LINE__,
95 if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, optlen)==-1)
97 fprintf(
logfile,
"%s %i setsockopt errno %i\n", __FILE__, __LINE__,
102 if (bind(sockfd, p->ai_addr, p->ai_addrlen)==-1)
105 fprintf(
logfile,
"%s %i bind errno %i\n", __FILE__, __LINE__, errno);
112 freeaddrinfo(servinfo);
116 fprintf(
logfile,
"%s %i listener: failed to bind\n", __FILE__, __LINE__);
120 fcntl(sockfd, F_SETFL, O_NONBLOCK==-1);
122 if (listen(sockfd, 1000) == -1)
124 fprintf(
logfile,
"%s %i listen errno %i\n", __FILE__, __LINE__, errno);
128 struct sockaddr_in their_addr;
130 socklen_t sin_size =
sizeof(their_addr);
138 int eventcount = poll(
fds,
nfds, -1);
147 for (nfds_t n=0; n <
nfds; n++)
160 short event =
fds[n].revents;
163 if (
fds[n].fd==sockfd)
165 if ((event & EPOLLERR) || (event & EPOLLHUP))
169 else if (event & POLLIN)
171 int newfd = accept(sockfd, (
struct sockaddr *)&their_addr,
176 printf(
"%s %i accept errno %i\n", __FILE__, __LINE__,
180 if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf,
183 fprintf(
logfile,
"%s %i setsockopt errno %i\n", __FILE__,
187 fcntl(newfd, F_SETFL, O_NONBLOCK);
198 if ((event & EPOLLERR) || (event & EPOLLHUP))
202 fprintf(
logfile,
"%s %i instance %li n %lu\n", __FILE__,
206 else if (event & POLLIN)
209 int readfd=
fds[n].fd;
213 readed=read(readfd,
inbuf, so_rcvbuf);
217 if (errno==EAGAIN || errno==EWOULDBLOCK)
223 while (pos < strRef.size())
225 if (
sizeof(
size_t)>(size_t)(strRef.size()-
230 size_t packagesize=*(
size_t *)(strRef.c_str()+
232 if (packagesize > strRef.size()-pos)
239 if (pos<strRef.size())
241 string newstr(strRef, pos, string::npos);
274 while (pos < (
size_t)readed)
276 if (
sizeof(
size_t)>(
size_t)(readed-pos))
280 size_t packagesize=*(
size_t *)(
inbuf+pos);
281 if (packagesize > readed-pos)
288 if (pos<(
size_t)readed)
319 bool isdcstrbig=
false;
327 ssize_t dcsize=LZ4_decompress_safe(buf+
sizeof(bufsize), dcstr,
328 bufsize-
sizeof(bufsize), bs);
331 if (isdcstrbig==
true)
340 dcstrbig=
new (std::nothrow)
char[bs];
355 size_t pos=
sizeof(size_t);
356 while (pos<inbufsize)
358 size_t s=*(
size_t *)(inbuf+pos);
360 string *serstr=
new string(inbuf+pos, s);
367 if (isdcstrbig==
true)
375 struct pollfd *newfds =
new struct pollfd[++nfds];
377 for (nfds_t n=0; n < nfds-1; n++)
382 newfds[nfds-1] = {newfd, EPOLLIN | EPOLLERR | EPOLLHUP, 0};
399 struct pollfd *newfds =
new struct pollfd[nfds - fdremoveset.size()];
403 for (nfds_t n=0; n < nfds - fdremoveset.size(); n++)
405 if (!fdremoveset.count(
fds[n].fd))
407 newfds[m++] =
fds[n];
413 nfds -= fdremoveset.size();
415 fprintf(
logfile,
"%s %i instance %li removefds nfds %lu\n", __FILE__,