Sierra Toolkit  Version of the Day
DistributedIndex.cpp
1 /*------------------------------------------------------------------------*/
2 /* Copyright 2010 Sandia Corporation. */
3 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */
4 /* license for use of this work by or on behalf of the U.S. Government. */
5 /* Export of this program may require a license from the */
6 /* United States Government. */
7 /*------------------------------------------------------------------------*/
8 
9 #include <stdexcept>
10 #include <sstream>
11 #include <algorithm>
12 #include <limits>
13 #include <stdint.h>
14 
15 #include <stk_util/parallel/ParallelComm.hpp>
16 #include <stk_util/parallel/DistributedIndex.hpp>
17 
18 #include <stk_util/util/RadixSort.hpp>
19 
20 namespace stk_classic {
21 namespace parallel {
22 
23 //----------------------------------------------------------------------
24 
25 namespace {
26 
27 struct KeyProcLess {
28 
29  bool operator()( const DistributedIndex::KeyProc & lhs ,
30  const DistributedIndex::KeyType & rhs ) const
31  { return lhs.first < rhs ; }
32 
33 };
34 
35 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage )
36 {
37  std::vector<DistributedIndex::KeyProc>::iterator
38  i = key_usage.begin() ,
39  j = key_usage.end() ;
40 
41  std::sort( i , j );
42 
43  i = std::unique( i , j );
44 
45  key_usage.erase( i , j );
46 }
47 
48 void sort_unique( std::vector<DistributedIndex::KeyType> & keys )
49 {
50  stk_classic::util::radix_sort_unsigned((keys.empty() ? NULL : &keys[0]), keys.size());
51 
52  std::vector<DistributedIndex::KeyType>::iterator
53  i = keys.begin() ,
54  j = keys.end() ;
55 
56  i = std::unique( i , j );
57  keys.erase( i , j );
58 }
59 
60 // reserve vector size (current size + rev_buffer remaining)
61 template < class T >
62 inline void reserve_for_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
63 {
64  unsigned num_remote = 0;
65  for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
66  CommBuffer & buf = all.recv_buffer( p );
67  num_remote += buf.remaining() / sizeof(T);
68  }
69  v.reserve(v.size() + num_remote);
70 }
71 
72 // unpack buffer into vector
73 template < class T >
74 inline void unpack_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v)
75 {
76  reserve_for_recv_buffer(all, comm_size, v);
77  for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
78  CommBuffer & buf = all.recv_buffer( p );
79  while ( buf.remaining() ) {
80  T kp;
81  buf.unpack( kp );
82  v.push_back( kp );
83  }
84  }
85 }
86 
87 // unpack buffer into vector, where pair.second is the processor
88 template < class T >
89 inline void unpack_with_proc_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<std::pair<T,DistributedIndex::ProcType> >& v)
90 {
91  reserve_for_recv_buffer(all, comm_size, v);
92  for ( DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) {
93  CommBuffer & buf = all.recv_buffer( p );
94  std::pair<T,DistributedIndex::ProcType> kp;
95  kp.second = p;
96  while ( buf.remaining() ) {
97  buf.unpack( kp.first );
98  v.push_back( kp );
99  }
100  }
101 }
102 
103 } // namespace <unnamed>
104 
105 //----------------------------------------------------------------------
106 
107 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 };
108 
109 enum { DISTRIBUTED_INDEX_CHUNK_SIZE =
110  size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS };
111 
112 DistributedIndex::ProcType
113 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const
114 {
115  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
116  if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) {
117  const KeyType offset = key - m_key_span[i].first ;
118  return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ;
119  }
120  }
121  return m_comm_size ;
122 }
123 
124 //----------------------------------------------------------------------
125 
126 DistributedIndex::~DistributedIndex() {}
127 
128 DistributedIndex::DistributedIndex (
129  ParallelMachine comm ,
130  const std::vector<KeySpan> & partition_bounds )
131  : m_comm( comm ),
132  m_comm_rank( parallel_machine_rank( comm ) ),
133  m_comm_size( parallel_machine_size( comm ) ),
134  m_span_count(0),
135  m_key_span(),
136  m_key_usage()
137 {
138  unsigned info[2] ;
139  info[0] = partition_bounds.size();
140  info[1] = 0 ;
141 
142  // Check each span for validity
143 
144  for ( std::vector<KeySpan>::const_iterator
145  i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
146  if ( i->second < i->first ||
147  ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) {
148  info[1] = 1 ;
149  }
150  }
151 
152 #if defined( STK_HAS_MPI )
153  if (m_comm_size > 1) {
154  MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm );
155  }
156 
157  if ( 0 < info[0] ) {
158  m_key_span.resize( info[0] );
159  if ( 0 == parallel_machine_rank( comm ) ) {
160  m_key_span = partition_bounds ;
161  }
162  if (m_comm_size > 1) {
163  MPI_Bcast( (m_key_span.empty() ? NULL : & m_key_span[0]), info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm );
164  }
165  }
166 #else
167  m_key_span = partition_bounds ;
168 #endif
169 
170  if ( info[1] ) {
171  std::ostringstream msg ;
172  msg << "sierra::parallel::DistributedIndex ctor( comm , " ;
173 
174  for ( std::vector<KeySpan>::const_iterator
175  i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
176  msg << " ( min = " << i->first << " , max = " << i->second << " )" ;
177  }
178  msg << " ) contains invalid span of keys" ;
179  throw std::runtime_error( msg.str() );
180  }
181 
182  m_span_count = info[0] ;
183 
184  if ( 0 == m_span_count ) {
185  m_key_span.push_back(
186  KeySpan( std::numeric_limits<KeyType>::min(),
187  std::numeric_limits<KeyType>::max() ) );
188  m_span_count = 1 ;
189  }
190 }
191 
192 //----------------------------------------------------------------------
193 //----------------------------------------------------------------------
194 
195 namespace {
196 
197 bool is_sorted_and_unique( const std::vector<DistributedIndex::KeyProc> & key_usage )
198 {
199  std::vector<DistributedIndex::KeyProc>::const_iterator itr = key_usage.begin();
200  std::vector<DistributedIndex::KeyProc>::const_iterator end = key_usage.end();
201  for ( ; itr != end; ++itr ) {
202  if ( itr + 1 != end && *itr >= *(itr + 1) ) {
203  return false;
204  }
205  }
206  return true;
207 }
208 
209 void query_pack_to_usage(
210  const std::vector<DistributedIndex::KeyProc> & key_usage ,
211  const std::vector<DistributedIndex::KeyType> & request ,
212  CommAll & all )
213 {
214  std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
215  std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin();
216 
217  for ( ; k != request.end() && i != key_usage.end() ; ++k ) {
218 
219  for ( ; i != key_usage.end() && i->first < *k ; ++i );
220 
221  std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
222  for ( ; j != key_usage.end() && j->first == *k ; ++j );
223 
224  for ( std::vector<DistributedIndex::KeyProc>::const_iterator
225  jsend = i ; jsend != j ; ++jsend ) {
226 
227  for ( std::vector<DistributedIndex::KeyProc>::const_iterator
228  jinfo = i ; jinfo != j ; ++jinfo ) {
229 
230  all.send_buffer( jsend->second )
231  .pack<DistributedIndex::KeyProc>( *jinfo );
232  }
233  }
234  }
235 }
236 
237 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage ,
238  const std::vector<DistributedIndex::KeyProc> & request ,
239  CommAll & all )
240 {
241  std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
242 
243  for ( std::vector<DistributedIndex::KeyProc>::const_iterator
244  k = request.begin() ;
245  k != request.end() &&
246  i != key_usage.end() ; ++k ) {
247 
248  for ( ; i != key_usage.end() && i->first < k->first ; ++i );
249 
250  for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
251  j != key_usage.end() && j->first == k->first ; ++j ) {
252  all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j );
253  }
254  }
255 }
256 
257 }
258 
260  const std::vector<DistributedIndex::KeyProc> & request ,
261  std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const
262 {
263  sharing_of_keys.clear();
264 
265  CommAll all( m_comm );
266 
267  query_pack( m_key_usage , request , all ); // Sizing
268 
269  all.allocate_buffers( m_comm_size / 4 , false );
270 
271  query_pack( m_key_usage , request , all ); // Packing
272 
273  all.communicate();
274 
275  unpack_recv_buffer(all, m_comm_size, sharing_of_keys);
276 
277  std::sort( sharing_of_keys.begin() , sharing_of_keys.end() );
278 }
279 
281  std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const
282 {
283  query( m_key_usage , sharing_of_local_keys );
284 }
285 
287  const std::vector<DistributedIndex::KeyType> & keys ,
288  std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
289 {
290  std::vector<KeyProc> request ;
291 
292  {
293  bool bad_key = false ;
294  CommAll all( m_comm );
295 
296  for ( std::vector<KeyType>::const_iterator
297  k = keys.begin() ; k != keys.end() ; ++k ) {
298  const ProcType p = to_which_proc( *k );
299 
300  if ( p < m_comm_size ) {
301  all.send_buffer( p ).pack<KeyType>( *k );
302  }
303  else {
304  bad_key = true ;
305  }
306  }
307 
308  // Error condition becomes global:
309 
310  bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
311 
312  if ( bad_key ) {
313  throw std::runtime_error("stk_classic::parallel::DistributedIndex::query given a key which is out of range");
314  }
315 
316  for ( std::vector<KeyType>::const_iterator
317  k = keys.begin() ; k != keys.end() ; ++k ) {
318  all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
319  }
320 
321  all.communicate();
322 
323  unpack_with_proc_recv_buffer(all, m_comm_size, request);
324  }
325 
326  sort_unique( request );
327 
328  query( request , sharing_keys );
329 }
330 
332  const std::vector<DistributedIndex::KeyType> & keys ,
333  std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
334 {
335  std::vector<KeyType> request ;
336 
337  {
338  bool bad_key = false ;
339  CommAll all( m_comm );
340 
341  for ( std::vector<KeyType>::const_iterator
342  k = keys.begin() ; k != keys.end() ; ++k ) {
343  const ProcType p = to_which_proc( *k );
344 
345  if ( p < m_comm_size ) {
346  all.send_buffer( p ).pack<KeyType>( *k );
347  }
348  else {
349  bad_key = true ;
350  }
351  }
352 
353  // Error condition becomes global:
354 
355  bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
356 
357  if ( bad_key ) {
358  throw std::runtime_error("stk_classic::parallel::DistributedIndex::query given a key which is out of range");
359  }
360 
361  for ( std::vector<KeyType>::const_iterator
362  k = keys.begin() ; k != keys.end() ; ++k ) {
363  all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
364  }
365 
366  all.communicate();
367 
368  unpack_recv_buffer(all, m_comm_size, request);
369  }
370 
371  sort_unique( request );
372 
373  {
374  CommAll all( m_comm );
375 
376  query_pack_to_usage( m_key_usage , request , all ); // Sizing
377 
378  all.allocate_buffers( m_comm_size / 4 , false );
379 
380  query_pack_to_usage( m_key_usage , request , all ); // Packing
381 
382  all.communicate();
383 
384  unpack_recv_buffer(all, m_comm_size, sharing_keys);
385 
386  std::sort( sharing_keys.begin() , sharing_keys.end() );
387  }
388 }
389 
390 //----------------------------------------------------------------------
391 //----------------------------------------------------------------------
392 
393 namespace {
394 
395 struct RemoveKeyProc {
396 
397  bool operator()( const DistributedIndex::KeyProc & kp ) const
398  { return kp.second < 0 ; }
399 
400  static void mark( std::vector<DistributedIndex::KeyProc> & key_usage ,
401  const DistributedIndex::KeyProc & kp )
402  {
403  std::vector<DistributedIndex::KeyProc>::iterator
404  i = std::lower_bound( key_usage.begin(),
405  key_usage.end(), kp.first, KeyProcLess() );
406 
407  // Iterate over the span of KeyProcs with matching key until an exact match
408  // is found. We have to do it this way because marking a KeyProc unsorts it
409  // in the key_usage vector, so we cannot look up KeyProcs directly once marking
410  // has begun.
411  while ( i != key_usage.end() && kp.first == i->first && kp.second != i->second) { ++i ; }
412 
413  if ( i != key_usage.end() && kp == *i ) {
414  i->second = -1 ;
415  }
416  }
417 
418  static void clean( std::vector<DistributedIndex::KeyProc> & key_usage )
419  {
420  std::vector<DistributedIndex::KeyProc>::iterator end =
421  std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() );
422  key_usage.erase( end , key_usage.end() );
423  }
424 };
425 
426 }
427 
429  const std::vector<DistributedIndex::KeyType> & add_new_keys ,
430  const std::vector<DistributedIndex::KeyType> & remove_existing_keys )
431 {
432  std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 );
433  std::vector<unsigned long> count_add( m_comm_size , (unsigned long)0 );
434 
435  size_t local_bad_input = 0 ;
436 
437  // Iterate over keys being removed and keep a count of keys being removed
438  // from other processes
439  for ( std::vector<KeyType>::const_iterator
440  i = remove_existing_keys.begin();
441  i != remove_existing_keys.end(); ++i ) {
442  const ProcType p = to_which_proc( *i );
443  if ( m_comm_size <= p ) {
444  // Key is not within one of the span:
445  ++local_bad_input ;
446  }
447  else if ( p != m_comm_rank ) {
448  ++( count_remove[ p ] );
449  }
450  }
451 
452  // Iterate over keys being added and keep a count of keys being added
453  // to other processes
454  for ( std::vector<KeyType>::const_iterator
455  i = add_new_keys.begin();
456  i != add_new_keys.end(); ++i ) {
457  const ProcType p = to_which_proc( *i );
458  if ( p == m_comm_size ) {
459  // Key is not within one of the span:
460  ++local_bad_input ;
461  }
462  else if ( p != m_comm_rank ) {
463  ++( count_add[ p ] );
464  }
465  }
466 
467  CommAll all( m_comm );
468 
469  // Sizing and add_new_keys bounds checking:
470 
471  // For each process, we are going to send the number of removed keys,
472  // the removed keys, and the added keys. It will be assumed that any keys
473  // beyond the number of removed keys will be added keys.
474  for ( int p = 0 ; p < m_comm_size ; ++p ) {
475  if ( count_remove[p] || count_add[p] ) {
476  CommBuffer & buf = all.send_buffer( p );
477  buf.skip<unsigned long>( 1 );
478  buf.skip<KeyType>( count_remove[p] );
479  buf.skip<KeyType>( count_add[p] );
480  }
481  }
482 
483  // Allocate buffers and perform a global OR of error_flag
484  const bool symmetry_flag = false ;
485  const bool error_flag = 0 < local_bad_input ;
486 
487  bool global_bad_input =
488  all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag );
489 
490  if ( global_bad_input ) {
491  std::ostringstream msg ;
492 
493  if ( 0 < local_bad_input ) {
494  msg << "stk_classic::parallel::DistributedIndex::update_keys ERROR Given "
495  << local_bad_input << " of " << add_new_keys.size()
496  << " add_new_keys outside of any span" ;
497  }
498 
499  throw std::runtime_error( msg.str() );
500  }
501 
502  // Packing:
503 
504  // Pack the remove counts for each process
505  for ( int p = 0 ; p < m_comm_size ; ++p ) {
506  if ( count_remove[p] || count_add[p] ) {
507  all.send_buffer( p ).pack<unsigned long>( count_remove[p] );
508  }
509  }
510 
511  // Pack the removed keys for each process
512  for ( std::vector<KeyType>::const_iterator
513  i = remove_existing_keys.begin();
514  i != remove_existing_keys.end(); ++i ) {
515  const ProcType p = to_which_proc( *i );
516  if ( p != m_comm_rank ) {
517  all.send_buffer( p ).pack<KeyType>( *i );
518  }
519  }
520 
521  // Pack the added keys for each process
522  for ( std::vector<KeyType>::const_iterator
523  i = add_new_keys.begin();
524  i != add_new_keys.end(); ++i ) {
525  const ProcType p = to_which_proc( *i );
526  if ( p != m_comm_rank ) {
527  all.send_buffer( p ).pack<KeyType>( *i );
528  }
529  }
530 
531  // Communicate keys
532  all.communicate();
533 
534  //------------------------------
535  // Remove for local keys
536 
537  for ( std::vector<KeyType>::const_iterator
538  i = remove_existing_keys.begin();
539  i != remove_existing_keys.end(); ++i ) {
540  const ProcType p = to_which_proc( *i );
541  if ( p == m_comm_rank ) {
542  RemoveKeyProc::mark( m_key_usage , KeyProc( *i , p ) );
543  }
544  }
545 
546  // Unpack the remove key and find it.
547  // Set the process to a negative value for subsequent removal.
548 
549  for ( int p = 0 ; p < m_comm_size ; ++p ) {
550  CommBuffer & buf = all.recv_buffer( p );
551  if ( buf.remaining() ) {
552  unsigned long remove_count = 0 ;
553 
554  KeyProc kp ;
555 
556  kp.second = p ;
557 
558  buf.unpack<unsigned long>( remove_count );
559 
560  for ( ; 0 < remove_count ; --remove_count ) {
561  buf.unpack<KeyType>( kp.first );
562 
563  RemoveKeyProc::mark( m_key_usage , kp );
564  }
565  }
566  }
567 
568  RemoveKeyProc::clean( m_key_usage );
569 
570  //------------------------------
571  // Append for local keys
572 
573  // Add new_keys going to this proc to local_key_usage
574  std::vector<KeyProc> local_key_usage ;
575  local_key_usage.reserve(add_new_keys.size());
576  for ( std::vector<KeyType>::const_iterator
577  i = add_new_keys.begin();
578  i != add_new_keys.end(); ++i ) {
579 
580  const ProcType p = to_which_proc( *i );
581  if ( p == m_comm_rank ) {
582  local_key_usage.push_back( KeyProc( *i , p ) );
583  }
584  }
585 
586  // Merge local_key_usage and m_key_usage into temp_key
587  std::vector<KeyProc> temp_key ;
588  temp_key.reserve(local_key_usage.size() + m_key_usage.size());
589  std::sort( local_key_usage.begin(), local_key_usage.end() );
590  std::merge( m_key_usage.begin(),
591  m_key_usage.end(),
592  local_key_usage.begin(),
593  local_key_usage.end(),
594  std::back_inserter(temp_key) );
595 
596  // Unpack and append for remote keys:
597  std::vector<KeyProc> remote_key_usage ;
598 
599  unpack_with_proc_recv_buffer(all, m_comm_size, remote_key_usage);
600 
601  std::sort( remote_key_usage.begin(), remote_key_usage.end() );
602 
603  m_key_usage.clear();
604  m_key_usage.reserve(temp_key.size() + remote_key_usage.size());
605 
606  // Merge temp_key and remote_key_usage into m_key_usage, so...
607  // m_key_usage = local_key_usage + remote_key_usage + m_key_usage(orig)
608  std::merge( temp_key.begin(),
609  temp_key.end(),
610  remote_key_usage.begin(),
611  remote_key_usage.end(),
612  std::back_inserter(m_key_usage) );
613 
614  // Unique m_key_usage
615  m_key_usage.erase(std::unique( m_key_usage.begin(),
616  m_key_usage.end()),
617  m_key_usage.end() );
618 
619  // Check invariant that m_key_usage is sorted
620  if (!is_sorted_and_unique(m_key_usage)) {
621  throw std::runtime_error( "Sorted&unique invariant violated!" );
622  }
623 }
624 
625 //----------------------------------------------------------------------
626 //----------------------------------------------------------------------
627 
628 void DistributedIndex::generate_new_global_key_upper_bound(
629  const std::vector<size_t> & requests ,
630  std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const
631 {
632  bool bad_request = m_span_count != requests.size();
633 
634  std::ostringstream error_msg ;
635 
636  error_msg
637  << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ;
638 
639  std::vector<unsigned long>
640  local_counts( m_span_count + 1 , (unsigned long) 0 ),
641  global_counts( m_span_count + 1 , (unsigned long) 0 );
642 
643  // Count unique keys in each span and add requested keys for
644  // final total count of keys needed.
645 
646  // Append the error check to this communication to avoid
647  // and extra reduction operation.
648  local_counts[ m_span_count ] = m_span_count != requests.size();
649 
650  if ( m_span_count == requests.size() ) {
651 
652  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
653  local_counts[i] = requests[i] ;
654  }
655 
656  std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
657 
658  for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) {
659  const KeyType key_span_last = m_key_span[i].second ;
660  size_t count = 0 ;
661  while ( j != m_key_usage.end() && j->first <= key_span_last ) {
662  const KeyType key = j->first ;
663  while ( j != m_key_usage.end() && key == j->first ) { ++j ; }
664  ++count ;
665  }
666  local_counts[i] += count ;
667  }
668  }
669 
670 #if defined( STK_HAS_MPI )
671  if (m_comm_size > 1) {
672  MPI_Allreduce( (local_counts.empty() ? NULL : & local_counts[0]) , (global_counts.empty() ? NULL : & global_counts[0]) ,
673  m_span_count + 1 , MPI_UNSIGNED_LONG ,
674  MPI_SUM , m_comm );
675  }
676  else {
677  global_counts = local_counts ;
678  }
679 #else
680  global_counts = local_counts ;
681 #endif
682 
683  bad_request = global_counts[m_span_count] != 0 ;
684 
685  if ( bad_request ) {
686  if ( m_span_count != requests.size() ) {
687  error_msg << " requests.size() = " << requests.size()
688  << " != " << m_span_count << " )" ;
689  }
690  }
691 
692  if ( ! bad_request ) {
693  for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
694  const size_t span_available =
695  ( 1 + m_key_span[i].second - m_key_span[i].first );
696 
697  const size_t span_requested = global_counts[i];
698 
699  if ( span_available < span_requested ) {
700  bad_request = true ;
701  error_msg << " global_sum( (existing+request)[" << i << "] ) = "
702  << span_requested
703  << " > global_sum( span_available ) = "
704  << span_available ;
705  }
706  }
707  }
708 
709  if ( bad_request ) {
710  throw std::runtime_error( error_msg.str() );
711  }
712 
713  // Determine the maximum generated key
714 
715  global_key_upper_bound.resize( m_span_count );
716 
717  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
718  global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ;
719  }
720 }
721 
722 //--------------------------------------------------------------------
723 //--------------------------------------------------------------------
724 
725 void DistributedIndex::generate_new_keys_local_planning(
726  const std::vector<DistributedIndex::KeyType> & key_global_upper_bound ,
727  const std::vector<size_t> & requests_local ,
728  std::vector<long> & new_request ,
729  std::vector<KeyType> & requested_keys ,
730  std::vector<KeyType> & contrib_keys ) const
731 {
732  new_request.assign( m_span_count , long(0) );
733 
734  contrib_keys.clear();
735 
736  std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
737 
738  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
739  // The maximum generated key from any process will
740  // not exceed this value.
741  const KeyType key_upper_bound = key_global_upper_bound[i] ;
742 
743  const size_t init_size = contrib_keys.size();
744 
745  const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ;
746 
747  const size_t chunk_rsize = m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ;
748 
749  for ( KeyType key_begin = m_key_span[i].first +
750  chunk_rsize ;
751  key_begin <= key_upper_bound ; key_begin += chunk_inc ) {
752 
753  // What is the first key of the chunk
754  KeyType key_iter = key_begin ;
755 
756  // What is the last key belonging to this process' chunk
757  const KeyType key_last =
758  std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound );
759 
760  // Jump into the sorted used key vector to
761  // the key which may be contributed
762 
763  j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() );
764  // now know: j == m_key_usage.end() OR
765  // key_iter <= j->first
766 
767  for ( ; key_iter <= key_last ; ++key_iter ) {
768  if ( j == m_key_usage.end() || key_iter < j->first ) {
769  // The current attempt 'key_iter' is not used, contribute it.
770  contrib_keys.push_back( key_iter );
771  }
772  else { // j != m_key_usage.end() && key_iter == j->first
773  // The current attempt 'key_iter' is already used,
774  // increment the used-iterator to its next key value.
775  while ( j != m_key_usage.end() && key_iter == j->first ) {
776  ++j ;
777  }
778  }
779  }
780  }
781 
782  // Determine which local keys will be contributed,
783  // keeping what this process could use from the contribution.
784  // This can reduce the subsequent communication load when
785  // donating keys to another process.
786 
787  const size_t this_contrib = contrib_keys.size() - init_size ;
788 
789  // How many keys will this process keep:
790  const size_t keep = std::min( requests_local[i] , this_contrib );
791 
792  // Take the kept keys from the contributed key vector.
793  requested_keys.insert( requested_keys.end() ,
794  contrib_keys.end() - keep ,
795  contrib_keys.end() );
796 
797  contrib_keys.erase( contrib_keys.end() - keep ,
798  contrib_keys.end() );
799 
800  // New request is positive for needed keys or negative for donated keys
801  new_request[i] = requests_local[i] - this_contrib ;
802  }
803 }
804 
805 //----------------------------------------------------------------------
806 
807 void DistributedIndex::generate_new_keys_global_planning(
808  const std::vector<long> & new_request ,
809  std::vector<long> & my_donations ) const
810 {
811  my_donations.assign( m_comm_size * m_span_count , long(0) );
812 
813  // Gather the global request plan for receiving and donating keys
814  // Positive values for receiving, negative values for donating.
815 
816  std::vector<long> new_request_global( m_comm_size * m_span_count );
817 
818 #if defined( STK_HAS_MPI )
819 
820  if (m_comm_size > 1) { // Gather requests into per-process spans
821 
822  // There is a possible bug in MPI_Allgather, for Intel 12; use MPI_Gather instead
823 #if defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1200)
824  {
825  // MPI doesn't do 'const' in its interface, but the send buffer is const
826  void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) ));
827  void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ;
828  for (int root = 0; root < m_comm_size; ++root)
829  {
830  MPI_Gather( send_buf , m_span_count , MPI_LONG ,
831  recv_buf , m_span_count , MPI_LONG , root, m_comm );
832  }
833  }
834 #else
835  {
836  // MPI doesn't do 'const' in its interface, but the send buffer is const
837  void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) ));
838  void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ;
839  MPI_Allgather( send_buf , m_span_count , MPI_LONG ,
840  recv_buf , m_span_count , MPI_LONG , m_comm );
841  }
842 #endif
843 
844  }
845  else {
846  new_request_global = new_request ;
847  }
848 #else
849  new_request_global = new_request ;
850 #endif
851 
852  // Now have the global receive & donate plan.
853  //--------------------------------------------------------------------
854  // Generate my donate plan from the global receive & donate plan.
855 
856  for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
857 
858  if ( new_request[i] < 0 ) { // This process is donating on this span
859  long my_total_donate = - new_request[i] ;
860 
861  long previous_donate = 0 ;
862 
863  // Count what previous processes have donated:
864  for ( int p = 0 ; p < m_comm_rank ; ++p ) {
865  const long new_request_p = new_request_global[ p * m_span_count + i ] ;
866  if ( new_request_p < 0 ) {
867  previous_donate -= new_request_p ;
868  }
869  }
870 
871  // What the donation count will be with my donation:
872  long end_donate = previous_donate + my_total_donate ;
873 
874  long previous_receive = 0 ;
875 
876  // Determine my donation to other processes (one to many).
877 
878  for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) {
879 
880  const long new_request_p = new_request_global[ p * m_span_count + i ];
881 
882  if ( 0 < new_request_p ) { // Process 'p' receives keys
883 
884  // Accumulation of requests:
885 
886  previous_receive += new_request_p ;
887 
888  if ( previous_donate < previous_receive ) {
889  // I am donating to process 'p'
890  const long n = std::min( previous_receive , end_donate )
891  - previous_donate ;
892 
893  my_donations[ p * m_span_count + i ] = n ;
894  previous_donate += n ;
895  my_total_donate -= n ;
896  }
897  }
898  }
899  }
900  }
901 }
902 
903 //--------------------------------------------------------------------
904 
906  const std::vector<size_t> & requests ,
907  std::vector< std::vector<KeyType> > & requested_keys )
908 {
909  //--------------------------------------------------------------------
910  // Develop the plan:
911 
912  std::vector<KeyType> global_key_upper_bound ;
913  std::vector<long> new_request ;
914  std::vector<long> my_donations ;
915  std::vector<KeyType> contrib_keys ;
916  std::vector<KeyType> new_keys ;
917 
918  // Verify input and generate global sum of
919  // current key usage and requested new keys.
920  // Throw a parallel consistent exception if the input is bad.
921 
922  generate_new_global_key_upper_bound( requests , global_key_upper_bound );
923 
924  // No exception thrown means all inputs are good and parallel consistent
925 
926  // Determine which local keys will be contributed,
927  // keeping what this process could use from the contribution.
928  // This can reduce the subsequent communication load when
929  // donating keys to another process.
930 
931  generate_new_keys_local_planning( global_key_upper_bound ,
932  requests ,
933  new_request ,
934  new_keys ,
935  contrib_keys );
936 
937  // Determine where this process will be donating 'contrib_keys'
938  generate_new_keys_global_planning( new_request, my_donations );
939 
940  // Due to using an upper bound as opposed to an exact maximum
941  // the contrib_keys is likely to contain more keys that are needed.
942  // Remove unneeded keys.
943 
944  // Backwards to erase from the end
945  for ( size_t i = m_span_count ; 0 < i ; ) {
946  --i ;
947  size_t count = 0 ;
948  for ( int p = 0 ; p < m_comm_size ; ++p ) {
949  count += my_donations[ p * m_span_count + i ];
950  }
951  std::vector<KeyType>::iterator j_beg = contrib_keys.begin();
952  std::vector<KeyType>::iterator j_end = contrib_keys.end();
953  j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first );
954  j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second );
955  const size_t n = std::distance( j_beg , j_end );
956  if ( count < n ) {
957  contrib_keys.erase( j_beg + count , j_end );
958  }
959  }
960 
961  // Plan is done, communicate the new keys.
962  //--------------------------------------------------------------------
963  // Put key this process is keeping into the index.
964  m_key_usage.reserve(m_key_usage.size() + new_keys.size());
965  for ( std::vector<KeyType>::iterator i = new_keys.begin();
966  i != new_keys.end() ; ++i ) {
967  m_key_usage.push_back( KeyProc( *i , m_comm_rank ) );
968  }
969 
970  //--------------------------------------------------------------------
971 
972  CommAll all( m_comm );
973 
974  // Sizing
975 
976  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
977  for ( int p = 0 ; p < m_comm_size ; ++p ) {
978  const size_t n_to_p = my_donations[ p * m_span_count + i ];
979  if ( 0 < n_to_p ) {
980  all.send_buffer(p).skip<KeyType>( n_to_p );
981  }
982  }
983  }
984 
985  all.allocate_buffers( m_comm_size / 4 , false );
986 
987  // Packing
988 
989  {
990  size_t n = 0 ;
991  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
992  for ( int p = 0 ; p < m_comm_size ; ++p ) {
993  const size_t n_to_p = my_donations[ p * m_span_count + i ];
994  if ( 0 < n_to_p ) {
995  all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p );
996  for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) {
997  m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) );
998  }
999  }
1000  }
1001  }
1002  }
1003 
1004  std::sort( m_key_usage.begin() , m_key_usage.end() );
1005 
1006  all.communicate();
1007 
1008  // Unpacking
1009  unpack_recv_buffer( all, m_comm_size, new_keys);
1010 
1011  stk_classic::util::radix_sort_unsigned((new_keys.empty() ? NULL : &new_keys[0]), new_keys.size());
1012 
1013  requested_keys.resize( m_span_count );
1014 
1015  {
1016  std::vector<KeyType>::iterator i_beg = new_keys.begin();
1017  for ( size_t i = 0 ; i < m_span_count ; ++i ) {
1018  std::vector<KeyType>::iterator i_end = i_beg + requests[i] ;
1019  requested_keys[i].assign( i_beg , i_end );
1020  i_beg = i_end ;
1021  }
1022  }
1023 }
1024 
1025 //----------------------------------------------------------------------
1026 
1027 } // namespace util
1028 } // namespace stk_classic
1029 
1030 
void query_to_usage(const std::vector< KeyType > &keys, std::vector< KeyProc > &sharing_of_keys) const
Query which processors added the given keys. The results of the query are pushed to the processes on ...
void query(std::vector< KeyProc > &sharing_of_local_keys) const
Query with which process the local added keys are shared.
unsigned parallel_machine_rank(ParallelMachine parallel_machine)
Member function parallel_machine_rank ...
Definition: Parallel.cpp:29
unsigned parallel_machine_size(ParallelMachine parallel_machine)
Member function parallel_machine_size ...
Definition: Parallel.cpp:18
void generate_new_keys(const std::vector< size_t > &requests, std::vector< std::vector< KeyType > > &requested_keys)
Request a collection of unused keys.
Sierra Toolkit.
MPI_Comm ParallelMachine
Definition: Parallel.hpp:32
void update_keys(const std::vector< KeyType > &add_new_keys, const std::vector< KeyType > &remove_existing_keys)
Update a parallel index with new and changed keys. FIRST: Remove this process&#39; participation in the e...
eastl::iterator_traits< InputIterator >::difference_type count(InputIterator first, InputIterator last, const T &value)