[C++] The C++ Programming Language: 5. A Tour of C++: Concurrency and Utilities (2)
5.3 Concurrency
동시성이란 몇몇 작업들을 동시에 실행하는 것입니다. 이는 단일 계산을 위한 여러 프로세서들을 사용함으로써 처리량을 늘리거나, 다른 프로그램이 응답을 기다릴 때 다른 부분을 처리할 수 있도록 허용함으로써 반응성을 증대시키기 위해 널리 사용됩니다. 모든 현대 프로그래밍 언어는 이를 지원합니다. 이 C++ 표준 라이브러리는 간편하며 20년 이상 사용되온 type-safe 변형이며 대부분의 최신 하드웨어가 보편적으로 지원하는 것입니다. 이 표준 라이브러리 지원은 세련된 하이 레벨 동시성 모델을 직접적으로 제공하는 것보다는 시스템 레벨의 동시성을 지원하는 것에 초점이 맞춰져 있습니다. 이들은 표준 라이브러리 기능을 사용해 빌드된 라이브러리로 제공될 수 있습니다.
표준 라이브러리는 단일 주소 공간에서 많은 스레드들의 동시 실행을 직접적으로 지원합니다. 이를 가능하게 하기 위해, C++은 적합한 메모리 모델과 원자적인 연산 집합을 제공합니다. 그러나 대부분의 사용자들은 표준 라이브러리와 그 최상위에서 구현된 라이브러리들로만 동시성을 볼 수 있을 것입니다. 이 섹션은 주요한 표준 라이브러리 동시성 지원 기능의 예제들을 살펴볼 것입니다: thread, mutex, lock() 연산, packaged_task, future. 이 기능들은 운영 체제가 제공하는 것에 따라 직접적으로 구축되어 있고, 이들 간의 성능 차이는 없습니다.
5.3.1 Tasks and threads
우리는 잠재적으로 다른 계산 작업들과 동시에 실행될 계산들을 태스크라고 합니다. 스레드는 프로그램 안의 태스크를 시스템 레벨에서 표현한 것입니다. 다른 태스크와 동시에 실행될 수 있는 태스크는 std::thread(<thread>에 있습니다)를 생성함으로써 시작할 수 있습니다. 태스크는 생성자의 인자가 됩니다. 태스크는 함수 혹은 함수 객체입니다:
1 2 3 4 5 6 7 8 9 10 11 | void f(); // function struct F { // function object void operator()(); // F’s call operator (§3.4.3) }; void user() { thread t1 {f}; // f() executes in separate thread thread t2 {F()}; // F()() executes in separate thread t1.join(); // wait for t1 t2.join(); // wait for t2 } | cs |
join()은 스레드의 작업이 완료되기 전 까지는 user()를 종료하지 않도록 합니다. "join"의 의미는 곧 "스레드의 종료를 기다림"입니다.
프로그램의 스레드는 단일 주소 공간을 공유합니다. 곧, 스레드는 일반적으로 데이터를 직접적으로 공유하지 않는 프로세스들과 다릅니다. 스레드가 주소 공간을 공유하기 시작함으로써, 공유 객체를 통해 통신할 수 있습니다. 이러한 통신은 일반적으로 lock 혹은 데이터 경쟁(races)을 방지하기 위한 다른 메커니즘들에 의해서 제어됩니다.
동시 작업을 프로그래밍하는 것은 굉장히 교묘(tricky)합니다. 태스크의 구현인 f(함수)와 F(함수 객체)를 살펴보죠.
1 2 3 4 5 | void f() { cout << "Hello "; } struct F { void operator()() { cout << "Parallel World!\n"; } }; | cs |
이는 나쁜 오류의 예시입니다: f와 F() 각각은 객체 cout을 아무 동기화 없이 사용하고 있습니다. 출력의 결과는 예상치 못한, 그리고 프로그램의 실행마다 달라지는 결과를 얻을 것입니다. 두 태스크에서 각각의 연산 실행 순서는 정의되어 있지 않기 때문입니다. 따라서 다음과 같이 이상한 결과를 낼 것이죠:
PaHerallllel o World! |
동시성 프로그램의 태스크를 정의할 때 우리의 초점은 그들이 간단하고 명확한 방식으로 의사소통을 하는 것 외에는 태스크를 완전히 분리하는 것입니다. 가장 간단하게 생각해볼 수 있는 방법은 호출자와 동시에 실행되는 작업처럼 생각하는 것입니다. 이를 하려면 어쨌거나 우리는 인자를 전달하고, 결과를 반환받고, 서로 간에 공유되는 데이터가 없도록 해야 합니다.
5.3.2 Passing Arguments
일반적으로, 태스크는 작업을 위해 데이터가 필요합니다. 우리는 인자를 이용해 데이터(혹은 포인터 및 데이터의 참조)를 손쉽게 전달할 수 있습니다. 다음을 참고하세요:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | void f(vector<double>& v); // function do something with v struct F { // function object: do something with v vector<double>& v; F(vector<double>& vv) :v{vv} { } void operator()(); // application operator ; §3.4.3 }; int main() { vector<double> some_vec {1,2,3,4,5,6,7,8,9}; vector<double> vec2 {10,11,12,13,14}; thread t1 {f,some_vec}; // f(some_vec) executes in a separate thread thread t2 {F{vec2}}; // F(vec2)() executes in a separate thread t1.join(); t2.join(); } | cs |
명백히, F{vec2}는 F 안에 벡터 인자의 참조를 저장합니다. F는 해당 배열을 사용하고 다른 태스크가 F가 실행되는 동안 vec2에 접근하지 않았으면 할 것입니다. 대신 vec2를 값으로 전달한다면 이러한 위험을 제거할 수 있죠.
{f,some_vec}으로 초기화하는 것은 스레드의 가변 템플릿 생성자를 사용하는 것입니다. 임의의 수의 인자들을 전달할 수 있죠. 컴파일러는 첫번째 인자가 다음의 인자들을 이용해 호출될 수 있는지를 확인하고, 필요한 함수 객체들을 빌드해 스레드에 전달합니다. 곧, F::operator()()와 f()는 같은 알고리즘을 수행하며, 두 태스크의 처리 역시 거의 동일합니다. 두 경우, 스레드의 실행을 위해 함수 객체가 생성될 것입니다.
5.3.3 Returning Results
5.3.2절의 예제에서, 우리는 non-const 참조로 인자를 전달했습니다. 저는 태스크가 참조의 값을 변경하기를 바랄 때에만 그렇게 합니다. 이는 다소 교활하지만 결과를 반환할 때 꽤나 흔한 일입니다. 덜 모호한 기술은 const 참조로 입력 데이터를 전달하고, 결과를 저장할 분리된 변수 또한 전달하는 것입니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | void f(const vector<double>& v, double∗ res);// take input from v; place result in *res class F { public: F(const vector<double>& vv, double∗ p) :v{vv}, res{p} { } void operator()(); // place result in *res private: const vector<double>& v; // source of input double∗ res; // target for output }; int main() { vector<double> some_vec; vector<double> vec2; // ... double res1; double res2; thread t1 {f,some_vec,&res1}; // f(some_vec,&res1) executes in a separate thread thread t2 {F{vec2,&res2}}; // F{vec2,&res2}() executes in a separate thread t1.join(); t2.join(); cout << res1 << ' ' << res2 << '\n'; } | cs |
이는 별로 우아한 방식은 아닙니다. 5.3.5.1절에서 좀 더 우아한 방식을 다뤄보도록 하죠.
5.3.4. Sharing Data
종종 태스크는 데이터 공유가 필요합니다. 이 경우, 접근은 동기화되어 한 순간에는 단 하나의 태스크만이 접근할 수 있도록 해야합니다. 숙련된 프로그래머들은 단순화를 통해 이를 바라보지만(가령 불변 데이터를 사용할 수 있습니다), 한 번에 하나의 태스크만이 주어진 객체 집합에 접근할 수 있도록 하는 방법도 고려합니다.
가장 근본적인 해결책의 요소는 mutex입니다. mutual exclusion object죠. 스레드는 mutex를 lock() 연산으로 점유합니다.
1 2 3 4 5 6 7 8 | mutex m; // controlling mutex int sh; // shared data void f() { unique_lock<mutex> lck {m}; // acquire mutex sh += 7; // manipulate shared data } // release mutex implicitl | cs |
unique_lock의 생성자는 mutex를 점유(m.lock()을 호출)합니다. 만약 다른 스레드가 이미 mutex를 점유하고 있다면, 스레드는 다른 스레드가 접근을 완료할 때까지 기다립니다(blocks). 스레드가 공유 데이터에 접근을 완료할 때마다, unique_lock은 mutex를 release(m.unlock()을 호출)합니다. 상호 배제와 잠금 기능은 <mutex>에서 볼 수 있습니다.
공유 데이터와 mutex 사이의 소통은 관습적입니다: 프로그래머는 단순히 어떤 mutex가 어떤 데이터와 대응하는지를 알아야 합니다. 명백하게 이는 에러가 나기 쉽죠. 우리는 다양한 언어의 수단을 통해 이러한 소통을 명확하게 하려고 노력합니다:
1 2 3 4 5 | class Record { public: mutex rm; // ... }; | cs |
Record 변수인 rec에 대해 rec.rm이 해당 데이터의 접근을 관리하는 mutex 변수임을 생각해내는 것은 어렵지 않습니다. 물론 더 명확한 이름이나 주석 등이 독자들에게 도움이 될 수 있습니다.
어떠한 행위를 취하기 위해 몇몇 자원에 동시에 접근해야 하는 일은 꽤나 흔합니다. 이는 곧 교착 상태(deadlock)를 발생시킵니다. 예를 들어, thread1이 mutex1을 취하고, 곧이어 mutex2를 취하려고 한다고 생각해봅시다. 반면, thread2는 mutex2를 취하고, 곧이어 mutex1을 취하려고 합니다. 이들 태스크는 영원히 끝나지 않을 것입니다. 표준 라이브러리는 여러 lock을 동시에 걸 수 있는 메커니즘을 제공합니다.
1 2 3 4 5 6 7 8 9 10 | void f() { // ... unique_lock<mutex> lck1 {m1,defer_lock}; // defer_lock: don’t yet try to acquire the mutex unique_lock<mutex> lck2 {m2,defer_lock}; unique_lock<mutex> lck3 {m3,defer_lock}; // ... lock(lck1,lck2,lck3); // acquire all three locks // ... manipulate shared data ... } // implicitly release all mutexes | cs |
이 lock()은 모든 mutex를 얻은 후에야 수행될 것입니다. 물론 mutex를 점유한 동안에는 차단되지 않죠. 각각 unique_lock의 소멸자들은 스레드의 스코프를 벗어날 때 자동으로 반환되도록 합니다.
공유 데이터를 통해 소통하는 것은 꽤나 저레벨입니다. 특히, 프로그래머는 다양한 태스크들을 통해 어떤 것이 되었고 어떤 것이 되지 않았는지 알아내는 방법을 고안해야 합니다. 이 점에 있어서 공유 데이터를 사용하는 것은 호출 및 반환을 통한 것보다 못하다고 볼 수 있습니다. 반면, 몇몇 사람들은 인자를 복사하고 반환하는 것보다 데이터를 공유하는 것이 훨씬 효율적이라고 확신하기도 합니다. 만약 아주 많은 데이터를 다루게 된다면, 그리고 locking과 unlocking의 비용이 상대적으로 비싸다면 단순히 데이터를 공유하는 것이 더 효율적일지도 모릅니다. 하지만, 현대의 기계들은 데이터를 복사하는데 굉장히 늑숙합니다. 특히 vector 요소와 같은 간단한 데이터들에 대해서는 더욱 그렇죠. 따라서 다른 어떤 충분한 고려 없이 "효율성"만을 위해서 공유 데이터를 통한 통신을 선택하는 일은 없길 바랍니다.
5.3.4.1 Waiting for Events
종종 스레드는 외부의 이벤트를 기다려야 할 때가 있습니다. 가령 다른 스레드가 특정 작업을 완료하기를 기다리는 것이죠. 가장 간단한 "event"는 단순히 시간이 지나가기를 기다리는 것입니다:
1 2 3 4 5 6 | using namespace std::chrono; // see §35.2 auto t0 = high_resolution_clock::now(); this_thread::sleep_for(milliseconds{20}); auto t1 = high_resolution_clock::now(); cout << duration_cast<nanoseconds>(t1−t0).count() << " nanoseconds passed\n"; | cs |
스레드를 실행할 필요가 없음에 주의하세요. 기본적으로 this_trad는 유일한 스레드를 참조합니다.
시계의 단위를 나노초 단위로 조정하기 위해 duration_cast를 사용했습니다. 5.4.1절과 35.2절에서 시간과 관련해 더욱 복잡한 것들을 다룹니다. 시간 기능은 <chrono>에서 찾을 수 있습니다.
외부의 이벤트를 이용해 통신하는 기본적인 지원은 <condition_variable>의 condition_variable로 제공됩니다. condition_variable은 하나의 스레드가 다른 스레드를 기다리도록 만들어주는 메커니즘입니다. 특히, 이는 다른 스레드에 의해 충족되는 특정한 조건(보통, 이벤트)을 기다리는 것이죠.
두 스레드가 queue를 이용해 메시지를 전달하며 통신하는 고전적인 예제를 봐보죠. queue에서 경쟁 조건을 피하기 위해 생성자와 소비자를 이용한 메커니즘을 선언했습니다.
1 2 3 4 5 6 7 | class Message { // object to be communicated // ... }; queue<Message> mqueue; // the queue of messages condition_variable mcond; // the var iable communicating events mutex mmutex; // the locking mechanism | cs |
queue, condition_variable, mutex는 표준 라이브러리에서 제공됩니다.
consumer()는 Message를 읽고 처리합니다:
1 2 3 4 5 6 7 8 9 10 11 12 | void consumer() { while(true) { unique_lock<mutex> lck{mmutex}; // acquire mmutex while (mcond.wait(lck)) /* do nothing */; // release lck and wait; // re-acquire lck upon wakeup auto m = mqueue.front(); // get the message mqueue.pop(); lck.unlock(); // release lck // ... process m ... } } | cs |
저는 명시적으로 queue와 condition_variable의 연산을 unique_lock과 mutex로 보호했습니다. condition_variable을 기다리는 동안 잠시 해당 lock을 해제했다가, 조건이 갖춰지면 다시 lock을 수행하죠.
이에 대응하는 producer()는 다음과 같습니다:
1 2 3 4 5 6 7 8 9 10 | void producer() { while(true) { Message m; // ... fill the message ... unique_lock<mutex> lck {mmutex}; // protect operations mqueue.push(m); mcond.notify_one(); // notify } // release lock (at end of scope) } | cs |
condition_variable은 다양한 형태의 우아하고 효율적인 공유를 가능케 합니다. 하지만 약간은 교묘하죠.
5.3.5 Communicating Tasks
표준 라이브러리는 몇몇 기능을 제공합니다. 프로그래머들이 직접적으로 저레벨의 스레드와 락을 사용하는 것 대신, 개념적인 단계의 태스크로 연산하도록 도와주는 것이죠:
- futre와 promise는 분리된 스레드들의 태스크로부터 값을 반환받게 해줍니다.
- packaged_task는 결과를 반환하는 메커니즘을 연결해 태스크를 실행할 수 있도록 도와줍니다.
- async()는 함수를 호출하는 것과 매우 비슷한 방식의 태스크 실행 방식입니다.
이러한 기능들은 <future>에 존재합니다.
5.3.5.1 future and promise
future와 promise에서 중요한 점은 명시적으로 lock을 하지 않아도 두 태스크 간 값을 전달할 수 있게 만들어준다는 것입니다. "시스템"은 효율적으로 교환을 구현하죠. 기본적인 아이디어는 간단합니다: 태스크가 다른 태스크에게 값을 전달하고 싶을 때, 그는 promise에 값을 집어넣습니다. 어찌 됐든, 구현은 이에 해당하는 future로 그 값이 나타나도록 하게 하죠. future는 읽을 수 잇습니다. 이를 그림으로 나타내면 다음과 같죠:
만약 우리가 future<X> 변수 fx를 지녔다면, 우리는 X 자료형의 값을 get()할 수 있습니다.
1 | X v = fx.g et(); // if necessary, wait for the value to get computed | cs |
만약 값이 아직 future에 존재하지 않는다면, 해당 스레드는 future의 값이 채워질 때까지 기다리게 됩니다. 만약 값이 계산될 수 없다면, get()은 예외를 던질 수도 있습니다. 시스템으로부터, 혹은 해당 값을 집어 넣는 태스크로부터 던져지는 것이죠.
promise의 주요한 목적은 future의 get()과 대응하는 단순한 "put" 연산을 지원하는 것입니다. 이는 set_value()와 set_exception()이죠. futre와 promise의 이름의 기원은 역사적입니다. 저를 탓하지 마세요.
만약 promise를 가지고 있고, X 자료형의 결과를 future에게 전달하고 싶다면, 여러분들은 둘 중 하나를 수행하면 됩니다: 값을 전달하던가, 예외를 전달하세요:
1 2 3 4 5 6 7 8 9 10 11 12 13 | void f(promise<X>& px) // a task: place the result in px { // ... try { X res; // ... compute a value for res ... px.set_value(res); } catch (...) { // oops: couldn’t compute res // pass the exception to the future’s thread: px.set_exception(current_exception()); } } | cs |
future로부터 전달된 예외를 처리하기 위해, get()의 호출자는 이를 처리할 로직을 어딘가 준비해야 합니다:
1 2 3 4 5 6 7 8 9 10 11 | void g(future<X>& fx) // a task: get the result from fx { // ... try { X v = fx.g et(); // if necessary, wait for the value to get computed // ... use v ... } catch (...) { // oops: someone couldn’t compute v // ... handle error ... } } | cs |
5.3.5.2 packaged_task
결과를 생성해야 하는 스레드로부터 어떻게 필요한 future와 promise들을 얻을 수 있을까요? packaged_task 자료형은 future와 promise를 연결한 태스크들을 간단하게 제공합니다. packaged_task는 태스크로부터 promise에 반환값이나 예외를 넣는 wrapper 코드를 제공합니다. 만약 get_future를 호출하도록 하면, packaged_task는 그 promise에 해당하는 future를 건내줄 것입니다. 예를 들어, 우리는 vector<double>의 각 요소를 두 개의 태스크가 반절씩 분담해 더하는 것을 생각해볼 수 있습니다. 표준 라이브러리의 accumulate()를 활용하는 것이죠.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | double accum(double∗ beg, double ∗ end, double init) // compute the sum of [beg:end) starting with the initial value init { return accumulate(beg,end,init); } double comp2(vector<double>& v) { using Task_type = double(double∗,double∗,double); // type of task packaged_task<Task_type> pt0 {accum}; // package the task (i.e., accum) packaged_task<Task_type> pt1 {accum}; future<double> f0 {pt0.get_future()}; // get hold of pt0’s future future<double> f1 {pt1.get_future()}; // get hold of pt1’s future double∗ first = &v[0]; thread t1 {move(pt0),first,first+v.size()/2,0}; // star t a thread for pt0 thread t2 {move(pt1),first+v.size()/2,first+v.size(),0}; // star t a thread for pt1 // ... return f0.get()+f1.get(); // get the results } | cs |
packaged_task 템플릿은 템플릿 인자로서 태스크의 자료형, 생성자 인자로서 태스크를 받습니다. move() 연산은 packaged_task가 복사될 수 없기 때문에 필요합니다.
이 코드엔 명시적 lock을 하는 코드가 없음을 주목하세요: 우리는 태스크가 수행되었는지에 좀 더 집중할 수 있게 되었습니다. 그들이 통신하는 방식을 관리하는 메커니즘은 우리의 손을 떠났죠. 두 개의 태스크는 분리된 스레드에서 잠재적으로 평행하게 실행될 것입니다.
5.3.5.3 async()
이 장에서 제가 추구하는 방식은 가장 단순하지만 가장 강력한 방식이라고 생각합니다: 태스크를 다른 태스크들과 동시에 수행되는 함수로서 생각하는 것이죠. C++ 표준 라이브러리에서만 제공되는 모델이지만, 아주 넓은 범위에서 활용될 수 있습니다. 좀 더 미묘하고 까다로운 모델을 활용할 수 있습니다. 예를 들어 공유 메모리에 의존하는 프로그래밍 스타일이죠.
잠재적으로 비동기적이게 태스크를 실행하는 것입니다. 바로 async()죠:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | double comp4(vector<double>& v) // spawn many tasks if v is large enough { if (v.size()<10000) return accum(v.begin(),v.end(),0.0); auto v0 = &v[0]; auto sz = v.size(); auto f0 = async(accum,v0,v0+sz/4,0.0); // first quarter auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0); // second quarter auto f2 = async(accum,v0+sz/2,v0+sz∗3/4,0.0); // third quarter auto f3 = async(accum,v0+sz∗3/4,v0+sz,0.0); // fourth quarter return f0.get()+f1.get()+f2.get()+f3.get(); // collect and combine the results } | cs |
기본적으로, async()는 결과를 얻는 부분으로부터 함수의 호출부를 분리합니다. 그리고 그 둘을 태스크로 실행하죠. async()를 사용하면, 여러분들은 스레드와 lock에 대해 생각하지 않아도 됩니다. 대신, 이러한 태스크들이 비동기적으로 계산될 수 있음을 명심하기만 하면 되죠. 이는 명확한 한계가 있습니다: 자원 공유가 필요한 태스크를 async()로 해결하려 하지 마세요! async()를 사용 시, 얼마나 많은 스레드들이 이에 관여할지 알 수 없습니다. 호출 시 가용한 시스템의 자원을 이용해 결정되기 때문이죠. 예를 들어, async()는 가용한 코어(프로세서)를 확인해 얼마나 많은 스레드들을 활용할지 결정합니다.
async()는 단순히 성능 향상을 위한 병렬 계산만을 제공하는 메커니즘은 아님을 기억하세요. 예를 들어, 다른 무언가로 활성화된 "main program"으로부터 벗어나 사용자에게 정보를 얻어내는 태스크를 생성하는 데에도 사용할 수 있습니다.
댓글
댓글 쓰기