30 #include <boost/thread.hpp>
31 #include "wsframe.hpp"
33 #include "wshandler.hpp"
35 #ifndef HAVE_BOOST_LOCK_GUARD
49 MutexHelper(pthread_mutex_t *mutex,
bool locknow =
true) :
50 m_pMutex(mutex), m_bLocked(false)
62 pthread_mutex_unlock(m_pMutex);
70 pthread_mutex_lock(m_pMutex);
80 pthread_mutex_unlock(m_pMutex);
83 pthread_mutex_t *m_pMutex;
110 : m_rng(simple_rng())
111 , m_parser(frame::parser<simple_rng>(m_rng))
112 , m_state(session::state::OPEN)
116 #ifndef HAVE_BOOST_LOCK_GUARD
117 pthread_mutexattr_t mattr;
118 pthread_mutexattr_init(&mattr);
119 pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);
120 pthread_mutex_init(&m_lock, &mattr);
121 pthread_mutexattr_destroy(&mattr);
123 m_handler->m_endpoint =
this;
126 #ifndef HAVE_BOOST_LOCK_GUARD
127 ~
wsendpoint() { pthread_mutex_destroy(&m_lock); }
143 std::istringstream s(data);
144 while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) {
147 if (m_parser.
ready()) {
156 if (m_parser.
ready()) {
160 case tracing::wserror::PROTOCOL_VIOLATION:
161 send_close(close::status::PROTOCOL_ERROR,e.
what());
163 case tracing::wserror::PAYLOAD_VIOLATION:
164 send_close(close::status::INVALID_PAYLOAD,e.
what());
166 case tracing::wserror::INTERNAL_ENDPOINT_ERROR:
167 send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.
what());
169 case tracing::wserror::SOFT_ERROR:
171 case tracing::wserror::MESSAGE_TOO_BIG:
172 send_close(close::status::MESSAGE_TOO_BIG,e.
what());
174 case tracing::wserror::OUT_OF_MESSAGES:
184 <<
"Dropping TCP due to unrecoverable exception: " << e.
code()
185 <<
" (" << e.
what() <<
")" << std::endl;
200 void send(
const std::string& payload, frame::opcode::value op) {
201 #ifdef HAVE_BOOST_LOCK_GUARD
202 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
207 if (m_state != session::state::OPEN) {
208 log::err <<
"send: enpoint-state not OPEN";
211 frame::parser<simple_rng> control(m_rng);
212 control.set_opcode(op);
213 control.set_fin(
true);
214 control.set_masked(
false);
215 control.set_payload(payload);
217 std::string tmp(control.get_header_str());
218 tmp.append(control.get_payload_str());
219 m_handler->do_response(tmp);
223 void process_data() {
239 #ifdef HAVE_BOOST_LOCK_GUARD
240 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
245 if (m_state == session::state::CLOSED) {
return;}
247 m_state = session::state::CLOSED;
248 m_handler->on_close();
263 void pong(
const std::vector<unsigned char> & payload) {
264 #ifdef HAVE_BOOST_LOCK_GUARD
265 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
270 if (m_state != session::state::OPEN) {
return;}
274 frame::parser<simple_rng> control(m_rng);
275 control.set_opcode(frame::opcode::PONG);
276 control.set_fin(
true);
277 control.set_masked(
false);
278 control.set_payload(payload);
280 std::string tmp(control.get_header_str());
281 tmp.append(control.get_payload_str());
282 m_handler->do_response(tmp);
297 void send_close(close::status::value code,
const std::string& reason) {
298 #ifdef HAVE_BOOST_LOCK_GUARD
299 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
304 if (m_state != session::state::OPEN) {
305 log::err <<
"Tried to disconnect a session that wasn't open" << std::endl;
309 if (close::status::invalid(code)) {
310 log::err <<
"Tried to close a connection with invalid close code: "
311 << code << std::endl;
313 }
else if (close::status::reserved(code)) {
314 log::err <<
"Tried to close a connection with reserved close code: "
315 << code << std::endl;
319 m_state = session::state::CLOSING;
321 frame::parser<simple_rng> control(m_rng);
322 control.set_opcode(frame::opcode::CLOSE);
323 control.set_fin(
true);
324 control.set_masked(
false);
325 if (code != close::status::NO_STATUS) {
326 const uint16_t payload = htons(code);
327 std::string pl(reinterpret_cast<const char*>(&payload), 2);
329 control.set_payload(pl);
332 std::string tmp(control.get_header_str());
333 tmp.append(control.get_payload_str());
334 m_handler->do_response(tmp);
343 void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) {
344 close::status::value local_close_code;
345 std::string local_close_reason;
347 if (remote_close_code == close::status::NO_STATUS) {
348 local_close_code = close::status::NORMAL;
349 local_close_reason =
"";
350 }
else if (remote_close_code == close::status::ABNORMAL_CLOSE) {
354 throw "shouldn't be here";
355 }
else if (close::status::invalid(remote_close_code)) {
357 local_close_code = close::status::PROTOCOL_ERROR;
358 local_close_reason =
"Status code is invalid";
359 }
else if (close::status::reserved(remote_close_code)) {
361 local_close_code = close::status::PROTOCOL_ERROR;
362 local_close_reason =
"Status code is reserved";
364 local_close_code = remote_close_code;
365 local_close_reason = remote_close_reason;
373 frame::parser<simple_rng> control(m_rng);
374 control.set_opcode(frame::opcode::CLOSE);
375 control.set_fin(
true);
376 control.set_masked(
false);
377 if (local_close_code != close::status::NO_STATUS) {
378 const uint16_t payload = htons(local_close_code);
379 std::string pl(reinterpret_cast<const char*>(&payload), 2);
380 pl.append(local_close_reason);
381 control.set_payload(pl);
384 std::string tmp(control.get_header_str());
385 tmp.append(control.get_payload_str());
386 m_handler->do_response(tmp);
390 void process_control() {
392 case frame::opcode::PING:
397 case frame::opcode::PONG:
400 case frame::opcode::CLOSE:
402 if (m_state == session::state::OPEN) {
404 log::debug <<
"sending close ack" << std::endl;
408 }
else if (m_state == session::state::CLOSING) {
410 log::debug <<
"got close ack" << std::endl;
416 tracing::wserror::PROTOCOL_VIOLATION);
423 frame::parser<simple_rng> m_parser;
424 session::state::value m_state;
425 #ifdef HAVE_BOOST_LOCK_GUARD
426 mutable boost::recursive_mutex m_lock;
428 mutable pthread_mutex_t m_lock;
430 wshandler *m_handler;
435 m_endpoint->
send(data, frame::opcode::TEXT);
440 m_endpoint->
send(data, frame::opcode::BINARY);