文章

实现数据库连接池-后传

1.引言

这篇文章是总结连接池所用到的技术点

2.单例模式

单例模式可以保证在整个应用程序中只有一个实例,这样可以避免多个实例对同一资源的访问冲突。在实现数据库连接池时,使用单例模式可以保证整个应用程序中只有一个连接池,这样可以更好地管理和分配数据库连接

单例模式目的是确保一个类只有一个实例,并提供一个全局访问点。在C++中,可以通过以下方式实现单例模式:

  1. 将构造函数、拷贝构造函数和赋值运算符声明为私有,以防止外部创建实例或复制实例。
  2. 在类中定义一个静态私有成员变量,用来存储唯一的实例。
  3. 提供一个公共的静态方法,用于获取唯一的实例。在这个方法中,如果实例不存在,则创建一个新的实例并返回;否则直接返回已有的实例

现在举个简单例子来说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Singleton {
private:
    static Singleton* instance;
    Singleton() {}
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;

public:
    static Singleton* getInstance() {
        if (instance == nullptr) {
            instance = new Singleton();
        }
        return instance;
    }
};

Singleton* Singleton::instance = nullptr;

这种实现方式并不是线程安全的,如果需要在多线程环境中使用单例模式,可以使用双重检查锁定(double-checked locking)或其他线程安全的方法来保证线程安全

其中Singleton* Singleton::instance = nullptr; 这一行代码定义了一个静态成员变量 instance,并将其初始化为 nullptr。这意味着在程序开始运行时,Singleton 类的唯一实例尚未创建

静态成员变量是属于类的,而不是属于某个特定的对象。这意味着无论创建多少个 Singleton 对象,它们都共享同一个 instance 变量。在这种情况下,由于构造函数是私有的,所以无法直接创建 Singleton 对象,因此 instance 变量将始终指向唯一的实例

getInstance() 方法中,如果 instance 变量为 nullptr,则会创建一个新的 Singleton 实例并将其地址赋给 instance 变量。否则,直接返回 instance 变量指向的实例

3.多线程中单例模式的线程安全

先简单介绍双重检查锁定:是一种用于保证线程安全的方法。它的基本思想是在 getInstance() 方法中使用两次检查来避免不必要的加锁。第一次检查在加锁之前进行,如果实例已经存在,则直接返回;否则再加锁并进行第二次检查,以确保实例仍然不存在,然后再创建实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <mutex>

class Singleton {
private:
    static Singleton* instance;
    static std::mutex mtx;
    Singleton() {}
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;

public:
    static Singleton* getInstance() {
        if (instance == nullptr) {
            std::lock_guard<std::mutex> lock(mtx);
            if (instance == nullptr) {
                instance = new Singleton();
            }
        }
        return instance;
    }
};

Singleton* Singleton::instance = nullptr;
std::mutex Singleton::mtx;

在这个示例中,我们使用了一个静态的 std::mutex 对象来保护对 instance 变量的访问。当多个线程同时调用 getInstance() 方法时,只有一个线程能够获得锁并进入临界区,其他线程将被阻塞。在临界区内,我们再次检查 instance 变量是否为 nullptr,如果是,则创建一个新的实例。

需要注意的是,这种方法在 C++11 及更高版本中才能正确工作,因为 C++11 引入了内存模型,保证了静态局部变量的初始化是线程安全的。在早期版本的 C++ 中,双重检查锁定可能会由于编译器优化而失效

有人不理解什么是临界区,以及为什么要检查两遍instance,原因是这样的

临界区是指在多线程环境中,多个线程可能同时访问同一段代码或数据的区域。如果不加以保护,这可能会导致数据竞争和不一致的结果。为了避免这种情况,我们通常使用锁来保护临界区,确保同一时间只有一个线程能够进入临界区。

在上面的示例中,临界区是指 getInstance() 方法中加锁后的代码块。在这个代码块中,我们再次检查 instance 变量是否为 nullptr,然后再创建实例。

检查两遍 instance 变量是否为 nullptr 的原因是为了避免不必要的加锁。加锁是一个耗时的操作,如果每次调用 getInstance() 方法都需要加锁,那么性能将受到影响。因此,在加锁之前,我们先进行一次检查,如果实例已经存在,则直接返回,无需加锁。

但是,如果多个线程同时调用 getInstance() 方法,并且实例尚未创建,那么它们都会通过第一次检查并进入临界区。为了避免创建多个实例,我们需要在临界区内再次检查 instance 变量是否为 nullptr。如果仍然为 nullptr,则创建一个新的实例;否则直接返回已有的实例。

这就是为什么要检查两遍 instance 变量是否为 nullptr 的原因

4.C++中的锁机制

加锁是一种用于保护临界区的方法。它的基本思想是使用一个锁来控制对临界区的访问。当一个线程需要进入临界区时,它必须先获得锁;当它离开临界区时,它必须释放锁。如果锁已经被其他线程占用,那么当前线程将被阻塞,直到锁被释放。

C++11 引入了多线程支持,包括对锁的支持。C++11 中提供了多种类型的锁,包括 std::mutexstd::recursive_mutexstd::timed_mutexstd::recursive_timed_mutex

std::mutex 是最基本的一种锁,它提供了两个方法:lock()unlock()。当一个线程需要进入临界区时,它可以调用 lock() 方法来获得锁;当它离开临界区时,它必须调用 unlock() 方法来释放锁。

下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <mutex>
#include <thread>

std::mutex mtx;

void print() {
    std::lock_guard<std::mutex> lock(mtx);
    std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t1(print);
    std::thread t2(print);

    t1.join();
    t2.join();

    return 0;
}

在这个示例中,我们定义了一个全局的 std::mutex 对象 mtx 来保护对 std::cout 的访问。在 print() 函数中,我们使用了 std::lock_guard 来自动管理锁的生命周期。当 std::lock_guard 对象创建时,它会自动调用 mtx.lock() 来获得锁;当它销毁时,它会自动调用 mtx.unlock() 来释放锁。

这样,当两个线程同时调用 print() 函数时,只有一个线程能够获得锁并进入临界区,另一个线程将被阻塞。这样就避免了数据竞争和不一致的结果。

t1.join()t2.join() 这两行代码分别用于等待线程 t1t2 结束。join() 方法会阻塞当前线程,直到被调用的线程结束。

在上面的示例中,main() 函数中创建了两个线程 t1t2,它们都执行 print() 函数。当 main() 函数执行到 t1.join() 时,它会阻塞并等待线程 t1 结束。当线程 t1 结束后,main() 函数继续执行,并调用 t2.join() 来等待线程 t2 结束。当线程 t2 结束后,main() 函数继续执行,并最终退出。

使用 join() 方法可以确保在线程结束之前不会退出程序。如果不调用 join() 方法,那么主线程可能会在其他线程结束之前退出,导致未定义的行为

这段代码创建了两个线程 t1t2,它们都执行 print() 函数。由于我们使用了锁来保护对 std::cout 的访问,所以两个线程的输出不会交叉。因此,这段代码的一个可能的输出结果是:

1
2
Hello from thread 1
Hello from thread 2

或者:

1
2
Hello from thread 2
Hello from thread 1

具体的输出顺序取决于线程的调度顺序,这是不确定的。

std::thread t1(print);std::thread t2(print); 这两行代码分别创建了两个线程 t1t2。它们都执行 print() 函数,但是它们是两个独立的线程,它们之间没有直接的关系。除了执行相同的函数之外,它们没有其他区别。

如果不加锁,那么两个线程可能会同时访问 std::cout,导致输出结果交叉。例如,一个可能的输出结果是:

1
2
HeHlleollo  ffrroomom  tthhrreeaadad  11
Hello from thread 2

由于线程的调度顺序是不确定的,所以输出结果也是不确定的。在实际应用中,不加锁可能会导致数据竞争和不一致的结果,因此应该避免这种情况。

“HeHlleollo ffrroomom tthhrreeaadad 11” 是一个可能的输出结果,用来说明当两个线程同时访问 std::cout 时可能会发生什么。

在这个例子中,两个线程同时执行 print() 函数,它们都试图向 std::cout 输出字符串 “Hello from thread X”(其中 X 是线程的 ID)。由于没有加锁来保护对 std::cout 的访问,所以两个线程的输出可能会交叉。

例如,线程 1 可能输出了 “H”,然后线程 2 输出了 “H”,然后线程 1 输出了 “e”,然后线程 2 输出了 “e”,以此类推。最终的结果就是 “HeHlleollo ffrroomom tthhrreeaadad 11”。

需要注意的是,这只是一个可能的输出结果。实际的输出结果取决于线程的调度顺序和操作系统的实现细节,它是不确定的。

5.多线程

既然都讲到这里了,再简单说下C++的多线程

多线程是指在一个程序中同时运行多个线程来完成不同的任务。每个线程都有自己的指令指针、寄存器和栈,但是它们共享同一个地址空间和其他资源,如打开的文件和全局变量

C++11 引入了对多线程的支持,包括 std::thread 类和相关的同步原语,如 std::mutexstd::condition_variable。使用这些类和函数,可以在 C++ 程序中创建和管理多个线程

下面是一个简单的示例,演示如何在 C++ 中创建和使用多个线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>

void print(int n) {
    for (int i = 0; i < n; ++i) {
        std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
    }
}

int main() {
    std::thread t1(print, 3);
    std::thread t2(print, 5);

    t1.join();
    t2.join();

    return 0;
}

在这个示例中,我们定义了一个 print() 函数,它接受一个整数参数 n,并输出 n 行文本。在 main() 函数中,我们创建了两个线程 t1t2,它们分别执行 print(3)print(5)。然后我们调用 join() 方法来等待两个线程结束。

这段代码的一个可能的输出结果是:

1
2
3
4
5
6
7
8
Hello from thread 1
Hello from thread 1
Hello from thread 1
Hello from thread 2
Hello from thread 2
Hello from thread 2
Hello from thread 2
Hello from thread 2

或者

1
2
3
4
5
6
7
8
Hello from thread 2
Hello from thread 2
Hello from thread 2
Hello from thread 2
Hello from thread 2
Hello from thread 1
Hello from thread 1
Hello from thread 1

具体的输出顺序取决于线程的调度顺序,这是不确定的

6.lambda

lambda 函数是 C++11 引入的一种新特性,它允许你定义一个匿名的函数对象,可以用来作为参数传递给其他函数或算法。lambda 函数的语法非常简洁,可以让你在不定义完整函数的情况下快速实现简单的功能。

lambda 函数的语法如下:

1
[capture list] (parameters) -> return type { function body }

其中,capture list 是捕获列表,用于指定 lambda 函数可以访问哪些外部变量;parameters 是参数列表,与普通函数的参数列表相同;return type 是返回类型,可以省略,编译器会自动推断;function body 是函数体,包含 lambda 函数的实现。

下面是一个简单的示例,演示如何使用 lambda 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <vector>
#include <algorithm>

int main() {
    std::vector<int> v = {1, 2, 3, 4, 5};

    int sum = 0;
    std::for_each(v.begin(), v.end(), [&sum](int x) {
        sum += x;
    });

    std::cout << "Sum: " << sum << std::endl;

    return 0;
}

在这个示例中,我们定义了一个 std::vector 对象 v,并使用 std::for_each() 算法遍历它。std::for_each() 算法接受一个函数对象作为参数,并对容器中的每个元素调用这个函数对象。

我们使用 lambda 函数作为 std::for_each() 算法的第三个参数。这个 lambda 函数接受一个整数参数 x,并将其累加到外部变量 sum 中。由于我们需要在 lambda 函数中修改外部变量 sum 的值,所以我们在捕获列表中使用了引用捕获 [&sum]

这段代码的输出结果是:

1
Sum: 15

注意捕获列表是可选的,如果不需要访问外部变量,那么可以省略捕获列表。例如:

1
2
3
std::thread t([](){
    std::cout << "Hello from thread" << std::endl;
});

在这个示例中,我们创建了一个线程 t,它执行一个没有捕获列表的 lambda 函数。这个 lambda 函数输出一行文本,然后结束。

7.atomic_int

atomic_int 是一个原子类型,它可以在多线程环境中安全地进行读写操作,而不会出现数据竞争

可以把 atomic_int 想象成一个保险箱,多个人可以同时往里面存钱或取钱,但是每次只能有一个人操作,其他人必须等待。这样就可以保证每个人的操作都是安全的

下面是一个简单的代码例子,演示了如何使用 atomic_int 来实现多线程安全的计数器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic_int counter(0);

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        ++counter;
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.push_back(std::thread(increment, 1000));
    }

    for (auto& th : threads) {
        th.join();
    }

    std::cout << "Counter: " << counter << '\n';

    return 0;
}

这段代码创建了一个名为 counteratomic_int 类型的变量,它的初始值为0。increment 函数接受一个整数参数 n,表示需要对计数器进行递增的次数。在函数内部,有一个循环,每次循环都会对计数器进行递增操作。

main 函数中,首先创建了一个 threads 向量,用于存储线程对象。然后,使用循环创建了10个线程,每个线程都调用 increment 函数,并传入参数 1000,表示每个线程都需要对计数器进行1000次递增操作。

接下来,使用另一个循环等待所有线程执行完毕。最后,输出计数器的值。

由于计数器是一个 atomic_int 类型,所以每次递增操作都是原子的,不会出现数据竞争。因此,在所有线程执行完毕后,计数器的值应该为10000。

有人会对下面代码进行疑惑

1
2
3
for (auto& th : threads) {
        th.join();
    }

这段代码中的 for 循环用于等待所有线程执行完毕。join 函数会阻塞当前线程,直到被调用的线程执行完毕。在这个例子中,main 函数中的 for 循环会依次调用每个线程对象的 join 函数,等待所有线程执行完毕

这样做的目的是确保所有线程都完成了对计数器的递增操作,才输出计数器的最终值

8.condition_variable

condition_variable 是一个条件变量,它用于实现线程间的同步。它通常与互斥锁一起使用,以便在等待某个条件时释放锁,并在条件满足时重新获取锁。

可以把 condition_variable 想象成一个餐厅的服务铃。当顾客需要服务时,他会按响服务铃,服务员就会过来为他服务。如果服务员正在忙,顾客就必须等待。当服务员完成工作后,他会检查服务铃是否响过,如果响过,他就会去为顾客服务。

下面是一个简单的代码例子,演示了如何使用 condition_variable 来实现线程间的同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
bool ready = false;

void print_id(int id) {
    std::unique_lock<std::mutex> lk(m);
    while (!ready) {
        cv.wait(lk);
    }
    std::cout << "thread " << id << '\n';
}

void go() {
    std::unique_lock<std::mutex> lk(m);
    ready = true;
    cv.notify_all();
}

int main() {
    std::thread threads[10];
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(print_id, i);

    std::cout << "10 threads ready to race...\n";
    go();

    for (auto& th : threads) th.join();

    return 0;
}

在这个例子中,有10个线程等待一个条件变量。当主线程调用 go 函数时,它会通知所有等待的线程继续执行

这段代码定义了一个互斥锁 m,一个条件变量 cv 和一个布尔变量 readyprint_id 函数接受一个整数参数 id,表示线程的编号。在函数内部,首先创建了一个 unique_lock 对象 lk,用于管理互斥锁。然后,使用 while 循环检查 ready 变量的值。如果 readyfalse,则调用条件变量的 wait 函数等待条件满足。当条件满足时(即 ready 变为 true),循环结束,输出线程编号。

go 函数用于通知所有等待的线程继续执行。在函数内部,首先创建了一个 unique_lock 对象 lk,用于管理互斥锁。然后,将 ready 变量的值设为 true,表示条件已经满足。最后,调用条件变量的 notify_all 函数通知所有等待的线程。

main 函数中,首先创建了一个名为 threads 的数组,用于存储线程对象。然后,使用循环创建了10个线程,每个线程都调用 print_id 函数,并传入一个整数参数作为线程编号。

接下来,输出一行提示信息,表示所有线程都已经准备好。然后,调用 go 函数通知所有等待的线程继续执行。

最后,使用一个循环等待所有线程执行完毕。

在这个例子中,所有线程都在等待一个条件变量。当主线程调用 go 函数时,它会通知所有等待的线程继续执行

这里可能有人会问:notify_all唤醒等待的线程后,是所有线程同时执行吗,还是说一个个执行?

当调用条件变量的 notify_all 函数唤醒所有等待的线程时,这些线程会从等待状态变为就绪状态。然后,操作系统调度器会根据其调度策略选择就绪状态的线程并将其调度到可用的处理器上运行。

这意味着,被唤醒的线程并不是同时开始执行,而是由操作系统调度器决定它们的执行顺序。在多处理器系统中,被唤醒的线程可能会并行执行;而在单处理器系统中,被唤醒的线程只能串行执行。

因此,被唤醒的线程的执行顺序和并行性取决于操作系统调度器和硬件环境

那会不会并行执行就不安全了?

并行执行线程本身并不会导致线程不安全。线程安全问题通常发生在多个线程访问共享数据时,如果对共享数据的访问没有进行适当的同步,就可能出现数据竞争和其他线程安全问题。

在上面给出的 condition_variable 的例子中,所有线程都在等待一个条件变量。当主线程调用 go 函数时,它会通知所有等待的线程继续执行。由于每个线程都只访问自己的局部变量,而不访问共享数据,所以这个例子中的线程是安全的,即使它们并行执行也不会出现问题。

如果多个线程需要访问共享数据,那么就需要使用同步机制(如互斥锁、原子变量等)来保证线程安全。这样,即使多个线程并行执行,也不会出现数据竞争和其他线程安全问题

每个线程都只访问自己的局部变量,这里的局部变量是什么?

局部变量是指在函数内部定义的变量。它们只能在定义它们的函数内部访问,当函数执行完毕后,它们就会被销毁。

在上面给出的 condition_variable 的例子中,print_id 函数中定义了一个名为 lk 的局部变量,它是一个 unique_lock 对象,用于管理互斥锁。这个变量只能在 print_id 函数内部访问,当函数执行完毕后,它就会被销毁。

由于每个线程都调用了一次 print_id 函数,所以每个线程都有自己的 lk 变量。这些变量彼此独立,互不影响。因此,即使多个线程并行执行,也不会出现数据竞争和其他线程安全问题

9.bind

std::bind 是一个函数模板,它可以将一个函数或函数对象与其参数绑定在一起,生成一个新的可调用对象。这个新的可调用对象可以使用固定的参数值或占位符来调用原始函数。

在你给出的代码中,std::bind(&ConnectionPool::produceConnectionTask, this) 将成员函数 produceConnectionTask 与对象 this 绑定在一起,生成一个新的可调用对象。当这个可调用对象被调用时,它会调用 this->produceConnectionTask()

下面是一个简单的例子,演示了如何使用 std::bind 来绑定成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <functional>

struct Foo {
    void print(int x) {
        std::cout << x << '\n';
    }
};

int main() {
    Foo foo;
    auto f = std::bind(&Foo::print, &foo, std::placeholders::_1);
    f(42); // 调用 foo.print(42)
    return 0;
}

在这个例子中,我们使用 std::bind 将成员函数 Foo::print 与对象 &foo 绑定在一起,并使用占位符 _1 表示第一个参数。然后,我们调用新生成的可调用对象 f,并传入参数 42。这相当于调用 foo.print(42)

thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));这行代码将来怎么调用呢

这行代码创建了一个名为 producestd::thread 对象。它使用 std::bind 将成员函数 ConnectionPool::produceConnectionTask 与对象 this 绑定在一起,生成一个新的可调用对象。然后,将这个可调用对象传递给 std::thread 构造函数,以在线程中调用 this->produceConnectionTask()

当这行代码执行完毕后,新创建的线程就会开始执行。它会调用 this->produceConnectionTask() 函数,并在函数执行完毕后退出。

如果需要等待线程执行完毕,可以调用 produce.join() 函数。这个函数会阻塞当前线程,直到 produce 线程执行完毕。

例如:

1
2
3
thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
// 等待线程执行完毕
produce.join();

在上面的代码中,我们首先创建了一个名为 produce 的线程对象。然后,调用它的 join 函数等待线程执行完毕。

不过经常看到join,那为什么要join呢

在多线程编程中,通常会创建多个线程来并行执行不同的任务。这样可以提高程序的执行效率,充分利用多核处理器的计算能力。

在某些情况下,主线程需要等待其他线程执行完毕后再继续执行。例如,主线程可能需要等待其他线程完成计算后才能汇总结果。这时,可以使用 std::thread 类的 join 函数来等待线程执行完毕。

当然,也可以不使用 join 函数,而是让主线程一直等待,直到其他线程执行完毕。但是这样做会浪费主线程的计算资源,降低程序的执行效率。使用 join 函数可以让主线程在等待时释放处理器资源,让其他线程更快地执行。

此外,join 函数还可以确保线程在析构前已经执行完毕。如果在线程对象析构时,线程仍然在运行,则程序会终止并报错。因此,在销毁线程对象前调用 join 函数是一种良好的编程实践

本文由作者按照 CC BY 4.0 进行授权