szu系统编程实验

本地实现单机多用户聊天系统,架构图如下:

image-20230104123824145

实现功能:

  • 众所周知的命名管道
  • 用户登录时,4次密码错误账号锁定10分钟。
  • 为每个用户建立独立日志文件,日志文件收集用户事件信息
  • 服务器采用多路复用(epoll)监听管道
  • 服务器为守护进程
  • 线程安全
  • 线程池

文件目录树:

img

IO多路复用

IO多路复用是一种同步的IO模型。 利用IO多路复用模型,可以实现一个线程监视多个文件句柄。
以上图的模型来说,就是我们不需要创造4个线程去分别监听每一个FIFO管道,只需要用一个epoll就能同时监听四个管道了。

epoll

epoll是为了处理大批量句柄而引进的一种poll,只有epoll_create,epoll_ctl,epoll_wait 3个系统调用。

int epoll_create(int size);

创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

第一个参数是epoll_create()的返回值。

第二个参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD:注册新的fd到epfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是需要监听的fd。

第四个参数是告诉内核需要监听什么事

events可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
data成员是一个epoll_data联合,其定义如下:

typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll监控的事件中已经发送的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。

Epoll的2种工作方式-水平触发(LT)和边缘触发(ET)

ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;

而LT模式是只要有数据没有处理就会一直通知下去的.

使用模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //有新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{
n = read(sockfd, line, MAXLINE)) < 0 //读
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}

clieninfo.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#ifndef _CLIENTINFO_H
#define _CLIENTINFO_H

#define REG_FIFO "/home/ubuntu/oslab/serverinfo/the_register_FIFO"
#define LOGIN_FIFO "/home/ubuntu/oslab/serverinfo/the_login_FIFO"
#define MSG_FIFO "/home/ubuntu/oslab/serverinfo/the_msg_FIFO"
#define LOGOUT_FIFO "/home/ubuntu/oslab/serverinfo/the_logout_FIFO"


#define SERVER_NAME "chat_server"

#define LOGIN_SUCCESS 0x400
#define LOGIN_FAILED 0x401
#define REGIST_SUCCESS 0x402
#define REGIST_FAILED 0x403
#define SEND_SUCCESS 0x405
#define SEND_FAILED 0x404
#define LOGOUT_SUCCESS 0x406
#define LOGOUT_FAILED 0x407
#define LOGIN_OVERTIMES 0x408

// typedef union {
// char receiver[0x50];
// int oneline_num;
// } receive;

typedef struct{
char sender[0x50];
char receiver[0x50];
char message[0x200];
}MESSAGE_INFO;

typedef struct{
char userfifo[0x50];
char statfifo[0x50];
char username[0x50];
char passwd[0x50];
}REG_LOGIN_INFO;

typedef struct{
char userfifo[0x50];
char statfifo[0x50];
char username[0x50];
}LOGOUT_INFO;

int readline(int fd,char *str){
int n;
do{
n=read(fd,str,1);
}while(n>0 && (*str++ != '\x00'));
return (n>0);
}
void errorinfo(char *str){
printf("\033[31m %s \033[0m\n",str);
exit(1);
}

char* itoa(int value,char str[],int radix)
{
char temp[33];
char *tp = temp;
int i;
unsigned v;
int sign;
char *sp;
if(radix > 36 || radix < 1)
return 0;
sign = (radix == 10 && value < 0); //十进制负数
if(sign)
v = -value;
else
v = (unsigned)value;
while(v || tp == temp) //转化操作
{
i = v % radix;
v = v / radix;
if(i < 10)
*tp++ = i + '0';
else
*tp++ = i + 'a' - 10;
}
if(str == 0)
str = (char*)malloc((tp - temp) + sign + 1);
sp = str;
if(sign) //是负数的话把负号先加入数组
*sp++ = '-';
while(tp > temp)
*sp++ = *--tp;
*sp = 0;

return str;
}

void dealstr(char *str){
char *p=str;
while((*p)!='\x00'){c
if(*p=='\n')
*p='\x00';
p++;
}
}

#endif

client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#gcc client.c -o client  -pthread -w

#include <ctype.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>

#include "clientinfo.h"

int regist_fd, login_fd, msg_fd, logout_fd;
int userfifo_fd, statfifo_fd;
char username[0x50];
char userfifo[0x50];
char passwd[0x50];
char statfifo[0x50];
int online_user_num;
int is_login = 0;
// don't save the passwd

void create_fifo(char* str) {
int res;
if (access(str, F_OK) == -1) {
res = mkfifo(str, 0777);
if (res < 0)
errorinfo("[x] mkfifo failed");
}
}

void clinet_init() {
regist_fd = open(REG_FIFO, O_RDWR | O_NONBLOCK);
if (regist_fd < 0)
errorinfo("[x] open regist FIFO failed");
login_fd = open(LOGIN_FIFO, O_RDWR | O_NONBLOCK);
if (login_fd < 0)
errorinfo("[x] open login FIFO failed");
logout_fd = open(LOGOUT_FIFO, O_RDWR | O_NONBLOCK);
if (logout_fd < 0)
errorinfo("[x] open logout FIFO failed");
msg_fd = open(MSG_FIFO, O_RDWR | O_NONBLOCK);
if (msg_fd < 0)
errorinfo("[x] open msg FIFO failed");
}

void menu() {
puts("\033[32m --------------------CLIENT-------------------- \033[0m");
puts("\033[32m 1. register \033[0m");
puts("\033[32m 2. login \033[0m");
puts("\033[32m 3. exit \033[0m");
printf("\033[32m Welcome to the chatroom , pls take a choice : \033[0m");
// puts("\033[31m \033[0m");
// puts("\033[31m \033[0m");
}

void chat_menu() {
printf(
"\033[32m --------------------CLIENT-------------------- user:%s "
"\033[0m\n",
username);
puts("\033[32m 1. start chat \033[0m");
puts("\033[32m 2. logout \033[0m");
puts("\033[32m 3. get online user info \033[0m");
printf("\033[32m Welcome to the chatroom , pls take a choice : \033[0m");
}

void login() {
// char temp_passwd[0x50];
char buffer[10];
puts("\033[32m --------------------CLIENT-------------------- \033[0m");
printf("\033[32m username: \033[0m");
// fflush(stdout);
// fgets(username,0x50,stdin);
scanf("%s", username);
printf("\033[32m passwd: \033[0m");
// fgets(passwd,0x50,stdin);
scanf("%s", passwd);
REG_LOGIN_INFO login_info;
strcpy(login_info.username, username);
strcpy(login_info.passwd, passwd);
strcpy(login_info.userfifo, userfifo);
strcpy(login_info.statfifo, statfifo);
write(login_fd, &login_info, sizeof(REG_LOGIN_INFO));
while (1) {
int n = read(statfifo_fd, buffer, 10);
if (n > 0) {
int result = atoi(buffer);
if (result == LOGIN_SUCCESS) {
sprintf(userfifo, "/home/ubuntu/oslab/clientinfo/userfifo/client_%s_FIFO",
username);
sprintf(statfifo, "/home/ubuntu/oslab/clientinfo/statfifo/client_%s_stat",
username);
is_login = 1;
puts("\033[32m Login Success \033[0m");
} else if (result == LOGIN_FAILED) {
puts("\033[31m Login failed ,pls try again \033[0m");
} else if (result == LOGIN_OVERTIMES) {
puts("\033[31m Login overtimes , pls try at least 10 minutes later ... \033[0m");
} else {
printf("login error : 0x%x\n", result);
exit(1);
}
break;
}
}
}
void regist() {
char buffer[10];
int is_regist = 0;
puts("\033[32m --------------------CLIENT-------------------- \033[0m");
printf("\033[32m create username: \033[0m");
// fflush(stdout);
// fgets(username,0x50,stdin);
scanf("%s", username);

printf("\033[32m create passwd: \033[0m");
// fflush(stdout);
// fgets(passwd,0x50,stdin);
scanf("%s", passwd);

REG_LOGIN_INFO regist_info;
strcpy(regist_info.username, username);
strcpy(regist_info.passwd, passwd);


sprintf(userfifo, "/home/ubuntu/oslab/clientinfo/userfifo/client_%s_FIFO", username);
create_fifo(userfifo);
userfifo_fd = open(userfifo, O_RDWR | O_NONBLOCK);
if (userfifo_fd < 0)
errorinfo("[x] open client FIFO failed");

sprintf(statfifo, "/home/ubuntu/oslab/clientinfo/statfifo/client_%s_stat", username);
create_fifo(statfifo);
statfifo_fd = open(statfifo, O_RDWR | O_NONBLOCK);
if (statfifo_fd < 0)
errorinfo("[x] open stat FIFO failed");

strcpy(regist_info.userfifo, userfifo);
strcpy(regist_info.statfifo, statfifo);
write(regist_fd, &regist_info, sizeof(REG_LOGIN_INFO));

while (1) {
int n = read(statfifo_fd, buffer, 10);
if (n > 0) {
int result = atoi(buffer);
if (result == REGIST_SUCCESS) {
is_regist = 1;
puts("\033[32m Regist Success \033[0m");
} else if (result == REGIST_FAILED) {
close(userfifo_fd);
close(statfifo_fd);
memset(userfifo, 0, 0x50);
memset(statfifo, 0, 0x50);
puts("\033[31m Regist failed ,pls try again \033[0m");
} else {
printf("regist error : 0x%x\n", result);
exit(1);
}
memset(username, 0, 0x50);
// memset(userfifo,0,0x50);
memset(passwd, 0, 0x50);
break;
}
}
}
void chat() {
char buffer[10];
char receiver_name[0x50];
char message[0x200];
int is_send = 0;
puts("\033[32m --------------------CLIENT-------------------- \033[0m");
printf("\033[32m Hello %s , start your chat ... \033[0m", username);

printf("\033[32m receiver name: \033[0m");
scanf("%s", receiver_name);
printf("\033[32m your message: \033[0m");
scanf("%s", message);

MESSAGE_INFO msg_info;
strcpy(msg_info.sender, username);
strcpy(msg_info.message, message);
strcpy(msg_info.receiver, receiver_name);
write(msg_fd, &msg_info, sizeof(MESSAGE_INFO));

while (1) {
int n = read(statfifo_fd, buffer, 10);
if (n > 0) {
int result = atoi(buffer);
if (result == SEND_SUCCESS) {
is_send = 1;
puts("\033[32m Send Success \033[0m");
} else if (result == SEND_FAILED) {
is_send = 0;
puts("\033[31m Send failed ,pls try again \033[0m");
} else {
printf("send error : 0x%x\n", result);
exit(1);
}
break;
}
}
}

void logout() {
char buffer[10];
char receiver_name[0x50];
char message[0x200];

puts("\033[32m --------------------CLIENT-------------------- \033[0m");
puts("\033[32m logout ... \033[0m");

LOGOUT_INFO logout_info;
strcpy(logout_info.username, username);
strcpy(logout_info.userfifo, userfifo);
strcpy(logout_info.statfifo, statfifo);
write(logout_fd, &logout_info, sizeof(LOGOUT_INFO));
while (1) {
int n = read(statfifo_fd, buffer, 10);
if (n > 0) {
int result = atoi(buffer);
if (result == LOGOUT_SUCCESS) {

is_login = 0;
puts("\033[32m Logout Success \033[0m");
} else if (result == LOGOUT_FAILED) {
puts("\033[31m Logout failed ,pls try again \033[0m");
} else {
puts("\033[31m Logout error ,pls try again \033[0m");
}
break;
}
}
}

void get_onelineinfo(){
MESSAGE_INFO msg_info;
strcpy(msg_info.sender, username);
strcpy(msg_info.receiver, SERVER_NAME);
int n=write(msg_fd, &msg_info, sizeof(MESSAGE_INFO));
if(n<0){
puts("\033[31m Get online info failed ,pls try again \033[0m");
}
}


void* receive_thread() {
// char buffer[sizeof(MESSAGE_INFO)];
MESSAGE_INFO msg_info;
while (1) {
int n = read(userfifo_fd, &msg_info, sizeof(MESSAGE_INFO));
if (n > 0) {
/* normal msg */
if(strcmp(msg_info.sender,SERVER_NAME)){
printf("\033[34m\n Receiver mssage from %s : %s\n \033[0m",
msg_info.sender, msg_info.message);
}else{
/* online info */
online_user_num = atoi(msg_info.receiver);
printf("\033[34m\n Oneline user num : %d \n\033[0m",
online_user_num);
printf("\033[34m Oneline user name list : \033[0m"
);
puts(msg_info.message);
}
}
}
}

int main(int argc, char* argv[]) {
int choice, n;
clinet_init();
while (1) {
menu();
scanf("%d", &choice);
switch (choice) {
case 1:
regist();
// four times
break;
case 2:
login();
break;
case 3:
exit(0);
default:
puts("\033[31m Invalid choice ,pls try again \033[0m");
}
if (is_login) {
pthread_t ptd;
pthread_create(&ptd, NULL, receive_thread, NULL);
while (1) {
int chat_choice;
chat_menu();
scanf("%d", &chat_choice);
switch (chat_choice) {
case 1:
chat();
break;
case 2:
logout();
break;
case 3:
get_onelineinfo();
break;
default:
puts("\033[31m Invalid choice ,pls try again \033[0m");
}
if ( !is_login) {
puts("\033[32m Restart ... \033[0m");
break;
}
}
}
}
}

server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
#gcc server.c -o server -lpthread -w

#include <ctype.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <syslog.h>
#include <time.h>
#include <unistd.h>

#include "clientinfo.h"
#include "threadpool.h"

#define CLK_TCK 1000
#define MAX_USER_NUM 0x50
#define LOCK_TIME 1000 * 60 * 10
#define MAX_THREAD_NUM 4
// typedef union epoll_data {
// void *ptr;
// int fd;
// uint32_t u32;
// uint64_t u64;
// } epoll_data_t;

// struct epoll_event {
// uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// };

typedef struct online_user {
char userfifo[0x50];
char statfifo[0x50];
char username[0x50];
char passwd[0x50];
struct online_user* next;
} USERLIST;

typedef struct {
char userfifo[0x50];
char statfifo[0x50];
char username[0x50];
char passwd[0x50];
int login_times;
clock_t start_time;
clock_t end_time;
} USER_INFO;

USERLIST* ONLINE_USERLIST;
int regist_fd, login_fd, msg_fd, logout_fd;
int user_num = 0, oneline_user_num = 0; // danger
USER_INFO USERINFO_DATA[MAX_USER_NUM];
tpool_t* pool;
pthread_mutex_t usernum_mutex, onlinenum_mutex, list_mutex;

void online_user_add(USERLIST* list) {
USERLIST* p = ONLINE_USERLIST;
while ((p->next) != NULL) {
p = p->next;
}
p->next = list;
}

void oneline_user_delete(char* str) {
USERLIST* p = ONLINE_USERLIST;
while (strcmp(p->next->username, str)) {
p = p->next;
}
USERLIST* temp = p->next;
p->next = temp->next;
free(temp);
}

void get_oneline_username(char* usernamebuf) {
USERLIST* p = ONLINE_USERLIST;
char* namebuf_index = usernamebuf;
while ((p->next) != NULL) {
p = p->next;
// printf("username:%s\n", p->username);
strcpy(namebuf_index, p->username);
namebuf_index += strlen(p->username);
*(namebuf_index) = ' ';
*(++namebuf_index) = ' ';
}
}

void epoll_add_event(int epoll_fd, int fd, int event) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev);
}

void create_fifo(char* str) {
int res;
if (access(str, F_OK) == -1) {
res = mkfifo(str, 0777);
if (res < 0)
errorinfo("[x] mkfifo failed");
}
}

char* gettime() {
clock_t current_time;
time(&current_time);
return asctime(gmtime(&current_time));
}

void create_log(char* username) {
char userlog[0x50];
char log_buffer[0x50];
sprintf(userlog, "/home/ubuntu/oslab/serverinfo/user_log/%s_log", username);

int log_fd = open(userlog, O_RDWR | O_NONBLOCK | O_CREAT | O_APPEND);
if (log_fd < 0)
errorinfo("open user_log failed");
sprintf(log_buffer, " %s regist %s \n", username, gettime());
int n = write(log_fd, log_buffer, strlen(log_buffer + 1));
// if (n < 0) {
// puts("[x] write user_log failed");
// }
close(log_fd);
}

void login_log(char* username) {
char userlog[0x50];
char log_buffer[0x50];
sprintf(userlog, "/home/ubuntu/oslab/serverinfo/user_log/%s_log", username);

int log_fd = open(userlog, O_RDWR | O_NONBLOCK | O_APPEND);
if (log_fd < 0)
errorinfo("open user_log failed");
sprintf(log_buffer, "%s login %s \n", username, gettime());
int n = write(log_fd, log_buffer, strlen(log_buffer + 1));
// if (n < 0) {
// puts("[x] write user_log failed");
// }
close(log_fd);
}

void logout_log(char* username) {
char userlog[0x50];
char log_buffer[0x50];
sprintf(userlog, "/home/ubuntu/oslab/serverinfo/user_log/%s_log", username);

int log_fd = open(userlog, O_RDWR | O_NONBLOCK | O_APPEND);
if (log_fd < 0)
errorinfo("open user_log failed");
sprintf(log_buffer, "%s logout %s \n", username, gettime());
int n = write(log_fd, log_buffer, strlen(log_buffer + 1));
// if (n < 0) {
// puts("[x] write user_log failed");
// }
close(log_fd);
}

void msg_log(char* sender, char* receiver, int success) {
char sender_log[0x50];
char receiver_log[0x50];
char log_buffer[0x50];
sprintf(sender_log, "/home/ubuntu/oslab/serverinfo/user_log/%s_log",
sender);
int sender_log_fd = open(sender_log, O_RDWR | O_NONBLOCK | O_APPEND);

if (success) {
sprintf(receiver_log, "/home/ubuntu/oslab/serverinfo/user_log/%s_log",
receiver);
int receiver_log_fd =
open(receiver_log, O_RDWR | O_NONBLOCK | O_APPEND);
if (sender_log_fd < 0 || receiver_log_fd < 0)
errorinfo("open user_log failed");
sprintf(log_buffer, "%s send messaget to %s (success). %s\n", sender,
receiver, gettime());
int n, m;
n = write(receiver_log_fd, log_buffer, strlen(log_buffer + 1));
m = write(sender_log_fd, log_buffer, strlen(log_buffer + 1));
// if (n < 0 || m < 0) {
// puts("[x] write user_log failed");
// }
close(receiver_log_fd);
close(sender_log_fd);
} else {
if (sender_log_fd < 0)
errorinfo("open user_log failed");
sprintf(log_buffer, "%s send messaget to %s (failed). %s\n", sender,
receiver, gettime());
int n = write(sender_log_fd, log_buffer, strlen(log_buffer + 1));
// if (n < 0) {
// puts("[x] write user_log failed");
// }
close(sender_log_fd);
}
}

void fifo_init() {
create_fifo(REG_FIFO);
create_fifo(LOGIN_FIFO);
create_fifo(MSG_FIFO);
create_fifo(LOGOUT_FIFO);
regist_fd = open(REG_FIFO, O_RDWR | O_NONBLOCK);
if (regist_fd < 0)
errorinfo("[x] open regist FIFO failed");
login_fd = open(LOGIN_FIFO, O_RDWR | O_NONBLOCK);
if (login_fd < 0)
errorinfo("[x] open login FIFO failed");
logout_fd = open(LOGOUT_FIFO, O_RDWR | O_NONBLOCK);
if (logout_fd < 0)
errorinfo("[x] open logout FIFO failed");
msg_fd = open(MSG_FIFO, O_RDWR | O_NONBLOCK);
if (msg_fd < 0)
errorinfo("[x] open msg FIFO failed");
}

void profile_init() {
ONLINE_USERLIST = (USERLIST*)malloc(sizeof(USERLIST));
ONLINE_USERLIST->next = NULL;

pthread_mutex_init(&usernum_mutex, NULL);
pthread_mutex_init(&onlinenum_mutex, NULL);
pthread_mutex_init(&list_mutex, NULL);

pool = NULL;
if (create_tpool(&pool, MAX_THREAD_NUM) != 0) {
errorinfo("create_tpool failed!");
}
}

void* login_thread() {
// puts("[+] login thread start ...");
int res;
REG_LOGIN_INFO login_info;
int statinfo_fd;
int is_success = 0;
int is_overtimes = 0;
res = read(login_fd, &login_info, sizeof(REG_LOGIN_INFO));
if (res > 0) {
for (int i = 0; i < user_num; i++) {
if (!strcmp(login_info.username, USERINFO_DATA[i].username)) {
// find the username
if (USERINFO_DATA[i].login_times >= 4) {
is_overtimes = 1;
USERINFO_DATA[i].end_time = clock();

if (((USERINFO_DATA[i].end_time -
USERINFO_DATA[i].start_time)) > LOCK_TIME) {

is_overtimes = 0;
USERINFO_DATA[i].login_times = 0;
} else {
break;
}
}

if (!strcmp(login_info.passwd,
USERINFO_DATA[i].passwd)) { // login success
char buf[10];
itoa(LOGIN_SUCCESS, buf, 10);
statinfo_fd =
open(login_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] login_thread open statfifo failed");
write(statinfo_fd, buf, 10);
close(statinfo_fd);
is_success = 1;

pthread_mutex_lock(&list_mutex);

if (!(ONLINE_USERLIST->next)) { // list head

USERLIST* user = (USERLIST*)malloc(sizeof(USERLIST));
strcpy(user->userfifo, login_info.userfifo);
strcpy(user->username, login_info.username);
strcpy(user->statfifo, login_info.statfifo);
strcpy(user->passwd, login_info.passwd);
user->next = NULL;
ONLINE_USERLIST->next = user;
} else { // add list
USERLIST* user = (USERLIST*)malloc(sizeof(USERLIST));
strcpy(user->userfifo, login_info.userfifo);
strcpy(user->username, login_info.username);
strcpy(user->statfifo, login_info.statfifo);
strcpy(user->passwd, login_info.passwd);
user->next = NULL;
online_user_add(user);
}

pthread_mutex_unlock(&list_mutex);

pthread_mutex_lock(&onlinenum_mutex);
oneline_user_num++; // mutex
pthread_mutex_unlock(&onlinenum_mutex);
login_log(login_info.username);
break;
} else {
/* passwd error */
if (!USERINFO_DATA[i].start_time) {
USERINFO_DATA[i].start_time = clock();
// printf("start:%d\n", USERINFO_DATA[i].start_time);
}
USERINFO_DATA[i].login_times++;
// printf("login times:%d\n", USERINFO_DATA[i].login_times);
}
}
}
if (!is_success) {
char buf[10];
if (is_overtimes) {
itoa(LOGIN_OVERTIMES, buf, 10);
} else {
itoa(LOGIN_FAILED, buf, 10);
}
statinfo_fd = open(login_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] login_thread open statfifo failed");
write(statinfo_fd, buf, 10);
close(statinfo_fd);
}
}
}

void* regist_thread() {
// puts("[+] regist thread start ...");
int res;
REG_LOGIN_INFO regist_info;
int statinfo_fd;
int is_success = 1;

res = read(regist_fd, &regist_info, sizeof(REG_LOGIN_INFO));
if (res > 0) {
for (int i = 0; i < user_num; i++) {
if (!strcmp(regist_info.username,
USERINFO_DATA[i].username)) { // username same
is_success = 0;
break;
}
}
if (is_success) { // regist success
pthread_mutex_lock(&usernum_mutex);
int i = user_num++;
pthread_mutex_unlock(&usernum_mutex);
strcpy(USERINFO_DATA[i].username, regist_info.username);
strcpy(USERINFO_DATA[i].passwd, regist_info.passwd);
strcpy(USERINFO_DATA[i].userfifo, regist_info.userfifo);
strcpy(USERINFO_DATA[i].statfifo, regist_info.statfifo);
char buf[10];
itoa(REGIST_SUCCESS, buf, 10);
statinfo_fd = open(regist_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] regist_thread open statfifo failed");

create_log(regist_info.username);

write(statinfo_fd, buf, 10);
close(statinfo_fd);
} else { // regist failed : username same
char buf[10];
itoa(REGIST_FAILED, buf, 10);
statinfo_fd = open(regist_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] regist_thread open statfifo failed");
write(statinfo_fd, buf, 10);
close(statinfo_fd);
}
}
}

void* msg_thread() {
// puts("[+] msg thread start ...");
int res;
MESSAGE_INFO msg_info;
int receiver_fd;
int is_send = 1;

res = read(msg_fd, &msg_info, sizeof(MESSAGE_INFO));
if (res > 0) {
/* normal msg */
if (strcmp(msg_info.receiver, SERVER_NAME)) {
char receiver_fifo[0x50];
//@TODO sender
sprintf(receiver_fifo,
"/home/ubuntu/oslab/clientinfo/userfifo/client_%s_FIFO",
msg_info.receiver);
receiver_fd = open(receiver_fifo, O_WRONLY | O_NONBLOCK);
if (receiver_fd < 0) {
puts("[x] msg_thread open failed");
is_send = 0;
}
//@TODO log
char buf[10];
char sender_fifo[0x50];
sprintf(sender_fifo,
"/home/ubuntu/oslab/clientinfo/statfifo/client_%s_stat",
msg_info.sender);

if (is_send) {
write(receiver_fd, &msg_info, sizeof(MESSAGE_INFO));
close(receiver_fd);
msg_log(msg_info.sender, msg_info.receiver, is_send);

itoa(SEND_SUCCESS, buf, 10);
int sender_fd = open(sender_fifo, O_WRONLY | O_NONBLOCK);
if (sender_fd < 0)
errorinfo("[x] msg_thread open userfifo failed");
write(sender_fd, buf, 10);
close(sender_fd);
} else {
msg_log(msg_info.sender, msg_info.receiver, is_send);

itoa(SEND_FAILED, buf, 10);
int sender_fd = open(sender_fifo, O_WRONLY | O_NONBLOCK);
if (sender_fd < 0)
errorinfo("[x] msg_thread open userfifo failed");
write(sender_fd, buf, 10);
close(sender_fd);
}
} else {
/* return online info */

char receiver_fifo[0x50];

sprintf(receiver_fifo,
"/home/ubuntu/oslab/clientinfo/userfifo/client_%s_FIFO",
msg_info.sender);
receiver_fd = open(receiver_fifo, O_WRONLY | O_NONBLOCK);
// if (receiver_fd < 0) {
// puts("[x] msg_thread open failed");
// }
char buf[10];
get_oneline_username(msg_info.message);
itoa(oneline_user_num, buf, 10);
strcpy(msg_info.receiver, buf);
strcpy(msg_info.sender, SERVER_NAME);
write(receiver_fd, &msg_info, sizeof(MESSAGE_INFO));
close(receiver_fd);
}
}
}

void* logout_thread() {
// puts("[+] logout thread start ...");
int res;
LOGOUT_INFO logout_info;
int statinfo_fd;
int is_find = 0;
res = read(logout_fd, &logout_info, sizeof(LOGOUT_INFO));
if (res > 0) {
//@TODO list find
for (int i = 0; i < user_num; i++) {
if (!strcmp(USERINFO_DATA[i].username, logout_info.username)) {
is_find = 1;
break;
}
}
// USERLIST* p = ONLINE_USERLIST;
// while (p->next!=NULL) {
// is_find = !(strcmp(p->next->username, logout_info.username));
// p = p->next;
// if(is_find)
// break;
// }


if (is_find) {
pthread_mutex_lock(&list_mutex);
oneline_user_delete(logout_info.username);
pthread_mutex_unlock(&list_mutex);

pthread_mutex_lock(&onlinenum_mutex);
oneline_user_num--;
pthread_mutex_unlock(&onlinenum_mutex);
char buf[10];
itoa(LOGOUT_SUCCESS, buf, 10);
statinfo_fd = open(logout_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] logout_thread open statfifo failed");

logout_log(logout_info.username);
write(statinfo_fd, buf, 10);
close(statinfo_fd);
} else {
char buf[10];
itoa(LOGOUT_FAILED, buf, 10);
statinfo_fd = open(logout_info.statfifo, O_WRONLY | O_NONBLOCK);
if (statinfo_fd < 0)
errorinfo("[x] logout_thread open statfifo failed");

write(statinfo_fd, buf, 10);
close(statinfo_fd);
}
}
}

void thread_init() {
pthread_t ptd_1, ptd_2, ptd_3, ptd_4;
pthread_create(&ptd_1, NULL, regist_thread, NULL);
pthread_create(&ptd_2, NULL, login_thread, NULL);
pthread_create(&ptd_3, NULL, msg_thread, NULL);
pthread_create(&ptd_4, NULL, logout_thread, NULL);
}

void monitor() {
struct epoll_event events[4];
int task_num = 0;
int epoll_fd = epoll_create(4); //
if (epoll_fd < 0) {
errorinfo("[x] epoll create failed");
}
epoll_add_event(epoll_fd, login_fd, EPOLLIN | EPOLLET);
epoll_add_event(epoll_fd, regist_fd, EPOLLIN | EPOLLET);
epoll_add_event(epoll_fd, msg_fd, EPOLLIN | EPOLLET);
epoll_add_event(epoll_fd, logout_fd, EPOLLIN | EPOLLET);
while (1) {
int num = epoll_wait(epoll_fd, events, 4, -1);
if (num < 0)
errorinfo("[x] epoll wait");
if (num == 0)
continue;
pthread_t ptd_1, ptd_2, ptd_3, ptd_4;
for (int i = 0; i < num; i++) {
struct epoll_event ev = events[i];
if (ev.data.fd == login_fd && ev.events & EPOLLIN) {
// pthread_create(&ptd_2, NULL, login_thread, NULL);
add_task_2_tpool(pool, login_thread, NULL);
epoll_add_event(epoll_fd, login_fd, EPOLLIN | EPOLLET);
// pthread_join(ptd_2, NULL);
} else if (ev.data.fd == regist_fd && ev.events & EPOLLIN) {
// pthread_create(&ptd_1, NULL, regist_thread, NULL);
add_task_2_tpool(pool, regist_thread, NULL);
epoll_add_event(epoll_fd, regist_fd, EPOLLIN | EPOLLET);
// pthread_join(ptd_1, NULL);
} else if (ev.data.fd == msg_fd && ev.events & EPOLLIN) {
// pthread_create(&ptd_3, NULL, msg_thread, NULL);
add_task_2_tpool(pool, msg_thread, NULL);
epoll_add_event(epoll_fd, msg_fd, EPOLLIN | EPOLLET);
// pthread_join(ptd_3, NULL);
} else if (ev.data.fd == logout_fd && ev.events & EPOLLIN) {
// pthread_create(&ptd_4, NULL, logout_thread, NULL);
add_task_2_tpool(pool, logout_thread, NULL);
epoll_add_event(epoll_fd, logout_fd, EPOLLIN | EPOLLET);
// pthread_join(ptd_4, NULL);
}
}
}
}

int run_server() {
fifo_init();
profile_init();
// thread_init();
monitor();
}
int main() {
signal(SIGTTOU, SIG_IGN);
signal(SIGTTIN, SIG_IGN);
signal(SIGTSTP, SIG_IGN);
signal(SIGHUP, SIG_IGN);
int pid, i;
pid = fork();
if (pid > 0) {
exit(0);
} else if (pid < 0) {
errorinfo("[x] fork error");
}
for (i = 0; i < NOFILE; close(i++)) {
/*改变工作目录,使得进程不与任何文件系统联系*/
chdir("/");
/*将文件当时创建屏蔽字设置为0*/
umask(0);
/*忽略SIGCHLD信号*/
signal(SIGCHLD, SIG_IGN);
}

syslog(LOG_USER | LOG_INFO, "my chat_server process start \n");

run_server();
}

线程池 threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

#include "tpool.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>


static void* work_routine(void* args)
{
tpool_t* pool = (tpool_t*)args;
tpool_work_t* work = NULL;

while(1){
pthread_mutex_lock(&pool->queue_lock);
while (!pool->tpool_head && !pool->shutdown) {
pthread_cond_wait(&pool->queue_ready, &pool->queue_lock);
}

if(pool->shutdown){
pthread_mutex_unlock(&pool->queue_lock);
pthread_exit(NULL);
}

/* tweak a work*/
work = pool->tpool_head;
pool->tpool_head = (tpool_work_t*)pool->tpool_head->next;
pthread_mutex_unlock(&pool->queue_lock);

work->work_routine(work->args);

free(work);
}
return NULL;
}

int create_tpool(tpool_t** pool,size_t max_thread_num)
{
(*pool) = (tpool_t*)malloc(sizeof(tpool_t));
if(NULL == *pool){
printf("in %s,malloc tpool_t failed!,errno = %d,explain:%s\n",__func__,errno,strerror(errno));
exit(-1);
}
(*pool)->shutdown = 0;
(*pool)->maxnum_thread = max_thread_num;
(*pool)->thread_id = (pthread_t*)malloc(sizeof(pthread_t)*max_thread_num);
if((*pool)->thread_id == NULL){
printf("in %s,init thread id failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}
(*pool)->tpool_head = NULL;
if(pthread_mutex_init(&((*pool)->queue_lock),NULL) != 0){
printf("in %s,initial mutex failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}

if(pthread_cond_init(&((*pool)->queue_ready),NULL) != 0){
printf("in %s,initial condition variable failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}

for(int i = 0; i < max_thread_num; i++){
if(pthread_create(&((*pool)->thread_id[i]),NULL,work_routine,(void*)(*pool)) != 0){
printf("pthread_create failed!\n");
exit(-1);
}
}
return 0;
}

void destroy_tpool(tpool_t* pool)
{
tpool_work_t* tmp_work;

if(pool->shutdown){
return;
}
pool->shutdown = 1;

pthread_mutex_lock(&pool->queue_lock);
pthread_cond_broadcast(&pool->queue_ready);
pthread_mutex_unlock(&pool->queue_lock);

for(int i = 0; i < pool->maxnum_thread; i++){
pthread_join(pool->thread_id[i],NULL);
}
free(pool->thread_id);
while(pool->tpool_head){
tmp_work = pool->tpool_head;
pool->tpool_head = (tpool_work_t*)pool->tpool_head->next;
free(tmp_work);
}

pthread_mutex_destroy(&pool->queue_lock);
pthread_cond_destroy(&pool->queue_ready);
free(pool);
}

int add_task_2_tpool(tpool_t* pool,void* (*routine)(void*),void* args)
{
tpool_work_t* work,*member;

if(!routine){
printf("rontine is null!\n");
return -1;
}

work = (tpool_work_t*)malloc(sizeof(tpool_work_t));
if(!work){
printf("in %s,malloc work error!,errno = %d,explain:%s\n",__func__,errno,strerror(errno));
return -1;
}

work->work_routine = routine;
work->args = args;
work->next = NULL;

pthread_mutex_lock(&pool->queue_lock);
member = pool->tpool_head;
if(!member){
pool->tpool_head = work;
}
else{
while(member->next){
member = (tpool_work_t*)member->next;
}
member->next = work;
}

//notify the pool that new task arrived!
pthread_cond_signal(&pool->queue_ready);
pthread_mutex_unlock(&pool->queue_lock);
return 0;
}

tpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#ifndef T_POOL
#define T_POOL

#include <ctype.h>
#include <pthread.h>

typedef struct tpool_work {
void* (*work_routine)(void*); // function to be called
void* args; // arguments
struct tool_work* next;
} tpool_work_t;

typedef struct tpool {
size_t shutdown; // is tpool shutdown or not, 1 ---> yes; 0 ---> no
size_t maxnum_thread; // maximum of threads
pthread_t* thread_id; // a array of threads
tpool_work_t* tpool_head; // tpool_work queue
pthread_cond_t queue_ready; // condition varaible
pthread_mutex_t queue_lock; // queue lock
} tpool_t;



int create_tpool(tpool_t** pool, size_t max_thread_num);


void destroy_tpool(tpool_t* pool);


int add_task_2_tpool(tpool_t* pool, void* (*routine)(void*), void* args);

#endif

参考资料

https://juejin.cn/post/6963589249463500831#heading-3

https://bbs.huaweicloud.com/blogs/381801