Давайте представим следующую задачу. Есть протокол TCP, и пусть наш клиент будет запрашивать у сервера содержимое каталогов.
Для нового соединения будем создавать новый поток, и для них (новых соединений) будем использовать заранее запущенные потоки из пула. Размер пула пускай будет фиксированный.
Клиент
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
#include <stdlib.h> #include <unistd.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <string.h> #include <arpa/inet.h> #include "assert.h" int main(int argc, char **argv) { int sock_desc; int readed, i; struct sockaddr_in serv_addr; char buff[260]; char *data; size_t data_size = 0; char eof[2] = { 4 , 0 }; int done = 0; memset(buff, 0 , sizeof(buff)); memset(&serv_addr, 0, sizeof(serv_addr)); if(argc < 4) { printf("Usage: %s ip port directory [directory2..]\n", argv[0]); exit(EXIT_FAILURE); } ASSERT((sock_desc = socket(AF_INET, SOCK_STREAM, 0)) != -1); serv_addr.sin_family = AF_INET; //не AF_UNIX, а AF_INET, типа, интернет домен serv_addr.sin_port = htons(atoi(argv[2])); //host to network short [dir...] ;) ASSERT(inet_pton(AF_INET, argv[1], &serv_addr.sin_addr) == 1) ; //преобразуем ip в сетевой адрес ASSERT(connect(sock_desc, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) != -1); //установим соединение с серверным гнездом for(i = 3; i < argc; ++i) //третий элемент - имя первой директории data_size += strlen(argv[i]) + 1; data_size += 2; //EOF (?) ASSERT((data = malloc(data_size)) != NULL); memset(data, 0, data_size); strcat(data, argv[3]); for(i = 4; i < argc; ++i) // { strcat(data, eof); strcat(data, argv[i]); } strcat(data,"\r\n"); ASSERT(write(sock_desc, data, data_size) >= 0); //запишем в сокет free(data); memset(buff, 0 , sizeof(buff)); while(!done) { ASSERT((readed = read(sock_desc, buff, sizeof(buff))) >= 0); // if(buff[readed - 1] == 4) readed--, done = 1; //дойдем до End of transmission ASSERT(write(STDOUT_FILENO, buff, readed) >= 0) ; } close(sock_desc); return 0; } |
Сервер (с использованием POSIX-семафоров, ибо кошер):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
#include <stdlib.h> #include <unistd.h> #include <sys/socket.h> #include <sys/types.h> #include <string.h> #include <semaphore.h> #include <pthread.h> #include <errno.h> #include <stdio.h> #include <netinet/in.h> #include <dirent.h> #include "assert.h" #define THREADS_NUMBER 4 struct threadpool_t { pthread_t thread[THREADS_NUMBER]; sem_t mutex[THREADS_NUMBER]; int conn_desc[THREADS_NUMBER]; int cur_thread; } threadpool; void *handler(void *arg) { int id = * (int *) arg; int readed, i; size_t buffsize = 0; char *buff = NULL, *tmp, temp[1024], *stok; DIR *d; struct dirent *dir; char eof[2] = { 4 , 0 }; // конец файла - EOT и NULL free(arg); for(;;) { ASSERT(sem_wait(&threadpool.mutex[id]) != -1); //если нормально семафор уменьшился do { buffsize += 1024; ASSERT((tmp = realloc(buff, buffsize)) != NULL); buff = tmp; memset(buff - 1024 + buffsize, 0, 1024); ASSERT((readed = read(threadpool.conn_desc[id], buff - 1024 + buffsize, 1024)) >= 0); printf("%d", readed); if(readed != 1024 && readed != 2 && readed != 0) { i = -1; while(buff[++i] != 13 && buff[i] != 0); //пока не возврат каретки и не null memset(buff + i, 0, buffsize - i); stok = strtok(buff, eof); //поделим buff на токены по eof-разделителю while (stok != NULL) { memset(temp, 0, sizeof(temp)); d = opendir(stok); if(d == NULL) { ASSERT(write(threadpool.conn_desc[id], stok, strlen(stok)) > 0); sprintf(temp, ":%s\r\n", strerror(errno)); ASSERT(write(threadpool.conn_desc[id], temp, strlen(temp)) > 0); } while(d != NULL && (dir = readdir(d)) != NULL) { sprintf(temp, "%s\r\n", dir->d_name); ASSERT(write(threadpool.conn_desc[id], temp, strlen(temp)) > 0); memset(temp, 0, sizeof(temp)); } if(d != NULL) ASSERT(closedir(d) != -1); stok = strtok(NULL, eof); } } ASSERT(write(threadpool.conn_desc[id], eof, 1) > 0); if(readed != 1024) { buffsize = 0; free(buff); buff = NULL; } } while (readed); ASSERT(close(threadpool.conn_desc[id]) != -1); threadpool.conn_desc[id] = -1; } return NULL; } int main(int argc, char **argv) { int sock_desc, i; struct sockaddr_in serv_addr, cli_addr; socklen_t clilen = sizeof(cli_addr); int *arg; memset(&threadpool, 0, sizeof(threadpool)); //занулили структуру memset(&threadpool.conn_desc, -1, sizeof(threadpool.conn_desc)); //заминусили дескрипторы memset(&serv_addr, 0, sizeof(serv_addr)); //занулили адреса сервера ASSERT((sock_desc = socket(AF_INET, SOCK_STREAM, 0)) >= 0); //чекаем создание сокета serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(3057); // всё устанавливаем ASSERT(bind(sock_desc, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0); //пробуем забиндиться listen(sock_desc, 5); //инициализируем гнездо, ориентированное на установление соединения for(i = 0; i < THREADS_NUMBER; ++i) { ASSERT((arg = malloc(sizeof(*arg))) != NULL); //проверим выделилась ли память *arg = i; ASSERT(sem_init(&threadpool.mutex[i], 0, 0) != -1); //инициализируем семафор со значением 0 и для этого процесса ASSERT(pthread_create(&threadpool.thread[i], NULL, &handler, (void *) arg) == 0); } for(;;) { ASSERT((i = accept(sock_desc,(struct sockaddr*) &cli_addr, &clilen)) != -1); //чекнем есть ли соединение и если что положим его в cli_addr и сохраним len для клиента while(threadpool.conn_desc[threadpool.cur_thread += (threadpool.cur_thread + 1) != THREADS_NUMBER ? 1 : -threadpool.cur_thread] != -1); //нувыпоняли threadpool.conn_desc[threadpool.cur_thread] = i; //установим дескриптор соединения для текущего объекта равным i ASSERT(sem_post(&threadpool.mutex[threadpool.cur_thread]) != -1); //увеличим значение для мьютекса для этого конкретного процесса } return 0; } |
Алсо, переопределим макроопределение ASSERT в заголовочнике:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#ifndef ASSERT_H #define ASSERT_H #endif #define ASSERT(cond) {\ if(!(cond))\ {\ sprintf(err, "%s, line %d: %s",__FILE__, __LINE__, #cond);\ perror(err);\ exit(EXIT_FAILURE);\ }\ } char err[200]; #endif /* ASSERT_H */ |
Если объяснять на пальцах, то получается следующая схема:
Кстати говоря, стоит отметить, что у нас изначально один сокет открыт на прослушку соединений, а потом, при установлении нового соединения, на сервере открывается новый сокет, и так для каждого.
Компильнуть можно вот таким Makefile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
CFLAGS=-Wall -Wextra -pedantic -ansi sLDFLAGS=-lsocket -lnsl -lrt SOLFIX=-D_REENTRANT cLDFLAGS=-lsocket -lnsl CC=gcc all: server client server: server.c assert.h $(CC) $(CFLAGS) -Wno-unused-parameter $(SOLFIX) $(sLDFLAGS) -o server server.c client: client.c assert.h $(CC) $(CFLAGS) $(sLDFLAGS) -o client client.c |
Такие дела.