InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Listener.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
36 #include "Listener.h"
37 #line 38 "Listener.cc"
38 
39 #define EPOLLEVENTS 1024
40 
42  : myIdentity(*myIdentityArg)
43 {
44  delete myIdentityArg;
47 
48  int listenersockfd = startsocket(myIdentity.nodes[0], myIdentity.services[0]);
49  int pglistenersockfd = startsocket(myIdentity.nodes[1],
50  myIdentity.services[1]);
51  struct epoll_event ev;
52  ev.events = EPOLLIN | EPOLLHUP | EPOLLET;
53  ev.data.fd = listenersockfd;
54 
55  if (epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, listenersockfd, &ev) == -1)
56  {
57  fprintf(logfile, "%s %i epoll_ctl errno %i\n", __FILE__, __LINE__, errno);
58  }
59 
60  ev.data.fd = pglistenersockfd;
61 
62  if (epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, pglistenersockfd, &ev) == -1)
63  {
64  fprintf(logfile, "%s %i epoll_ctl errno %i\n", __FILE__, __LINE__, errno);
65  }
66 
67  struct sockaddr_in their_addr; // connector's address information
68 
69  socklen_t sin_size = sizeof(their_addr);
70 
71  int roundrobin = 0;
72 
73  struct epoll_event events[EPOLLEVENTS];
74 
75  socketAffinity.resize(NUMSOCKETS+1, 0);
77 
78  class MboxProducer *producer=NULL;
79  listenertype_e listenertype=LISTENER_NONE;
80 
81  while (1)
82  {
83  int eventcount = epoll_wait(myIdentity.epollfd, events, EPOLLEVENTS, -1);
84 
85  for (int n=0; n < eventcount; n++)
86  {
87  int fd = events[n].data.fd;
88  int event = events[n].events;
89 
90  if (fd==listenersockfd || fd==pglistenersockfd)
91  {
92  if (fd==listenersockfd)
93  {
94  if (event & EPOLLIN)
95  {
96  int newfd = accept(fd, (struct sockaddr *)&their_addr,
97  &sin_size);
98 
99  if (newfd == -1)
100  {
101  printf("%s %i accept errno %i\n", __FILE__, __LINE__,
102  errno);
103  break;
104  }
105  if (newfd > NUMSOCKETS)
106  {
107  fprintf(logfile, "%s %i fd %i > %i\n", __FILE__,
108  __LINE__, newfd,
109  NUMSOCKETS);
110  close(newfd);
111  continue;
112  }
113 
114  fcntl(newfd, F_SETFL, O_NONBLOCK);
115  int optval = 1;
116  setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &optval,
117  sizeof(optval));
118  ev.data.fd = newfd;
119  pthread_mutex_lock(&connectionsMutex);
120  socketAffinity[newfd]=
121  mboxes.transactionAgentPtrs[roundrobin++ %
124  pthread_mutex_unlock(&connectionsMutex);
125  epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, newfd, &ev);
126  }
127  }
128  else if (fd==pglistenersockfd)
129  {
130  // if (event & EPOLLIN)
131  while (1)
132  {
133  int newfd = accept(fd, (struct sockaddr *)&their_addr,
134  &sin_size);
135 
136  if (newfd == -1)
137  {
138  if (errno != EAGAIN && errno != EWOULDBLOCK)
139  {
140  printf("%s %i accept errno %i\n", __FILE__,
141  __LINE__, errno);
142  }
143  else
144  {
145  break;
146  }
147  }
148  if (newfd > NUMSOCKETS)
149  {
150  fprintf(logfile, "%s %i fd %i > %i\n", __FILE__,
151  __LINE__, newfd, NUMSOCKETS);
152  close(newfd);
153  continue;
154  }
155 
156  fcntl(newfd, F_SETFL, O_NONBLOCK);
157  int optval = 1;
158  setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &optval,
159  sizeof(optval));
160  ev.data.fd = newfd;
161  pthread_mutex_lock(&connectionsMutex);
162  socketAffinity[newfd]=
163  mboxes.transactionAgentPtrs[roundrobin++ %
165  listenerTypes[newfd]=LISTENER_PG;
166  pthread_mutex_unlock(&connectionsMutex);
167  epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, newfd, &ev);
168  socketAffinity[newfd]->sendMsg(*(new class MessageSocket(newfd, 0, listenerTypes[newfd], myIdentity.address.nodeid, TOPIC_SOCKETCONNECTED)));
169  }
170  }
171  }
172  else
173  { // already established socket activity
174  pthread_mutex_lock(&connectionsMutex);
175  if (socketAffinity[fd])
176  {
177  producer=socketAffinity[fd];
178  listenertype=listenerTypes[fd];
179  }
180  else
181  {
182  pthread_mutex_unlock(&connectionsMutex);
183  fprintf(logfile, "%s %i event %i on spurious sockfd %i\n", __FILE__,
184  __LINE__, event, fd);
185  continue;
186  }
187  pthread_mutex_unlock(&connectionsMutex);
188  producer->sendMsg(*(new class MessageSocket(fd, event,
189  listenertype,
191  TOPIC_SOCKET)));
192  }
193  }
194  }
195 }
196 
198 {
199 }
200 
201 void *listener(void *identity)
202 {
203  new Listener((Topology::partitionAddress *)identity);
204  while (1)
205  {
206  sleep(250000);
207  }
208  return NULL;
209 }
210 
211 int Listener::startsocket(string &node, string &service)
212 {
213  struct addrinfo hints = {};
214  hints.ai_family = AF_INET;
215  hints.ai_socktype = SOCK_STREAM;
216  hints.ai_protocol = IPPROTO_TCP;
217  hints.ai_flags = AI_PASSIVE;
218  struct addrinfo *servinfo;
219 
220  char *nodeptr = NULL;
221 
222  if (node.size())
223  {
224  nodeptr = (char *)node.c_str();
225  }
226 
227  int rv = getaddrinfo((const char *)nodeptr, service.c_str(), &hints,
228  &servinfo);
229 
230  if (rv)
231  {
232  fprintf(logfile, "%s %i getaddrinfo %s %i\n", __FILE__, __LINE__,
233  gai_strerror(rv), rv);
234  exit(1);
235  }
236 
237  int sockfd = 0;
238  int yes = 1;
239  struct addrinfo *p=NULL;
240 
241  for (p = servinfo; p != NULL; p = p->ai_next)
242  {
243  sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
244 
245  if (sockfd == -1)
246  {
247  fprintf(logfile, "%s %i socket errno %i\n", __FILE__, __LINE__,
248  errno);
249  continue;
250  }
251 
252  fcntl(sockfd, F_SETFL, O_NONBLOCK);
253 
254  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1)
255  {
256  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
257  errno);
258  continue;
259  }
260 
261  if (bind(sockfd, p->ai_addr, p->ai_addrlen)==-1)
262  {
263  close(sockfd);
264  fprintf(logfile, "%s %i bind errno %i\n", __FILE__, __LINE__, errno);
265  continue;
266  }
267 
268  break;
269  }
270 
271  freeaddrinfo(servinfo);
272 
273  if (p==NULL)
274  {
275  fprintf(logfile, "%s %i listener: failed to bind\n", __FILE__, __LINE__);
276  return -1;
277  }
278 
279  if (listen(sockfd, 1000) == -1)
280  {
281  fprintf(logfile, "%s %i listen errno %i\n", __FILE__, __LINE__, errno);
282  exit(1);
283  }
284 
285  return sockfd;
286 }