Dash Core Source Documentation (0.16.0.1)

Find detailed information regarding the Dash Core source code.

bls_worker.cpp
Go to the documentation of this file.
1 // Copyright (c) 2018-2019 The Dash Core developers
2 // Distributed under the MIT/X11 software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <bls/bls_worker.h>
6 #include <hash.h>
7 #include <serialize.h>
8 
9 #include <util.h>
10 
11 template <typename T>
12 bool VerifyVectorHelper(const std::vector<T>& vec, size_t start, size_t count)
13 {
14  if (start == 0 && count == 0) {
15  count = vec.size();
16  }
17  std::set<uint256> set;
18  for (size_t i = start; i < start + count; i++) {
19  if (!vec[i].IsValid())
20  return false;
21  // check duplicates
22  if (!set.emplace(vec[i].GetHash()).second) {
23  return false;
24  }
25  }
26  return true;
27 }
28 
29 // Creates a doneCallback and a future. The doneCallback simply finishes the future
30 template <typename T>
31 std::pair<std::function<void(const T&)>, std::future<T> > BuildFutureDoneCallback()
32 {
33  auto p = std::make_shared<std::promise<T> >();
34  std::function<void(const T&)> f = [p](const T& v) {
35  p->set_value(v);
36  };
37  return std::make_pair(std::move(f), p->get_future());
38 }
39 template <typename T>
40 std::pair<std::function<void(T)>, std::future<T> > BuildFutureDoneCallback2()
41 {
42  auto p = std::make_shared<std::promise<T> >();
43  std::function<void(const T&)> f = [p](T v) {
44  p->set_value(v);
45  };
46  return std::make_pair(std::move(f), p->get_future());
47 }
48 
49 
51 
53 {
54 }
55 
57 {
58  Stop();
59 }
60 
62 {
63  int workerCount = std::thread::hardware_concurrency() / 2;
64  workerCount = std::max(std::min(1, workerCount), 4);
65  workerPool.resize(workerCount);
66  RenameThreadPool(workerPool, "dash-bls-worker");
67 }
68 
70 {
72  workerPool.stop(true);
73 }
74 
75 bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& ids, BLSVerificationVectorPtr& vvecRet, BLSSecretKeyVector& skShares)
76 {
77  BLSSecretKeyVectorPtr svec = std::make_shared<BLSSecretKeyVector>((size_t)quorumThreshold);
78  vvecRet = std::make_shared<BLSVerificationVector>((size_t)quorumThreshold);
79  skShares.resize(ids.size());
80 
81  for (int i = 0; i < quorumThreshold; i++) {
82  (*svec)[i].MakeNewKey();
83  }
84  std::list<std::future<bool> > futures;
85  size_t batchSize = 8;
86 
87  for (size_t i = 0; i < quorumThreshold; i += batchSize) {
88  size_t start = i;
89  size_t count = std::min(batchSize, quorumThreshold - start);
90  auto f = [&, start, count](int threadId) {
91  for (size_t j = start; j < start + count; j++) {
92  (*vvecRet)[j] = (*svec)[j].GetPublicKey();
93  }
94  return true;
95  };
96  futures.emplace_back(workerPool.push(f));
97  }
98 
99  for (size_t i = 0; i < ids.size(); i += batchSize) {
100  size_t start = i;
101  size_t count = std::min(batchSize, ids.size() - start);
102  auto f = [&, start, count](int threadId) {
103  for (size_t j = start; j < start + count; j++) {
104  if (!skShares[j].SecretKeyShare(*svec, ids[j])) {
105  return false;
106  }
107  }
108  return true;
109  };
110  futures.emplace_back(workerPool.push(f));
111  }
112  bool success = true;
113  for (auto& f : futures) {
114  if (!f.get()) {
115  success = false;
116  }
117  }
118  return success;
119 }
120 
121 // aggregates a single vector of BLS objects in parallel
122 // the input vector is split into batches and each batch is aggregated in parallel
123 // when enough batches are finished to form a new batch, the new batch is queued for further parallel aggregation
124 // when no more batches can be created from finished batch results, the final aggregated is created and the doneCallback
125 // called.
126 // The Aggregator object needs to be created on the heap and it will delete itself after calling the doneCallback
127 // The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
128 // input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
129 template <typename T>
130 struct Aggregator {
131  typedef T ElementType;
132 
133  size_t batchSize{16};
134  std::shared_ptr<std::vector<const T*> > inputVec;
135 
136  bool parallel;
138 
139  std::mutex m;
140  // items in the queue are all intermediate aggregation results of finished batches.
141  // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
142  boost::lockfree::queue<T*> aggQueue;
143  std::atomic<size_t> aggQueueSize{0};
144 
145  // keeps track of currently queued/in-progress batches. If it reaches 0, we are done
146  std::atomic<size_t> waitCount{0};
147 
148  typedef std::function<void(const T& agg)> DoneCallback;
150 
151  // TP can either be a pointer or a reference
152  template <typename TP>
153  Aggregator(const std::vector<TP>& _inputVec,
154  size_t start, size_t count,
155  bool _parallel,
156  ctpl::thread_pool& _workerPool,
157  DoneCallback _doneCallback) :
158  workerPool(_workerPool),
159  parallel(_parallel),
160  aggQueue(0),
161  doneCallback(std::move(_doneCallback))
162  {
163  inputVec = std::make_shared<std::vector<const T*> >(count);
164  for (size_t i = 0; i < count; i++) {
165  (*inputVec)[i] = pointer(_inputVec[start + i]);
166  }
167  }
168 
169  const T* pointer(const T& v) { return &v; }
170  const T* pointer(const T* v) { return v; }
171 
172  // Starts aggregation.
173  // If parallel=true, then this will return fast, otherwise this will block until aggregation is done
174  void Start()
175  {
176  size_t batchCount = (inputVec->size() + batchSize - 1) / batchSize;
177 
178  if (!parallel) {
179  if (inputVec->size() == 1) {
180  doneCallback(*(*inputVec)[0]);
181  } else {
183  }
184  delete this;
185  return;
186  }
187 
188  if (batchCount == 1) {
189  // just a single batch of work, take a shortcut.
190  PushWork([this](int threadId) {
191  if (inputVec->size() == 1) {
192  doneCallback(*(*inputVec)[0]);
193  } else {
195  }
196  delete this;
197  });
198  return;
199  }
200 
201  // increment wait counter as otherwise the first finished async aggregation might signal that we're done
202  IncWait();
203  for (size_t i = 0; i < batchCount; i++) {
204  size_t start = i * batchSize;
205  size_t count = std::min(batchSize, inputVec->size() - start);
207  }
208  // this will decrement the wait counter and in most cases NOT finish, as async work is still in progress
209  CheckDone();
210  }
211 
212  void IncWait()
213  {
214  ++waitCount;
215  }
216 
217  void CheckDone()
218  {
219  if (--waitCount == 0) {
220  Finish();
221  }
222  }
223 
224  void Finish()
225  {
226  // All async work is done, but we might have items in the aggQueue which are the results of the async
227  // work. This is the case when these did not add up to a new batch. In this case, we have to aggregate
228  // the items into the final result
229 
230  std::vector<T*> rem(aggQueueSize);
231  for (size_t i = 0; i < rem.size(); i++) {
232  T* p = nullptr;
233  bool s = aggQueue.pop(p);
234  assert(s);
235  rem[i] = p;
236  }
237 
238  T r;
239  if (rem.size() == 1) {
240  // just one intermediate result, which is actually the final result
241  r = *rem[0];
242  } else {
243  // multiple intermediate results left which did not add up to a new batch. aggregate them now
244  r = SyncAggregate(rem, 0, rem.size());
245  }
246 
247  // all items which are left in the queue are intermediate results, so we must delete them
248  for (size_t i = 0; i < rem.size(); i++) {
249  delete rem[i];
250  }
251  doneCallback(r);
252 
253  delete this;
254  }
255 
256  void AsyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
257  {
258  IncWait();
259  PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del));
260  }
261 
262  void SyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
263  {
264  // aggregate vec and push the intermediate result onto the work queue
265  PushAggQueue(SyncAggregate(*vec, start, count));
266  if (del) {
267  for (size_t i = 0; i < count; i++) {
268  delete (*vec)[start + i];
269  }
270  }
271  CheckDone();
272  }
273 
274  void PushAggQueue(const T& v)
275  {
276  aggQueue.push(new T(v));
277 
278  if (++aggQueueSize >= batchSize) {
279  // we've collected enough intermediate results to form a new batch.
280  std::shared_ptr<std::vector<const T*> > newBatch;
281  {
282  std::unique_lock<std::mutex> l(m);
283  if (aggQueueSize < batchSize) {
284  // some other worker thread grabbed this batch
285  return;
286  }
287  newBatch = std::make_shared<std::vector<const T*> >(batchSize);
288  // collect items for new batch
289  for (size_t i = 0; i < batchSize; i++) {
290  T* p = nullptr;
291  bool s = aggQueue.pop(p);
292  assert(s);
293  (*newBatch)[i] = p;
294  }
296  }
297 
298  // push new batch to work queue. del=true this time as these items are intermediate results and need to be deleted
299  // after aggregation is done
300  AsyncAggregateAndPushAggQueue(newBatch, 0, newBatch->size(), true);
301  }
302  }
303 
304  template <typename TP>
305  T SyncAggregate(const std::vector<TP>& vec, size_t start, size_t count)
306  {
307  T result = *vec[start];
308  for (size_t j = 1; j < count; j++) {
309  result.AggregateInsecure(*vec[start + j]);
310  }
311  return result;
312  }
313 
314  template <typename Callable>
315  void PushWork(Callable&& f)
316  {
317  workerPool.push(f);
318  }
319 };
320 
321 // Aggregates multiple input vectors into a single output vector
322 // Inputs are in the following form:
323 // [
324 // [a1, b1, c1, d1],
325 // [a2, b2, c2, d2],
326 // [a3, b3, c3, d3],
327 // [a4, b4, c4, d4],
328 // ]
329 // The result is in the following form:
330 // [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
331 // Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
332 template <typename T>
335  typedef std::vector<T> VectorType;
336  typedef std::shared_ptr<VectorType> VectorPtrType;
337  typedef std::vector<VectorPtrType> VectorVectorType;
338  typedef std::function<void(const VectorPtrType& agg)> DoneCallback;
340 
342  size_t start;
343  size_t count;
344  bool parallel;
346 
347  std::atomic<size_t> doneCount;
348 
350  size_t vecSize;
351 
353  size_t _start, size_t _count,
354  bool _parallel, ctpl::thread_pool& _workerPool,
355  DoneCallback _doneCallback) :
356  vecs(_vecs),
357  parallel(_parallel),
358  start(_start),
359  count(_count),
360  workerPool(_workerPool),
361  doneCallback(std::move(_doneCallback))
362  {
363  assert(!vecs.empty());
364  vecSize = vecs[0]->size();
365  result = std::make_shared<VectorType>(vecSize);
366  doneCount = 0;
367  }
368 
369  void Start()
370  {
371  std::vector<AggregatorType*> aggregators;
372  for (size_t i = 0; i < vecSize; i++) {
373  std::vector<const T*> tmp(count);
374  for (size_t j = 0; j < count; j++) {
375  tmp[j] = &(*vecs[start + j])[i];
376  }
377 
378  auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i));
379  // we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop
380  aggregators.emplace_back(aggregator);
381  }
382  for (auto agg : aggregators) {
383  agg->Start();
384  }
385  }
386 
387  void CheckDone(const T& agg, size_t idx)
388  {
389  (*result)[idx] = agg;
390  if (++doneCount == vecSize) {
392  delete this;
393  }
394  }
395 };
396 
397 // See comment of AsyncVerifyContributionShares for a description on what this does
398 // Same rules as in Aggregator apply for the inputs
400  struct BatchState {
401  size_t start;
402  size_t count;
403 
406 
407  // starts with 0 and is incremented if either vvec or skShare aggregation finishs. If it reaches 2, we know
408  // that aggregation for this batch is fully done. We can then start verification.
409  std::unique_ptr<std::atomic<int> > aggDone;
410 
411  // we can't directly update a vector<bool> in parallel
412  // as vector<bool> is not thread safe (uses bitsets internally)
413  // so we must use vector<char> temporarely and concatenate/convert
414  // each batch result into a final vector<bool>
415  std::vector<char> verifyResults;
416  };
417 
419  const std::vector<BLSVerificationVectorPtr>& vvecs;
421  size_t batchSize;
422  bool parallel;
424 
426 
427  size_t batchCount;
428  size_t verifyCount;
429 
430  std::vector<BatchState> batchStates;
431  std::atomic<size_t> verifyDoneCount{0};
432  std::function<void(const std::vector<bool>&)> doneCallback;
433 
434  ContributionVerifier(const CBLSId& _forId, const std::vector<BLSVerificationVectorPtr>& _vvecs,
435  const BLSSecretKeyVector& _skShares, size_t _batchSize,
436  bool _parallel, bool _aggregated, ctpl::thread_pool& _workerPool,
437  std::function<void(const std::vector<bool>&)> _doneCallback) :
438  forId(_forId),
439  vvecs(_vvecs),
440  skShares(_skShares),
441  batchSize(_batchSize),
442  parallel(_parallel),
443  aggregated(_aggregated),
444  workerPool(_workerPool),
445  doneCallback(std::move(_doneCallback))
446  {
447  }
448 
449  void Start()
450  {
451  if (!aggregated) {
452  // treat all inputs as one large batch
453  batchSize = vvecs.size();
454  batchCount = 1;
455  } else {
456  batchCount = (vvecs.size() + batchSize - 1) / batchSize;
457  }
458  verifyCount = vvecs.size();
459 
460  batchStates.resize(batchCount);
461  for (size_t i = 0; i < batchCount; i++) {
462  auto& batchState = batchStates[i];
463 
464  batchState.aggDone.reset(new std::atomic<int>(0));
465  batchState.start = i * batchSize;
466  batchState.count = std::min(batchSize, vvecs.size() - batchState.start);
467  batchState.verifyResults.assign(batchState.count, 0);
468  }
469 
470  if (aggregated) {
471  size_t batchCount2 = batchCount; // 'this' might get deleted while we're still looping
472  for (size_t i = 0; i < batchCount2; i++) {
473  AsyncAggregate(i);
474  }
475  } else {
476  // treat all inputs as a single batch and verify one-by-one
478  }
479  }
480 
481  void Finish()
482  {
483  size_t batchIdx = 0;
484  std::vector<bool> result(vvecs.size());
485  for (size_t i = 0; i < vvecs.size(); i += batchSize) {
486  auto& batchState = batchStates[batchIdx++];
487  for (size_t j = 0; j < batchState.count; j++) {
488  result[batchState.start + j] = batchState.verifyResults[j] != 0;
489  }
490  }
491  doneCallback(result);
492  delete this;
493  }
494 
495  void AsyncAggregate(size_t batchIdx)
496  {
497  auto& batchState = batchStates[batchIdx];
498 
499  // aggregate vvecs and skShares of batch in parallel
500  auto vvecAgg = new VectorAggregator<CBLSPublicKey>(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1));
501  auto skShareAgg = new Aggregator<CBLSSecretKey>(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1));
502 
503  vvecAgg->Start();
504  skShareAgg->Start();
505  }
506 
507  void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr& vvec)
508  {
509  auto& batchState = batchStates[batchIdx];
510  batchState.vvec = vvec;
511  if (++(*batchState.aggDone) == 2) {
512  HandleAggDone(batchIdx);
513  }
514  }
515  void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey& skShare)
516  {
517  auto& batchState = batchStates[batchIdx];
518  batchState.skShare = skShare;
519  if (++(*batchState.aggDone) == 2) {
520  HandleAggDone(batchIdx);
521  }
522  }
523 
524  void HandleVerifyDone(size_t batchIdx, size_t count)
525  {
526  size_t c = verifyDoneCount += count;
527  if (c == verifyCount) {
528  Finish();
529  }
530  }
531 
532  void HandleAggDone(size_t batchIdx)
533  {
534  auto& batchState = batchStates[batchIdx];
535 
536  if (batchState.vvec == nullptr || batchState.vvec->empty() || !batchState.skShare.IsValid()) {
537  // something went wrong while aggregating and there is nothing we can do now except mark the whole batch as failed
538  // this can only happen if inputs were invalid in some way
539  batchState.verifyResults.assign(batchState.count, 0);
540  HandleVerifyDone(batchIdx, batchState.count);
541  return;
542  }
543 
544  AsyncAggregatedVerifyBatch(batchIdx);
545  }
546 
547  void AsyncAggregatedVerifyBatch(size_t batchIdx)
548  {
549  auto f = [this, batchIdx](int threadId) {
550  auto& batchState = batchStates[batchIdx];
551  bool result = Verify(batchState.vvec, batchState.skShare);
552  if (result) {
553  // whole batch is valid
554  batchState.verifyResults.assign(batchState.count, 1);
555  HandleVerifyDone(batchIdx, batchState.count);
556  } else {
557  // at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized)
558  AsyncVerifyBatchOneByOne(batchIdx);
559  }
560  };
561  PushOrDoWork(std::move(f));
562  }
563 
564  void AsyncVerifyBatchOneByOne(size_t batchIdx)
565  {
566  size_t count = batchStates[batchIdx].count;
567  batchStates[batchIdx].verifyResults.assign(count, 0);
568  for (size_t i = 0; i < count; i++) {
569  auto f = [this, i, batchIdx](int threadId) {
570  auto& batchState = batchStates[batchIdx];
571  batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]);
572  HandleVerifyDone(batchIdx, 1);
573  };
574  PushOrDoWork(std::move(f));
575  }
576  }
577 
578  bool Verify(const BLSVerificationVectorPtr& vvec, const CBLSSecretKey& skShare)
579  {
580  CBLSPublicKey pk1;
581  if (!pk1.PublicKeyShare(*vvec, forId)) {
582  return false;
583  }
584 
585  CBLSPublicKey pk2 = skShare.GetPublicKey();
586  return pk1 == pk2;
587  }
588 
589  template <typename Callable>
590  void PushOrDoWork(Callable&& f)
591  {
592  if (parallel) {
593  workerPool.push(std::move(f));
594  } else {
595  f(0);
596  }
597  }
598 };
599 
600 void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
601  size_t start, size_t count, bool parallel,
602  std::function<void(const BLSVerificationVectorPtr&)> doneCallback)
603 {
604  if (start == 0 && count == 0) {
605  count = vvecs.size();
606  }
607  if (vvecs.empty() || count == 0 || start > vvecs.size() || start + count > vvecs.size()) {
608  doneCallback(nullptr);
609  return;
610  }
611  if (!VerifyVerificationVectors(vvecs, start, count)) {
612  doneCallback(nullptr);
613  return;
614  }
615 
616  auto agg = new VectorAggregator<CBLSPublicKey>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
617  agg->Start();
618 }
619 
620 std::future<BLSVerificationVectorPtr> CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
621  size_t start, size_t count, bool parallel)
622 {
623  auto p = BuildFutureDoneCallback<BLSVerificationVectorPtr>();
624  AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel, std::move(p.first));
625  return std::move(p.second);
626 }
627 
628 BLSVerificationVectorPtr CBLSWorker::BuildQuorumVerificationVector(const std::vector<BLSVerificationVectorPtr>& vvecs,
629  size_t start, size_t count, bool parallel)
630 {
631  return AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel).get();
632 }
633 
634 template <typename T>
636  const std::vector<T>& vec, size_t start, size_t count, bool parallel,
637  std::function<void(const T&)> doneCallback)
638 {
639  if (start == 0 && count == 0) {
640  count = vec.size();
641  }
642  if (vec.empty() || count == 0 || start > vec.size() || start + count > vec.size()) {
643  doneCallback(T());
644  return;
645  }
646  if (!VerifyVectorHelper(vec, start, count)) {
647  doneCallback(T());
648  return;
649  }
650 
651  auto agg = new Aggregator<T>(vec, start, count, parallel, workerPool, std::move(doneCallback));
652  agg->Start();
653 }
654 
656  size_t start, size_t count, bool parallel,
657  std::function<void(const CBLSSecretKey&)> doneCallback)
658 {
659  AsyncAggregateHelper(workerPool, secKeys, start, count, parallel, doneCallback);
660 }
661 
662 std::future<CBLSSecretKey> CBLSWorker::AsyncAggregateSecretKeys(const BLSSecretKeyVector& secKeys,
663  size_t start, size_t count, bool parallel)
664 {
665  auto p = BuildFutureDoneCallback<CBLSSecretKey>();
666  AsyncAggregateSecretKeys(secKeys, start, count, parallel, std::move(p.first));
667  return std::move(p.second);
668 }
669 
671  size_t start, size_t count, bool parallel)
672 {
673  return AsyncAggregateSecretKeys(secKeys, start, count, parallel).get();
674 }
675 
677  size_t start, size_t count, bool parallel,
678  std::function<void(const CBLSPublicKey&)> doneCallback)
679 {
680  AsyncAggregateHelper(workerPool, pubKeys, start, count, parallel, doneCallback);
681 }
682 
683 std::future<CBLSPublicKey> CBLSWorker::AsyncAggregatePublicKeys(const BLSPublicKeyVector& pubKeys,
684  size_t start, size_t count, bool parallel)
685 {
686  auto p = BuildFutureDoneCallback<CBLSPublicKey>();
687  AsyncAggregatePublicKeys(pubKeys, start, count, parallel, std::move(p.first));
688  return std::move(p.second);
689 }
690 
692  size_t start, size_t count, bool parallel)
693 {
694  return AsyncAggregatePublicKeys(pubKeys, start, count, parallel).get();
695 }
696 
698  size_t start, size_t count, bool parallel,
699  std::function<void(const CBLSSignature&)> doneCallback)
700 {
701  AsyncAggregateHelper(workerPool, sigs, start, count, parallel, doneCallback);
702 }
703 
704 std::future<CBLSSignature> CBLSWorker::AsyncAggregateSigs(const BLSSignatureVector& sigs,
705  size_t start, size_t count, bool parallel)
706 {
707  auto p = BuildFutureDoneCallback<CBLSSignature>();
708  AsyncAggregateSigs(sigs, start, count, parallel, std::move(p.first));
709  return std::move(p.second);
710 }
711 
713  size_t start, size_t count, bool parallel)
714 {
715  return AsyncAggregateSigs(sigs, start, count, parallel).get();
716 }
717 
718 
720 {
721  CBLSPublicKey pkShare;
722  pkShare.PublicKeyShare(*vvec, id);
723  return pkShare;
724 }
725 
726 void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
727  bool parallel, bool aggregated, std::function<void(const std::vector<bool>&)> doneCallback)
728 {
729  if (!forId.IsValid() || !VerifyVerificationVectors(vvecs)) {
730  std::vector<bool> result;
731  result.assign(vvecs.size(), false);
732  doneCallback(result);
733  return;
734  }
735 
736  auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
737  verifier->Start();
738 }
739 
740 std::future<std::vector<bool> > CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
741  bool parallel, bool aggregated)
742 {
743  auto p = BuildFutureDoneCallback<std::vector<bool> >();
744  AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated, std::move(p.first));
745  return std::move(p.second);
746 }
747 
748 std::vector<bool> CBLSWorker::VerifyContributionShares(const CBLSId& forId, const std::vector<BLSVerificationVectorPtr>& vvecs, const BLSSecretKeyVector& skShares,
749  bool parallel, bool aggregated)
750 {
751  return AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated).get();
752 }
753 
754 std::future<bool> CBLSWorker::AsyncVerifyContributionShare(const CBLSId& forId,
755  const BLSVerificationVectorPtr& vvec,
756  const CBLSSecretKey& skContribution)
757 {
758  if (!forId.IsValid() || !VerifyVerificationVector(*vvec)) {
759  auto p = BuildFutureDoneCallback<bool>();
760  p.first(false);
761  return std::move(p.second);
762  }
763 
764  auto f = [this, &forId, &vvec, &skContribution](int threadId) {
765  CBLSPublicKey pk1;
766  if (!pk1.PublicKeyShare(*vvec, forId)) {
767  return false;
768  }
769 
770  CBLSPublicKey pk2 = skContribution.GetPublicKey();
771  return pk1 == pk2;
772  };
773  return workerPool.push(f);
774 }
775 
777  const CBLSSecretKey& skContribution)
778 {
779  CBLSPublicKey pk1;
780  if (!pk1.PublicKeyShare(*vvec, forId)) {
781  return false;
782  }
783 
784  CBLSPublicKey pk2 = skContribution.GetPublicKey();
785  return pk1 == pk2;
786 }
787 
788 bool CBLSWorker::VerifyVerificationVector(const BLSVerificationVector& vvec, size_t start, size_t count)
789 {
790  return VerifyVectorHelper(vvec, start, count);
791 }
792 
793 bool CBLSWorker::VerifyVerificationVectors(const std::vector<BLSVerificationVectorPtr>& vvecs,
794  size_t start, size_t count)
795 {
796  if (start == 0 && count == 0) {
797  count = vvecs.size();
798  }
799 
800  std::set<uint256> set;
801  for (size_t i = 0; i < count; i++) {
802  auto& vvec = vvecs[start + i];
803  if (vvec == nullptr) {
804  return false;
805  }
806  if (vvec->size() != vvecs[start]->size()) {
807  return false;
808  }
809  for (size_t j = 0; j < vvec->size(); j++) {
810  if (!(*vvec)[j].IsValid()) {
811  return false;
812  }
813  // check duplicates
814  if (!set.emplace((*vvec)[j].GetHash()).second) {
815  return false;
816  }
817  }
818  }
819 
820  return true;
821 }
822 
823 bool CBLSWorker::VerifySecretKeyVector(const BLSSecretKeyVector& secKeys, size_t start, size_t count)
824 {
825  return VerifyVectorHelper(secKeys, start, count);
826 }
827 
828 bool CBLSWorker::VerifySignatureVector(const BLSSignatureVector& sigs, size_t start, size_t count)
829 {
830  return VerifyVectorHelper(sigs, start, count);
831 }
832 
833 void CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash, CBLSWorker::SignDoneCallback doneCallback)
834 {
835  workerPool.push([secKey, msgHash, doneCallback](int threadId) {
836  doneCallback(secKey.Sign(msgHash));
837  });
838 }
839 
840 std::future<CBLSSignature> CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash)
841 {
842  auto p = BuildFutureDoneCallback<CBLSSignature>();
843  AsyncSign(secKey, msgHash, std::move(p.first));
844  return std::move(p.second);
845 }
846 
847 void CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash,
848  CBLSWorker::SigVerifyDoneCallback doneCallback, CancelCond cancelCond)
849 {
850  if (!sig.IsValid() || !pubKey.IsValid()) {
851  doneCallback(false);
852  return;
853  }
854 
855  std::unique_lock<std::mutex> l(sigVerifyMutex);
856 
857  bool foundDuplicate = false;
858  for (auto& s : sigVerifyQueue) {
859  if (s.msgHash == msgHash) {
860  foundDuplicate = true;
861  break;
862  }
863  }
864 
865  if (foundDuplicate) {
866  // batched/aggregated verification does not allow duplicate hashes, so we push what we currently have and start
867  // with a fresh batch
869  }
870 
871  sigVerifyQueue.emplace_back(std::move(doneCallback), std::move(cancelCond), sig, pubKey, msgHash);
874  }
875 }
876 
877 std::future<bool> CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash, CancelCond cancelCond)
878 {
879  auto p = BuildFutureDoneCallback2<bool>();
880  AsyncVerifySig(sig, pubKey, msgHash, std::move(p.first), cancelCond);
881  return std::move(p.second);
882 }
883 
885 {
886  std::unique_lock<std::mutex> l(sigVerifyMutex);
887  return sigVerifyBatchesInProgress != 0;
888 }
889 
890 // sigVerifyMutex must be held while calling
892 {
893  auto f = [this](int threadId, std::shared_ptr<std::vector<SigVerifyJob> > _jobs) {
894  auto& jobs = *_jobs;
895  if (jobs.size() == 1) {
896  auto& job = jobs[0];
897  if (!job.cancelCond()) {
898  bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
899  job.doneCallback(valid);
900  }
901  std::unique_lock<std::mutex> l(sigVerifyMutex);
903  if (!sigVerifyQueue.empty()) {
905  }
906  return;
907  }
908 
909  CBLSSignature aggSig;
910  std::vector<size_t> indexes;
911  std::vector<CBLSPublicKey> pubKeys;
912  std::vector<uint256> msgHashes;
913  indexes.reserve(jobs.size());
914  pubKeys.reserve(jobs.size());
915  msgHashes.reserve(jobs.size());
916  for (size_t i = 0; i < jobs.size(); i++) {
917  auto& job = jobs[i];
918  if (job.cancelCond()) {
919  continue;
920  }
921  if (pubKeys.empty()) {
922  aggSig = job.sig;
923  } else {
924  aggSig.AggregateInsecure(job.sig);
925  }
926  indexes.emplace_back(i);
927  pubKeys.emplace_back(job.pubKey);
928  msgHashes.emplace_back(job.msgHash);
929  }
930 
931  if (!pubKeys.empty()) {
932  bool allValid = aggSig.VerifyInsecureAggregated(pubKeys, msgHashes);
933  if (allValid) {
934  for (size_t i = 0; i < pubKeys.size(); i++) {
935  jobs[indexes[i]].doneCallback(true);
936  }
937  } else {
938  // one or more sigs were not valid, revert to per-sig verification
939  // TODO this could be improved if we would cache pairing results in some way as the previous aggregated verification already calculated all the pairings for the hashes
940  for (size_t i = 0; i < pubKeys.size(); i++) {
941  auto& job = jobs[indexes[i]];
942  bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash);
943  job.doneCallback(valid);
944  }
945  }
946  }
947 
948  std::unique_lock<std::mutex> l(sigVerifyMutex);
950  if (!sigVerifyQueue.empty()) {
952  }
953  };
954 
955  auto batch = std::make_shared<std::vector<SigVerifyJob> >(std::move(sigVerifyQueue));
957 
959  workerPool.push(f, batch);
960 }
bool VerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
Definition: bls_worker.cpp:776
int sigVerifyBatchesInProgress
Definition: bls_worker.h:49
T SyncAggregate(const std::vector< TP > &vec, size_t start, size_t count)
Definition: bls_worker.cpp:305
std::pair< std::function< void(T)>, std::future< T > > BuildFutureDoneCallback2()
Definition: bls_worker.cpp:40
std::vector< CBLSPublicKey > BLSPublicKeyVector
Definition: bls.h:466
void AsyncVerifyBatchOneByOne(size_t batchIdx)
Definition: bls_worker.cpp:564
const BLSSecretKeyVector & skShares
Definition: bls_worker.cpp:420
BLSVerificationVectorPtr vvec
Definition: bls_worker.cpp:404
void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey &skShare)
Definition: bls_worker.cpp:515
void AsyncBuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start, size_t count, bool parallel, std::function< void(const BLSVerificationVectorPtr &)> doneCallback)
Definition: bls_worker.cpp:600
bool VerifySecretKeyVector(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:823
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl.h:152
VectorAggregator(const VectorVectorType &_vecs, size_t _start, size_t _count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
Definition: bls_worker.cpp:352
std::shared_ptr< std::vector< const T * > > inputVec
Definition: bls_worker.cpp:134
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:345
void AsyncVerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel, bool aggregated, std::function< void(const std::vector< bool > &)> doneCallback)
Definition: bls_worker.cpp:726
std::function< void(bool)> SigVerifyDoneCallback
Definition: bls_worker.h:25
static const int SIG_VERIFY_BATCH_SIZE
Definition: bls_worker.h:31
std::function< void(const T &agg)> DoneCallback
Definition: bls_worker.cpp:148
CBLSPublicKey AggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:691
std::vector< CBLSPublicKey > BLSVerificationVector
Definition: bls.h:465
std::mutex sigVerifyMutex
Definition: bls_worker.h:48
Definition: box.hpp:161
BLSVerificationVectorPtr BuildQuorumVerificationVector(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:628
std::vector< CBLSSecretKey > BLSSecretKeyVector
Definition: bls.h:467
void resize(int nThreads)
Definition: ctpl.h:70
void IncWait()
Definition: bls_worker.cpp:212
void AsyncAggregateHelper(ctpl::thread_pool &workerPool, const std::vector< T > &vec, size_t start, size_t count, bool parallel, std::function< void(const T &)> doneCallback)
Definition: bls_worker.cpp:635
bool VerifyVerificationVector(const BLSVerificationVector &vvec, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:788
void clear_queue()
Definition: ctpl.h:99
void CheckDone(const T &agg, size_t idx)
Definition: bls_worker.cpp:387
void HandleAggDone(size_t batchIdx)
Definition: bls_worker.cpp:532
bool GenerateContributions(int threshold, const BLSIdVector &ids, BLSVerificationVectorPtr &vvecRet, BLSSecretKeyVector &skShares)
Definition: bls_worker.cpp:75
void AsyncAggregateAndPushAggQueue(std::shared_ptr< std::vector< const T *> > &vec, size_t start, size_t count, bool del)
Definition: bls_worker.cpp:256
void PushAggQueue(const T &v)
Definition: bls_worker.cpp:274
bool VerifyInsecureAggregated(const std::vector< CBLSPublicKey > &pubKeys, const std::vector< uint256 > &hashes) const
Definition: bls.cpp:348
std::vector< CBLSSignature > BLSSignatureVector
Definition: bls.h:468
ctpl::thread_pool workerPool
Definition: bls_worker.h:29
std::function< void(const VectorPtrType &agg)> DoneCallback
Definition: bls_worker.cpp:338
Definition: bls.h:218
std::mutex m
Definition: bls_worker.cpp:139
std::shared_ptr< BLSVerificationVector > BLSVerificationVectorPtr
Definition: bls.h:471
bool VerifyVerificationVectors(const std::vector< BLSVerificationVectorPtr > &vvecs, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:793
std::atomic< size_t > waitCount
Definition: bls_worker.cpp:146
const T * pointer(const T &v)
Definition: bls_worker.cpp:169
std::vector< CBLSId > BLSIdVector
Definition: bls.h:464
std::size_t size_t
Definition: bits.hpp:21
Aggregator< T > AggregatorType
Definition: bls_worker.cpp:334
std::shared_ptr< VectorType > VectorPtrType
Definition: bls_worker.cpp:336
void AsyncAggregatePublicKeys(const BLSPublicKeyVector &pubKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSPublicKey &)> doneCallback)
Definition: bls_worker.cpp:676
void Start()
Definition: bls_worker.cpp:174
bool VerifySignatureVector(const BLSSignatureVector &sigs, size_t start=0, size_t count=0)
Definition: bls_worker.cpp:828
void AsyncAggregatedVerifyBatch(size_t batchIdx)
Definition: bls_worker.cpp:547
std::atomic< size_t > verifyDoneCount
Definition: bls_worker.cpp:431
void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr &vvec)
Definition: bls_worker.cpp:507
CBLSPublicKey BuildPubKeyShare(const BLSVerificationVectorPtr &vvec, const CBLSId &id)
Definition: bls_worker.cpp:719
const VectorVectorType & vecs
Definition: bls_worker.cpp:341
std::pair< std::function< void(const T &)>, std::future< T > > BuildFutureDoneCallback()
Definition: bls_worker.cpp:31
std::function< void(const CBLSSignature &)> SignDoneCallback
Definition: bls_worker.h:24
void HandleVerifyDone(size_t batchIdx, size_t count)
Definition: bls_worker.cpp:524
void Stop()
Definition: bls_worker.cpp:69
std::vector< bool > VerifyContributionShares(const CBLSId &forId, const std::vector< BLSVerificationVectorPtr > &vvecs, const BLSSecretKeyVector &skShares, bool parallel=true, bool aggregated=true)
Definition: bls_worker.cpp:748
CBLSSecretKey AggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:670
std::shared_ptr< BLSSecretKeyVector > BLSSecretKeyVectorPtr
Definition: bls.h:473
std::unique_ptr< std::atomic< int > > aggDone
Definition: bls_worker.cpp:409
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:137
const std::vector< BLSVerificationVectorPtr > & vvecs
Definition: bls_worker.cpp:419
std::function< bool()> CancelCond
Definition: bls_worker.h:26
std::vector< VectorPtrType > VectorVectorType
Definition: bls_worker.cpp:337
void PushOrDoWork(Callable &&f)
Definition: bls_worker.cpp:590
void PushSigVerifyBatch()
Definition: bls_worker.cpp:891
DoneCallback doneCallback
Definition: bls_worker.cpp:149
bool Verify(const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skShare)
Definition: bls_worker.cpp:578
void Start()
Definition: bls_worker.cpp:61
void CheckDone()
Definition: bls_worker.cpp:217
256-bit opaque blob.
Definition: uint256.h:123
bool IsAsyncVerifyInProgress()
Definition: bls_worker.cpp:884
void AsyncVerifySig(const CBLSSignature &sig, const CBLSPublicKey &pubKey, const uint256 &msgHash, SigVerifyDoneCallback doneCallback, CancelCond cancelCond=[] { return false;})
Definition: bls_worker.cpp:847
bool PublicKeyShare(const std::vector< CBLSPublicKey > &mpk, const CBLSId &id)
Definition: bls.cpp:218
ctpl::thread_pool & workerPool
Definition: bls_worker.cpp:425
void stop(bool isWait=false)
Definition: ctpl.h:121
void SyncAggregateAndPushAggQueue(std::shared_ptr< std::vector< const T *> > &vec, size_t start, size_t count, bool del)
Definition: bls_worker.cpp:262
void AggregateInsecure(const CBLSSignature &o)
Definition: bls.cpp:277
boost::lockfree::queue< T * > aggQueue
Definition: bls_worker.cpp:142
std::function< void(const std::vector< bool > &)> doneCallback
Definition: bls_worker.cpp:432
std::atomic< size_t > aggQueueSize
Definition: bls_worker.cpp:143
void AsyncAggregateSecretKeys(const BLSSecretKeyVector &secKeys, size_t start, size_t count, bool parallel, std::function< void(const CBLSSecretKey &)> doneCallback)
Definition: bls_worker.cpp:655
CBLSPublicKey GetPublicKey() const
Definition: bls.cpp:147
void AsyncSign(const CBLSSecretKey &secKey, const uint256 &msgHash, SignDoneCallback doneCallback)
Definition: bls_worker.cpp:833
std::atomic< size_t > doneCount
Definition: bls_worker.cpp:347
static int count
Definition: tests.c:45
void RenameThreadPool(ctpl::thread_pool &tp, const char *baseName)
Definition: util.cpp:1276
void AsyncAggregate(size_t batchIdx)
Definition: bls_worker.cpp:495
std::future< bool > AsyncVerifyContributionShare(const CBLSId &forId, const BLSVerificationVectorPtr &vvec, const CBLSSecretKey &skContribution)
Definition: bls_worker.cpp:754
ContributionVerifier(const CBLSId &_forId, const std::vector< BLSVerificationVectorPtr > &_vvecs, const BLSSecretKeyVector &_skShares, size_t _batchSize, bool _parallel, bool _aggregated, ctpl::thread_pool &_workerPool, std::function< void(const std::vector< bool > &)> _doneCallback)
Definition: bls_worker.cpp:434
CBLSSignature AggregateSigs(const BLSSignatureVector &sigs, size_t start=0, size_t count=0, bool parallel=true)
Definition: bls_worker.cpp:712
DoneCallback doneCallback
Definition: bls_worker.cpp:339
std::vector< T > VectorType
Definition: bls_worker.cpp:335
std::vector< SigVerifyJob > sigVerifyQueue
Definition: bls_worker.h:50
Aggregator(const std::vector< TP > &_inputVec, size_t start, size_t count, bool _parallel, ctpl::thread_pool &_workerPool, DoneCallback _doneCallback)
Definition: bls_worker.cpp:153
std::vector< char > verifyResults
Definition: bls_worker.cpp:415
void AsyncAggregateSigs(const BLSSignatureVector &sigs, size_t start, size_t count, bool parallel, std::function< void(const CBLSSignature &)> doneCallback)
Definition: bls_worker.cpp:697
std::vector< BatchState > batchStates
Definition: bls_worker.cpp:430
const T * pointer(const T *v)
Definition: bls_worker.cpp:170
VectorPtrType result
Definition: bls_worker.cpp:349
void Finish()
Definition: bls_worker.cpp:224
bool IsValid() const
Definition: bls.h:94
size_t batchSize
Definition: bls_worker.cpp:133
void PushWork(Callable &&f)
Definition: bls_worker.cpp:315
CBLSSignature Sign(const uint256 &hash) const
Definition: bls.cpp:160
bool VerifyVectorHelper(const std::vector< T > &vec, size_t start, size_t count)
Definition: bls_worker.cpp:12
Released under the MIT license