InfiniSQL  v0.1.2-alpha
Massive Scale Transaction Processing
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DeadlockMgr.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Mark Travis <mtravis15432+src@gmail.com>
3  * All rights reserved. No warranty, explicit or implicit, provided.
4  *
5  * This file is part of InfiniSQL(tm).
6 
7  * InfiniSQL is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 3
9  * as published by the Free Software Foundation.
10  *
11  * InfiniSQL is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with InfiniSQL. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
28 #include "DeadlockMgr.h"
29 #line 30 "DeadlockMgr.cc"
30 
32  myIdentity(*myIdentityArg)
33 {
34  delete myIdentityArg;
37 
38  class Message *msgrcv=NULL;
39  short waitval = 1000;
40 
41  while (1)
42  {
44  for (short n=0; n < MSGRECEIVEBATCHSIZE; n ++)
45  {
46  GETMSG(msgrcv, myIdentity.mbox, waitval)
47 
48  if (msgrcv==NULL)
49  {
50  waitval=1000;
51  break;
52  }
53 
54  class MessageDeadlock &msgrcvref =
55  *((class MessageDeadlock *)msgrcv);
56 
57  switch (msgrcv->messageStruct.topic)
58  {
59  case TOPIC_DEADLOCKNEW:
60  {
61  newDeadLockLists_s &listsRef = msgrcvref.nodes;
63  listsRef.locked;
65  listsRef.waiting;
66 
67  taCmd returninfo;
68  returninfo.addr = msgrcvref.messageStruct.sourceAddr;
69  returninfo.pendingcmdid =
71  returnMap[msgrcvref.deadlockStruct.transactionid] = returninfo;
72 
73  boost::unordered_set<string>::iterator it;
74 
75  for (it = listsRef.locked.begin();
76  it != listsRef.locked.end(); ++it)
77  {
78  locksTransactionMap[*it].insert(msgrcvref.deadlockStruct.transactionid);
79  }
80 
81  for (it = listsRef.waiting.begin();
82  it != listsRef.waiting.end(); ++it)
83  {
84  waitsTransactionMap[*it].insert(msgrcvref.deadlockStruct.transactionid);
85  }
86  }
87  break;
88 
90  {
91  string &changedeadlockRef = msgrcvref.deadlockNode;
92 
93  switch (msgrcvref.deadlockStruct.deadlockchange)
94  {
95  case ADDLOCKEDENTRY:
97  {
98  transactionLocksMap[msgrcvref.deadlockStruct.transactionid].insert(changedeadlockRef);
99  locksTransactionMap[changedeadlockRef].insert(msgrcvref.deadlockStruct.transactionid);
100  }
101  break;
102 
103  case ADDLOCKPENDINGENTRY:
104  if (transactionWaitsMap.count(msgrcvref.deadlockStruct.transactionid))
105  {
106  transactionWaitsMap[msgrcvref.deadlockStruct.transactionid].insert(changedeadlockRef);
107  waitsTransactionMap[changedeadlockRef].insert(msgrcvref.deadlockStruct.transactionid);
108  }
109  break;
110 
111  case REMOVELOCKEDENTRY:
112  if (transactionLocksMap.count(msgrcvref.deadlockStruct.transactionid))
113  {
114  transactionLocksMap[msgrcvref.deadlockStruct.transactionid].erase(changedeadlockRef);
115  locksTransactionMap[changedeadlockRef].erase(msgrcvref.deadlockStruct.transactionid);
116  }
117  break;
118 
120  if (transactionWaitsMap.count(msgrcvref.deadlockStruct.transactionid))
121  {
123  erase(changedeadlockRef);
124  waitsTransactionMap[changedeadlockRef].erase(msgrcvref.deadlockStruct.transactionid);
125  }
126 
127  break;
128 
130  if (transactionWaitsMap.count(msgrcvref.deadlockStruct.transactionid))
131  {
132  transactionWaitsMap[msgrcvref.deadlockStruct.transactionid].erase(changedeadlockRef);
133  transactionLocksMap[msgrcvref.deadlockStruct.transactionid].insert(changedeadlockRef);
134 
135  waitsTransactionMap[changedeadlockRef].erase(msgrcvref.deadlockStruct.transactionid);
136  locksTransactionMap[changedeadlockRef].insert(msgrcvref.deadlockStruct.transactionid);
137  }
138 
139  break;
140 
141  default:
142  fprintf(logfile, "anomaly: %li %s %i\n",
143  msgrcvref.deadlockStruct.deadlockchange, __FILE__,
144  __LINE__);
145  }
146  }
147  break;
148 
151  break;
152 
153  case TOPIC_TOPOLOGY:
155  break;
156 
157  default:
158  fprintf(logfile, "anomaly: %i %s %i\n",
159  msgrcv->messageStruct.topic, __FILE__, __LINE__);
160  }
161  }
162 
163  // do algorithm
164  // set the waitval to -1 so next msg_receive() will block indefinitely
165  // algorithm searches for all existing deadlocks, so there's no
166  // need to run it until the maps change
167  // also maybe swap maps with blank to shrink memory if the whole
168  // thing has been searched
169  algorithm();
170  waitval = 1000;
171  }
172 }
173 
175 {
176 }
177 
178 // launcher
179 void *deadlockMgr(void *identity)
180 {
181  new DeadlockMgr((Topology::partitionAddress *)identity);
182  while (1)
183  {
184  sleep(10);
185  }
186  return NULL;
187 }
188 
189 void DeadlockMgr::makeLockedItem(bool isrow, int64_t rowid, int64_t tableid,
190  int64_t engineid, int64_t domainid,
191  int64_t fieldid, long double floatentry,
192  std::string *stringentry,
193  std::string *returnstring)
194 {
195  string &returnstringRef = *returnstring;
196  size_t strlength = stringentry->length();
197  returnstringRef.reserve(sizeof(isrow) + (5 * sizeof(int64_t)) +
198  sizeof(floatentry) + sizeof(strlength) + strlength);
199 
200  // this is so identical entries but with garbage in irrelevant values don't
201  // show up as different
202  if (isrow==true)
203  {
204  fieldid=-1;
205  floatentry=0;
206  stringentry->clear();
207  }
208  else
209  {
210  rowid=-1;
211  engineid=-1;
212  }
213 
214  size_t pos = 0;
215  memcpy(&returnstringRef[pos], &isrow, sizeof(isrow));
216  pos += sizeof(isrow);
217  memcpy(&returnstringRef[pos], &rowid, sizeof(rowid));
218  pos += sizeof(rowid);
219  memcpy(&returnstringRef[pos], &tableid, sizeof(tableid));
220  pos += sizeof(tableid);
221  memcpy(&returnstringRef[pos], &engineid, sizeof(engineid));
222  pos += sizeof(engineid);
223  memcpy(&returnstringRef[pos], &domainid, sizeof(domainid));
224  pos += sizeof(domainid);
225  memcpy(&returnstringRef[pos], &fieldid, sizeof(fieldid));
226  pos += sizeof(fieldid);
227  memcpy(&returnstringRef[pos], &floatentry, sizeof(floatentry));
228  pos += sizeof(floatentry);
229  memcpy(&returnstringRef[pos], &strlength, sizeof(strlength));
230  pos += sizeof(strlength);
231  memcpy(&returnstringRef[pos], stringentry->c_str(), strlength);
232 }
233 
235 {
236  bool deadlockflag = true;
237 
238  // remove all deadlocks from existing data set
239  while (deadlockflag==true)
240  {
241  deadlockflag = false;
242 
243  boost::unordered_map< int64_t,
244  boost::unordered_set<string> >::iterator it;
245 
246  for (it = transactionWaitsMap.begin();
247  it != transactionWaitsMap.end(); ++it)
248  {
249 
250  if (walk(it->first)==true) // means a deadlock was found
251  {
252  deadlockflag=true;
253  }
254 
255  skipTransactionSet.clear();
256  skipItemSet.clear();
257  transactionGraphSet.clear();
258  }
259  }
260 }
261 
262 void DeadlockMgr::deadlock(int64_t transactionid)
263 {
264  if (!returnMap.count(transactionid))
265  {
266  return;
267  }
268 
269  class MessageDeadlock *msg = new class MessageDeadlock;
270  class MessageDeadlock &msgref = *msg;
272  msgref.deadlockStruct.transactionid = transactionid;
274  returnMap[transactionid].pendingcmdid;
275 
276  mboxes.toActor(myIdentity.address, returnMap[transactionid].addr, *msg);
277  removeTransaction(transactionid);
278 }
279 
280 // for walk, return true for deadlock, false for no deadlock
281 // walk through items that this transaction is waiting for
282 bool DeadlockMgr::walk(int64_t transactionid)
283 {
284  if (transactionGraphSet.count(transactionid))
285  {
286  deadlock(transactionid);
287  return true;
288  }
289 
290  if (skipTransactionSet.count(transactionid))
291  {
292  return false;
293  }
294 
295  skipTransactionSet.insert(transactionid);
296 
297  // check for items this transaction is waiting on (though this should
298  // always be positive, or the transactionid should be removed by other means)
299  if (!transactionWaitsMap.count(transactionid)) // end of path
300  {
301  return false;
302  }
303 
304  transactionGraphSet.insert(transactionid);
305 
306  boost::unordered_set<string>::iterator it;
307 
308  for (it = transactionWaitsMap[transactionid].begin();
309  it != transactionWaitsMap[transactionid].end(); ++it)
310  {
311  if (walk(it)==true) // deadlock happened at some point!
312  {
313  return true;
314  }
315  }
316 
317  return false;
318 }
319 
320 // walk through transactions that hold this item
321 bool DeadlockMgr::walk(boost::unordered_set<string>::iterator itemIt)
322 {
323  const string &itemRef = (string)(*itemIt);
324 
325  if (skipItemSet.count(itemRef))
326  {
327  return false;
328  }
329 
330  skipItemSet.insert(itemRef);
331 
332  // check for transactions locking this item
333  if (!locksTransactionMap.count(itemRef)) // positively no deadlock
334  {
335  return false;
336  }
337 
338  boost::unordered_set<int64_t>::iterator it;
339 
340  for (it = locksTransactionMap[itemRef].begin();
341  it != locksTransactionMap[itemRef].end(); ++it)
342  {
343  if (walk(*it)==true) // deadlock happened!
344  {
345  return true;
346  }
347  }
348 
349  return false;
350 }
351 
352 void DeadlockMgr::removeTransaction(int64_t transactionid)
353 {
354  // walk through all the locks that the transaction holds
355  // and those which it waits on, and remove the transactionid
356  // entry from those items
357  boost::unordered_set<string>::iterator it;
358 
359  for (it = transactionLocksMap[transactionid].begin();
360  it != transactionLocksMap[transactionid].end();
361  ++it)
362  {
363  locksTransactionMap[*it].erase(transactionid);
364 
365  if (locksTransactionMap[*it].empty()==true)
366  {
367  locksTransactionMap.erase(*it);
368  }
369  }
370 
371  for (it = transactionWaitsMap[transactionid].begin();
372  it != transactionWaitsMap[transactionid].end();
373  ++it)
374  {
375  waitsTransactionMap[*it].erase(transactionid);
376 
377  if (waitsTransactionMap[*it].empty()==true)
378  {
379  locksTransactionMap.erase(*it);
380  }
381  }
382 
383  transactionLocksMap.erase(transactionid);
384  transactionWaitsMap.erase(transactionid);
385  returnMap.erase(transactionid);
386 }