Class to manage sets of io chains. More...
#include <llpumpio.h>
Classes | |
struct | LLLinkInfo |
Struct to associate a pipe with it's buffer io indexes. More... | |
Public Types | |
typedef std::vector < LLIOPipe::ptr_t > | chain_t |
Typedef for having a chain of pipes. | |
enum | EControl |
Enumeration to send commands to the pump. | |
typedef std::vector< LLLinkInfo > | links_t |
Typedef for having a chain of LLLinkInfo instances. | |
Public Member Functions | |
bool | addChain (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
bool | addChain (const chain_t &chain, F32 timeout) |
Add a chain to this pump and process in the next cycle. | |
void | adjustTimeoutSeconds (F32 delta) |
Adjust the timeout of the running chain. | |
void | callback () |
Run through the callback queue and call process() . | |
void | clearLock (S32 key) |
Clears the identified lock. | |
void | control (EControl op) |
Send a command to the pump. | |
bool | copyCurrentLinkInfo (links_t &links) const |
Copy the currently running chain link info. | |
bool | prime (apr_pool_t *pool) |
Prepare this pump for usage. | |
void | pump (const S32 &poll_timeout) |
Call this method to call process on all running chains. | |
bool | respond (const links_t &links, LLIOPipe::buffer_ptr_t data, LLSD context) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
bool | respond (LLIOPipe *pipe) |
Add a chain to a special queue which will be called during the next call to callback() and then dropped from the queue. | |
running_chains_t::size_type | runningChains () const |
Return number of running chains. | |
bool | setConditional (LLIOPipe *pipe, const apr_pollfd_t *poll) |
Set up file descriptors for for the running chain. | |
S32 | setLock () |
Lock the current chain. | |
bool | setTimeoutSeconds (F32 timeout) |
Set or clear a timeout for the running chain. | |
bool | sleepChain (F64 seconds) |
Stop processing a chain for a while. | |
~LLPumpIO () | |
Destructor. | |
Protected Types | |
enum | EState |
State of the pump. | |
Protected Member Functions | |
bool | handleChainError (LLChainInfo &chain, LLIOPipe::EStatus error) |
Rewind through the chain to try to recover from an error. | |
void | processChain (LLChainInfo &chain) |
Process the chain passed in. | |
void | rebuildPollset () |
Given the internal state of the chains, rebuild the pollset. |
Class to manage sets of io chains.
The pump class provides a thread abstraction for doing IO based communication between two threads in a structured and optimized for processor time. The primary usage is to create a pump, and call pump()
on a thread used for IO and call respond()
on a thread that is expected to do higher level processing. You can call almost any other method from any thread - see notes for each method for details. In order for the threading abstraction to work, you need to call prime()
with a valid apr pool. A pump instance manages much of the state for the pipe, including the list of pipes in the chain, the channel for each element in the chain, the buffer, and if any pipe has marked the stream or process as done. Pipes can also set file descriptor based conditional statements so that calls to process do not happen until data is ready to be read or written. Pipes control execution of calls to process by returning a status code such as STATUS_OK or STATUS_BREAK. One way to conceptualize the way IO will work is that a pump combines the unit processing of pipes to behave like file pipes on the unix command line.
bool LLPumpIO::addChain | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context, | |||
F32 | timeout | |||
) |
Add a chain to this pump and process in the next cycle.
This method provides a slightly more sophisticated method for adding a chain where the caller can specify which link elements are on what channels. This method will fail if no buffer is provided since any calls to generate new channels for the buffers will cause unpredictable interleaving of data.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
bool LLPumpIO::addChain | ( | const chain_t & | chain, | |
F32 | timeout | |||
) |
Add a chain to this pump and process in the next cycle.
This method will automatically generate a buffer and assign each link in the chain as if it were the consumer to the previous.
chain | The pipes for the chain | |
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
void LLPumpIO::adjustTimeoutSeconds | ( | F32 | delta | ) |
Adjust the timeout of the running chain.
This method has no effect if there is no timeout on the chain.
delta | The number of seconds to add to/remove from the timeout. |
void LLPumpIO::callback | ( | ) |
Run through the callback queue and call process()
.
This call will process all prending responses and call process on each. This method will then drop all processed callback requests which may lead to deleting the referenced objects.
void LLPumpIO::clearLock | ( | S32 | key | ) |
Clears the identified lock.
links | A container for the links which will be appended |
void LLPumpIO::control | ( | LLPumpIO::EControl | op | ) |
Send a command to the pump.
op | What control to send to the pump. |
bool LLPumpIO::copyCurrentLinkInfo | ( | links_t & | links | ) | const |
Copy the currently running chain link info.
*FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface.
links | A container for the links which will be appended |
bool LLPumpIO::handleChainError | ( | LLChainInfo & | chain, | |
LLIOPipe::EStatus | error | |||
) | [protected] |
Rewind through the chain to try to recover from an error.
This method will potentially modify the internals of the chain.
chain | The LLChainInfo object to work on. |
bool LLPumpIO::prime | ( | apr_pool_t * | pool | ) |
Prepare this pump for usage.
If you fail to call this method prior to use, the pump will try to work, but will not come with any thread locking mechanisms.
pool | The apr pool to use. |
void LLPumpIO::processChain | ( | LLChainInfo & | chain | ) | [protected] |
Process the chain passed in.
This method will potentially modify the internals of the chain. On end, the chain.mHead will equal chain.mChainLinks.end().
chain | The LLChainInfo object to work on. |
void LLPumpIO::pump | ( | const S32 & | poll_timeout | ) |
Call this method to call process on all running chains.
This method iterates through the running chains, and if all pipe on a chain are unconditionally ready or if any pipe has any conditional processiong condition then process will be called on every chain which has requested processing. that chain has a file descriptor ready, process()
will be called for all pipes which have requested it.
void LLPumpIO::rebuildPollset | ( | ) | [protected] |
Given the internal state of the chains, rebuild the pollset.
bool LLPumpIO::respond | ( | const links_t & | links, | |
LLIOPipe::buffer_ptr_t | data, | |||
LLSD | context | |||
) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.
It is important to remember that you should not add a data buffer or context which may still be in another chain - that will almost certainly lead to a problems. Ensure that you are done reading and writing to those parameters, have new generated, or empty pointers.
links | The pipes and io indexes for the chain | |
data | Shared pointer to data buffer | |
context | Potentially undefined context meta-data for chain. |
bool LLPumpIO::respond | ( | LLIOPipe * | pipe | ) |
Add a chain to a special queue which will be called during the next call to callback()
and then dropped from the queue.
chain | The IO chain that will get one process() . Add pipe to a special queue which will be called during the next call to callback() and then dropped from the queue. This call will add a single pipe, with no buffer, context, or channel information to the callback queue. It will be called once, and then dropped. | |
pipe | A single io pipe which will be called |
running_chains_t::size_type LLPumpIO::runningChains | ( | ) | const [inline] |
Return number of running chains.
*NOTE: This is only used in debugging and not considered efficient or safe enough for production use.
bool LLPumpIO::setConditional | ( | LLIOPipe * | pipe, | |
const apr_pollfd_t * | poll | |||
) |
Set up file descriptors for for the running chain.
There is currently a limit of one conditional per pipe. *NOTE: The internal mechanism for building a pollset based on pipe/pollfd/chain generates an epoll error on linux (and probably behaves similarly on other platforms) because the pollset rebuilder will add each apr_pollfd_t serially. This does not matter for pipes on the same chain, since any signalled pipe will eventually invoke a call to process(), but is a problem if the same apr_pollfd_t is on different chains. Once we have more than just network i/o on the pump, this might matter. *FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface.
pipe | The pipe which is setting a conditional | |
poll | The entire socket and read/write condition - null to remove |
S32 LLPumpIO::setLock | ( | ) |
Lock the current chain.
This locks the currently running chain so that no more calls to process()
until you call clearLock()
with the lock identifier. *FIX: Given the structure of the pump and pipe relationship, this should probably go through a different mechanism than the pump. I think it would be best if the pipe had some kind of controller which was passed into process()
rather than the pump which exposed this interface.
clearLock()
or 0 on failure. bool LLPumpIO::setTimeoutSeconds | ( | F32 | timeout | ) |
Set or clear a timeout for the running chain.
timeout | The number of seconds in the future to expire. Pass in 0.0f to never expire. |
bool LLPumpIO::sleepChain | ( | F64 | seconds | ) |
Stop processing a chain for a while.
This method will not update the timeout for this chain, so it is possible to sleep the chain until it is collected by the pump during a timeout cleanup.
seconds | The number of seconds in the future to resume processing. |