InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ObGateway.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
30 #include "ObGateway.h"
31 #line 32 "ObGateway.cc"
32 
34  myIdentity(*myIdentityArg), so_sndbuf(16777216), ismultinode(false)
35 {
36  delete myIdentityArg;
37 
41 
42  optlen=sizeof(so_sndbuf);
43  int waitfor = 100;
44 
45  // pendingMsgs[remotenodeid]=serialized messages
46  char *sendstr=NULL;
47  size_t sendsize=0;
48  boost::unordered_map< int64_t, vector<string *> > pendingMsgs;
49  char *serstr=NULL;
50  serstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
51  char *serstrbig=NULL;
52  bool isserstrbig=false;
53  char *cstr=NULL;
54  cstrsmall=new (std::nothrow) char[SERIALIZEDMAXSIZE];
55  char *cstrbig=NULL;
56  bool iscstrbig=false;
57 
58  while (1)
59  {
60  for (size_t inmsg=0; inmsg < 5000; inmsg++)
61  {
62  class Message *msgrcv = myIdentity.mbox->receive(waitfor);
63 
64  if (msgrcv==NULL)
65  {
66  waitfor = 100;
67  break;
68  }
69 
70  waitfor = 0;
71 
72  switch (msgrcv->messageStruct.topic)
73  {
74  case TOPIC_TOPOLOGY:
77  break;
78 
79  case TOPIC_SERIALIZED: // destined for remote host
80  pendingMsgs[msgrcv->messageStruct.destAddr.nodeid].push_back(((class MessageSerialized *)msgrcv)->data);
81  break;
82 
84  {
85  class MessageBatchSerialized &msgRef=
86  *(class MessageBatchSerialized *)msgrcv;
87  for (short n=0; n<msgRef.nmsgs; n++)
88  {
89  pendingMsgs[msgRef.msgbatch[n].nodeid].push_back(msgRef.msgbatch[n].serializedmsg);
90  }
91  }
92  break;
93 
94  default:
95  printf("%s %i anomaly %i\n", __FILE__, __LINE__,
96  msgrcv->messageStruct.topic);
97  }
98  }
99 
100  // send all pendings
101  boost::unordered_map< int64_t, vector<string *> >::iterator it;
102  for (it=pendingMsgs.begin(); it != pendingMsgs.end(); ++it)
103  {
104  vector<string *> &msgsRef = it->second;
105  size_t s=sizeof(size_t);
106  for (size_t n=0; n < msgsRef.size(); n++)
107  {
108  s += sizeof(size_t) + msgsRef[n]->size();
109  }
110  if (!s)
111  {
112  continue;
113  }
114  // format is: total size, [msgsize, msg]...
115  if (s>SERIALIZEDMAXSIZE)
116  {
117  serstrbig=new (std::nothrow) char[s];
118  serstr=serstrbig;
119  isserstrbig=true;
120  }
121  else
122  {
123  serstr=serstrsmall;
124  isserstrbig=false;
125  }
126  memcpy(serstr, &s, sizeof(s));
127  size_t pos=sizeof(s);
128  for (size_t n=0; n < msgsRef.size(); n++)
129  {
130  string &msgRef=*msgsRef[n];
131  size_t ms=msgRef.size();
132  memcpy(serstr+pos, &ms, sizeof(ms));
133  pos += sizeof(size_t);
134  memcpy(serstr+pos, msgRef.c_str(), ms);
135  pos += ms;
136  delete &msgRef;
137  }
138 
139  // compress
140  if (cfgs.compressgw==true)
141  {
142  int cbound=LZ4_compressBound(s);
143  if (cbound+sizeof(size_t) > SERIALIZEDMAXSIZE)
144  {
145  cstrbig=new (std::nothrow) char[cbound+sizeof(size_t)];
146  cstr=cstrbig;
147  iscstrbig=true;
148  }
149  else
150  {
151  cstr=cstrsmall;
152  iscstrbig=false;
153  }
154  size_t csize=LZ4_compress(serstr, cstr+sizeof(csize), s);
155  csize += sizeof(csize);
156  memcpy(cstr, &csize, sizeof(csize));
157  sendstr=cstr;
158  sendsize=csize;
159  }
160  else
161  {
162  sendstr=serstr;
163  sendsize=s;
164  }
165 
166  if (send(remoteGateways[it->first], sendstr, sendsize, 0)==-1)
167  {
168  printf("%s %i send errno %i nodeid %i instance %li it->first %li socket %i\n", __FILE__, __LINE__, errno, myTopology.nodeid, myIdentity.instance,
169  it->first, remoteGateways[it->first]);
170  }
171  pendingMsgs.erase(it->first);
172 
173  if (isserstrbig==true)
174  {
175  delete[] serstrbig;
176  }
177  if (iscstrbig==true)
178  {
179  delete[] cstrbig;
180  }
181  }
182  }
183 }
184 
186 {
187  delete serstrsmall;
188  delete cstrsmall;
189 }
190 
192 {
193  map< int64_t, vector<string> >::iterator it;
194 
195  if (!myTopology.ibGateways.empty() &&
196  (int64_t)remoteGateways.size() < myTopology.ibGateways.rbegin()->first+1)
197  {
198  remoteGateways.resize(myTopology.ibGateways.rbegin()->first+1, 0);
199  }
200 
201  for (it = myTopology.ibGateways.begin();
202  it != myTopology.ibGateways.end(); ++it)
203  {
204  vector<string> &vecstringRef = it->second;
205 
206  if (it->first==myTopology.nodeid)
207  {
208  continue;
209  }
210  if ((int64_t)myTopology.ibGateways[it->first].size() <
212  {
213  continue;
214  }
215 
216  if (remoteGateways[it->first]==0)
217  {
218  // connect to server
219  if (ismultinode==false)
220  {
221  setprio();
222  ismultinode=true;
223  }
224  int sockfd;
225  size_t found = vecstringRef[myIdentity.instance].find(':');
226  string node = vecstringRef[myIdentity.instance].substr(0, found);
227  string service = vecstringRef[myIdentity.instance].substr(found+1,
228  vecstringRef[myIdentity.instance].size()-(found+1));
229  struct addrinfo hints;
230  memset(&hints, 0, sizeof(struct addrinfo));
231  hints.ai_family = AF_INET;
232  hints.ai_socktype = SOCK_STREAM;
233  hints.ai_flags = AI_PASSIVE;
234  struct addrinfo *servinfo;
235 
236  if (getaddrinfo(node.c_str(), service.c_str(), &hints,
237  &servinfo))
238  {
239  fprintf(logfile, "%s %i getaddrinfo\n", __FILE__, __LINE__);
240  return;
241  }
242 
243  sockfd = socket(servinfo->ai_family, servinfo->ai_socktype,
244  servinfo->ai_protocol);
245  if (sockfd == -1)
246  {
247  fprintf(logfile, "%s %i socket\n", __FILE__, __LINE__);
248  return;
249  }
250 
251  if (connect(sockfd, servinfo->ai_addr, servinfo->ai_addrlen))
252  {
253  fprintf(logfile, "%s %i connect errno %i '%s:%s'\n", __FILE__,
254  __LINE__, errno, node.c_str(), service.c_str());
255  return;
256  }
257 
258  if (setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &so_sndbuf,
259  sizeof(so_sndbuf))==-1)
260  {
261  fprintf(logfile, "%s %i setsockopt errno %i\n", __FILE__,
262  __LINE__, errno);
263  continue;
264  }
265 
266  freeaddrinfo(servinfo);
267 
268  remoteGateways[it->first] = sockfd;
269  }
270  }
271 }
272 
273 // launcher, regular function
274 void *obGateway(void *identity)
275 {
276  new ObGateway((Topology::partitionAddress *)identity);
277  while (1)
278  {
279  sleep(150000);
280  }
281  return NULL;
282 }