This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy path9-4-epolloneshot.c
169 lines (146 loc) · 4.24 KB
/
9-4-epolloneshot.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
int epollfd;
int sockfd;
};
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/*将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中,参数oneshot指定是否注册fd上的EPOLLONESHOT事件
*/
void addfd(int epollfd, int fd, bool oneshot)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if (oneshot)
{
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/*重置fd上的事件,这样操作后,尽管fd上的EPOLLONESHOT事件被注册,但是操作系统仍然会触发fd上的EPOLLIN事件,且之触发一次
*/
void reset_oneshot(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
/*工作线程*/
void* worker(void *arg)
{
int sockfd = ((fds *)arg)->sockfd;
int epollfd = ((fds *)arg)->epollfd;
fprintf(stdout, "start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, '\0', BUFFER_SIZE);
while(1)
{
int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
if (ret == 0)
{
close(sockfd);
fprintf(stderr, "foreiner closed the connection\n");
break;
}
else if (ret < 0)
{
if (errno == EAGAIN)
{
reset_oneshot(epollfd, sockfd);
fprintf(stdout, "read later\n");
break;
}
}
else
{
fprintf(stdout, "get content: %s\n", buf);
sleep(5); //sleep 5 second
}
}
fprintf(stdout, "end thread receiveing data on fd: %d\n", sockfd);
return NULL;
}
int main(int argc, char *argv[])
{
if (argc <= 2)
{
fprintf(stderr, "usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char * ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd > 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd, false);
while(1)
{
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0)
{
fprintf(stderr, "epoll failure\n");
break;
}
for(int i = 0; i < ret; i ++)
{
int sockfd = events[i].data.fd;
if (sockfd == listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addlength);
/**/
addfd(epollfd, connfd, true);
}
else if (events[i].events & EPOLLIN)
{
pthread_t thread;
fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
}
else
{
fprintf(stderr, "something else happened\n");
}
}
}
close(listenfd);
return 0;
}