- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- <iframe src="https://www.slidestalk.com/u3797/21_Additional_Topics83870?embed" frame border="0" width="640" height="360" scrolling="no" allowfullscreen="true">复制
- 微信扫一扫分享
21_Additional_Topics
展开查看详情
1 .Additional Topics Become exposed to certain topics so you’ve at least heard of them and can investigate further Message Queues Latches/Barriers Promises/Futures Thread Pools Monitors Active Classes Remote Procedural Calls CORBA Learning Goals © Paul Davies, C. Antonio Sanchez. Not to be copied, used, or revised without explicit written permission from the copyright owner.
2 .Inter-Process Communication So far we’ve seen… Shared memory: block of memory mapped to multiple processes so they can access it simultaneously Pipes: queue of bytes that can be sent between processes Sockets: queue of bytes that can be sent across a network Additional Topics Now we briefly introduce: Messages Message queues 2
3 .Windows Messages Operation of the Windows OS is largely based on Events . Every time a mouse button is pressed, an event is triggered Every time a keyboard key is pressed, an event is triggered Every time a count-down timer reaches zero, an event is triggered Every time a new hardware device is plugged in, an event is triggered With each event, a Message is broadcast by the system. Windows communicates with an application by sending these event messages to it. Applications can also send messages to each other, or to specific threads. A process or thread can check for a message, and any data associated with it. Additional Topics https://msdn.microsoft.com/en-us/library/windows/desktop/ms632590(v=vs.85).aspx 3
4 .Windows Message Queues Messages are stored in a queue , consist of a numeric identifier . When reading messages, we can filter for IDs in a given range. The current thread can wait for the next message within the range, and remove it from the queue. Reads are sequential , similar to a FIFO, except that the applied filter can be used to ignore or postpone reading of certain messages, allowing some message prioritization functionality. Writing to message queues never blocks or suspends (i.e. the queue dynamically grows to accommodate the message) Additional Topics 4
5 .Windows Messages Windows messages consist of Additional Topics BOOL PostMessage( HWND hWnd, UINT Msg, WPARAM wParam, LPARAM lParam) : posts a message to a specific window, or broadcast to all top-level windows in the system. BOOL PostThreadMessage( DWORD idThread, UINT Msg, WPARAM wParam, LPARAM lParam) posts a message to the specified thread BOOL PeekMessage( LPMSG lpMsg, HWND hWnd, UINT wMsgFilterMin, UINT wMsgFilterMax, UINT wRemoveMsg) reads the next message with ID in a given filter range if one exists, without removing it (unless explicitly requested) BOOL GetMessage( LPMSG lpMsg, HWND hWnd, UINT wMsgFilterMin, UINT wMsgFilterMax) waits for and reads the next message with ID in a given filter range, removing it from the queue. BOOL WaitMessage() waits until there is at least one message in the queue 5
6 .Windows Messages Additional Topics The message 1000 could ask the recipient thread to turn ON a light bulb in the living room. The message 1001 could mean turn OFF the light bulb. The message values 1000-1009 could be interpreted by an elevator as a request to GO to any of the floors 0-9 . The message 4000 sent to a process/thread could mean ‘ terminate yourself ’. The precise meaning of these numeric messages is up to the application, usually interpreted as a request or command. The sender cannot force the receiver to read the message, only hope for a response. 6
7 .POSIX Message Queues Message queues in Linux/macOS and other POSIX systems work a bit differently than the Windows counterpart. Queues are explicitly created, given a named path /dev/mqueue/… Queues can be shared, unlinked, and closed Messages consist of a string and an integer priority Messages are delivered highest priority first , those with identical priorities are delivered in order of arrival Can set up a callback function (signal event handler) to handle incoming messages Additional Topics http://man7.org/linux/man-pages/man7/mq_overview.7.html 7
8 .POSIX Message Queues Additional Topics #include <mqueue.h> mqd_t mq_open(const char * name , int oflag ); Creates or connects to an existing message queue, with creation flags int mq_send(mqd_t mqdes , const char * msg_ptr , size_t msg_len , unsigned int msg_prio ); Sends a character string message with provided priority to the given queue ssize_t mq_receive(mqd_t mqdes , char * msg_ptr , size_t msg_len , unsigned int * msg_prio ) ; Retrieves the next highest priority message, along with its priority, (by default) blocking until there is a message int mq_notify(mqd_t mqdes , const struct sigevent * sevp ); Registers a signal-event handler to respond to incoming messages int mq_unlink(const char * name ); int mq_close(mqd_t mqdes ); closes the queue unlinks the name from the queue 8
9 .Message Queue Summary Useful for sending information between threads/processes. Similar to pipes, except multiple threads/processes can write to the same queue concurrently some functionality for message priorities, by either filtering (Windows) or sorting (POSIX) Messages need to be interpreted, often into commands Beware of unbounded memory issues, since queue grows with each message Additional Topics 9
10 .Synchronization So far we’ve seen… Mutex: basic primitive for providing mutual exclusion of critical section Semaphore: counting primitive limiting the # of threads accessing a resource Condition Variables: allow waiting until certain conditions are met Shared Mutex: allow multiple concurrent shared accesses, or exclusive access Additional Topics Now we briefly introduce: Latches Barriers Promises/Futures 10
11 .Latches A countdown latch allows threads/processes to wait until the latch’s value reaches zero . Decrementing and waiting are often decoupled, allowing many threads to wait for a single signal. Latches are useful for synchronizing the start of several threads. Usually supports the following operations: decrement() : reduce the count of the latch wait() : waits until the value of the latch reaches zero decrement_and_wait() : reduce the count and wait until the latch reaches zero Note the lack of increment() or reset(int) . Latches are usually considered a one-time-use object, initialized to a particular value. Additional Topics 11
12 .Latch Implementations #include <experimental/latch> std::experimental::latch latch(size_t value) : constructor with initial value count_down(size_t n=1) : reduces value count_down_and_wait() : reduces value by one and waits until reaches zero is_ready() : checks if value is zero wait() : blocks until value is zero Additional Topics Until then, see boost::thread::latch , or create your own: Shared counter variable Mutex to protect access Condition variable for waiting until count is 0 In C++20 : 12
13 .Latch Example Create and synchronize the start of multiple threads (2 different ways) Additional Topics 13 void thread_func(std::experimental::latch& latch) { latch.wait(); // ... synchronized action ... } void main() { // latch with value of 1 std::experimental::latch latch(1); const int nthreads = 100 ; for ( int i= 0 ; i<nthreads; ++i) { std::thread thread(thread_func, std::ref(latch)); thread.detach(); } // notify all threads to start at once latch.count_down(); return 0 ; } void thread_func(std::experimental::latch& latch) { latch.count_down_and_wait(); // ... synchronized action ... } void main() { const int nthreads = 100 ; // latch with value of # threads std::experimental::latch latch(nthreads); for ( int i= 0 ; i<nthreads; ++i) { std::thread thread(thread_func, std::ref(latch)); thread.detach(); } return 0 ; }
14 .Barriers A barrier is a synchronization primitive used to wait until a given number of threads arrive before allowing any of them to continue. When all threads have been let through, it resets to provide another synchronization point. It is also sometimes called a rendezvous . They support a wait() operation that is used to wait until a specified number of threads arrive. It is essentially a counter (similar to a semaphore) that waits until the count becomes zero before allowing threads to continue. The count then resets, ready for the next set of waits. Barriers are useful for coordinating a set of repeated parallel tasks . Additional Topics 14
15 .Barrier Implementations #include <experimental/barrier> std::experimental::barrier barrier(size_t num_threads) : constructor with # threads arrive_and_wait() : waits until all others arrive arrive_and_drop() : removes thread from the set Additional Topics Until then, see boost::thread::barrier , or cpen333::thread::rendezvous or can easily create your own: Shared counter variable Mutex to protect access Condition variable for waiting until count is 0 In C++20 : 15
16 .Barrier Example Additional Topics In graphics, simulation, special effects, may have many different objects/geometries Compute physics, updated positions and deformations, lighting effects, etc… using multiple threads Need to synchronize completion before rendering frame artisynth.org We could create a new bunch of threads each update, do work, then join threads to synchronize, but leads to constant creation/deletion of threads. Better to use separate threads executing in a loop with barrier to synchronize. 16
17 .Promises and Futures A promise sets up a contract that a value will be available in the future . Allows a function to be launched asynchronously , returning a proxy for the result. We can query the status of the future, or wait for it to be ready. Basic functionality: Additional Topics future = promise.get_future(); // fulfills promise promise.set( value ); // waits for promise value = future.get(); 17
18 .Promises and Futures in C++11 std::promise<ValueType>: Associated with a future, manually sets future value or exception std::future<ValueType> Waits for a value to be ready, there can only be one per promise/task std::shared_future<ValueType> Waits for a value to be ready, can be many waiting in multiple threads Additional Topics Futures can also be returned by functions , with no associated promise visible, allowing functions to be executed asynchronously std::async(std::launch, Function&&, Args&&...) Returns a future that can be used to obtain the result of the provided function call, potentially executed asynchronously, could be deferred. 18
19 .Promises and Futures: Example Additional Topics Launching an asynchronous or deferred function 19 long find_factor( long n) { // ... compute prime factors of n ... } int main() { // launches immediately std::future< long > async_result = std::async(std::launch::async, find_factor, 249234224 ); // launches when result requested std::future< long > deferred_result = std::async(std::launch::deferred, find_factor, 9887897182 ); long x = async_result.get(); long y = deferred_result.get(); return 0 ; }
20 .Thread Pools We’ve already seen the producer-consumer pattern for distributing work between several threads. Thread pools generalize this: A common fixed-size pool of threads used for running generic tasks A shared queue of tasks to run by the pool Adding a task to the pool returns a future that can be used to wait on the result Additional Topics thread_pool pool(4); future = pool.execute(func, args); result = future.get(); // waits for result 20
21 .Thread Pools Additional Topics Not part of C++ standard (yet). You can find many library implementations out there. Anthony Williams Create your own… Provide interfaces with fixed parameter/return types Manually wrap function calls with parameters, use promise/future to return results asynchronously Template programming to auto-detect parameter/return types Java has ThreadPoolExecutor , runs instances of Runnable 21
22 .Thread Pools Additional Topics Not part of C++ standard (yet). You can find many library implementations out there. Anthony Williams class thread_pool { public : // initializes pool thread_pool(size_t nthreads); // submits a task to run template < typename Func, typename ...Args> auto submit(Func&& func, Args&&... args) -> std::future< decltype (func(args...))>; // run next task in calling thread void run_pending_task(); // shuts down the pool void shutdown(); }; 22
23 .Thread Pools: Example Additional Topics Dynamic MR of Heart Beating 256x256x64 3D volume 12 full volumes Need to decode 64x12 2D slice images and assemble into a 3D+Time data structure void load4DImage(Image4D image) { vector<future<void>> futures; for (int time=0; time<12; ++time) { futures.push_back(pool.execute(load3DImage(image, time))); } // wait for completion for (auto& f : futures) { f.get(); } } void load3DImage(Image4D image, int time) { vector<future<void>> futures; for (int slice=0; slice<64; ++slice) { futures.push_back(pool.execute(load2DImage(image, time, slice))); } // wait for completion for (auto& f : futures) { f.get(); } } Issues? 23
24 .Thread Pools Additional Topics CAUTION: If you have a set of inter-dependent tasks, beware of deadlock! Pools have a fixed number of threads running. If one task is ever stuck waiting for something, that thread remains occupied. If all threads in the pool are waiting for an event triggered by a task still in the queue to be executed, will be deadlocked ! Can lead to some pretty hard-to-find race conditions that may even be machine-dependent if pool size is dependent on number of cores. Thread pools are most effective for running a set of independent tasks . If advanced synchronization is required, usually best to manage threads manually. 24
25 .Monitors A monitor handles synchronization internally. All functionality that may lead to race conditions is encapsulated within interface functions . Essentially, the details of the required synchronization are hidden from the programmer. It is the designer’s responsibility to ensure safe use of the resource rather than the user’s. The result is a thread-safe class . Often provide the ability to wait for conditions to be met. Additional Topics Recall: Thread-Safe Queue implementations for the Producer-Consumer pattern 25
26 .Monitors: Example Additional Topics 26 class BankAccount { private : double balance_; // the data to be protected std::mutex mutex_; std::condition_variable cv_; public : /** * Safely withdraw funds if there is sufficient money * @param amount amount to withdraw * @return true if successful, false otherwise */ bool withdraw ( double amount) { std::lock_guard<std::mutex> lock(mutex_); bool status = false ; if (balance >= amount) { status = true ; balance -= amount; cv_.notify_all(); } return status ; } /** * Add funds to bank account * @param amount amount to deposit */ void deposit ( double amount) { std::lock_guard<std::mutex> lock(mutex_); balance_ += amount; cv_.notify_all(); } /** * Retrieve bank balance * @return your current savings, treat with care */ double getBalance() { std::lock_guard<std::mutex> lock(mutex_); return balance_; } // allows waiting until there is money in the account void waitForBalance( double b) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [&](){ return balance >= b ;}); } // constructor and destructor BankAccount () : balance_( 0 ), mutex_(), cv_() { } };
27 .Monitors: Example Additional Topics 27 class BankAccount { public : bool withdraw ( double amount); void deposit ( double amount); double getBalance(); void waitForBalance( double b); }; void child( BankAccount & forfun) { // I need money to spend std ::cout << "Waiting for money to be put into account" << std ::endl; forfun.waitForBalance( 1000 ); double balance = forfun.getBalance(); forfun.withdraw(balance); std ::cout << " Spending all my extra money :), $" << balance << std ::endl; } int main() { BankAccount forschool; // every week put 300 in for ( int i= 0 ; i< 52 ; ++i) { std :: thread bigspender(&child, std ::ref(forschool)); std ::cout << "Giving child $300 for school" << std ::endl; forschool.deposit( 300 ); bigspender.join(); // wait for child } std ::cout << "Done." << std ::endl; return 0 ; }
28 .Monitors: Example Additional Topics 27 class BankAccount { public : bool withdraw ( double amount); void deposit ( double amount); double getBalance(); void waitForBalance( double b); }; void child( BankAccount & forfun) { // I need money to spend std ::cout << "Waiting for money to be put into account" << std ::endl; forfun.waitForBalance( 1000 ); double balance = forfun.getBalance(); forfun.withdraw(balance); std ::cout << " Spending all my extra money :), $" << balance << std ::endl; } int main() { BankAccount forschool; // every week put 300 in for ( int i= 0 ; i< 52 ; ++i) { std :: thread bigspender(&child, std ::ref(forschool)); std ::cout << "Giving child $300 for school" << std ::endl; forschool.deposit( 300 ); bigspender.join(); // wait for child } std ::cout << "Done." << std ::endl; return 0 ; }
29 .Active Objects Additional Topics class active_object { private : std::thread* thread_; // worker thread std::deque<Message> queue_; // queue std::mutex_ mutex_; // protect queue std::condition_variable cv_; // wait when idle const Message POISON; // poison pill void run(); public : // create object active_object() : thread_( nullptr ), queue_(), mutex_(), POISON() { thread_ = new std::thread(&active_object::run, this ); } // terminate private thread ~active_object(); }; active_object::~active_object() { { std::lock_guard< decltype (mutex_)> lock(mutex_); queue_.push_back(POISON); } cv_.notify_one(); thread_.join(); } void active_object::run() { std::unique_lock< decltype (mutex_)> lock(mutex_, std::defer_lock); Message msg; do { lock.lock(); cv_.wait(lock, [&](){ return !queue_.empty(); } ); msg = queue_.front(); queue_.pop_front(); lock.unlock(); // do something with msg } while (msg != POISON); } 29 Base class:







