Sierra Toolkit  Version of the Day
ParallelInputStream.cpp
1 /*------------------------------------------------------------------------*/
2 /* Copyright 2010 Sandia Corporation. */
3 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */
4 /* license for use of this work by or on behalf of the U.S. Government. */
5 /* Export of this program may require a license from the */
6 /* United States Government. */
7 /*------------------------------------------------------------------------*/
8 
9 #include <cstdio>
10 #include <stdexcept>
11 #include <stk_util/parallel/ParallelInputStream.hpp>
12 
13 /*--------------------------------------------------------------------*/
14 
15 namespace stk_classic {
16 namespace {
17 
18 #if defined( STK_HAS_MPI )
19 
20 void broadcast( ParallelMachine comm , void * buf , int n )
21 { MPI_Bcast( buf , n , MPI_BYTE , 0 , comm ); }
22 
23 #else
24 
25 void broadcast( ParallelMachine , void * , int ) {}
26 
27 #endif
28 
29 }
30 }
31 
32 /*--------------------------------------------------------------------*/
33 
34 namespace stk_classic {
35 namespace {
36 
37 //----------------------------------------------------------------------
38 
39 class ParInBuf : public std::streambuf {
40 public:
41  enum { BUFFER_LENGTH = 0x010000 /* 64k bytes */ };
42  enum { BUFFER_PUTBACK = 0x000010 /* 16 bytes */ };
43  enum { MAX_READ = BUFFER_LENGTH - BUFFER_PUTBACK };
44  ParInBuf( ParallelMachine , const char * const );
45  virtual ~ParInBuf();
46 
47 protected:
48  virtual int underflow(); // refill the input buffer
49  virtual int overflow( int c = EOF ); // Not called
50  virtual int sync(); // No-op
51  virtual std::streambuf * setbuf( char * , std::streamsize ); // No-op
52 
53 private:
54  void close();
55 
56  ParallelMachine m_comm ;
58  std::FILE * m_root_fp ;
59  char m_buffer[ BUFFER_LENGTH ];
60 };
61 
62 ParInBuf::ParInBuf( ParallelMachine comm , const char * const file_name )
63  : m_comm( comm ), m_root_fp( NULL )
64 {
65  int result = 1 ;
66 
67  if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) {
68  result = NULL != ( m_root_fp = std::fopen( file_name , "r" ) );
69  }
70 
71  broadcast( m_comm , & result , sizeof(int) );
72 
73  if ( ! result ) {
74  std::string msg;
75  msg.append("stk_classic::ParallelInputStream( " );
76  if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) {
77  msg.append( file_name );
78  }
79  else {
80  msg.append( "<NULL>" );
81  }
82  msg.append( " ) FAILED" );
83  throw std::runtime_error(msg);
84  }
85 }
86 
87 void ParInBuf::close()
88 {
89  if ( NULL != m_root_fp ) { std::fclose( m_root_fp ); m_root_fp = NULL ; }
90  setg(NULL,NULL,NULL);
91 }
92 
93 ParInBuf::~ParInBuf()
94 { close(); }
95 
96 int ParInBuf::underflow()
97 {
98  char * const buf = m_buffer + BUFFER_PUTBACK ;
99  int nread = 0 ;
100 
101  if ( gptr() == NULL || egptr() <= gptr() ) {
102  if ( NULL != m_root_fp ) { nread = std::fread(buf,1,MAX_READ,m_root_fp); }
103  broadcast( m_comm , & nread , sizeof(int) );
104  }
105 
106  if ( 0 < nread ) {
107  broadcast( m_comm , buf , nread );
108  setg( m_buffer , buf , buf + nread );
109  }
110  else {
111  close();
112  }
113 
114  return 0 < nread ? *buf : EOF ;
115 }
116 
117 namespace {
118 
119 void throw_overflow()
120 {
121  std::string msg ;
122  msg.append("stk_classic::ParallelInputStream::overflow CALL IS ERRONEOUS" );
123  throw std::runtime_error(msg);
124 }
125 
126 }
127 
128 int ParInBuf::overflow( int )
129 { throw_overflow(); return EOF ; }
130 
131 int ParInBuf::sync()
132 { return 0 ; }
133 
134 std::streambuf * ParInBuf::setbuf( char * , std::streamsize )
135 {
136  return this ;
137 }
138 
139 //----------------------------------------------------------------------
140 
141 } // namespace
142 
143 
144 ParallelInputStream::ParallelInputStream(
145  ParallelMachine comm ,
146  const char * const file_name )
147  : std::istream( new ParInBuf( comm , file_name ) )
148 {}
149 
150 ParallelInputStream::~ParallelInputStream()
151 { delete rdbuf(); }
152 
153 } // namespace stk_classic
154 
155 
unsigned parallel_machine_rank(ParallelMachine parallel_machine)
Member function parallel_machine_rank ...
Definition: Parallel.cpp:29
Sierra Toolkit.
MPI_Comm ParallelMachine
Definition: Parallel.hpp:32