SRS之StateThreads学习

SRS之StateThreads学习

HolyZion

最近在看SRS的源码。SRS是基于协程开发的,底层使用了StateThreads。所以为了充分的理解SRS源码,需要先学习一下StateThreads。这里对StateThreads的学习做了一些总结和记录。

StateThreads是什么

StateThreads是一个用户级线程库,用于多线程编程。它提供了一种轻量级的线程模型,允许开发人员以更简单的方式编写并发程序。

StateThreads有什么用

StateThreads 的主要目标是提供一种高效的用户级线程模型,以减少线程切换和上下文切换的开销。它采用协作式调度策略,即线程在主动释放执行权之前不会被抢占。这种方式可以减少线程切换的开销,但也需要开发人员在适当的时机主动释放执行权,以避免长时间的阻塞导致程序响应性下降。

StateThreads 提供了一组简单的函数和宏,用于创建和管理线程、同步和通信等操作。它支持线程的创建、销毁、休眠、唤醒等基本操作,以及互斥锁、条件变量、信号量等同步机制。开发人员可以使用这些函数和宏来编写并发程序,而不需要直接操作操作系统提供的线程和同步原语。

总的来说,StateThreads是一个高性能、高并发、高扩展性和可读性的网络服务器架构。

StateThreads怎么用

下载

git clone -b srs https://github.com/ossrs/state-threads.git

编译

make linux-debug
编译完成后,将头文件导入需要使用到StateThreads的项目。并在编译项目时链接st库即可。

使用示例

示例一

下面是用StateThreads实现的一个简单的服务,可以监听客户端的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include <iostream>
#include <stdio.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>

#define LISTEN_PORT 8000

#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(-1); \
} while (0)

void *client_thread(void *arg)
{
st_netfd_t client_st_fd = (st_netfd_t)arg;
// 用于获取与 st_netfd_t 对象关联的文件描述符(File Descriptor)。它返回一个整数值,表示文件描述符的值。
// 将 st_netfd_t 对象转换为普通的文件描述符
int client_fd = st_netfd_fileno(client_st_fd);

sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
// 获取与套接字连接的对端的地址信息
int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);
if (ret == -1)
{
printf("[WARN] Failed to get client ip: %s\n", strerror(ret));
}

char ip_buf[INET_ADDRSTRLEN];
// 内存区域清零
memset(ip_buf, 0, sizeof(ip_buf));
inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,
sizeof(ip_buf));

while (1)
{
char buf[1024] = {0};
// 从给定的套接字中读取指定字节数的数据,并将其存储在提供的缓冲区 buf 中
ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);
if (ret == -1)
{
printf("client st_read error\n");
break;
}
else if (ret == 0)
{
printf("client quit, ip = %s\n", ip_buf);
break;
}

printf("recv from %s, data = %s", ip_buf, buf);

ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);
if (ret == -1)
{
printf("client st_write error\n");
}
}
}

void *listen_thread(void *arg)
// 监听
{
while (1)
{
st_netfd_t client_st_fd =
st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if (client_st_fd == NULL)
{
continue;
}

printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));

st_thread_t client_tid =
st_thread_create(client_thread, (void *)client_st_fd, 0, 0);
if (client_tid == NULL)
{
printf("Failed to st create client thread\n");
}
}
}

int main()
{
// 用于设置 ST 库的事件系统。
int ret = st_set_eventsys(ST_EVENTSYS_ALT);
if (ret == -1)
{
printf("st_set_eventsys use linux epoll failed\n");
}
// st初始化
ret = st_init();
if (ret != 0)
{
printf("st_init failed. ret = %d\n", ret);
return -1;
}
// 创建套接字
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1)
{
ERR_EXIT("socket");
}

int reuse_socket = 1;

// 设置套接字选项
ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,
sizeof(int));
if (ret == -1)
{
ERR_EXIT("setsockopt");
}

struct sockaddr_in server_addr; // 用于表示 IPv4 地址的结构体
server_addr.sin_family = AF_INET; // 地址族,一般为 AF_INET
server_addr.sin_port = htons(LISTEN_PORT); // 端口
server_addr.sin_addr.s_addr = INADDR_ANY; // ipv4地址结构
// 将套接字与特定的 IP 地址和端口号进行绑定
ret =
bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));
if (ret == -1)
{
ERR_EXIT("bind");
}

ret = listen(listen_fd, 128);
if (ret == -1)
{
ERR_EXIT("listen");
}
// st_netfd_open_socket() 是 State Threads (ST) 库中的一个函数,用于创建一个 st_netfd_t 类型的文件描述符对象,以便进行异步 I/O 操作。
st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);
if (!st_listen_fd)
{
printf("st_netfd_open_socket open socket failed.\n");
return -1;
}

// 创建线程监听来一个建立连接的请求
st_thread_t listen_tid =
st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);
if (listen_tid == NULL)
{
printf("Failed to st create listen thread\n");
}

while (1)
{
st_sleep(1);
}

return 0;
}

在这里插入图片描述

示例二

StateThreads创建多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <stdio.h>
#include <st.h>
#include <string>

void *do_calc(void *arg)
{
int sleep_ms = (int)(long int)(char *)arg * 10;

for (;;)
{
printf("in sthread #%dms\n", sleep_ms);
st_usleep(sleep_ms * 1000);
}

return NULL;
}

int main(int argc, char **argv)
{
if (argc <= 1)
{
printf("Test the concurrence of state-threads!\n"
"Usage: %s <sthread_count>\n"
"eg. %s 10000\n",
argv[0], argv[0]);
return -1;
}

if (st_init() < 0)
{
printf("error!");
return -1;
}

int i;
int count = std::stoi(argv[1]);
for (i = 1; i <= count; i++)
{
if (st_thread_create(do_calc, (void *)i, 0, 0) == NULL)
{
printf("error!");
return -1;
}
}

st_thread_exit(NULL);

return 0;
}

关于StateThreads的运行原理,可以看文章《SRS开源直播服务 - StateThreads微线程框架学习

SRS中的StateThreads

使用的源码为SRS4.0

系统架构图:
在这里插入图片描述
在SRS的源码中,StateThreads在srs_st_init()函数中完成初始化。具体的调用流程如下。
SRS的main函数在文件srs_main_server.cpp中。
srs_main_server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
......
int main(int argc, char** argv)
{
srs_error_t err = do_main(argc, argv);
......
}

srs_error_t do_main(int argc, char** argv)
{
srs_error_t err = srs_success;

// Initialize global or thread-local variables.
if ((err = srs_thread_initialize()) != srs_success) {
return srs_error_wrap(err, "thread init");
}
......
}

srs_app_threads.cpp

1
2
3
4
5
6
7
8
9
10
11
12
......
srs_error_t srs_thread_initialize()
{
srs_error_t err = srs_success;
......
// Initialize ST, which depends on pps cids.
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}
......
}
......

srs_service_st.cpp

1
2
3
4
5
6
7
8
9
......
srs_error_t srs_st_init()
{
......
int r0 = 0;
if((r0 = st_init()) != 0){
return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
}
......

在srs_service_st.cpp中调用StateThreads库的初始化函数,完成StateThreads的初始化。

  • 标题: SRS之StateThreads学习
  • 作者: HolyZion
  • 创建于 : 2023-07-12 00:00:00
  • 更新于 : 2023-11-18 00:11:37
  • 链接: https://holyzion.host/2023/07/12/SRS之StateThreads学习/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。