InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Listener Class Reference

execute Listener actor More...

#include <Listener.h>

Collaboration diagram for Listener:

Public Member Functions

 Listener (Topology::partitionAddress *myIdentityArg)
 
virtual ~Listener ()
 
int startsocket (string &node, string &service)
 create listening socket More...
 

Public Attributes

class Mboxes mboxes
 
Topology::partitionAddress myIdentity
 
class Topology myTopology
 

Detailed Description

execute Listener actor

Parameters
myIdentityArghow to identify this

Definition at line 46 of file Listener.h.

Constructor & Destructor Documentation

Listener::Listener ( Topology::partitionAddress myIdentityArg)

Definition at line 41 of file Listener.cc.

References Topology::partitionAddress::address, connectionsMutex, Topology::partitionAddress::epollfd, LISTENER_NONE, LISTENER_PG, LISTENER_RAW, listenerTypes, logfile, mboxes, myIdentity, myTopology, Topology::addressStruct::nodeid, Mboxes::nodeid, Topology::partitionAddress::nodes, NUMSOCKETS, Topology::numtransactionagents, MboxProducer::sendMsg(), Topology::partitionAddress::services, socketAffinity, startsocket(), TOPIC_SOCKET, TOPIC_SOCKETCONNECTED, Mboxes::transactionAgentPtrs, and Mboxes::update().

42  : myIdentity(*myIdentityArg)
43 {
44  delete myIdentityArg;
47 
48  int listenersockfd = startsocket(myIdentity.nodes[0], myIdentity.services[0]);
49  int pglistenersockfd = startsocket(myIdentity.nodes[1],
50  myIdentity.services[1]);
51  struct epoll_event ev;
52  ev.events = EPOLLIN | EPOLLHUP | EPOLLET;
53  ev.data.fd = listenersockfd;
54 
55  if (epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, listenersockfd, &ev) == -1)
56  {
57  fprintf(logfile, "%s %i epoll_ctl errno %i\n", __FILE__, __LINE__, errno);
58  }
59 
60  ev.data.fd = pglistenersockfd;
61 
62  if (epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, pglistenersockfd, &ev) == -1)
63  {
64  fprintf(logfile, "%s %i epoll_ctl errno %i\n", __FILE__, __LINE__, errno);
65  }
66 
67  struct sockaddr_in their_addr; // connector's address information
68 
69  socklen_t sin_size = sizeof(their_addr);
70 
71  int roundrobin = 0;
72 
73  struct epoll_event events[EPOLLEVENTS];
74 
75  socketAffinity.resize(NUMSOCKETS+1, 0);
77 
78  class MboxProducer *producer=NULL;
79  listenertype_e listenertype=LISTENER_NONE;
80 
81  while (1)
82  {
83  int eventcount = epoll_wait(myIdentity.epollfd, events, EPOLLEVENTS, -1);
84 
85  for (int n=0; n < eventcount; n++)
86  {
87  int fd = events[n].data.fd;
88  int event = events[n].events;
89 
90  if (fd==listenersockfd || fd==pglistenersockfd)
91  {
92  if (fd==listenersockfd)
93  {
94  if (event & EPOLLIN)
95  {
96  int newfd = accept(fd, (struct sockaddr *)&their_addr,
97  &sin_size);
98 
99  if (newfd == -1)
100  {
101  printf("%s %i accept errno %i\n", __FILE__, __LINE__,
102  errno);
103  break;
104  }
105  if (newfd > NUMSOCKETS)
106  {
107  fprintf(logfile, "%s %i fd %i > %i\n", __FILE__,
108  __LINE__, newfd,
109  NUMSOCKETS);
110  close(newfd);
111  continue;
112  }
113 
114  fcntl(newfd, F_SETFL, O_NONBLOCK);
115  int optval = 1;
116  setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &optval,
117  sizeof(optval));
118  ev.data.fd = newfd;
119  pthread_mutex_lock(&connectionsMutex);
120  socketAffinity[newfd]=
121  mboxes.transactionAgentPtrs[roundrobin++ %
124  pthread_mutex_unlock(&connectionsMutex);
125  epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, newfd, &ev);
126  }
127  }
128  else if (fd==pglistenersockfd)
129  {
130  // if (event & EPOLLIN)
131  while (1)
132  {
133  int newfd = accept(fd, (struct sockaddr *)&their_addr,
134  &sin_size);
135 
136  if (newfd == -1)
137  {
138  if (errno != EAGAIN && errno != EWOULDBLOCK)
139  {
140  printf("%s %i accept errno %i\n", __FILE__,
141  __LINE__, errno);
142  }
143  else
144  {
145  break;
146  }
147  }
148  if (newfd > NUMSOCKETS)
149  {
150  fprintf(logfile, "%s %i fd %i > %i\n", __FILE__,
151  __LINE__, newfd, NUMSOCKETS);
152  close(newfd);
153  continue;
154  }
155 
156  fcntl(newfd, F_SETFL, O_NONBLOCK);
157  int optval = 1;
158  setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &optval,
159  sizeof(optval));
160  ev.data.fd = newfd;
161  pthread_mutex_lock(&connectionsMutex);
162  socketAffinity[newfd]=
163  mboxes.transactionAgentPtrs[roundrobin++ %
165  listenerTypes[newfd]=LISTENER_PG;
166  pthread_mutex_unlock(&connectionsMutex);
167  epoll_ctl(myIdentity.epollfd, EPOLL_CTL_ADD, newfd, &ev);
168  socketAffinity[newfd]->sendMsg(*(new class MessageSocket(newfd, 0, listenerTypes[newfd], myIdentity.address.nodeid, TOPIC_SOCKETCONNECTED)));
169  }
170  }
171  }
172  else
173  { // already established socket activity
174  pthread_mutex_lock(&connectionsMutex);
175  if (socketAffinity[fd])
176  {
177  producer=socketAffinity[fd];
178  listenertype=listenerTypes[fd];
179  }
180  else
181  {
182  pthread_mutex_unlock(&connectionsMutex);
183  fprintf(logfile, "%s %i event %i on spurious sockfd %i\n", __FILE__,
184  __LINE__, event, fd);
185  continue;
186  }
187  pthread_mutex_unlock(&connectionsMutex);
188  producer->sendMsg(*(new class MessageSocket(fd, event,
189  listenertype,
191  TOPIC_SOCKET)));
192  }
193  }
194  }
195 }

Here is the call graph for this function:

Listener::~Listener ( )
virtual

Definition at line 197 of file Listener.cc.

198 {
199 }

Member Function Documentation

int Listener::startsocket ( string &  node,
string &  service 
)

create listening socket

Parameters
nodehostname or ipv4 address
serviceTCP port or service name
Returns

< TODO: handle listener failure

Definition at line 211 of file Listener.cc.

References logfile.

Referenced by Listener().

212 {
213  struct addrinfo hints = {};
214  hints.ai_family = AF_INET;
215  hints.ai_socktype = SOCK_STREAM;
216  hints.ai_protocol = IPPROTO_TCP;
217  hints.ai_flags = AI_PASSIVE;
218  struct addrinfo *servinfo;
219 
220  char *nodeptr = NULL;
221 
222  if (node.size())
223  {
224  nodeptr = (char *)node.c_str();
225  }
226 
227  int rv = getaddrinfo((const char *)nodeptr, service.c_str(), &hints,
228  &servinfo);
229 
230  if (rv)
231  {
232  fprintf(logfile, "%s %i getaddrinfo %s %i\n", __FILE__, __LINE__,
233  gai_strerror(rv), rv);
234  exit(1);
235  }
236 
237  int sockfd = 0;
238  int yes = 1;
239  struct addrinfo *p=NULL;
240 
241  for (p = servinfo; p != NULL; p = p->ai_next)
242  {
243  sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
244 
245  if (sockfd == -1)
246  {
247  fprintf(logfile, "%s %i socket errno %i\n", __FILE__, __LINE__,
248  errno);
249  continue;
250  }
251 
252  fcntl(sockfd, F_SETFL, O_NONBLOCK);
253 
254  if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))==-1)
255  {
256  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__, __LINE__,
257  errno);
258  continue;
259  }
260 
261  if (bind(sockfd, p->ai_addr, p->ai_addrlen)==-1)
262  {
263  close(sockfd);
264  fprintf(logfile, "%s %i bind errno %i\n", __FILE__, __LINE__, errno);
265  continue;
266  }
267 
268  break;
269  }
270 
271  freeaddrinfo(servinfo);
272 
273  if (p==NULL)
274  {
275  fprintf(logfile, "%s %i listener: failed to bind\n", __FILE__, __LINE__);
276  return -1;
277  }
278 
279  if (listen(sockfd, 1000) == -1)
280  {
281  fprintf(logfile, "%s %i listen errno %i\n", __FILE__, __LINE__, errno);
282  exit(1);
283  }
284 
285  return sockfd;
286 }

Here is the caller graph for this function:

Member Data Documentation

class Mboxes Listener::mboxes

Definition at line 63 of file Listener.h.

Referenced by Listener().

Topology::partitionAddress Listener::myIdentity

Definition at line 64 of file Listener.h.

Referenced by Listener().

class Topology Listener::myTopology

Definition at line 65 of file Listener.h.

Referenced by Listener().


The documentation for this class was generated from the following files: