InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
main.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
28 #include <sys/un.h>
29 #include "version.h"
30 #include "gch.h"
31 #include "Topology.h"
32 #line 33 "main.cc"
33 
34 void *topologyMgr(void *);
35 
36 FILE *logfile;
38 std::string zmqsocket;
40 pthread_mutex_t nodeTopologyMutex;
41 pthread_mutex_t connectionsMutex;
42 void *zmqcontext;
43 std::string storedprocprefix = "InfiniSQL_";
44 std::vector<class MboxProducer *> socketAffinity;
45 std::vector<listenertype_e> listenerTypes;
46 
47 int main(int argc, char **argv)
48 {
49  zmqcontext = zmq_ctx_new();
50  zmq_ctx_set(zmqcontext, ZMQ_IO_THREADS, 2);
51  setlinebuf(stdout);
52 
53  string logfilename;
54  int c;
55 
56  while ((c = getopt(argc, argv, "l:m:n:hv")) != -1)
57  {
58  switch (c)
59  {
60  case 'm':
61  zmqsocket.assign("tcp://");
62  zmqsocket.append(optarg, strlen(optarg));
63  break;
64 
65  case 'l':
66  logfilename.assign(optarg, strlen(optarg));
67  break;
68 
69  case 'n':
70  nodeTopology.nodeid = atol(optarg);
71  break;
72 
73  case 'h':
74  printf("-m <management ip:port> -n <nodeid> -l <log path/file> -v\n");
75  exit(0);
76  break;
77 
78  case 'v':
79  printf("%s\n", version);
80  exit(0);
81 
82  default:
83  ;
84  }
85  }
86 
87  if (!logfilename.size())
88  {
89  logfilename.assign("/tmp/infinisqld.log");
90  }
91 
92  logfile = fopen(logfilename.c_str(), "a");
93 
94  if (logfile==NULL)
95  {
96  printf("%s %i cannot open logfile %s errno %i\n", __FILE__, __LINE__,
97  logfilename.c_str(), errno);
98  exit(1);
99  }
100 
101  setlinebuf(logfile);
102 
103  pthread_mutexattr_t attr;
104  attr.__align = PTHREAD_MUTEX_ADAPTIVE_NP;
105  pthread_mutex_init(&nodeTopologyMutex, &attr);
106  pthread_mutex_init(&connectionsMutex, &attr);
107  pthread_t topologyMgrThread;
109  arg->type = ACTOR_TOPOLOGYMGR;
110  arg->mbox = new class Mbox;
111  arg->address.nodeid = 1;
112  arg->address.actorid = 1;
113  arg->instance = -1;
114 
115  cfgs.compressgw=true;
116 
117  int rv=pthread_create(&topologyMgrThread, NULL, topologyMgr, arg);
118  if (rv)
119  {
120  printf("%s %i pthread_create rv %i\n", __FILE__, __LINE__, rv);
121  exit(1);
122  }
123 
124  while (1)
125  {
126  sleep(10);
127  }
128 
129  return 0;
130 }
131 
132 // global functions
133 void msgpack2Vector(vector<string> *resultvector, char *payload, int64_t length)
134 {
135  msgpack::unpacked msg;
136  msgpack::unpack(&msg, payload, length);
137  msgpack::object obj = msg.get();
138  obj.convert(resultvector);
139 }
140 
141 void debug(char *description, int line, char *file)
142 {
143  fprintf(logfile, "DEBUG %i %s %s\n", line, file, description);
144 }
145 
146 char setdeleteflag(char *c)
147 {
148  return *c |= 1 << DELETEFLAG;
149 }
150 
151 bool getdeleteflag(char c)
152 {
153  return c & 1 << DELETEFLAG;
154 }
155 
156 char cleardeleteflag(char *c)
157 {
158  return *c &= ~(1 << DELETEFLAG);
159 }
160 
161 char setinsertflag(char *c)
162 {
163  return *c |= 1 << INSERTFLAG;
164 }
165 
166 bool getinsertflag(char c)
167 {
168  return c & 1 << INSERTFLAG;
169 }
170 
171 char clearinsertflag(char *c)
172 {
173  return *c &= ~(1 << INSERTFLAG);
174 }
175 
176 bool setwritelock(char *c)
177 {
178  if (*c & 1 << LOCKEDFLAG) // already locked
179  {
180  return false;
181  }
182 
183  *c |= 1 << LOCKEDFLAG;
184  *c |= 1 << LOCKTYPEFLAG;
185  return true;
186 }
187 
188 bool setreadlock(char *c)
189 {
190  if (*c & 1 << LOCKEDFLAG) // already locked
191  {
192  return false;
193  }
194 
195  *c |= 1 << LOCKEDFLAG;
196  *c &= ~(1 << LOCKTYPEFLAG);
197  return true;
198 }
199 
200 char clearlockedflag(char *c)
201 {
202  return *c &= ~(1 << LOCKEDFLAG);
203 }
204 
206 {
207  if (!(c & 1 << LOCKEDFLAG))
208  {
209  return NOLOCK;
210  }
211 
212  if (c & 1 << LOCKTYPEFLAG)
213  {
214  return WRITELOCK;
215  }
216  else
217  {
218  return READLOCK;
219  }
220 }
221 
223 {
224  return *c &= ~(1 << REPLACEDELETEFLAG);
225 }
226 
228 {
229  return c & 1 << REPLACEDELETEFLAG;
230 }
231 
232 char setreplacedeleteflag(char *c)
233 {
234  return *c |= 1 << REPLACEDELETEFLAG;
235 }
236 
237 int16_t getPartitionid(fieldValue_s &fieldVal, fieldtype_e type,
238  int16_t numpartitions)
239 {
240  switch (type)
241  {
242  case INT:
243  return SpookyHash::Hash64((void *) &fieldVal.value.integer,
244  sizeof(fieldVal.value.integer), 0) %
245  numpartitions;
246  break;
247 
248  case UINT:
249  return SpookyHash::Hash64((void *) &fieldVal.value.uinteger,
250  sizeof(fieldVal.value.uinteger), 0) %
251  numpartitions;
252  break;
253 
254  case BOOL:
255  return SpookyHash::Hash64((void *) &fieldVal.value.boolean,
256  sizeof(fieldVal.value.boolean), 0) %
257  numpartitions;
258  break;
259 
260  case FLOAT:
261  return SpookyHash::Hash64((void *) &fieldVal.value.floating,
262  sizeof(fieldVal.value.floating), 0) %
263  numpartitions;
264  break;
265 
266  case CHAR:
267  return SpookyHash::Hash64((void *) &fieldVal.value.character,
268  sizeof(fieldVal.value.character), 0) %
269  numpartitions;
270  break;
271 
272  case CHARX:
273  return SpookyHash::Hash64((void *) fieldVal.str.c_str(),
274  fieldVal.str.length(), 0) % numpartitions;
275  break;
276 
277  case VARCHAR:
278  return SpookyHash::Hash64((void *) fieldVal.str.c_str(),
279  fieldVal.str.length(), 0) % numpartitions;
280  break;
281 
282  default:
283  printf("%s %i anomaly fieldtype %i\n", __FILE__, __LINE__, type);
284  return -1;
285  }
286 }
287 
288 // no escape chars specified as yet
289 void like2Regex(string &likeStr)
290 {
291  size_t pos;
292 
293  while ((pos = likeStr.find('_', 0)) != string::npos)
294  {
295  likeStr[pos]='.';
296  }
297 
298  while ((pos = likeStr.find('%', 0)) != string::npos)
299  {
300  likeStr[pos]='*';
301  likeStr.insert(pos, 1, '.');
302  }
303 }
304 
305 bool compareFields(fieldtype_e type, const fieldValue_s &val1,
306  const fieldValue_s &val2)
307 {
308  if (val1.isnull==true && val2.isnull==true)
309  {
310  return true;
311  }
312  else if (val1.isnull==true || val2.isnull==true)
313  {
314  return false;
315  }
316 
317  switch (type)
318  {
319  case INT:
320  if (val1.value.integer==val2.value.integer)
321  {
322  return true;
323  }
324 
325  break;
326 
327  case UINT:
328  if (val1.value.uinteger==val2.value.uinteger)
329  {
330  return true;
331  }
332 
333  break;
334 
335  case BOOL:
336  if (val1.value.boolean==val2.value.boolean)
337  {
338  return true;
339  }
340 
341  break;
342 
343  case FLOAT:
344  if (val1.value.floating==val2.value.floating)
345  {
346  return true;
347  }
348 
349  break;
350 
351  case CHAR:
352  if (val1.value.character==val2.value.character)
353  {
354  return true;
355  }
356 
357  break;
358 
359  case CHARX:
360  if (val1.str.compare(val2.str)==0)
361  {
362  return true;
363  }
364 
365  break;
366 
367  case VARCHAR:
368  if (val1.str.compare(val2.str)==0)
369  {
370  return true;
371  }
372 
373  break;
374 
375  default:
376  printf("%s %i anomaly %i\n", __FILE__, __LINE__, type);
377  }
378 
379  return false;
380 }
381 
382 void trimspace(string &input)
383 {
384  size_t last=input.find_last_not_of(' ');
385 
386  if (last != string::npos)
387  {
388  input.erase(last+1);
389  }
390  else
391  {
392  input.clear();
393  }
394 }
395 
396 void stagedRow2ReturnRow(const stagedRow_s &stagedRow, returnRow_s &returnRow)
397 {
399  returnRow.locktype=stagedRow.locktype;
400 
401  switch (stagedRow.cmd)
402  {
403  case NOCOMMAND:
404  returnRow.rowid=stagedRow.originalrowid;
405  returnRow.row=stagedRow.originalRow;
406  break;
407 
408  case INSERT:
409  returnRow.rowid=stagedRow.newrowid;
410  returnRow.row=stagedRow.newRow;
411  break;
412 
413  case UPDATE:
414  returnRow.rowid=stagedRow.newrowid;
415  returnRow.row=stagedRow.newRow;
416  break;
417 
418  default:
419  printf("%s %i anomaly %i\n", __FILE__, __LINE__, stagedRow.cmd);
420  returnRow=returnRow_s();
421  }
422 }
423 
424 void setprio()
425 {
426  struct sched_param params;
427  params.sched_priority=RTPRIO;
428  int rv=pthread_setschedparam(pthread_self(), SCHED_FIFO, &params);
429  if (rv != 0)
430  {
431  fprintf(logfile, "%s %i some problem setting priority %i for tid %li error %i\n", __FILE__, __LINE__, RTPRIO, pthread_self(), rv);
432  }
433 }
434