无栈协程

Posted on March 17, 2013

在开始之前,先来看一段代码 

void pop3::operator() ( const boost::system::error_code& ec, std::size_t length )
{
	using namespace boost::asio;

	ip::tcp::endpoint endpoint;
	std::string		status;
	std::string		maillength;
	std::istream	inbuffer ( m_streambuf.get() );
	std::string		msg;

	reenter ( this ) {
restart:
		m_socket.reset( new ip::tcp::socket(io_service) );

		do {
#ifndef DEBUG
			// 延时 60s
			_yield ::boost::delayedcallsec( io_service, 60, boost::bind(*this, ec, 0) );
#endif

			// dns 解析并连接.
 			_yield boost::async_avconnect(
 				boost::proxychain(io_service).add_proxy()(boost::proxy_tcp(*m_socket, ip::tcp::resolver::query(m_mailserver, "110"))),
 				*this);

			// 失败了延时 10s
			if ( ec )
				_yield ::boost::delayedcallsec ( io_service, 10, boost::bind(*this, ec, 0) );
		} while ( ec ); // 尝试到连接成功为止!

		// 好了,连接上了.
		m_streambuf.reset ( new streambuf );
		// "+OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0)"
		_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
		inbuffer >> status;

		if ( status != "+OK" ) {
			// 失败,重试.
			goto restart;
		}

		// 发送用户名.
		_yield m_socket->async_write_some ( buffer ( std::string ( "user " ) + m_mailaddr + "\n" ), *this );
		if(ec) goto restart;
		// 接受返回状态.
		m_streambuf.reset ( new streambuf );
		_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
		inbuffer >> status;

		// 解析是不是 OK.
		if ( status != "+OK" ) {
			// 失败,重试.
			goto restart;
		}

		// 发送密码.
		_yield m_socket->async_write_some ( buffer ( std::string ( "pass " ) + m_passwd + "\n" ), *this );
		// 接受返回状态.
		m_streambuf.reset ( new streambuf );
		_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
		inbuffer >> status;

		// 解析是不是 OK.
		if ( status != "+OK" ) {
			// 失败,重试.
			goto restart;
		}

		// 完成登录. 开始接收邮件.

		// 发送 list 命令.
		_yield m_socket->async_write_some ( buffer ( std::string ( "list\n" ) ), *this );
		// 接受返回的邮件.
		m_streambuf.reset ( new streambuf );
		_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
		inbuffer >> status;

		// 解析是不是 OK.
		if ( status != "+OK" ) {
			// 失败,重试.
			goto restart;
		}

		// 开始进入循环处理邮件.
		maillist.clear();
		_yield	m_socket->async_read_some ( m_streambuf->prepare ( 8192 ), *this );
		m_streambuf->commit ( length );

		while ( status != "." ) {
			maillength.clear();
			status.clear();
			inbuffer >> status;
			inbuffer >> maillength;

			// 把邮件的编号push到容器里.
			if ( maillength.length() )
				maillist.push_back ( status );

			if ( inbuffer.eof() && status != "." )
				_yield	m_socket->async_read_some ( m_streambuf->prepare ( 8192 ), *this );
		}

		// 获取邮件.
		while ( !maillist.empty() ) {
			// 发送 retr #number 命令.
			msg = boost::str ( boost::format ( "retr %s\r\n" ) %  maillist[0] );
			_yield m_socket->async_write_some ( buffer ( msg ), *this );
			// 获得 +OK
			m_streambuf.reset ( new streambuf );
			_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
			inbuffer >> status;

			// 解析是不是 OK.
			if ( status != "+OK" ) {
				// 失败,重试.
				goto restart;
			}

			// 获取邮件内容,邮件一单行的 . 结束.
			_yield	async_read_until ( *m_socket, *m_streambuf, "\r\n.\r\n", *this );
			// 然后将邮件内容给处理.
			process_mail ( inbuffer );
			// 删除邮件啦.
			msg = boost::str ( boost::format ( "dele %s\r\n" ) %  maillist[0] );
			_yield m_socket->async_write_some ( buffer ( msg ), *this );

			maillist.erase ( maillist.begin() );
			// 获得 +OK
			m_streambuf.reset ( new streambuf );
			_yield	async_read_until ( *m_socket, *m_streambuf, "\n", *this );
			inbuffer >> status;

			// 解析是不是 OK.
			if ( status != "+OK" ) {
				// 失败,但是并不是啥大问题.
				std::cout << "deleting mail failed" << std::endl;
				// but 如果是连接出问题那还是要重启的.
				if(ec) goto restart;
			}
		}

		// 处理完毕.
		_yield async_write ( *m_socket, buffer ( "quit\n" ), *this );
		_yield ::boost::delayedcallsec ( io_service, 1, boost::bind ( *this, ec, 0 ) );
		if(m_socket->is_open())
			m_socket->shutdown ( ip::tcp::socket::shutdown_both );
		_yield ::boost::delayedcallsec ( io_service, 1, boost::bind ( *this, ec, 0 ) );
		m_socket.reset();
		std::cout << "邮件处理完毕" << std::endl;
		_yield ::boost::delayedcallsec ( io_service, 30, boost::bind ( *this, ec, 0 ) );
		goto restart;
	}
}

这个代码,乍一看就是同步代码嘛!而事实上它是异步的

在这个代码里,使用了 _yield 前缀再配合 async_* 异步函数,使用异步实现了同步的pop3登录算法。

这个神奇的代码,神奇之处就是 reenter(this) 和 _yield。这2个地方就是实现的全部的关键。

我在群课程里有简单的提到过协程,有兴趣的可以到 avplayer社区讲座:协程 围观。

Comments